Skip to content

Commit 59136a7

Browse files
authored
Fix bolt handshake not having a timeout (#905)
During the bolt version negotiation the socket was not constrained by any timeout. This could, in rare occasions cause the driver to get stuck until some other mechanism (e.g., socket keep alive) would free it. The handshake is not limited by the `connection_acquisition_timeout`.
1 parent 53ade64 commit 59136a7

File tree

8 files changed

+131
-105
lines changed

8 files changed

+131
-105
lines changed

src/neo4j/_async/io/_bolt.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from ..._codec.hydration import v1 as hydration_v1
2929
from ..._codec.packstream import v1 as packstream_v1
3030
from ..._conf import PoolConfig
31+
from ..._deadline import Deadline
3132
from ..._exceptions import (
3233
BoltError,
3334
BoltHandshakeError,
@@ -289,17 +290,21 @@ def get_handshake(cls):
289290
return b"".join(version.to_bytes() for version in offered_versions).ljust(16, b"\x00")
290291

291292
@classmethod
292-
async def ping(cls, address, *, timeout=None, pool_config=None):
293+
async def ping(cls, address, *, deadline=None, pool_config=None):
293294
""" Attempt to establish a Bolt connection, returning the
294295
agreed Bolt protocol version if successful.
295296
"""
296297
if pool_config is None:
297298
pool_config = PoolConfig()
299+
if deadline is None:
300+
deadline = Deadline(None)
301+
298302
try:
299303
s, protocol_version, handshake, data = \
300304
await AsyncBoltSocket.connect(
301305
address,
302-
timeout=timeout,
306+
tcp_timeout=pool_config.connection_timeout,
307+
deadline=deadline,
303308
custom_resolver=pool_config.resolver,
304309
ssl_context=pool_config.get_ssl_context(),
305310
keep_alive=pool_config.keep_alive,
@@ -313,14 +318,14 @@ async def ping(cls, address, *, timeout=None, pool_config=None):
313318
# [bolt-version-bump] search tag when changing bolt version support
314319
@classmethod
315320
async def open(
316-
cls, address, *, auth=None, timeout=None, routing_context=None,
321+
cls, address, *, auth=None, deadline=None, routing_context=None,
317322
pool_config=None
318323
):
319324
"""Open a new Bolt connection to a given server address.
320325
321326
:param address:
322327
:param auth:
323-
:param timeout: the connection timeout in seconds
328+
:param deadline: how long to wait for the connection to be established
324329
:param routing_context: dict containing routing context
325330
:param pool_config:
326331
@@ -330,26 +335,17 @@ async def open(
330335
raised if the Bolt Protocol can not negotiate a protocol version.
331336
:raise ServiceUnavailable: raised if there was a connection issue.
332337
"""
333-
def time_remaining():
334-
if timeout is None:
335-
return None
336-
t = timeout - (perf_counter() - t0)
337-
return t if t > 0 else 0
338338

339-
t0 = perf_counter()
340339
if pool_config is None:
341340
pool_config = PoolConfig()
341+
if deadline is None:
342+
deadline = Deadline(None)
342343

343-
socket_connection_timeout = pool_config.connection_timeout
344-
if socket_connection_timeout is None:
345-
socket_connection_timeout = time_remaining()
346-
elif timeout is not None:
347-
socket_connection_timeout = min(pool_config.connection_timeout,
348-
time_remaining())
349344
s, protocol_version, handshake, data = \
350345
await AsyncBoltSocket.connect(
351346
address,
352-
timeout=socket_connection_timeout,
347+
tcp_timeout=pool_config.connection_timeout,
348+
deadline=deadline,
353349
custom_resolver=pool_config.resolver,
354350
ssl_context=pool_config.get_ssl_context(),
355351
keep_alive=pool_config.keep_alive,
@@ -410,7 +406,7 @@ def time_remaining():
410406
)
411407

412408
try:
413-
connection.socket.set_deadline(time_remaining())
409+
connection.socket.set_deadline(deadline)
414410
try:
415411
await connection.hello()
416412
finally:

src/neo4j/_async/io/_pool.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ async def connection_creator():
135135
released_reservation = False
136136
try:
137137
try:
138-
connection = await self.opener(
139-
address, deadline.to_timeout()
140-
)
138+
connection = await self.opener(address, deadline)
141139
except ServiceUnavailable:
142140
await self.deactivate(address)
143141
raise
@@ -382,9 +380,9 @@ def open(cls, address, *, auth, pool_config, workspace_config):
382380
:returns: BoltPool
383381
"""
384382

385-
async def opener(addr, timeout):
383+
async def opener(addr, deadline):
386384
return await AsyncBolt.open(
387-
addr, auth=auth, timeout=timeout, routing_context=None,
385+
addr, auth=auth, deadline=deadline, routing_context=None,
388386
pool_config=pool_config
389387
)
390388

@@ -437,9 +435,9 @@ def open(cls, *addresses, auth, pool_config, workspace_config,
437435
raise ConfigurationError("The key 'address' is reserved for routing context.")
438436
routing_context["address"] = str(address)
439437

440-
async def opener(addr, timeout):
438+
async def opener(addr, deadline):
441439
return await AsyncBolt.open(
442-
addr, auth=auth, timeout=timeout,
440+
addr, auth=auth, deadline=deadline,
443441
routing_context=routing_context, pool_config=pool_config
444442
)
445443

src/neo4j/_async_compat/network/_bolt_socket.py

Lines changed: 72 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,11 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl):
272272
) from error
273273
raise
274274

275-
async def _handshake(self, resolved_address):
275+
async def _handshake(self, resolved_address, deadline):
276276
"""
277277
278-
:param s: Socket
279278
:param resolved_address:
279+
:param deadline: Deadline for handshake
280280
281281
:returns: (socket, version, client_handshake, server_response_data)
282282
"""
@@ -296,47 +296,52 @@ async def _handshake(self, resolved_address):
296296
log.debug("[#%04X] C: <HANDSHAKE> %s %s %s %s", local_port,
297297
*supported_versions)
298298

299-
data = self.Bolt.MAGIC_PREAMBLE + self.Bolt.get_handshake()
300-
await self.sendall(data)
299+
request = self.Bolt.MAGIC_PREAMBLE + self.Bolt.get_handshake()
301300

302301
# Handle the handshake response
303302
original_timeout = self.gettimeout()
304-
if original_timeout is not None:
305-
self.settimeout(original_timeout + 1)
303+
self.settimeout(deadline.to_timeout())
306304
try:
307-
data = await self.recv(4)
305+
await self.sendall(request)
306+
response = await self.recv(4)
308307
except OSError as exc:
309308
raise ServiceUnavailable(
310-
"Failed to read any data from server {!r} "
311-
"after connected".format(resolved_address)) from exc
309+
f"Failed to read any data from server {resolved_address!r} "
310+
f"after connected (deadline {deadline})"
311+
) from exc
312312
finally:
313313
self.settimeout(original_timeout)
314-
data_size = len(data)
314+
data_size = len(response)
315315
if data_size == 0:
316316
# If no data is returned after a successful select
317317
# response, the server has closed the connection
318318
log.debug("[#%04X] S: <CLOSE>", local_port)
319319
await self.close()
320320
raise ServiceUnavailable(
321-
"Connection to {address} closed without handshake response".format(
322-
address=resolved_address))
321+
f"Connection to {resolved_address} closed without handshake "
322+
"response"
323+
)
323324
if data_size != 4:
324325
# Some garbled data has been received
325326
log.debug("[#%04X] S: @*#!", local_port)
326327
await self.close()
327328
raise BoltProtocolError(
328-
"Expected four byte Bolt handshake response from %r, received %r instead; check for incorrect port number" % (
329-
resolved_address, data), address=resolved_address)
330-
elif data == b"HTTP":
329+
"Expected four byte Bolt handshake response from "
330+
f"{resolved_address!r}, received {response!r} instead; "
331+
"check for incorrect port number"
332+
, address=resolved_address
333+
)
334+
elif response == b"HTTP":
331335
log.debug("[#%04X] S: <CLOSE>", local_port)
332336
await self.close()
333337
raise ServiceUnavailable(
334-
"Cannot to connect to Bolt service on {!r} "
335-
"(looks like HTTP)".format(resolved_address))
336-
agreed_version = data[-1], data[-2]
338+
f"Cannot to connect to Bolt service on {resolved_address!r} "
339+
"(looks like HTTP)"
340+
)
341+
agreed_version = response[-1], response[-2]
337342
log.debug("[#%04X] S: <HANDSHAKE> 0x%06X%02X", local_port,
338343
agreed_version[1], agreed_version[0])
339-
return self, agreed_version, handshake, data
344+
return self, agreed_version, handshake, response
340345

341346
@classmethod
342347
async def close_socket(cls, socket_):
@@ -356,8 +361,8 @@ async def close_socket(cls, socket_):
356361
pass
357362

358363
@classmethod
359-
async def connect(cls, address, *, timeout, custom_resolver, ssl_context,
360-
keep_alive):
364+
async def connect(cls, address, *, tcp_timeout, deadline,
365+
custom_resolver, ssl_context, keep_alive):
361366
""" Connect and perform a handshake and return a valid Connection object,
362367
assuming a protocol version can be agreed.
363368
"""
@@ -371,12 +376,18 @@ async def connect(cls, address, *, timeout, custom_resolver, ssl_context,
371376
addressing.Address(address), resolver=custom_resolver
372377
)
373378
async for resolved_address in resolved_addresses:
379+
deadline_timeout = deadline.to_timeout()
380+
if (
381+
deadline_timeout is not None
382+
and deadline_timeout <= tcp_timeout
383+
):
384+
tcp_timeout = deadline_timeout
374385
s = None
375386
try:
376387
s = await cls._connect_secure(
377-
resolved_address, timeout, keep_alive, ssl_context
388+
resolved_address, tcp_timeout, keep_alive, ssl_context
378389
)
379-
return await s._handshake(resolved_address)
390+
return await s._handshake(resolved_address, deadline)
380391
except (BoltError, DriverError, OSError) as error:
381392
try:
382393
local_port = s.getsockname()[1]
@@ -560,11 +571,12 @@ def _secure(cls, s, host, ssl_context):
560571
return s
561572

562573
@classmethod
563-
def _handshake(cls, s, resolved_address):
574+
def _handshake(cls, s, resolved_address, deadline):
564575
"""
565576
566577
:param s: Socket
567578
:param resolved_address:
579+
:param deadline:
568580
569581
:returns: (socket, version, client_handshake, server_response_data)
570582
"""
@@ -584,46 +596,52 @@ def _handshake(cls, s, resolved_address):
584596
log.debug("[#%04X] C: <HANDSHAKE> %s %s %s %s", local_port,
585597
*supported_versions)
586598

587-
data = cls.Bolt.MAGIC_PREAMBLE + cls.Bolt.get_handshake()
588-
s.sendall(data)
599+
request = cls.Bolt.MAGIC_PREAMBLE + cls.Bolt.get_handshake()
589600

590601
# Handle the handshake response
591-
ready_to_read = False
592-
with selectors.DefaultSelector() as selector:
593-
selector.register(s, selectors.EVENT_READ)
594-
selector.select(1)
602+
original_timeout = s.gettimeout()
603+
s.settimeout(deadline.to_timeout())
595604
try:
596-
data = s.recv(4)
605+
s.sendall(request)
606+
response = s.recv(4)
597607
except OSError as exc:
598608
raise ServiceUnavailable(
599-
"Failed to read any data from server {!r} "
600-
"after connected".format(resolved_address)) from exc
601-
data_size = len(data)
609+
f"Failed to read any data from server {resolved_address!r} "
610+
f"after connected (deadline {deadline})"
611+
) from exc
612+
finally:
613+
s.settimeout(original_timeout)
614+
data_size = len(response)
602615
if data_size == 0:
603616
# If no data is returned after a successful select
604617
# response, the server has closed the connection
605618
log.debug("[#%04X] S: <CLOSE>", local_port)
606619
cls.close_socket(s)
607620
raise ServiceUnavailable(
608-
"Connection to {address} closed without handshake response".format(
609-
address=resolved_address))
621+
f"Connection to {resolved_address} closed without handshake "
622+
"response"
623+
)
610624
if data_size != 4:
611625
# Some garbled data has been received
612626
log.debug("[#%04X] S: @*#!", local_port)
613627
cls.close_socket(s)
614628
raise BoltProtocolError(
615-
"Expected four byte Bolt handshake response from %r, received %r instead; check for incorrect port number" % (
616-
resolved_address, data), address=resolved_address)
617-
elif data == b"HTTP":
629+
"Expected four byte Bolt handshake response from "
630+
f"{resolved_address!r}, received {response!r} instead; "
631+
"check for incorrect port number"
632+
, address=resolved_address
633+
)
634+
elif response == b"HTTP":
618635
log.debug("[#%04X] S: <CLOSE>", local_port)
619636
cls.close_socket(s)
620637
raise ServiceUnavailable(
621-
"Cannot to connect to Bolt service on {!r} "
622-
"(looks like HTTP)".format(resolved_address))
623-
agreed_version = data[-1], data[-2]
638+
f"Cannot to connect to Bolt service on {resolved_address!r} "
639+
"(looks like HTTP)"
640+
)
641+
agreed_version = response[-1], response[-2]
624642
log.debug("[#%04X] S: <HANDSHAKE> 0x%06X%02X", local_port,
625643
agreed_version[1], agreed_version[0])
626-
return cls(s), agreed_version, handshake, data
644+
return cls(s), agreed_version, handshake, response
627645

628646
@classmethod
629647
def close_socket(cls, socket_):
@@ -639,8 +657,8 @@ def close_socket(cls, socket_):
639657
pass
640658

641659
@classmethod
642-
def connect(cls, address, *, timeout, custom_resolver, ssl_context,
643-
keep_alive):
660+
def connect(cls, address, *, tcp_timeout, deadline, custom_resolver,
661+
ssl_context, keep_alive):
644662
""" Connect and perform a handshake and return a valid Connection object,
645663
assuming a protocol version can be agreed.
646664
"""
@@ -653,12 +671,19 @@ def connect(cls, address, *, timeout, custom_resolver, ssl_context,
653671
addressing.Address(address), resolver=custom_resolver
654672
)
655673
for resolved_address in resolved_addresses:
674+
deadline_timeout = deadline.to_timeout()
675+
if (
676+
deadline_timeout is not None
677+
and deadline_timeout <= tcp_timeout
678+
):
679+
tcp_timeout = deadline_timeout
656680
s = None
657681
try:
658-
s = BoltSocket._connect(resolved_address, timeout, keep_alive)
682+
s = BoltSocket._connect(resolved_address, tcp_timeout,
683+
keep_alive)
659684
s = BoltSocket._secure(s, resolved_address._host_name,
660685
ssl_context)
661-
return BoltSocket._handshake(s, resolved_address)
686+
return BoltSocket._handshake(s, resolved_address, deadline)
662687
except (BoltError, DriverError, OSError) as error:
663688
try:
664689
local_port = s.getsockname()[1]

src/neo4j/_deadline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ def from_timeout_or_deadline(cls, timeout):
7272
return timeout
7373
return cls(timeout)
7474

75+
def __str__(self):
76+
return f"Deadline(timeout={self._original_timeout})"
77+
7578

7679
merge_deadlines = min
7780

0 commit comments

Comments
 (0)