Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d24b4a5
PYTHON-5503 Use uv to install just in GitHub Actions (#2490)
blink1073 Aug 19, 2025
3a26119
PYTHON-5502 Fix c extensions on OIDC VMs (#2489)
blink1073 Aug 19, 2025
db3d3c7
Prep for 4.14.1 release (#2495) [master] (#2496)
sleepyStick Aug 20, 2025
f7b94be
PYTHON-5143 Support auto encryption in unified tests (#2488)
blink1073 Aug 20, 2025
9a9a65c
PYTHON-5496 Update CSOT tests for change in dropIndex behavior in 8.3…
blink1073 Aug 20, 2025
5e96353
PYTHON-5508 - Add built-in DecimalEncoder and DecimalDecoder (#2499)
NoahStapp Aug 21, 2025
e08284b
PYTHON-5456 Support text indexes with auto encryption (#2500)
blink1073 Aug 21, 2025
ddf9508
PYTHON-5510 Fix server selection log message for commitTransaction (#…
ShaneHarvey Aug 22, 2025
3ebd934
PYTHON-5514 Specific assertions for "is" and "is not None" (#2502)
sleepyStick Aug 25, 2025
cd4e5db
Bump pyright from 1.1.403 to 1.1.404 (#2506)
dependabot[bot] Aug 25, 2025
9892e1b
Update coverage requirement from <=7.10.3,>=5 to >=5,<=7.10.5 (#2507)
dependabot[bot] Aug 25, 2025
1179c5c
DRIVERS-3218 Avoid clearing the connection pool when the server conne…
blink1073 Aug 26, 2025
bc91967
set to one byte
blink1073 Aug 26, 2025
8c361be
Bump the actions group with 5 updates (#2505)
dependabot[bot] Aug 26, 2025
0d4c84e
PYTHON-5519 Clean up uv handling (#2510)
blink1073 Aug 26, 2025
f51e8a5
update approach
blink1073 Aug 27, 2025
c1fe2e3
Merge branch 'master' of github.com:mongodb/mongo-python-driver into …
blink1073 Aug 27, 2025
7584d2d
Revert "Merge branch 'master' of github.com:mongodb/mongo-python-driv…
blink1073 Aug 27, 2025
f1544aa
undo topology changes
blink1073 Aug 27, 2025
9d34e52
improve sleep translation
blink1073 Aug 27, 2025
bb5ac35
improve sleep translation
blink1073 Aug 27, 2025
957a87d
PYTHON-5519 Clean up uv handling (#2510)
blink1073 Aug 26, 2025
9d0af17
add prose tests
blink1073 Aug 27, 2025
da0c0e5
debug
blink1073 Aug 27, 2025
70b4113
fix and update tests
blink1073 Aug 27, 2025
c974d36
fix logic
blink1073 Aug 27, 2025
845f17a
only backoff if conn is closed
blink1073 Aug 27, 2025
09fc66d
use AutoReconnect
blink1073 Aug 28, 2025
84478d0
update tests
blink1073 Aug 28, 2025
532c1b8
update handshake error tests
blink1073 Aug 28, 2025
6890c73
undo lock file changes
blink1073 Aug 28, 2025
6f38a9b
add test for limiting maxConnecting
blink1073 Sep 2, 2025
0997248
add sdam check
blink1073 Sep 3, 2025
320cb54
fix pool backoff
blink1073 Sep 4, 2025
7734af7
do not clear the pool on SystemOverloaded
blink1073 Sep 4, 2025
452cdd6
add retryable label
blink1073 Sep 4, 2025
1f44c48
PYTHON-5521 - Update TestBsonSizeBatches.test_06_insert_fails_over_16…
NoahStapp Sep 3, 2025
fa5c151
fix test
blink1073 Sep 5, 2025
0ab78e4
fix test
blink1073 Sep 5, 2025
07d0233
Revert "update handshake error tests"
blink1073 Sep 5, 2025
771570d
update maxConnecting test
blink1073 Sep 5, 2025
6623261
fix maxconnecting test
blink1073 Sep 5, 2025
f602d4c
fix handling of maxConnecting
blink1073 Sep 5, 2025
64aa0af
update test
blink1073 Sep 5, 2025
7f6335e
fix test
blink1073 Sep 10, 2025
6db793d
undo lock file changes
blink1073 Sep 10, 2025
8c2eb91
PYTHON-5538 Clean up uv lock file handling (#2522)
blink1073 Sep 10, 2025
a84a181
wip
blink1073 Sep 10, 2025
c5ce8dd
wip
blink1073 Sep 11, 2025
679807e
update backoff criteria
blink1073 Sep 11, 2025
7e9f19f
update backoff criteria
blink1073 Sep 11, 2025
b0b5800
update backoff criteria
blink1073 Sep 11, 2025
a033c58
handle the already closed case
blink1073 Sep 11, 2025
ded90b0
handle another edge case
blink1073 Sep 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from bson import DEFAULT_CODEC_OPTIONS
from pymongo import _csot, helpers_shared
from pymongo.asynchronous.client_session import _validate_session_write_concern
from pymongo.asynchronous.helpers import _handle_reauth
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
from pymongo.asynchronous.network import command
from pymongo.common import (
MAX_BSON_SIZE,
Expand Down Expand Up @@ -791,6 +791,7 @@ def __init__(
self._max_connecting = self.opts.max_connecting
self._pending = 0
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -846,6 +847,8 @@ async def _reset(
async with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -937,6 +940,12 @@ async def update_is_writable(self, is_writable: Optional[bool]) -> None:
for _socket in self.conns:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]

async def backoff(self, service_id: Optional[ObjectId] = None) -> None:
# Mark the pool as in backoff.
# TODO: how to handle load balancers?
self._backoff += 1
# TODO: emit a message.

async def reset(
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
) -> None:
Expand Down Expand Up @@ -994,7 +1003,8 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
async with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self._max_connecting:
max_connecting = 1 if self._backoff else self._max_connecting
if self._pending >= max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1051,6 +1061,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
await asyncio.sleep(_backoff(self._backoff))

try:
networking_interface = await _configured_protocol_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
Expand Down Expand Up @@ -1103,6 +1117,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.asynccontextmanager
Expand Down Expand Up @@ -1279,12 +1295,13 @@ async def _get_conn(
# to be checked back into the pool.
async with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self._max_connecting):
max_connecting = 1 if self._backoff else self._max_connecting
while not (self.conns or self._pending < max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not await _async_cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self._max_connecting:
if self.conns or self._pending < max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1395,6 +1412,20 @@ async def checkin(self, conn: AsyncConnection) -> None:
# Pool.reset().
if self.stale_generation(conn.generation, conn.service_id):
close_conn = True
# If in backoff state, check the conn's readiness.
elif self._backoff:
# Set a 1ms read deadline and attempt to read 1 byte from the connection.
# Expect it to block for 1ms then return a deadline exceeded error. If it
# returns any other error, the connection is not usable, so return false.
# If it doesn't return an error and actually reads data, the connection is
# also not usable, so return false.
conn.conn.get_conn.settimeout(0.001)
close_conn = True
try:
conn.conn.get_conn.read(1)
except Exception as _:
# TODO: verify the exception
close_conn = False
Copy link
Member

@ShaneHarvey ShaneHarvey Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 comments:

  1. I believe this logic needs to move to connection checkout. Here in connection check in we already know the connection is useable because we're checking it back in after a successful command.
  2. Instead of a 1ms read can we reuse the existing _perished() + conn_closed() methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
Expand Down
4 changes: 4 additions & 0 deletions pymongo/asynchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ async def reset(self, service_id: Optional[ObjectId] = None) -> None:
"""Clear the connection pool."""
await self.pool.reset(service_id)

async def backoff(self, service_id: Optional[ObjectId] = None) -> None:
"""Set the connection pool in backoff mode."""
await self.pool.backoff(service_id)

async def close(self) -> None:
"""Clear the connection pool and stop the monitor.

Expand Down
18 changes: 12 additions & 6 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,12 +896,18 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None
# ... MUST NOT request an immediate check of the server."
if not self._settings.load_balanced:
await self._process_change(ServerDescription(address, error=error))
# Clear the pool.
await server.reset(service_id)
# "When a client marks a server Unknown from `Network error when
# reading or writing`_, clients MUST cancel the hello check on
# that server and close the current monitoring connection."
server._monitor.cancel_check()

if err_ctx.completed_handshake:
# Clear the pool.
await server.reset(service_id)
# "When a client marks a server Unknown from `Network error when
# reading or writing`_, clients MUST cancel the hello check on
# that server and close the current monitoring connection."
server._monitor.cancel_check()
return

# Set the pool into backoff mode.
await server.backoff(service_id)

async def handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None:
"""Handle an application error.
Expand Down
39 changes: 35 additions & 4 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
from pymongo.server_type import SERVER_TYPE
from pymongo.socket_checker import SocketChecker
from pymongo.synchronous.client_session import _validate_session_write_concern
from pymongo.synchronous.helpers import _handle_reauth
from pymongo.synchronous.helpers import _backoff, _handle_reauth
from pymongo.synchronous.network import command

if TYPE_CHECKING:
Expand Down Expand Up @@ -789,6 +789,7 @@ def __init__(
self._max_connecting = self.opts.max_connecting
self._pending = 0
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -844,6 +845,8 @@ def _reset(
with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -935,6 +938,12 @@ def update_is_writable(self, is_writable: Optional[bool]) -> None:
for _socket in self.conns:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]

def backoff(self, service_id: Optional[ObjectId] = None) -> None:
# Mark the pool as in backoff.
# TODO: how to handle load balancers?
self._backoff += 1
# TODO: emit a message.

def reset(
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
) -> None:
Expand Down Expand Up @@ -990,7 +999,8 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self._max_connecting:
max_connecting = 1 if self._backoff else self._max_connecting
if self._pending >= max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1047,6 +1057,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
asyncio.sleep(_backoff(self._backoff))

try:
networking_interface = _configured_socket_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
Expand Down Expand Up @@ -1099,6 +1113,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
if handler:
handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.contextmanager
Expand Down Expand Up @@ -1275,12 +1291,13 @@ def _get_conn(
# to be checked back into the pool.
with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self._max_connecting):
max_connecting = 1 if self._backoff else self._max_connecting
while not (self.conns or self._pending < max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not _cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self._max_connecting:
if self.conns or self._pending < max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1391,6 +1408,20 @@ def checkin(self, conn: Connection) -> None:
# Pool.reset().
if self.stale_generation(conn.generation, conn.service_id):
close_conn = True
# If in backoff state, check the conn's readiness.
elif self._backoff:
# Set a 1ms read deadline and attempt to read 1 byte from the connection.
# Expect it to block for 1ms then return a deadline exceeded error. If it
# returns any other error, the connection is not usable, so return false.
# If it doesn't return an error and actually reads data, the connection is
# also not usable, so return false.
conn.conn.get_conn.settimeout(0.001)
close_conn = True
try:
conn.conn.get_conn.read(1)
except Exception as _:
# TODO: verify the exception
close_conn = False
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
Expand Down
4 changes: 4 additions & 0 deletions pymongo/synchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def reset(self, service_id: Optional[ObjectId] = None) -> None:
"""Clear the connection pool."""
self.pool.reset(service_id)

def backoff(self, service_id: Optional[ObjectId] = None) -> None:
"""Set the connection pool in backoff mode."""
self.pool.backoff(service_id)

def close(self) -> None:
"""Clear the connection pool and stop the monitor.

Expand Down
18 changes: 12 additions & 6 deletions pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,18 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None:
# ... MUST NOT request an immediate check of the server."
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
# Clear the pool.
server.reset(service_id)
# "When a client marks a server Unknown from `Network error when
# reading or writing`_, clients MUST cancel the hello check on
# that server and close the current monitoring connection."
server._monitor.cancel_check()

if err_ctx.completed_handshake:
# Clear the pool.
server.reset(service_id)
# "When a client marks a server Unknown from `Network error when
# reading or writing`_, clients MUST cancel the hello check on
# that server and close the current monitoring connection."
server._monitor.cancel_check()
return

# Set the pool into backoff mode.
server.backoff(service_id)

def handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None:
"""Handle an application error.
Expand Down
Loading