Skip to content

Commit 1179c5c

Browse files
committed
DRIVERS-3218 Avoid clearing the connection pool when the server connection rate limiter triggers
1 parent 875c564 commit 1179c5c

File tree

6 files changed

+102
-20
lines changed

6 files changed

+102
-20
lines changed

pymongo/asynchronous/pool.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from bson import DEFAULT_CODEC_OPTIONS
3838
from pymongo import _csot, helpers_shared
3939
from pymongo.asynchronous.client_session import _validate_session_write_concern
40-
from pymongo.asynchronous.helpers import _handle_reauth
40+
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
4141
from pymongo.asynchronous.network import command
4242
from pymongo.common import (
4343
MAX_BSON_SIZE,
@@ -791,6 +791,7 @@ def __init__(
791791
self._max_connecting = self.opts.max_connecting
792792
self._pending = 0
793793
self._client_id = client_id
794+
self._backoff = 0
794795
if self.enabled_for_cmap:
795796
assert self.opts._event_listeners is not None
796797
self.opts._event_listeners.publish_pool_created(
@@ -846,6 +847,8 @@ async def _reset(
846847
async with self.size_cond:
847848
if self.closed:
848849
return
850+
# Clear the backoff state.
851+
self._backoff = 0
849852
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
850853
old_state, self.state = self.state, PoolState.PAUSED
851854
self.gen.inc(service_id)
@@ -937,6 +940,12 @@ async def update_is_writable(self, is_writable: Optional[bool]) -> None:
937940
for _socket in self.conns:
938941
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
939942

943+
async def backoff(self, service_id: Optional[ObjectId] = None) -> None:
944+
# Mark the pool as in backoff.
945+
# TODO: how to handle load balancers?
946+
self._backoff += 1
947+
# TODO: emit a message.
948+
940949
async def reset(
941950
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
942951
) -> None:
@@ -994,7 +1003,8 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
9941003
async with self._max_connecting_cond:
9951004
# If maxConnecting connections are already being created
9961005
# by this pool then try again later instead of waiting.
997-
if self._pending >= self._max_connecting:
1006+
max_connecting = 1 if self._backoff else self._max_connecting
1007+
if self._pending >= max_connecting:
9981008
return
9991009
self._pending += 1
10001010
incremented = True
@@ -1051,6 +1061,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10511061
driverConnectionId=conn_id,
10521062
)
10531063

1064+
# Apply backoff if applicable.
1065+
if self._backoff:
1066+
await asyncio.sleep(_backoff(self._backoff))
1067+
10541068
try:
10551069
networking_interface = await _configured_protocol_interface(self.address, self.opts)
10561070
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
@@ -1103,6 +1117,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11031117
if handler:
11041118
await handler.client._topology.receive_cluster_time(conn._cluster_time)
11051119

1120+
# Clear the backoff state.
1121+
self._backoff = 0
11061122
return conn
11071123

11081124
@contextlib.asynccontextmanager
@@ -1279,12 +1295,13 @@ async def _get_conn(
12791295
# to be checked back into the pool.
12801296
async with self._max_connecting_cond:
12811297
self._raise_if_not_ready(checkout_started_time, emit_event=False)
1282-
while not (self.conns or self._pending < self._max_connecting):
1298+
max_connecting = 1 if self._backoff else self._max_connecting
1299+
while not (self.conns or self._pending < max_connecting):
12831300
timeout = deadline - time.monotonic() if deadline else None
12841301
if not await _async_cond_wait(self._max_connecting_cond, timeout):
12851302
# Timed out, notify the next thread to ensure a
12861303
# timeout doesn't consume the condition.
1287-
if self.conns or self._pending < self._max_connecting:
1304+
if self.conns or self._pending < max_connecting:
12881305
self._max_connecting_cond.notify()
12891306
emitted_event = True
12901307
self._raise_wait_queue_timeout(checkout_started_time)
@@ -1395,6 +1412,20 @@ async def checkin(self, conn: AsyncConnection) -> None:
13951412
# Pool.reset().
13961413
if self.stale_generation(conn.generation, conn.service_id):
13971414
close_conn = True
1415+
# If in backoff state, check the conn's readiness.
1416+
elif self._backoff:
1417+
# Set a 1ms read deadline and attempt to read 1 byte from the connection.
1418+
# Expect it to block for 1ms then return a deadline exceeded error. If it
1419+
# returns any other error, the connection is not usable, so return false.
1420+
# If it doesn't return an error and actually reads data, the connection is
1421+
# also not usable, so return false.
1422+
conn.conn.get_conn.settimeout(0.001)
1423+
close_conn = True
1424+
try:
1425+
conn.conn.get_conn.read()
1426+
except Exception as _:
1427+
# TODO: verify the exception
1428+
close_conn = False
13981429
else:
13991430
conn.update_last_checkin_time()
14001431
conn.update_is_writable(bool(self.is_writable))

pymongo/asynchronous/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ async def reset(self, service_id: Optional[ObjectId] = None) -> None:
9191
"""Clear the connection pool."""
9292
await self.pool.reset(service_id)
9393

94+
async def backoff(self, service_id: Optional[ObjectId] = None) -> None:
95+
"""Set the connection pool in backoff mode."""
96+
await self.pool.backoff(service_id)
97+
9498
async def close(self) -> None:
9599
"""Clear the connection pool and stop the monitor.
96100

pymongo/asynchronous/topology.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -896,12 +896,18 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None
896896
# ... MUST NOT request an immediate check of the server."
897897
if not self._settings.load_balanced:
898898
await self._process_change(ServerDescription(address, error=error))
899-
# Clear the pool.
900-
await server.reset(service_id)
901-
# "When a client marks a server Unknown from `Network error when
902-
# reading or writing`_, clients MUST cancel the hello check on
903-
# that server and close the current monitoring connection."
904-
server._monitor.cancel_check()
899+
900+
if err_ctx.completed_handshake:
901+
# Clear the pool.
902+
await server.reset(service_id)
903+
# "When a client marks a server Unknown from `Network error when
904+
# reading or writing`_, clients MUST cancel the hello check on
905+
# that server and close the current monitoring connection."
906+
server._monitor.cancel_check()
907+
return
908+
909+
# Set the pool into backoff mode.
910+
await server.backoff(service_id)
905911

906912
async def handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None:
907913
"""Handle an application error.

pymongo/synchronous/pool.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
from pymongo.server_type import SERVER_TYPE
8585
from pymongo.socket_checker import SocketChecker
8686
from pymongo.synchronous.client_session import _validate_session_write_concern
87-
from pymongo.synchronous.helpers import _handle_reauth
87+
from pymongo.synchronous.helpers import _backoff, _handle_reauth
8888
from pymongo.synchronous.network import command
8989

9090
if TYPE_CHECKING:
@@ -789,6 +789,7 @@ def __init__(
789789
self._max_connecting = self.opts.max_connecting
790790
self._pending = 0
791791
self._client_id = client_id
792+
self._backoff = 0
792793
if self.enabled_for_cmap:
793794
assert self.opts._event_listeners is not None
794795
self.opts._event_listeners.publish_pool_created(
@@ -844,6 +845,8 @@ def _reset(
844845
with self.size_cond:
845846
if self.closed:
846847
return
848+
# Clear the backoff state.
849+
self._backoff = 0
847850
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
848851
old_state, self.state = self.state, PoolState.PAUSED
849852
self.gen.inc(service_id)
@@ -935,6 +938,12 @@ def update_is_writable(self, is_writable: Optional[bool]) -> None:
935938
for _socket in self.conns:
936939
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
937940

941+
def backoff(self, service_id: Optional[ObjectId] = None) -> None:
942+
# Mark the pool as in backoff.
943+
# TODO: how to handle load balancers?
944+
self._backoff += 1
945+
# TODO: emit a message.
946+
938947
def reset(
939948
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
940949
) -> None:
@@ -990,7 +999,8 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
990999
with self._max_connecting_cond:
9911000
# If maxConnecting connections are already being created
9921001
# by this pool then try again later instead of waiting.
993-
if self._pending >= self._max_connecting:
1002+
max_connecting = 1 if self._backoff else self._max_connecting
1003+
if self._pending >= max_connecting:
9941004
return
9951005
self._pending += 1
9961006
incremented = True
@@ -1047,6 +1057,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10471057
driverConnectionId=conn_id,
10481058
)
10491059

1060+
# Apply backoff if applicable.
1061+
if self._backoff:
1062+
asyncio.sleep(_backoff(self._backoff))
1063+
10501064
try:
10511065
networking_interface = _configured_socket_interface(self.address, self.opts)
10521066
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
@@ -1099,6 +1113,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10991113
if handler:
11001114
handler.client._topology.receive_cluster_time(conn._cluster_time)
11011115

1116+
# Clear the backoff state.
1117+
self._backoff = 0
11021118
return conn
11031119

11041120
@contextlib.contextmanager
@@ -1275,12 +1291,13 @@ def _get_conn(
12751291
# to be checked back into the pool.
12761292
with self._max_connecting_cond:
12771293
self._raise_if_not_ready(checkout_started_time, emit_event=False)
1278-
while not (self.conns or self._pending < self._max_connecting):
1294+
max_connecting = 1 if self._backoff else self._max_connecting
1295+
while not (self.conns or self._pending < max_connecting):
12791296
timeout = deadline - time.monotonic() if deadline else None
12801297
if not _cond_wait(self._max_connecting_cond, timeout):
12811298
# Timed out, notify the next thread to ensure a
12821299
# timeout doesn't consume the condition.
1283-
if self.conns or self._pending < self._max_connecting:
1300+
if self.conns or self._pending < max_connecting:
12841301
self._max_connecting_cond.notify()
12851302
emitted_event = True
12861303
self._raise_wait_queue_timeout(checkout_started_time)
@@ -1391,6 +1408,20 @@ def checkin(self, conn: Connection) -> None:
13911408
# Pool.reset().
13921409
if self.stale_generation(conn.generation, conn.service_id):
13931410
close_conn = True
1411+
# If in backoff state, check the conn's readiness.
1412+
elif self._backoff:
1413+
# Set a 1ms read deadline and attempt to read 1 byte from the connection.
1414+
# Expect it to block for 1ms then return a deadline exceeded error. If it
1415+
# returns any other error, the connection is not usable, so return false.
1416+
# If it doesn't return an error and actually reads data, the connection is
1417+
# also not usable, so return false.
1418+
conn.conn.get_conn.settimeout(0.001)
1419+
close_conn = True
1420+
try:
1421+
conn.conn.get_conn.read()
1422+
except Exception as _:
1423+
# TODO: verify the exception
1424+
close_conn = False
13941425
else:
13951426
conn.update_last_checkin_time()
13961427
conn.update_is_writable(bool(self.is_writable))

pymongo/synchronous/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ def reset(self, service_id: Optional[ObjectId] = None) -> None:
9191
"""Clear the connection pool."""
9292
self.pool.reset(service_id)
9393

94+
def backoff(self, service_id: Optional[ObjectId] = None) -> None:
95+
"""Set the connection pool in backoff mode."""
96+
self.pool.backoff(service_id)
97+
9498
def close(self) -> None:
9599
"""Clear the connection pool and stop the monitor.
96100

pymongo/synchronous/topology.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -894,12 +894,18 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None:
894894
# ... MUST NOT request an immediate check of the server."
895895
if not self._settings.load_balanced:
896896
self._process_change(ServerDescription(address, error=error))
897-
# Clear the pool.
898-
server.reset(service_id)
899-
# "When a client marks a server Unknown from `Network error when
900-
# reading or writing`_, clients MUST cancel the hello check on
901-
# that server and close the current monitoring connection."
902-
server._monitor.cancel_check()
897+
898+
if err_ctx.completed_handshake:
899+
# Clear the pool.
900+
server.reset(service_id)
901+
# "When a client marks a server Unknown from `Network error when
902+
# reading or writing`_, clients MUST cancel the hello check on
903+
# that server and close the current monitoring connection."
904+
server._monitor.cancel_check()
905+
return
906+
907+
# Set the pool into backoff mode.
908+
server.backoff(service_id)
903909

904910
def handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None:
905911
"""Handle an application error.

0 commit comments

Comments
 (0)