Skip to content

Commit 9eb4049

Browse files
authored
Merge pull request #249 from redknightlois/RDBC-948
RDBC-948: Ensure topology handling matches C# Client.
2 parents be55a07 + 0a400b5 commit 9eb4049

File tree

3 files changed

+168
-41
lines changed

3 files changed

+168
-41
lines changed

ravendb/http/request_executor.py

Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from http import HTTPStatus
3838

3939

40-
from typing import TYPE_CHECKING, List, Dict, Tuple
40+
from typing import TYPE_CHECKING, List, Dict, Tuple, Optional
4141

4242
if TYPE_CHECKING:
4343
from ravendb.documents.session import SessionInfo
@@ -633,7 +633,13 @@ def _send_request_to_server(
633633
self._throw_failed_to_contact_all_nodes(command, request)
634634

635635
return None
636-
except IOError as e:
636+
except (requests.RequestException, OSError) as e:
637+
# RDBC-948: https://issues.hibernatingrhinos.com/issue/RDBC-948/Python-client-connection-failover-breaks-with-unknown-DNS-name-or-server-is-down.
638+
# Handle failover on network errors from both requests and the OS:
639+
# - RequestException covers requests' network stack (connect, TLS, proxies, etc.).
640+
# - OSError covers socket-level issues like DNS getaddrinfo on some platforms.
641+
# Different OS/resolvers surface the same fault differently; catching both mirrors the C# client
642+
# (HttpRequestException/SocketException) and makes failover reliable.
637643
if not should_retry:
638644
raise
639645

@@ -805,7 +811,8 @@ def _throw_failed_to_contact_all_nodes(self, command: RavenCommand, request: req
805811
)
806812

807813
if len(command.failed_nodes) == 1:
808-
raise command.failed_nodes.popitem()
814+
# raise the single recorded exception
815+
raise next(iter(command.failed_nodes.values()))
809816

810817
message = (
811818
f"Tried to send {command._result_class.__name__} request via {request.method}"
@@ -1159,81 +1166,87 @@ def __handle_server_down(
11591166
if command.failed_nodes is None:
11601167
command.failed_nodes = {}
11611168

1162-
return (
1163-
False # todo: command.failed_nodes[chosen_node] = self.__read_exception_from_server(request, response, e)
1164-
)
1169+
# record the failure for this node
1170+
if not command.is_failed_with_node(chosen_node):
1171+
command.failed_nodes[chosen_node] = self.__read_exception_from_server(request, response, e)
11651172

1173+
# If the node is not part of the topology, we can't failover using selector.
11661174
if node_index is None:
1167-
# We executed request over a node not in the topology. This means no failover...
11681175
return False
11691176

1177+
# If we don't have a selector yet, we also cannot failover.
11701178
if self._node_selector is None:
1171-
# todo: spawnHealthChecks(chosenNode, nodeIndex)
11721179
return False
11731180

1174-
# As the server is down, we discard the server version to ensure we update when it goes up.
1181+
# As the server is down, discard server version to ensure it updates when back up.
11751182
chosen_node.discard_server_version()
11761183

1184+
# Mark the node as failed to move selection forward
11771185
self._node_selector.on_failed_request(node_index)
11781186

1187+
# For broadcastable commands, attempt to broadcast instead of single-node retry.
11791188
if self.should_broadcast(command):
11801189
command.result = self.__broadcast(command, session_info)
11811190
return True
11821191

1183-
# todo: self.spawn_health_checks(chosen_node, node_index)
1184-
1192+
# Choose the next preferred node and retry
11851193
index_node_and_etag = self._node_selector.get_preferred_node_with_topology()
1194+
1195+
# If topology changed since we started, clear failed nodes record to allow retries
11861196
if command.failover_topology_etag != self.topology_etag:
11871197
command.failed_nodes.clear()
11881198
command.failover_topology_etag = self.topology_etag
11891199

1200+
# Avoid infinite loop if the next node is already marked as failed
11901201
if index_node_and_etag.current_node in command.failed_nodes:
11911202
return False
11921203

1193-
self.__on_failed_request_invoke(url, e, request, response)
1204+
# Notify listeners about the failed request with full details
1205+
self.__on_failed_request_invoke_details(url, e, request, response)
11941206

1207+
# Retry the command on the next node
11951208
self.execute(
11961209
index_node_and_etag.current_node, index_node_and_etag.current_index, command, should_retry, session_info
11971210
)
11981211

11991212
return True
12001213

12011214
@staticmethod
1202-
def __read_exception_from_server(request: requests.Request, response: requests.Response, e: Exception) -> Exception:
1203-
if response and response.content:
1204-
response_json = None
1215+
def __read_exception_from_server(
1216+
request: requests.Request, response: requests.Response, e: Optional[Exception]
1217+
) -> Exception:
1218+
# Prefer server-provided error when available
1219+
if response is not None and response.content:
1220+
raw = None
12051221
try:
1206-
response_json = response.content.decode("utf-8")
1222+
raw = response.content.decode("utf-8")
12071223

1208-
# todo: change this bs
1209-
def exception_schema_decoder(dictionary: dict) -> ExceptionDispatcher.ExceptionSchema:
1224+
def _decode(d: dict) -> ExceptionDispatcher.ExceptionSchema:
12101225
return ExceptionDispatcher.ExceptionSchema(
1211-
dictionary.get("url"),
1212-
dictionary.get("class"),
1213-
dictionary.get("message"),
1214-
dictionary.get("error"),
1226+
d.get("url"), d.get("class"), d.get("message"), d.get("error")
12151227
)
12161228

1217-
return ExceptionDispatcher.get(
1218-
json.loads(response_json, object_hook=exception_schema_decoder), response.status_code, e
1219-
)
1220-
except:
1221-
exception_schema = ExceptionDispatcher.ExceptionSchema(
1222-
request.url,
1229+
return ExceptionDispatcher.get(json.loads(raw, object_hook=_decode), response.status_code, e)
1230+
except Exception:
1231+
schema = ExceptionDispatcher.ExceptionSchema(
1232+
request.url if request else "",
12231233
"Unparsable Server Response",
1224-
"Get unrecognized response from the server",
1225-
response_json,
1234+
"Unrecognized response from server",
1235+
raw,
12261236
)
1227-
1228-
return ExceptionDispatcher.get(exception_schema, response.status_code, e)
1229-
1230-
exception_schema = ExceptionDispatcher.ExceptionSchema(
1231-
request.url,
1232-
e.__class__.__qualname__,
1233-
e.args[0],
1234-
f"An exception occurred while contacting {request.url}.{os.linesep}{str(e)}",
1235-
)
1236-
return ExceptionDispatcher.get(exception_schema, HTTPStatus.SERVICE_UNAVAILABLE, e)
1237+
return ExceptionDispatcher.get(schema, response.status_code, e)
1238+
1239+
# Fallback when we have no usable response body
1240+
url = request.url if request else ""
1241+
cls = type(e).__name__ if e else "RequestFailed"
1242+
msg = str(e) if e else "Request failed"
1243+
details = f"An exception occurred while contacting {url}."
1244+
if e:
1245+
details += f"{os.linesep}{msg}"
1246+
1247+
schema = ExceptionDispatcher.ExceptionSchema(url, cls, msg, details)
1248+
status = response.status_code if response else HTTPStatus.SERVICE_UNAVAILABLE
1249+
return ExceptionDispatcher.get(schema, status, e or RuntimeError(msg))
12371250

12381251
class IndexAndResponse:
12391252
def __init__(self, index: int, response: requests.Response):

ravendb/http/topology.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ def get_preferred_node_internal(cls, state: NodeSelector.__NodeSelectorState) ->
149149
server_nodes = state.nodes
150150
length = min(len(server_nodes), len(state_failures))
151151
for i in range(length):
152-
if state_failures[0] == 0:
152+
# pick the first node without failures
153+
if state_failures[i] == 0 and server_nodes[i].server_role == ServerNode.Role.MEMBER:
153154
return CurrentIndexAndNode(i, server_nodes[i])
154155
return cls.unlikely_everyone_faulted_choice(state)
155156

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from threading import Event
2+
from ravendb.documents.store.definition import DocumentStore
3+
from ravendb.documents.subscriptions.options import SubscriptionWorkerOptions
4+
from ravendb.exceptions.exceptions import AllTopologyNodesDownException
5+
from ravendb.infrastructure.entities import User
6+
from ravendb.serverwide.operations.common import GetDatabaseRecordOperation
7+
from ravendb.tests.test_base import TestBase
8+
from ravendb.http.topology import UpdateTopologyParameters
9+
from ravendb.http.server_node import ServerNode
10+
11+
12+
class TestRDBC948(TestBase):
13+
def setUp(self):
14+
super().setUp()
15+
16+
def test_failover_with_invalid_dns_in_urls(self):
17+
# One invalid DNS hostname and one valid server URL (from the embedded test server)
18+
invalid_host = "http://thisnamedoesnotexist:8080"
19+
valid_url = self.store.urls[0]
20+
21+
with DocumentStore(urls=[invalid_host, valid_url], database=self.store.database) as store2:
22+
store2.conventions.disable_topology_updates = False
23+
store2.initialize()
24+
25+
# Should succeed by failing over to the valid URL
26+
with store2.open_session() as session:
27+
session.store({"Name": "John"}, "users/1")
28+
session.save_changes()
29+
30+
# Verify we can read it back (continues using the healthy node)
31+
with store2.open_session() as session:
32+
doc = session.load("users/1")
33+
self.assertIsNotNone(doc)
34+
self.assertEqual(doc.get("Name"), "John")
35+
36+
def test_all_nodes_down_throws(self):
37+
# Two unreachable endpoints: invalid DNS and a closed localhost port
38+
urls = [
39+
"http://thisnamedoesnotexist:8080",
40+
"http://127.0.0.1:1234",
41+
]
42+
43+
with DocumentStore(urls=urls, database=self.store.database) as store2:
44+
store2.conventions.disable_topology_updates = False
45+
store2.initialize()
46+
47+
with self.assertRaises(AllTopologyNodesDownException):
48+
with store2.open_session() as session:
49+
session.load("users/does-not-matter")
50+
51+
def test_maintenance_operation_failover_with_invalid_dns(self):
52+
invalid_host = "http://thisnamedoesnotexist:8080"
53+
valid_url = self.store.urls[0]
54+
database = self.store.database
55+
56+
with DocumentStore(urls=[invalid_host, valid_url], database=database) as store2:
57+
store2.conventions.disable_topology_updates = False
58+
store2.initialize()
59+
60+
# Perform maintenance call, should succeed by failing over
61+
record = store2.maintenance.server.send(GetDatabaseRecordOperation(database))
62+
self.assertIsNotNone(record)
63+
64+
def test_request_executor_failover_with_invalid_dns(self):
65+
invalid_host = "http://thisnamedoesnotexist:8080"
66+
valid_url = self.store.urls[0]
67+
68+
with DocumentStore(urls=[invalid_host, valid_url], database=self.store.database) as store2:
69+
store2.conventions.disable_topology_updates = False
70+
store2.initialize()
71+
72+
# Explicitly refresh topology like C# tests via UpdateTopologyAsync.
73+
# This avoids racing the background first-topology-update and ensures the selector is initialized.
74+
req_ex = store2.get_request_executor()
75+
params = UpdateTopologyParameters(ServerNode(valid_url, store2.database))
76+
params.timeout_in_ms = 5
77+
params.debug_tag = "test-init"
78+
req_ex.update_topology_async(params).result()
79+
# Now URL should reflect the healthy node
80+
self.assertIsNotNone(req_ex.url, "request executor URL did not initialize")
81+
self.assertTrue(req_ex.url.startswith(valid_url), f"unexpected URL: {req_ex.url}")
82+
83+
# And simple operations should succeed
84+
with store2.open_session() as session:
85+
session.store({"Name": "Jane"}, "users/2")
86+
session.save_changes()
87+
88+
def test_subscription_failover_with_invalid_dns(self):
89+
invalid_host = "http://thisnamedoesnotexist:8080"
90+
valid_url = self.store.urls[0]
91+
92+
with DocumentStore(urls=[invalid_host, valid_url], database=self.store.database) as store2:
93+
store2.conventions.disable_topology_updates = False
94+
store2.initialize()
95+
96+
# Create a subscription and ensure worker connects and receives items
97+
sub_id = store2.subscriptions.create_for_class(User)
98+
with store2.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(sub_id), User) as worker:
99+
got_item = Event()
100+
101+
def _run(batch):
102+
for item in batch.items:
103+
if item.result is not None:
104+
got_item.set()
105+
106+
worker.run(_run)
107+
108+
# Add a document so the subscription has something to send
109+
with store2.open_session() as session:
110+
session.store(User(name="SubUser"))
111+
session.save_changes()
112+
113+
self.assertTrue(got_item.wait(10))

0 commit comments

Comments
 (0)