Skip to content

Commit 72cb50f

Browse files
4.0 pull and discard (#388)
* pull n are used to fetch results in batches with the fetch_size configuration * result.commit will now discard the rest if has more * replaced the api result.summary() with result.consume() * added new stub scripts * fixed format issue * using bolkit 4.0+ stub server Before it was set to use the legacy bolkit stub server * explicit version of boltkit in requirements * fetches in batches * added new stub server test scripts * graceful shutdown for session result consume * updated script to pass the bookmark to the next run * added more time to let the stub servers start * bolt 3 pull will now just ignore n and qid parameters * added discard behaviour for transaction.consume and transaction.roll_back
1 parent 43fb83e commit 72cb50f

35 files changed

+923
-177
lines changed

docs/source/index.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ Argument Renaming Changes
157157
* :code:`StatementResultSummary.protocol_version` is now :code:`ResultSummary.server.protocol_version`
158158

159159

160+
API Changes
161+
=========================
162+
163+
* :code:`Result.summary()` have been replaced with :code:`Result.consume()`, this behaviour is to consume all remaining records in the buffer and returns the ResultSummary.
164+
165+
160166
Dependency Changes
161167
==================
162168

docs/source/results.rst

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ A :class:`.Result` is attached to an active connection, through a :class:`.Sessi
2525

2626
.. automethod:: records
2727

28-
.. automethod:: summary
29-
3028
.. automethod:: consume
3129

3230
.. automethod:: single

neo4j/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def close(self):
323323
"""
324324
self._pool.close()
325325

326+
@experimental("The configuration may change in the future.")
326327
def verify_connectivity(self, **config):
327328
""" This verifies if the driver can connect to a remote server or a cluster
328329
by establishing a network connection with the remote and possibly exchanging
@@ -403,12 +404,14 @@ def pipeline(self, **config):
403404
PipelineConfig.consume(config) # Consume the config
404405
return Pipeline(self._pool, pipeline_config)
405406

407+
@experimental("The configuration may change in the future.")
406408
def verify_connectivity(self, **config):
407409
server_agent = None
410+
config["fetch_size"] = -1
408411
with self.session(**config) as session:
409412
result = session.run("RETURN 1 AS x")
410413
value = result.single().value()
411-
summary = result.summary()
414+
summary = result.consume()
412415
server_agent = summary.server.agent
413416
return server_agent
414417

@@ -445,6 +448,7 @@ def pipeline(self, **config):
445448
PipelineConfig.consume(config) # Consume the config
446449
return Pipeline(self._pool, pipeline_config)
447450

451+
@experimental("The configuration may change in the future.")
448452
def verify_connectivity(self, **config):
449453
"""
450454
:raise ServiceUnavailable: raised if the server does not support routing or if routing support is broken.

neo4j/io/_bolt3.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8787
self.responses = deque()
8888
self._max_connection_lifetime = max_connection_lifetime
8989
self._creation_timestamp = perf_counter()
90+
self.state = None
9091

9192
# Determine the user agent
9293
if user_agent:
@@ -173,18 +174,12 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
173174
self._append(b"\x10", fields, Response(self, **handlers))
174175

175176
def discard(self, n=-1, qid=-1, **handlers):
176-
if n != -1:
177-
raise ValueError("Incremental discard is not supported in Bolt 3")
178-
if qid != -1:
179-
raise ValueError("Query selection on discard is not supported in Bolt 3")
177+
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
180178
log.debug("[#%04X] C: DISCARD_ALL", self.local_port)
181179
self._append(b"\x2F", (), Response(self, **handlers))
182180

183181
def pull(self, n=-1, qid=-1, **handlers):
184-
if n != -1:
185-
raise ValueError("Incremental pull is not supported in Bolt 3")
186-
if qid != -1:
187-
raise ValueError("Query selection on pull is not supported in Bolt 3")
182+
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
188183
log.debug("[#%04X] C: PULL_ALL", self.local_port)
189184
self._append(b"\x3F", (), Response(self, **handlers))
190185

@@ -305,7 +300,7 @@ def fetch_message(self):
305300
raise
306301

307302
if details:
308-
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # TODO
303+
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details))
309304
self.responses[0].on_records(details)
310305

311306
if summary_signature is None:

neo4j/io/_bolt4x0.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8686
self.responses = deque()
8787
self._max_connection_lifetime = max_connection_lifetime # self.pool_config.max_connection_lifetime
8888
self._creation_timestamp = perf_counter()
89+
self.state = None
8990

9091
# Determine the user agent
9192
if user_agent:
@@ -303,7 +304,7 @@ def fetch_message(self):
303304
raise
304305

305306
if details:
306-
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # TODO
307+
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details))
307308
self.responses[0].on_records(details)
308309

309310
if summary_signature is None:
@@ -482,19 +483,30 @@ def __init__(self, connection, **handlers):
482483
def on_records(self, records):
483484
""" Called when one or more RECORD messages have been received.
484485
"""
486+
self.connection.state = "streaming"
485487
handler = self.handlers.get("on_records")
486488
if callable(handler):
487489
handler(records)
488490

489491
def on_success(self, metadata):
490492
""" Called when a SUCCESS message has been received.
491493
"""
492-
handler = self.handlers.get("on_success")
493-
if callable(handler):
494-
handler(metadata)
495-
handler = self.handlers.get("on_summary")
496-
if callable(handler):
497-
handler()
494+
if metadata.get("has_more"):
495+
if self.connection.state == "streaming_discard_all":
496+
handler = self.handlers.get("on_success_has_more_streaming_discard_all")
497+
self.connection.state = None
498+
if callable(handler):
499+
handler(self.connection, **self.handlers)
500+
else:
501+
self.connection.state = "streaming_has_more"
502+
else:
503+
self.connection.state = None
504+
handler = self.handlers.get("on_success")
505+
if callable(handler):
506+
handler(metadata)
507+
handler = self.handlers.get("on_summary")
508+
if callable(handler):
509+
handler()
498510

499511
def on_failure(self, metadata):
500512
""" Called when a FAILURE message has been received.

0 commit comments

Comments
 (0)