Skip to content

Commit 83f84cd

Browse files
authored
Adding handling of FAILING_OVER and FAILED_OVER events/push notifications (#3716)
1 parent 4518ce8 commit 83f84cd

File tree

3 files changed

+385
-37
lines changed

3 files changed

+385
-37
lines changed

redis/maintenance_events.py

Lines changed: 123 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
class MaintenanceState(enum.Enum):
1212
NONE = "none"
1313
MOVING = "moving"
14-
MIGRATING = "migrating"
14+
MAINTENANCE = "maintenance"
1515

1616

1717
if TYPE_CHECKING:
@@ -261,6 +261,105 @@ def __hash__(self) -> int:
261261
return hash((self.__class__, self.id))
262262

263263

264+
class NodeFailingOverEvent(MaintenanceEvent):
265+
"""
266+
Event for when a Redis cluster node is in the process of failing over.
267+
268+
This event is received when a node starts a failover process during
269+
cluster maintenance operations or when handling node failures.
270+
271+
Args:
272+
id (int): Unique identifier for this event
273+
ttl (int): Time-to-live in seconds for this notification
274+
"""
275+
276+
def __init__(self, id: int, ttl: int):
277+
super().__init__(id, ttl)
278+
279+
def __repr__(self) -> str:
280+
expiry_time = self.creation_time + self.ttl
281+
remaining = max(0, expiry_time - time.monotonic())
282+
return (
283+
f"{self.__class__.__name__}("
284+
f"id={self.id}, "
285+
f"ttl={self.ttl}, "
286+
f"creation_time={self.creation_time}, "
287+
f"expires_at={expiry_time}, "
288+
f"remaining={remaining:.1f}s, "
289+
f"expired={self.is_expired()}"
290+
f")"
291+
)
292+
293+
def __eq__(self, other) -> bool:
294+
"""
295+
Two NodeFailingOverEvent events are considered equal if they have the same
296+
id and are of the same type.
297+
"""
298+
if not isinstance(other, NodeFailingOverEvent):
299+
return False
300+
return self.id == other.id and type(self) is type(other)
301+
302+
def __hash__(self) -> int:
303+
"""
304+
Return a hash value for the event to allow
305+
instances to be used in sets and as dictionary keys.
306+
307+
Returns:
308+
int: Hash value based on event type and id
309+
"""
310+
return hash((self.__class__, self.id))
311+
312+
313+
class NodeFailedOverEvent(MaintenanceEvent):
314+
"""
315+
Event for when a Redis cluster node has completed a failover.
316+
317+
This event is received when a node has finished the failover process
318+
during cluster maintenance operations or after handling node failures.
319+
320+
Args:
321+
id (int): Unique identifier for this event
322+
"""
323+
324+
DEFAULT_TTL = 5
325+
326+
def __init__(self, id: int):
327+
super().__init__(id, NodeFailedOverEvent.DEFAULT_TTL)
328+
329+
def __repr__(self) -> str:
330+
expiry_time = self.creation_time + self.ttl
331+
remaining = max(0, expiry_time - time.monotonic())
332+
return (
333+
f"{self.__class__.__name__}("
334+
f"id={self.id}, "
335+
f"ttl={self.ttl}, "
336+
f"creation_time={self.creation_time}, "
337+
f"expires_at={expiry_time}, "
338+
f"remaining={remaining:.1f}s, "
339+
f"expired={self.is_expired()}"
340+
f")"
341+
)
342+
343+
def __eq__(self, other) -> bool:
344+
"""
345+
Two NodeFailedOverEvent events are considered equal if they have the same
346+
id and are of the same type.
347+
"""
348+
if not isinstance(other, NodeFailedOverEvent):
349+
return False
350+
return self.id == other.id and type(self) is type(other)
351+
352+
def __hash__(self) -> int:
353+
"""
354+
Return a hash value for the event to allow
355+
instances to be used in sets and as dictionary keys.
356+
357+
Returns:
358+
int: Hash value based on event type and id
359+
"""
360+
return hash((self.__class__, self.id))
361+
362+
264363
class MaintenanceEventsConfig:
265364
"""
266365
Configuration class for maintenance events handling behaviour. Events are received through
@@ -457,40 +556,54 @@ def handle_node_moved_event(self, event: NodeMovingEvent):
457556

458557

459558
class MaintenanceEventConnectionHandler:
559+
# 1 = "starting maintenance" events, 0 = "completed maintenance" events
560+
_EVENT_TYPES: dict[type["MaintenanceEvent"], int] = {
561+
NodeMigratingEvent: 1,
562+
NodeFailingOverEvent: 1,
563+
NodeMigratedEvent: 0,
564+
NodeFailedOverEvent: 0,
565+
}
566+
460567
def __init__(
461568
self, connection: "ConnectionInterface", config: MaintenanceEventsConfig
462569
) -> None:
463570
self.connection = connection
464571
self.config = config
465572

466573
def handle_event(self, event: MaintenanceEvent):
467-
if isinstance(event, NodeMigratingEvent):
468-
return self.handle_migrating_event(event)
469-
elif isinstance(event, NodeMigratedEvent):
470-
return self.handle_migration_completed_event(event)
471-
else:
574+
# get the event type by checking its class in the _EVENT_TYPES dict
575+
event_type = self._EVENT_TYPES.get(event.__class__, None)
576+
577+
if event_type is None:
472578
logging.error(f"Unhandled event type: {event}")
579+
return
473580

474-
def handle_migrating_event(self, notification: NodeMigratingEvent):
581+
if event_type:
582+
self.handle_maintenance_start_event(MaintenanceState.MAINTENANCE)
583+
else:
584+
self.handle_maintenance_completed_event()
585+
586+
def handle_maintenance_start_event(self, maintenance_state: MaintenanceState):
475587
if (
476588
self.connection.maintenance_state == MaintenanceState.MOVING
477589
or not self.config.is_relax_timeouts_enabled()
478590
):
479591
return
480-
self.connection.maintenance_state = MaintenanceState.MIGRATING
592+
593+
self.connection.maintenance_state = maintenance_state
481594
self.connection.set_tmp_settings(tmp_relax_timeout=self.config.relax_timeout)
482595
# extend the timeout for all created connections
483596
self.connection.update_current_socket_timeout(self.config.relax_timeout)
484597

485-
def handle_migration_completed_event(self, notification: "NodeMigratedEvent"):
598+
def handle_maintenance_completed_event(self):
486599
# Only reset timeouts if state is not MOVING and relax timeouts are enabled
487600
if (
488601
self.connection.maintenance_state == MaintenanceState.MOVING
489602
or not self.config.is_relax_timeouts_enabled()
490603
):
491604
return
492605
self.connection.reset_tmp_settings(reset_relax_timeout=True)
493-
# Node migration completed - reset the connection
606+
# Maintenance completed - reset the connection
494607
# timeouts by providing -1 as the relax timeout
495608
self.connection.update_current_socket_timeout(-1)
496609
self.connection.maintenance_state = MaintenanceState.NONE

0 commit comments

Comments
 (0)