Skip to content

Commit 0cf6741

Browse files
authored
Fix #796: connection pool clogging up (#804)
The reservation count was also increased if the pool was full and the reservation would never be turned into a new connection. This lead to the pool clogging up after a while. Add unit tests
1 parent 8971b29 commit 0cf6741

File tree

4 files changed

+70
-12
lines changed

4 files changed

+70
-12
lines changed

neo4j/_async/io/_pool.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
defaultdict,
2424
deque,
2525
)
26-
from contextlib import asynccontextmanager
2726
from logging import getLogger
2827
from random import choice
2928

@@ -161,11 +160,10 @@ async def connection_creator():
161160
connections = self.connections[address]
162161
pool_size = (len(connections)
163162
+ self.connections_reservations[address])
164-
can_create_new_connection = (infinite_pool_size
165-
or pool_size < max_pool_size)
166-
self.connections_reservations[address] += 1
167-
if can_create_new_connection:
168-
return connection_creator
163+
if infinite_pool_size or pool_size < max_pool_size:
164+
# there's room for a new connection
165+
self.connections_reservations[address] += 1
166+
return connection_creator
169167
return None
170168

171169
async def _acquire(self, address, deadline, liveness_check_timeout):

neo4j/_sync/io/_pool.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
defaultdict,
2424
deque,
2525
)
26-
from contextlib import contextmanager
2726
from logging import getLogger
2827
from random import choice
2928

@@ -161,11 +160,10 @@ def connection_creator():
161160
connections = self.connections[address]
162161
pool_size = (len(connections)
163162
+ self.connections_reservations[address])
164-
can_create_new_connection = (infinite_pool_size
165-
or pool_size < max_pool_size)
166-
self.connections_reservations[address] += 1
167-
if can_create_new_connection:
168-
return connection_creator
163+
if infinite_pool_size or pool_size < max_pool_size:
164+
# there's room for a new connection
165+
self.connections_reservations[address] += 1
166+
return connection_creator
169167
return None
170168

171169
def _acquire(self, address, deadline, liveness_check_timeout):

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
WRITE_ACCESS,
2626
)
2727
from neo4j._async.io import AsyncNeo4jPool
28+
from neo4j._async_compat.util import AsyncUtil
2829
from neo4j._conf import (
2930
PoolConfig,
3031
RoutingConfig,
@@ -437,3 +438,33 @@ async def test_failing_opener_leaves_connections_in_use_alone(opener):
437438
with pytest.raises((ServiceUnavailable, SessionExpired)):
438439
await pool.acquire(READ_ACCESS, 30, "test_db", None, None)
439440
assert not cx1.closed()
441+
442+
443+
@mark_async_test
444+
async def test__acquire_new_later_with_room(opener):
445+
config = PoolConfig()
446+
config.max_connection_pool_size = 1
447+
pool = AsyncNeo4jPool(
448+
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
449+
)
450+
assert pool.connections_reservations[READER_ADDRESS] == 0
451+
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
452+
assert pool.connections_reservations[READER_ADDRESS] == 1
453+
assert callable(creator)
454+
if AsyncUtil.is_async_code:
455+
assert inspect.iscoroutinefunction(creator)
456+
457+
458+
@mark_async_test
459+
async def test__acquire_new_later_without_room(opener):
460+
config = PoolConfig()
461+
config.max_connection_pool_size = 1
462+
pool = AsyncNeo4jPool(
463+
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
464+
)
465+
_ = await pool.acquire(READ_ACCESS, 30, "test_db", None, None)
466+
# pool is full now
467+
assert pool.connections_reservations[READER_ADDRESS] == 0
468+
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
469+
assert pool.connections_reservations[READER_ADDRESS] == 0
470+
assert creator is None

tests/unit/sync/io/test_neo4j_pool.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
READ_ACCESS,
2525
WRITE_ACCESS,
2626
)
27+
from neo4j._async_compat.util import Util
2728
from neo4j._conf import (
2829
PoolConfig,
2930
RoutingConfig,
@@ -437,3 +438,33 @@ def test_failing_opener_leaves_connections_in_use_alone(opener):
437438
with pytest.raises((ServiceUnavailable, SessionExpired)):
438439
pool.acquire(READ_ACCESS, 30, "test_db", None, None)
439440
assert not cx1.closed()
441+
442+
443+
@mark_sync_test
444+
def test__acquire_new_later_with_room(opener):
445+
config = PoolConfig()
446+
config.max_connection_pool_size = 1
447+
pool = Neo4jPool(
448+
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
449+
)
450+
assert pool.connections_reservations[READER_ADDRESS] == 0
451+
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
452+
assert pool.connections_reservations[READER_ADDRESS] == 1
453+
assert callable(creator)
454+
if Util.is_async_code:
455+
assert inspect.iscoroutinefunction(creator)
456+
457+
458+
@mark_sync_test
459+
def test__acquire_new_later_without_room(opener):
460+
config = PoolConfig()
461+
config.max_connection_pool_size = 1
462+
pool = Neo4jPool(
463+
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
464+
)
465+
_ = pool.acquire(READ_ACCESS, 30, "test_db", None, None)
466+
# pool is full now
467+
assert pool.connections_reservations[READER_ADDRESS] == 0
468+
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
469+
assert pool.connections_reservations[READER_ADDRESS] == 0
470+
assert creator is None

0 commit comments

Comments
 (0)