Skip to content

Commit 0a400b5

Browse files
committed
RDBC-948: Ensure topology handling matches C# Client.
Handles network errors during connection failover more reliably by catching both requests-level and OS-level socket exceptions. This mirrors the behavior of the C# client and prevents connection failures due to DNS resolution issues or server unavailability. Also, refactors the failover logic to improve retry behavior on failed requests, ensuring that the next preferred node is chosen and retried, while avoiding infinite loops by checking for already failed nodes. Notifies listeners about failed requests with full details, and broadcasts commands where applicable to improve efficiency. Finally, ensures that only member nodes without failures are chosen first. Adds tests to verify failover scenarios with invalid DNS configurations and unreachable endpoints.
1 parent 2161369 commit 0a400b5

File tree

3 files changed

+168
-41
lines changed

3 files changed

+168
-41
lines changed

ravendb/http/request_executor.py

Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from http import HTTPStatus
3838

3939

40-
from typing import TYPE_CHECKING, List, Dict, Tuple
40+
from typing import TYPE_CHECKING, List, Dict, Tuple, Optional
4141

4242
if TYPE_CHECKING:
4343
from ravendb.documents.session import SessionInfo
@@ -633,7 +633,13 @@ def _send_request_to_server(
633633
self._throw_failed_to_contact_all_nodes(command, request)
634634

635635
return None
636-
except IOError as e:
636+
except (requests.RequestException, OSError) as e:
637+
# RDBC-948: https://issues.hibernatingrhinos.com/issue/RDBC-948/Python-client-connection-failover-breaks-with-unknown-DNS-name-or-server-is-down.
638+
# Handle failover on network errors from both requests and the OS:
639+
# - RequestException covers requests' network stack (connect, TLS, proxies, etc.).
640+
# - OSError covers socket-level issues like DNS getaddrinfo on some platforms.
641+
# Different OS/resolvers surface the same fault differently; catching both mirrors the C# client
642+
# (HttpRequestException/SocketException) and makes failover reliable.
637643
if not should_retry:
638644
raise
639645

@@ -805,7 +811,8 @@ def _throw_failed_to_contact_all_nodes(self, command: RavenCommand, request: req
805811
)
806812

807813
if len(command.failed_nodes) == 1:
808-
raise command.failed_nodes.popitem()
814+
# raise the single recorded exception
815+
raise next(iter(command.failed_nodes.values()))
809816

810817
message = (
811818
f"Tried to send {command._result_class.__name__} request via {request.method}"
@@ -1159,81 +1166,87 @@ def __handle_server_down(
11591166
if command.failed_nodes is None:
11601167
command.failed_nodes = {}
11611168

1162-
return (
1163-
False # todo: command.failed_nodes[chosen_node] = self.__read_exception_from_server(request, response, e)
1164-
)
1169+
# record the failure for this node
1170+
if not command.is_failed_with_node(chosen_node):
1171+
command.failed_nodes[chosen_node] = self.__read_exception_from_server(request, response, e)
11651172

1173+
# If the node is not part of the topology, we can't failover using selector.
11661174
if node_index is None:
1167-
# We executed request over a node not in the topology. This means no failover...
11681175
return False
11691176

1177+
# If we don't have a selector yet, we also cannot failover.
11701178
if self._node_selector is None:
1171-
# todo: spawnHealthChecks(chosenNode, nodeIndex)
11721179
return False
11731180

1174-
# As the server is down, we discard the server version to ensure we update when it goes up.
1181+
# As the server is down, discard server version to ensure it updates when back up.
11751182
chosen_node.discard_server_version()
11761183

1184+
# Mark the node as failed to move selection forward
11771185
self._node_selector.on_failed_request(node_index)
11781186

1187+
# For broadcastable commands, attempt to broadcast instead of single-node retry.
11791188
if self.should_broadcast(command):
11801189
command.result = self.__broadcast(command, session_info)
11811190
return True
11821191

1183-
# todo: self.spawn_health_checks(chosen_node, node_index)
1184-
1192+
# Choose the next preferred node and retry
11851193
index_node_and_etag = self._node_selector.get_preferred_node_with_topology()
1194+
1195+
# If topology changed since we started, clear failed nodes record to allow retries
11861196
if command.failover_topology_etag != self.topology_etag:
11871197
command.failed_nodes.clear()
11881198
command.failover_topology_etag = self.topology_etag
11891199

1200+
# Avoid infinite loop if the next node is already marked as failed
11901201
if index_node_and_etag.current_node in command.failed_nodes:
11911202
return False
11921203

1193-
self.__on_failed_request_invoke(url, e, request, response)
1204+
# Notify listeners about the failed request with full details
1205+
self.__on_failed_request_invoke_details(url, e, request, response)
11941206

1207+
# Retry the command on the next node
11951208
self.execute(
11961209
index_node_and_etag.current_node, index_node_and_etag.current_index, command, should_retry, session_info
11971210
)
11981211

11991212
return True
12001213

12011214
@staticmethod
1202-
def __read_exception_from_server(request: requests.Request, response: requests.Response, e: Exception) -> Exception:
1203-
if response and response.content:
1204-
response_json = None
1215+
def __read_exception_from_server(
1216+
request: requests.Request, response: requests.Response, e: Optional[Exception]
1217+
) -> Exception:
1218+
# Prefer server-provided error when available
1219+
if response is not None and response.content:
1220+
raw = None
12051221
try:
1206-
response_json = response.content.decode("utf-8")
1222+
raw = response.content.decode("utf-8")
12071223

1208-
# todo: change this bs
1209-
def exception_schema_decoder(dictionary: dict) -> ExceptionDispatcher.ExceptionSchema:
1224+
def _decode(d: dict) -> ExceptionDispatcher.ExceptionSchema:
12101225
return ExceptionDispatcher.ExceptionSchema(
1211-
dictionary.get("url"),
1212-
dictionary.get("class"),
1213-
dictionary.get("message"),
1214-
dictionary.get("error"),
1226+
d.get("url"), d.get("class"), d.get("message"), d.get("error")
12151227
)
12161228

1217-
return ExceptionDispatcher.get(
1218-
json.loads(response_json, object_hook=exception_schema_decoder), response.status_code, e
1219-
)
1220-
except:
1221-
exception_schema = ExceptionDispatcher.ExceptionSchema(
1222-
request.url,
1229+
return ExceptionDispatcher.get(json.loads(raw, object_hook=_decode), response.status_code, e)
1230+
except Exception:
1231+
schema = ExceptionDispatcher.ExceptionSchema(
1232+
request.url if request else "",
12231233
"Unparsable Server Response",
1224-
"Get unrecognized response from the server",
1225-
response_json,
1234+
"Unrecognized response from server",
1235+
raw,
12261236
)
1227-
1228-
return ExceptionDispatcher.get(exception_schema, response.status_code, e)
1229-
1230-
exception_schema = ExceptionDispatcher.ExceptionSchema(
1231-
request.url,
1232-
e.__class__.__qualname__,
1233-
e.args[0],
1234-
f"An exception occurred while contacting {request.url}.{os.linesep}{str(e)}",
1235-
)
1236-
return ExceptionDispatcher.get(exception_schema, HTTPStatus.SERVICE_UNAVAILABLE, e)
1237+
return ExceptionDispatcher.get(schema, response.status_code, e)
1238+
1239+
# Fallback when we have no usable response body
1240+
url = request.url if request else ""
1241+
cls = type(e).__name__ if e else "RequestFailed"
1242+
msg = str(e) if e else "Request failed"
1243+
details = f"An exception occurred while contacting {url}."
1244+
if e:
1245+
details += f"{os.linesep}{msg}"
1246+
1247+
schema = ExceptionDispatcher.ExceptionSchema(url, cls, msg, details)
1248+
status = response.status_code if response else HTTPStatus.SERVICE_UNAVAILABLE
1249+
return ExceptionDispatcher.get(schema, status, e or RuntimeError(msg))
12371250

12381251
class IndexAndResponse:
12391252
def __init__(self, index: int, response: requests.Response):

ravendb/http/topology.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ def get_preferred_node_internal(cls, state: NodeSelector.__NodeSelectorState) ->
149149
server_nodes = state.nodes
150150
length = min(len(server_nodes), len(state_failures))
151151
for i in range(length):
152-
if state_failures[0] == 0:
152+
# pick the first node without failures
153+
if state_failures[i] == 0 and server_nodes[i].server_role == ServerNode.Role.MEMBER:
153154
return CurrentIndexAndNode(i, server_nodes[i])
154155
return cls.unlikely_everyone_faulted_choice(state)
155156

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from threading import Event
2+
from ravendb.documents.store.definition import DocumentStore
3+
from ravendb.documents.subscriptions.options import SubscriptionWorkerOptions
4+
from ravendb.exceptions.exceptions import AllTopologyNodesDownException
5+
from ravendb.infrastructure.entities import User
6+
from ravendb.serverwide.operations.common import GetDatabaseRecordOperation
7+
from ravendb.tests.test_base import TestBase
8+
from ravendb.http.topology import UpdateTopologyParameters
9+
from ravendb.http.server_node import ServerNode
10+
11+
12+
class TestRDBC948(TestBase):
13+
def setUp(self):
14+
super().setUp()
15+
16+
def test_failover_with_invalid_dns_in_urls(self):
17+
# One invalid DNS hostname and one valid server URL (from the embedded test server)
18+
invalid_host = "http://thisnamedoesnotexist:8080"
19+
valid_url = self.store.urls[0]
20+
21+
with DocumentStore(urls=[invalid_host, valid_url], database=self.store.database) as store2:
22+
store2.conventions.disable_topology_updates = False
23+
store2.initialize()
24+
25+
# Should succeed by failing over to the valid URL
26+
with store2.open_session() as session:
27+
session.store({"Name": "John"}, "users/1")
28+
session.save_changes()
29+
30+
# Verify we can read it back (continues using the healthy node)
31+
with store2.open_session() as session:
32+
doc = session.load("users/1")
33+
self.assertIsNotNone(doc)
34+
self.assertEqual(doc.get("Name"), "John")
35+
36+
def test_all_nodes_down_throws(self):
37+
# Two unreachable endpoints: invalid DNS and a closed localhost port
38+
urls = [
39+
"http://thisnamedoesnotexist:8080",
40+
"http://127.0.0.1:1234",
41+
]
42+
43+
with DocumentStore(urls=urls, database=self.store.database) as store2:
44+
store2.conventions.disable_topology_updates = False
45+
store2.initialize()
46+
47+
with self.assertRaises(AllTopologyNodesDownException):
48+
with store2.open_session() as session:
49+
session.load("users/does-not-matter")
50+
51+
def test_maintenance_operation_failover_with_invalid_dns(self):
52+
invalid_host = "http://thisnamedoesnotexist:8080"
53+
valid_url = self.store.urls[0]
54+
database = self.store.database
55+
56+
with DocumentStore(urls=[invalid_host, valid_url], database=database) as store2:
57+
store2.conventions.disable_topology_updates = False
58+
store2.initialize()
59+
60+
# Perform maintenance call, should succeed by failing over
61+
record = store2.maintenance.server.send(GetDatabaseRecordOperation(database))
62+
self.assertIsNotNone(record)
63+
64+
def test_request_executor_failover_with_invalid_dns(self):
65+
invalid_host = "http://thisnamedoesnotexist:8080"
66+
valid_url = self.store.urls[0]
67+
68+
with DocumentStore(urls=[invalid_host, valid_url], database=self.store.database) as store2:
69+
store2.conventions.disable_topology_updates = False
70+
store2.initialize()
71+
72+
# Explicitly refresh topology like C# tests via UpdateTopologyAsync.
73+
# This avoids racing the background first-topology-update and ensures the selector is initialized.
74+
req_ex = store2.get_request_executor()
75+
params = UpdateTopologyParameters(ServerNode(valid_url, store2.database))
76+
params.timeout_in_ms = 5
77+
params.debug_tag = "test-init"
78+
req_ex.update_topology_async(params).result()
79+
# Now URL should reflect the healthy node
80+
self.assertIsNotNone(req_ex.url, "request executor URL did not initialize")
81+
self.assertTrue(req_ex.url.startswith(valid_url), f"unexpected URL: {req_ex.url}")
82+
83+
# And simple operations should succeed
84+
with store2.open_session() as session:
85+
session.store({"Name": "Jane"}, "users/2")
86+
session.save_changes()
87+
88+
def test_subscription_failover_with_invalid_dns(self):
89+
invalid_host = "http://thisnamedoesnotexist:8080"
90+
valid_url = self.store.urls[0]
91+
92+
with DocumentStore(urls=[invalid_host, valid_url], database=self.store.database) as store2:
93+
store2.conventions.disable_topology_updates = False
94+
store2.initialize()
95+
96+
# Create a subscription and ensure worker connects and receives items
97+
sub_id = store2.subscriptions.create_for_class(User)
98+
with store2.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(sub_id), User) as worker:
99+
got_item = Event()
100+
101+
def _run(batch):
102+
for item in batch.items:
103+
if item.result is not None:
104+
got_item.set()
105+
106+
worker.run(_run)
107+
108+
# Add a document so the subscription has something to send
109+
with store2.open_session() as session:
110+
session.store(User(name="SubUser"))
111+
session.save_changes()
112+
113+
self.assertTrue(got_item.wait(10))

0 commit comments

Comments
 (0)