Skip to content

Hitless handshake #3735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
092e33b
Handling of topology update push notifications for Standalone Redis c…
petyaslavova Jun 27, 2025
41a199e
Adding sequence id to the maintenance push notifications. Adding unit…
petyaslavova Jul 11, 2025
63d0c45
Adding integration-like tests for migrating/migrated events handling
petyaslavova Jul 11, 2025
5c71733
Removed unused imports
petyaslavova Jul 11, 2025
96c6e5d
Revert changing of the default retry object initialization for connec…
petyaslavova Jul 11, 2025
8691475
Complete migrating/migrated integration-like tests
petyaslavova Jul 14, 2025
7b57a22
Adding moving integration-like tests
petyaslavova Jul 15, 2025
bed2e40
Fixed BlockingConnectionPool locking strategy. Removed debug logging.…
petyaslavova Jul 17, 2025
0744ee5
Fixing linters
petyaslavova Jul 17, 2025
4c536f3
Applying Copilot's comments
petyaslavova Jul 17, 2025
6768d5d
Fixed type annotations not compatible with older python versions
petyaslavova Jul 17, 2025
ce31ec7
Add a few more tests and fix pool mock for python 3.9
petyaslavova Jul 17, 2025
d73cd35
Adding maintenance state to connections. Migrating and Migrated are n…
petyaslavova Jul 18, 2025
788cf52
Refactored the tmp host address and timeout storing and the way to ap…
petyaslavova Jul 22, 2025
6d496f0
Apply review comments
petyaslavova Jul 24, 2025
a8ba5ce
Adding handling of FAILING_OVER and FAILED_OVER events/push notificat…
petyaslavova Jul 24, 2025
2d3731f
Applying moving/moved only on connections to the same proxy.
petyaslavova Jul 26, 2025
d2288e9
Adds handshake for enabling server maintenance notifications
elena-kolevska Aug 1, 2025
b294db2
Adds tests
elena-kolevska Aug 7, 2025
a82cbfe
Merge branch 'master' into ps_hitless_upgrade_sync_redis
petyaslavova Aug 8, 2025
2cdfa75
Applying review comments.
petyaslavova Aug 8, 2025
822fccd
Refactor to have less methods in pool classes and made some of the ex…
petyaslavova Aug 11, 2025
2736aaa
Fixing lint errors
petyaslavova Aug 11, 2025
1e2b96d
Fixing tests
petyaslavova Aug 11, 2025
fb487c0
Adds handshake
elena-kolevska Aug 7, 2025
67bbee9
Fixes mock messages to follow the latest standard
elena-kolevska Aug 12, 2025
70688cc
Linters
elena-kolevska Aug 12, 2025
f9eec35
Hitless Upgrades enabled by default
elena-kolevska Aug 12, 2025
5fd2ddb
Fixing unit tests
petyaslavova Aug 13, 2025
e8785de
Applying review comments and moving resolving of conn ip in the Abstr…
petyaslavova Aug 13, 2025
c3caf6a
Fixing the docs of some of the new methods in connection pools. Handl…
petyaslavova Aug 14, 2025
8d7cc00
Merge branch 'ps_hitless_upgrade_sync_redis' into ps_add_fail_over_ev…
petyaslavova Aug 15, 2025
76eba1b
Merge branch 'ps_add_fail_over_events_handling' into hitless_handshake
petyaslavova Aug 15, 2025
579d032
Handling of topology update push notifications for Standalone Redis c…
petyaslavova Jun 27, 2025
8d27a86
Adding sequence id to the maintenance push notifications. Adding unit…
petyaslavova Jul 11, 2025
32a16f0
Adding integration-like tests for migrating/migrated events handling
petyaslavova Jul 11, 2025
8bfdf13
Removed unused imports
petyaslavova Jul 11, 2025
33d7295
Revert changing of the default retry object initialization for connec…
petyaslavova Jul 11, 2025
346097f
Complete migrating/migrated integration-like tests
petyaslavova Jul 14, 2025
f3a9a71
Adding moving integration-like tests
petyaslavova Jul 15, 2025
c0438c8
Fixed BlockingConnectionPool locking strategy. Removed debug logging.…
petyaslavova Jul 17, 2025
6ca514f
Fixing linters
petyaslavova Jul 17, 2025
778abdf
Applying Copilot's comments
petyaslavova Jul 17, 2025
667109b
Fixed type annotations not compatible with older python versions
petyaslavova Jul 17, 2025
ef1742a
Add a few more tests and fix pool mock for python 3.9
petyaslavova Jul 17, 2025
7b43890
Adding maintenance state to connections. Migrating and Migrated are n…
petyaslavova Jul 18, 2025
08f1585
Refactored the tmp host address and timeout storing and the way to ap…
petyaslavova Jul 22, 2025
9a31a71
Apply review comments
petyaslavova Jul 24, 2025
602bbe9
Applying moving/moved only on connections to the same proxy.
petyaslavova Jul 26, 2025
953b41a
Applying review comments.
petyaslavova Aug 8, 2025
2210fed
Refactor to have less methods in pool classes and made some of the ex…
petyaslavova Aug 11, 2025
1427d99
Fixing lint errors
petyaslavova Aug 11, 2025
a2744f3
Fixing tests
petyaslavova Aug 11, 2025
260b34e
Fixing the docs of some of the new methods in connection pools. Handl…
petyaslavova Aug 14, 2025
4c6eb44
Applying review comments
petyaslavova Aug 15, 2025
10ded34
Adding handling of FAILING_OVER and FAILED_OVER events/push notificat…
petyaslavova Jul 24, 2025
241be74
Remove unused parse_list_to_dict function from helpers (#3733)
mengxunQAQ Aug 12, 2025
ba7cb87
Typos in vectorset commands.py (#3719)
hunterhogan Aug 12, 2025
c01d331
Adding abstractmethod declaration for cache property setter in Evicti…
mengxunQAQ Aug 12, 2025
7a2c3fc
Fix async clients safety when used as an async context manager (#3512)
abrookins Aug 15, 2025
5024b5f
Applying review comments
petyaslavova Aug 15, 2025
5fd93c5
Adding handling of FAILING_OVER and FAILED_OVER events/push notificat…
petyaslavova Jul 24, 2025
efa9852
Fixes mock messages to follow the latest standard
elena-kolevska Aug 12, 2025
8a6402f
Applying some test fixes after rebase
petyaslavova Aug 15, 2025
07402d0
Merge branch 'feat/hitless-upgrade-sync-standalone' into ps_add_fail_…
petyaslavova Aug 18, 2025
b9afaf0
Fixing tests after merging with feature branch
petyaslavova Aug 18, 2025
058be2c
Fixing lint errors.
petyaslavova Aug 18, 2025
e4a8646
Update tests/test_maintenance_events_handling.py
petyaslavova Aug 18, 2025
51d24ba
Applying review comments
petyaslavova Aug 18, 2025
66c1fe0
Applying review comments
petyaslavova Aug 18, 2025
67aee8c
Merge branch 'ps_add_fail_over_events_handling' into hitless_handshake
petyaslavova Aug 18, 2025
a7dd150
Merge branch 'feat/hitless-upgrade-sync-standalone' into hitless_hand…
petyaslavova Aug 19, 2025
1388cb9
Fixing a check if we should send hitless handshake. Fixing merge issues.
petyaslavova Aug 19, 2025
97db940
Applying review comments
petyaslavova Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 74 additions & 45 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import sys
from abc import ABC
from asyncio import IncompleteReadError, StreamReader, TimeoutError
Expand Down Expand Up @@ -56,6 +57,8 @@
"Client sent AUTH, but no password is set": AuthenticationError,
}

logger = logging.getLogger(__name__)


class BaseParser(ABC):
EXCEPTION_CLASSES = {
Expand Down Expand Up @@ -199,31 +202,42 @@ def handle_push_response(self, response, **kwargs):
*_MOVING_MESSAGE,
):
return self.pubsub_push_handler_func(response)
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
return self.invalidation_push_handler_func(response)
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# TODO: PARSE latest format when available
host, port = response[2].decode().split(":")
ttl = response[1]
id = 1 # Hardcoded value until the notification starts including the id
notification = NodeMovingEvent(id, host, port, ttl)
return self.node_moving_push_handler_func(notification)
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
if msg_type in _MIGRATING_MESSAGE:
# TODO: PARSE latest format when available
ttl = response[1]
id = 2 # Hardcoded value until the notification starts including the id
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
# TODO: PARSE latest format when available
id = 3 # Hardcoded value until the notification starts including the id
notification = NodeMigratedEvent(id)
else:

try:
if (
msg_type in _INVALIDATION_MESSAGE
and self.invalidation_push_handler_func
):
return self.invalidation_push_handler_func(response)

if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# Expected message format is: MOVING <seq_number> <time> <endpoint>
id = response[1]
ttl = response[2]
host, port = response[3].decode().split(":")
notification = NodeMovingEvent(id, host, port, ttl)
return self.node_moving_push_handler_func(notification)

if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
notification = None
if notification is not None:
return self.maintenance_push_handler_func(notification)
else:
return None

if msg_type in _MIGRATING_MESSAGE:
# Expected message format is: MIGRATING <seq_number> <time> <shard_id-s>
id = response[1]
ttl = response[2]
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
id = response[1]
notification = NodeMigratedEvent(id)

if notification is not None:
return self.maintenance_push_handler_func(notification)
except Exception as e:
logger.error(
"Error handling {} message ({}): {}".format(msg_type, response, e)
)

return None

def set_pubsub_push_handler(self, pubsub_push_handler_func):
self.pubsub_push_handler_func = pubsub_push_handler_func
Expand Down Expand Up @@ -252,34 +266,49 @@ async def handle_pubsub_push_response(self, response):

async def handle_push_response(self, response, **kwargs):
"""Handle push responses asynchronously"""

msg_type = response[0]
if msg_type not in (
*_INVALIDATION_MESSAGE,
*_MAINTENANCE_MESSAGES,
*_MOVING_MESSAGE,
):
return await self.pubsub_push_handler_func(response)
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
return await self.invalidation_push_handler_func(response)
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# push notification from enterprise cluster for node moving
# TODO: PARSE latest format when available
host, port = response[2].split(":")
ttl = response[1]
id = 1 # Hardcoded value for async parser
notification = NodeMovingEvent(id, host, port, ttl)
return await self.node_moving_push_handler_func(notification)
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
if msg_type in _MIGRATING_MESSAGE:
# TODO: PARSE latest format when available
ttl = response[1]
id = 2 # Hardcoded value for async parser
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
# TODO: PARSE latest format when available
id = 3 # Hardcoded value for async parser
notification = NodeMigratedEvent(id)
return await self.maintenance_push_handler_func(notification)

try:
if (
msg_type in _INVALIDATION_MESSAGE
and self.invalidation_push_handler_func
):
return await self.invalidation_push_handler_func(response)

if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# push notification from enterprise cluster for node moving
id = response[1]
ttl = response[2]
host, port = response[3].split(":")
notification = NodeMovingEvent(id, host, port, ttl)
return await self.node_moving_push_handler_func(notification)

if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
notification = None

if msg_type in _MIGRATING_MESSAGE:
id = response[1]
ttl = response[2]
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
id = response[1]
notification = NodeMigratedEvent(id)

if notification is not None:
return await self.maintenance_push_handler_func(notification)
except Exception as e:
logger.error(
"Error handling {} message ({}): {}".format(msg_type, response, e)
)

return None

def set_pubsub_push_handler(self, pubsub_push_handler_func):
"""Set the pubsub push handler function"""
Expand Down
164 changes: 139 additions & 25 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ def should_reconnect(self):
"""
pass

@abstractmethod
def get_resolved_ip(self):
"""
Get resolved ip address for the connection.
"""
pass

@abstractmethod
def update_current_socket_timeout(self, relax_timeout: Optional[float] = None):
"""
Expand Down Expand Up @@ -421,32 +428,16 @@ def __init__(
parser_class = _RESP3Parser
self.set_parser(parser_class)

if maintenance_events_config and maintenance_events_config.enabled:
if maintenance_events_pool_handler:
maintenance_events_pool_handler.set_connection(self)
self._parser.set_node_moving_push_handler(
maintenance_events_pool_handler.handle_event
)
self._maintenance_event_connection_handler = (
MaintenanceEventConnectionHandler(self, maintenance_events_config)
)
self._parser.set_maintenance_push_handler(
self._maintenance_event_connection_handler.handle_event
)
self.maintenance_events_config = maintenance_events_config

# Set up maintenance events if enabled
self._configure_maintenance_events(
maintenance_events_pool_handler,
orig_host_address,
orig_socket_timeout,
orig_socket_connect_timeout,
)

self.orig_host_address = (
orig_host_address if orig_host_address else self.host
)
self.orig_socket_timeout = (
orig_socket_timeout if orig_socket_timeout else self.socket_timeout
)
self.orig_socket_connect_timeout = (
orig_socket_connect_timeout
if orig_socket_connect_timeout
else self.socket_connect_timeout
)
else:
self._maintenance_event_connection_handler = None
self._should_reconnect = False
self.maintenance_state = maintenance_state

Expand Down Expand Up @@ -505,6 +496,46 @@ def set_parser(self, parser_class):
"""
self._parser = parser_class(socket_read_size=self._socket_read_size)

def _configure_maintenance_events(
self,
maintenance_events_pool_handler=None,
orig_host_address=None,
orig_socket_timeout=None,
orig_socket_connect_timeout=None,
):
"""Enable maintenance events by setting up handlers and storing original connection parameters."""
if (
not self.maintenance_events_config
or not self.maintenance_events_config.enabled
):
self._maintenance_event_connection_handler = None
return

# Set up pool handler if available
if maintenance_events_pool_handler:
self._parser.set_node_moving_push_handler(
maintenance_events_pool_handler.handle_event
)

# Set up connection handler
self._maintenance_event_connection_handler = MaintenanceEventConnectionHandler(
self, self.maintenance_events_config
)
self._parser.set_maintenance_push_handler(
self._maintenance_event_connection_handler.handle_event
)

# Store original connection parameters
self.orig_host_address = orig_host_address if orig_host_address else self.host
self.orig_socket_timeout = (
orig_socket_timeout if orig_socket_timeout else self.socket_timeout
)
self.orig_socket_connect_timeout = (
orig_socket_connect_timeout
if orig_socket_connect_timeout
else self.socket_connect_timeout
)

def set_maintenance_event_pool_handler(
self, maintenance_event_pool_handler: MaintenanceEventPoolHandler
):
Expand Down Expand Up @@ -652,6 +683,39 @@ def on_connect_check_health(self, check_health: bool = True):
):
raise ConnectionError("Invalid RESP version")

# Send maintenance notifications handshake if RESP3 is active and maintenance events are enabled
# and we have a host to determine the endpoint type from
if (
self.protocol not in [2, "2"]
and self.maintenance_events_config
and self.maintenance_events_config.enabled
and self._maintenance_event_connection_handler
and hasattr(self, "host")
):
try:
endpoint_type = self.maintenance_events_config.get_endpoint_type(
self.host, self
)
self.send_command(
"CLIENT",
"MAINT_NOTIFICATIONS",
"ON",
"moving-endpoint-type",
endpoint_type.value,
check_health=check_health,
)
response = self.read_response()
if str_if_bytes(response) != "OK":
raise ConnectionError(
"The server doesn't support maintenance notifications"
)
except Exception as e:
# Log warning but don't fail the connection
import logging

logger = logging.getLogger(__name__)
logger.warning(f"Failed to enable maintenance notifications: {e}")

Copy link
Preview

Copilot AI Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad exception handling catches all exceptions during the handshake process. Consider being more specific about which exceptions to handle and potentially re-raise authentication or connection errors that shouldn't be silently ignored.

Suggested change
except (AuthenticationError, ConnectionError) as e:
# Re-raise critical connection/authentication errors
raise
except ResponseError as e:
# Log warning but don't fail the connection for expected server response errors
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to enable maintenance notifications: {e}")

Copilot uses AI. Check for mistakes.

# if a client_name is given, set it
if self.client_name:
self.send_command(
Expand Down Expand Up @@ -888,6 +952,56 @@ def re_auth(self):
self.read_response()
self._re_auth_token = None

def get_resolved_ip(self) -> Optional[str]:
"""
Extract the resolved IP address from an
established connection or resolve it from the host.

First tries to get the actual IP from the socket (most accurate),
then falls back to DNS resolution if needed.

Args:
connection: The connection object to extract the IP from

Returns:
str: The resolved IP address, or None if it cannot be determined
"""

# Method 1: Try to get the actual IP from the established socket connection
# This is most accurate as it shows the exact IP being used
try:
if self._sock is not None:
peer_addr = self._sock.getpeername()
if peer_addr and len(peer_addr) >= 1:
# For TCP sockets, peer_addr is typically (host, port) tuple
# Return just the host part
return peer_addr[0]
except (AttributeError, OSError):
# Socket might not be connected or getpeername() might fail
pass

# Method 2: Fallback to DNS resolution of the host
# This is less accurate but works when socket is not available
try:
host = getattr(self, "host", "localhost")
port = getattr(self, "port", 6379)
if host:
# Use getaddrinfo to resolve the hostname to IP
# This mimics what the connection would do during _connect()
addr_info = socket.getaddrinfo(
host, port, socket.AF_UNSPEC, socket.SOCK_STREAM
)
if addr_info:
# Return the IP from the first result
# addr_info[0] is (family, socktype, proto, canonname, sockaddr)
# sockaddr[0] is the IP address
return addr_info[0][4][0]
except (AttributeError, OSError, socket.gaierror):
# DNS resolution might fail
pass

return None

@property
def maintenance_state(self) -> MaintenanceState:
return self._maintenance_state
Expand Down
Loading