88from abc import abstractmethod
99from itertools import chain
1010from queue import Empty , Full , LifoQueue
11- from typing import Any , Callable , Dict , List , Optional , Type , TypeVar , Union
11+ from typing import Any , Callable , Dict , List , Literal , Optional , Type , TypeVar , Union
1212from urllib .parse import parse_qs , unquote , urlparse
1313
1414from redis .cache import (
@@ -249,6 +249,13 @@ def maintenance_state(self, state: "MaintenanceState"):
249249 """
250250 pass
251251
252+ @abstractmethod
253+ def getpeername (self ):
254+ """
255+ Returns the peer name of the connection.
256+ """
257+ pass
258+
252259 @abstractmethod
253260 def mark_for_reconnect (self ):
254261 """
@@ -402,6 +409,7 @@ def __init__(
402409
403410 if maintenance_events_config and maintenance_events_config .enabled :
404411 if maintenance_events_pool_handler :
412+ maintenance_events_pool_handler .set_connection (self )
405413 self ._parser .set_node_moving_push_handler (
406414 maintenance_events_pool_handler .handle_event
407415 )
@@ -484,6 +492,7 @@ def set_parser(self, parser_class):
484492 def set_maintenance_event_pool_handler (
485493 self , maintenance_event_pool_handler : MaintenanceEventPoolHandler
486494 ):
495+ maintenance_event_pool_handler .set_connection (self )
487496 self ._parser .set_node_moving_push_handler (
488497 maintenance_event_pool_handler .handle_event
489498 )
@@ -867,6 +876,11 @@ def maintenance_state(self) -> MaintenanceState:
867876 def maintenance_state (self , state : "MaintenanceState" ):
868877 self ._maintenance_state = state
869878
879+ def getpeername (self ):
880+ if not self ._sock :
881+ return None
882+ return self ._sock .getpeername ()[0 ]
883+
870884 def mark_for_reconnect (self ):
871885 self ._should_reconnect = True
872886
@@ -1892,10 +1906,27 @@ def re_auth_callback(self, token: TokenInterface):
18921906 for conn in self ._in_use_connections :
18931907 conn .set_re_auth_token (token )
18941908
1895- def set_maintenance_state_for_all_connections (self , state : "MaintenanceState" ):
1909+ def set_maintenance_state_for_connections (
1910+ self ,
1911+ state : "MaintenanceState" ,
1912+ matching_address : Optional [str ] = None ,
1913+ address_type_to_match : Literal ["connected" , "configured" ] = "connected" ,
1914+ ):
18961915 for conn in self ._available_connections :
1916+ if address_type_to_match == "connected" :
1917+ if matching_address and conn .getpeername () != matching_address :
1918+ continue
1919+ else :
1920+ if matching_address and conn .host != matching_address :
1921+ continue
18971922 conn .maintenance_state = state
18981923 for conn in self ._in_use_connections :
1924+ if address_type_to_match == "connected" :
1925+ if matching_address and conn .getpeername () != matching_address :
1926+ continue
1927+ else :
1928+ if matching_address and conn .host != matching_address :
1929+ continue
18991930 conn .maintenance_state = state
19001931
19011932 def set_maintenance_state_in_connection_kwargs (self , state : "MaintenanceState" ):
@@ -1963,7 +1994,12 @@ def remove_tmp_config_from_connection_kwargs(self):
19631994 }
19641995 )
19651996
1966- def reset_connections_tmp_settings (self ):
1997+ def reset_connections_tmp_settings (
1998+ self ,
1999+ moving_address : Optional [str ] = None ,
2000+ reset_host_address : bool = False ,
2001+ reset_relax_timeout : bool = False ,
2002+ ):
19672003 """
19682004 Restore original settings from temporary configuration for all connections in the pool.
19692005
@@ -1978,16 +2014,25 @@ def reset_connections_tmp_settings(self):
19782014 """
19792015 with self ._lock :
19802016 for conn in self ._available_connections :
2017+ if moving_address and conn .host != moving_address :
2018+ continue
19812019 conn .reset_tmp_settings (
1982- reset_host_address = True , reset_relax_timeout = True
2020+ reset_host_address = reset_host_address ,
2021+ reset_relax_timeout = reset_relax_timeout ,
19832022 )
19842023 for conn in self ._in_use_connections :
2024+ if moving_address and conn .host != moving_address :
2025+ continue
19852026 conn .reset_tmp_settings (
1986- reset_host_address = True , reset_relax_timeout = True
2027+ reset_host_address = reset_host_address ,
2028+ reset_relax_timeout = reset_relax_timeout ,
19872029 )
19882030
19892031 def update_active_connections_for_reconnect (
1990- self , tmp_host_address : str , tmp_relax_timeout : Optional [float ] = None
2032+ self ,
2033+ tmp_host_address : str ,
2034+ tmp_relax_timeout : Optional [float ] = None ,
2035+ moving_address_src : Optional [str ] = None ,
19912036 ):
19922037 """
19932038 Mark all active connections for reconnect.
@@ -1999,6 +2044,8 @@ def update_active_connections_for_reconnect(
19992044 :param tmp_relax_timeout: The relax timeout to use for the connection.
20002045 """
20012046 for conn in self ._in_use_connections :
2047+ if moving_address_src and conn .getpeername () != moving_address_src :
2048+ continue
20022049 self ._update_connection_for_reconnect (
20032050 conn , tmp_host_address , tmp_relax_timeout
20042051 )
@@ -2007,6 +2054,7 @@ def disconnect_and_reconfigure_free_connections(
20072054 self ,
20082055 tmp_host_address : str ,
20092056 tmp_relax_timeout : Optional [float ] = None ,
2057+ moving_address_src : Optional [str ] = None ,
20102058 ):
20112059 """
20122060 Disconnect all free/available connections.
@@ -2019,13 +2067,17 @@ def disconnect_and_reconfigure_free_connections(
20192067 """
20202068
20212069 for conn in self ._available_connections :
2070+ if moving_address_src and conn .getpeername () != moving_address_src :
2071+ continue
20222072 self ._disconnect_and_update_connection_for_reconnect (
20232073 conn , tmp_host_address , tmp_relax_timeout
20242074 )
20252075
20262076 def update_connections_current_timeout (
20272077 self ,
20282078 relax_timeout : Optional [float ],
2079+ matching_address : Optional [str ] = None ,
2080+ address_type_to_match : Literal ["connected" , "configured" ] = "connected" ,
20292081 include_free_connections : bool = False ,
20302082 ):
20312083 """
@@ -2039,10 +2091,22 @@ def update_connections_current_timeout(
20392091 :param include_available_connections: Whether to include available connections in the update.
20402092 """
20412093 for conn in self ._in_use_connections :
2094+ if address_type_to_match == "connected" :
2095+ if matching_address and conn .getpeername () != matching_address :
2096+ continue
2097+ else :
2098+ if matching_address and conn .host != matching_address :
2099+ continue
20422100 conn .update_current_socket_timeout (relax_timeout )
20432101
20442102 if include_free_connections :
20452103 for conn in self ._available_connections :
2104+ if address_type_to_match == "connected" :
2105+ if matching_address and conn .getpeername () != matching_address :
2106+ continue
2107+ else :
2108+ if matching_address and conn .host != matching_address :
2109+ continue
20462110 conn .update_current_socket_timeout (relax_timeout )
20472111
20482112 def _update_connection_for_reconnect (
@@ -2308,7 +2372,10 @@ def disconnect(self):
23082372 self ._locked = False
23092373
23102374 def update_active_connections_for_reconnect (
2311- self , tmp_host_address : str , tmp_relax_timeout : Optional [float ] = None
2375+ self ,
2376+ tmp_host_address : str ,
2377+ tmp_relax_timeout : Optional [float ] = None ,
2378+ moving_address_src : Optional [str ] = None ,
23122379 ):
23132380 """
23142381 Mark all active connections for reconnect.
@@ -2323,6 +2390,8 @@ def update_active_connections_for_reconnect(
23232390 connections_in_queue = {conn for conn in self .pool .queue if conn }
23242391 for conn in self ._connections :
23252392 if conn not in connections_in_queue :
2393+ if moving_address_src and conn .getpeername () != moving_address_src :
2394+ continue
23262395 self ._update_connection_for_reconnect (
23272396 conn , tmp_host_address , tmp_relax_timeout
23282397 )
@@ -2331,6 +2400,7 @@ def disconnect_and_reconfigure_free_connections(
23312400 self ,
23322401 tmp_host_address : str ,
23332402 tmp_relax_timeout : Optional [Number ] = None ,
2403+ moving_address_src : Optional [str ] = None ,
23342404 ):
23352405 """
23362406 Disconnect all free/available connections.
@@ -2345,13 +2415,17 @@ def disconnect_and_reconfigure_free_connections(
23452415
23462416 for conn in existing_connections :
23472417 if conn :
2418+ if moving_address_src and conn .getpeername () != moving_address_src :
2419+ continue
23482420 self ._disconnect_and_update_connection_for_reconnect (
23492421 conn , tmp_host_address , tmp_relax_timeout
23502422 )
23512423
23522424 def update_connections_current_timeout (
23532425 self ,
23542426 relax_timeout : Optional [float ] = None ,
2427+ matching_address : Optional [str ] = None ,
2428+ address_type_to_match : Literal ["connected" , "configured" ] = "connected" ,
23552429 include_free_connections : bool = False ,
23562430 ):
23572431 """
@@ -2365,11 +2439,23 @@ def update_connections_current_timeout(
23652439 """
23662440 if include_free_connections :
23672441 for conn in tuple (self ._connections ):
2442+ if address_type_to_match == "connected" :
2443+ if matching_address and conn .getpeername () != matching_address :
2444+ continue
2445+ else :
2446+ if matching_address and conn .host != matching_address :
2447+ continue
23682448 conn .update_current_socket_timeout (relax_timeout )
23692449 else :
23702450 connections_in_queue = {conn for conn in self .pool .queue if conn }
23712451 for conn in self ._connections :
23722452 if conn not in connections_in_queue :
2453+ if address_type_to_match == "connected" :
2454+ if matching_address and conn .getpeername () != matching_address :
2455+ continue
2456+ else :
2457+ if matching_address and conn .host != matching_address :
2458+ continue
23732459 conn .update_current_socket_timeout (relax_timeout )
23742460
23752461 def _update_maintenance_events_config_for_connections (
@@ -2387,14 +2473,24 @@ def _update_maintenance_events_configs_for_connections(
23872473 conn .set_maintenance_event_pool_handler (maintenance_events_pool_handler )
23882474 conn .maintenance_events_config = maintenance_events_pool_handler .config
23892475
2390- def reset_connections_tmp_settings (self ):
2476+ def reset_connections_tmp_settings (
2477+ self ,
2478+ moving_address : Optional [str ] = None ,
2479+ reset_host_address : bool = False ,
2480+ reset_relax_timeout : bool = False ,
2481+ ):
23912482 """
23922483 Override base class method to work with BlockingConnectionPool's structure.
23932484
23942485 Restore original settings from temporary configuration for all connections in the pool.
23952486 """
23962487 for conn in tuple (self ._connections ):
2397- conn .reset_tmp_settings (reset_host_address = True , reset_relax_timeout = True )
2488+ if moving_address and conn .host != moving_address :
2489+ continue
2490+ conn .reset_tmp_settings (
2491+ reset_host_address = reset_host_address ,
2492+ reset_relax_timeout = reset_relax_timeout ,
2493+ )
23982494
23992495 def set_in_maintenance (self , in_maintenance : bool ):
24002496 """
@@ -2405,6 +2501,18 @@ def set_in_maintenance(self, in_maintenance: bool):
24052501 """
24062502 self ._in_maintenance = in_maintenance
24072503
2408- def set_maintenance_state_for_all_connections (self , state : "MaintenanceState" ):
2504+ def set_maintenance_state_for_connections (
2505+ self ,
2506+ state : "MaintenanceState" ,
2507+ matching_address : Optional [str ] = None ,
2508+ address_type_to_match : Literal ["connected" , "configured" ] = "connected" ,
2509+ ):
24092510 for conn in self ._connections :
2511+ if address_type_to_match == "connected" :
2512+ if matching_address and conn .getpeername () != matching_address :
2513+ continue
2514+ else :
2515+ if matching_address and conn .host != matching_address :
2516+ continue
2517+
24102518 conn .maintenance_state = state
0 commit comments