Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d24b4a5
PYTHON-5503 Use uv to install just in GitHub Actions (#2490)
blink1073 Aug 19, 2025
3a26119
PYTHON-5502 Fix c extensions on OIDC VMs (#2489)
blink1073 Aug 19, 2025
db3d3c7
Prep for 4.14.1 release (#2495) [master] (#2496)
sleepyStick Aug 20, 2025
f7b94be
PYTHON-5143 Support auto encryption in unified tests (#2488)
blink1073 Aug 20, 2025
9a9a65c
PYTHON-5496 Update CSOT tests for change in dropIndex behavior in 8.3…
blink1073 Aug 20, 2025
5e96353
PYTHON-5508 - Add built-in DecimalEncoder and DecimalDecoder (#2499)
NoahStapp Aug 21, 2025
e08284b
PYTHON-5456 Support text indexes with auto encryption (#2500)
blink1073 Aug 21, 2025
ddf9508
PYTHON-5510 Fix server selection log message for commitTransaction (#…
ShaneHarvey Aug 22, 2025
3ebd934
PYTHON-5514 Specific assertions for "is" and "is not None" (#2502)
sleepyStick Aug 25, 2025
cd4e5db
Bump pyright from 1.1.403 to 1.1.404 (#2506)
dependabot[bot] Aug 25, 2025
9892e1b
Update coverage requirement from <=7.10.3,>=5 to >=5,<=7.10.5 (#2507)
dependabot[bot] Aug 25, 2025
1179c5c
DRIVERS-3218 Avoid clearing the connection pool when the server conne…
blink1073 Aug 26, 2025
bc91967
set to one byte
blink1073 Aug 26, 2025
8c361be
Bump the actions group with 5 updates (#2505)
dependabot[bot] Aug 26, 2025
0d4c84e
PYTHON-5519 Clean up uv handling (#2510)
blink1073 Aug 26, 2025
f51e8a5
update approach
blink1073 Aug 27, 2025
c1fe2e3
Merge branch 'master' of github.com:mongodb/mongo-python-driver into …
blink1073 Aug 27, 2025
7584d2d
Revert "Merge branch 'master' of github.com:mongodb/mongo-python-driv…
blink1073 Aug 27, 2025
f1544aa
undo topology changes
blink1073 Aug 27, 2025
9d34e52
improve sleep translation
blink1073 Aug 27, 2025
bb5ac35
improve sleep translation
blink1073 Aug 27, 2025
957a87d
PYTHON-5519 Clean up uv handling (#2510)
blink1073 Aug 26, 2025
9d0af17
add prose tests
blink1073 Aug 27, 2025
da0c0e5
debug
blink1073 Aug 27, 2025
70b4113
fix and update tests
blink1073 Aug 27, 2025
c974d36
fix logic
blink1073 Aug 27, 2025
845f17a
only backoff if conn is closed
blink1073 Aug 27, 2025
09fc66d
use AutoReconnect
blink1073 Aug 28, 2025
84478d0
update tests
blink1073 Aug 28, 2025
532c1b8
update handshake error tests
blink1073 Aug 28, 2025
6890c73
undo lock file changes
blink1073 Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ else
fi

# List the packages.
uv sync ${UV_ARGS} --reinstall
uv sync ${UV_ARGS} --reinstall --quiet
uv pip list

# Ensure we go back to base environment after the test.
trap "uv sync" EXIT HUP

# Start the test runner.
uv run ${UV_ARGS} .evergreen/scripts/run_tests.py "$@"

Expand Down
26 changes: 15 additions & 11 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set shell := ["bash", "-c"]

# Commonly used command segments.
uv_run := "uv run --isolated --frozen "
uv_run := "uv run --frozen "
typing_run := uv_run + "--group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
docs_run := uv_run + "--extra docs"
doc_build := "./doc/_build"
Expand All @@ -13,51 +13,55 @@ mypy_args := "--install-types --non-interactive"
default:
@just --list

[private]
resync:
@uv sync --quiet --frozen

install:
bash .evergreen/scripts/setup-dev-env.sh

[group('docs')]
docs:
docs: && resync
{{docs_run}} sphinx-build -W -b html doc {{doc_build}}/html

[group('docs')]
docs-serve:
docs-serve: && resync
{{docs_run}} sphinx-autobuild -W -b html doc --watch ./pymongo --watch ./bson --watch ./gridfs {{doc_build}}/serve

[group('docs')]
docs-linkcheck:
docs-linkcheck: && resync
{{docs_run}} sphinx-build -E -b linkcheck doc {{doc_build}}/linkcheck

[group('typing')]
typing:
typing: && resync
just typing-mypy
just typing-pyright

[group('typing')]
typing-mypy:
typing-mypy: && resync
{{typing_run}} mypy {{mypy_args}} bson gridfs tools pymongo
{{typing_run}} mypy {{mypy_args}} --config-file mypy_test.ini test
{{typing_run}} mypy {{mypy_args}} test/test_typing.py test/test_typing_strict.py

[group('typing')]
typing-pyright:
typing-pyright: && resync
{{typing_run}} pyright test/test_typing.py test/test_typing_strict.py
{{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py

[group('lint')]
lint:
lint: && resync
{{uv_run}} pre-commit run --all-files

[group('lint')]
lint-manual:
lint-manual: && resync
{{uv_run}} pre-commit run --all-files --hook-stage manual

[group('test')]
test *args="-v --durations=5 --maxfail=10":
test *args="-v --durations=5 --maxfail=10": && resync
{{uv_run}} --extra test pytest {{args}}

[group('test')]
run-tests *args:
run-tests *args: && resync
bash ./.evergreen/run-tests.sh {{args}}

[group('test')]
Expand Down
39 changes: 30 additions & 9 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from bson import DEFAULT_CODEC_OPTIONS
from pymongo import _csot, helpers_shared
from pymongo.asynchronous.client_session import _validate_session_write_concern
from pymongo.asynchronous.helpers import _handle_reauth
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
from pymongo.asynchronous.network import command
from pymongo.common import (
MAX_BSON_SIZE,
Expand Down Expand Up @@ -791,6 +791,7 @@ def __init__(
self._max_connecting = self.opts.max_connecting
self._pending = 0
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -846,6 +847,8 @@ async def _reset(
async with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -994,7 +997,8 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
async with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self._max_connecting:
max_connecting = 1 if self._backoff else self._max_connecting
if self._pending >= max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1051,6 +1055,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
await asyncio.sleep(_backoff(self._backoff))

try:
networking_interface = await _configured_protocol_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
Expand Down Expand Up @@ -1094,15 +1102,24 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A

await conn.authenticate()
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException:
except BaseException as e:
async with self.lock:
self.active_contexts.discard(conn.cancel_context)
# Enter backoff mode and reconnect on establishment failure.
if type(e) == AutoReconnect:
await conn.close_conn(ConnectionClosedReason.ERROR)
self._backoff += 1
# TODO: emit a message about backoff.
print("backing off", self._backoff) # noqa: T201
return await self.connect(handler)
await conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.asynccontextmanager
Expand Down Expand Up @@ -1279,12 +1296,13 @@ async def _get_conn(
# to be checked back into the pool.
async with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self._max_connecting):
max_connecting = 1 if self._backoff else self._max_connecting
while not (self.conns or self._pending < max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not await _async_cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self._max_connecting:
if self.conns or self._pending < max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1425,8 +1443,8 @@ async def _perished(self, conn: AsyncConnection) -> bool:
:class:`~pymongo.errors.AutoReconnect` exceptions on server
hiccups, etc. We only check if the socket was closed by an external
error if it has been > 1 second since the socket was checked into the
pool, to keep performance reasonable - we can't avoid AutoReconnects
completely anyway.
pool, or we are in backoff mode, to keep performance reasonable -
we can't avoid AutoReconnects completely anyway.
"""
idle_time_seconds = conn.idle_time_seconds()
# If socket is idle, open a new one.
Expand All @@ -1437,8 +1455,11 @@ async def _perished(self, conn: AsyncConnection) -> bool:
await conn.close_conn(ConnectionClosedReason.IDLE)
return True

if self._check_interval_seconds is not None and (
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds
check_interval_seconds = self._check_interval_seconds
if self._backoff:
check_interval_seconds = 0
if check_interval_seconds is not None and (
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
):
if conn.conn_closed():
await conn.close_conn(ConnectionClosedReason.ERROR)
Expand Down
39 changes: 30 additions & 9 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
from pymongo.server_type import SERVER_TYPE
from pymongo.socket_checker import SocketChecker
from pymongo.synchronous.client_session import _validate_session_write_concern
from pymongo.synchronous.helpers import _handle_reauth
from pymongo.synchronous.helpers import _backoff, _handle_reauth
from pymongo.synchronous.network import command

if TYPE_CHECKING:
Expand Down Expand Up @@ -789,6 +789,7 @@ def __init__(
self._max_connecting = self.opts.max_connecting
self._pending = 0
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -844,6 +845,8 @@ def _reset(
with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -990,7 +993,8 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self._max_connecting:
max_connecting = 1 if self._backoff else self._max_connecting
if self._pending >= max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1047,6 +1051,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
time.sleep(_backoff(self._backoff))

try:
networking_interface = _configured_socket_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
Expand Down Expand Up @@ -1090,15 +1098,24 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect

conn.authenticate()
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException:
except BaseException as e:
with self.lock:
self.active_contexts.discard(conn.cancel_context)
# Enter backoff mode and reconnect on establishment failure.
if type(e) == AutoReconnect:
conn.close_conn(ConnectionClosedReason.ERROR)
self._backoff += 1
# TODO: emit a message about backoff.
print("backing off", self._backoff) # noqa: T201
return self.connect(handler)
conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.contextmanager
Expand Down Expand Up @@ -1275,12 +1292,13 @@ def _get_conn(
# to be checked back into the pool.
with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self._max_connecting):
max_connecting = 1 if self._backoff else self._max_connecting
while not (self.conns or self._pending < max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not _cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self._max_connecting:
if self.conns or self._pending < max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1421,8 +1439,8 @@ def _perished(self, conn: Connection) -> bool:
:class:`~pymongo.errors.AutoReconnect` exceptions on server
hiccups, etc. We only check if the socket was closed by an external
error if it has been > 1 second since the socket was checked into the
pool, to keep performance reasonable - we can't avoid AutoReconnects
completely anyway.
pool, or we are in backoff mode, to keep performance reasonable -
we can't avoid AutoReconnects completely anyway.
"""
idle_time_seconds = conn.idle_time_seconds()
# If socket is idle, open a new one.
Expand All @@ -1433,8 +1451,11 @@ def _perished(self, conn: Connection) -> bool:
conn.close_conn(ConnectionClosedReason.IDLE)
return True

if self._check_interval_seconds is not None and (
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds
check_interval_seconds = self._check_interval_seconds
if self._backoff:
check_interval_seconds = 0
if check_interval_seconds is not None and (
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
):
if conn.conn_closed():
conn.close_conn(ConnectionClosedReason.ERROR)
Expand Down
61 changes: 61 additions & 0 deletions test/asynchronous/test_pooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,67 @@ async def test_connection_timeout_message(self):
str(error.exception),
)

@async_client_context.require_failCommand_appName
async def test_pool_backoff_preserves_existing_collections(self):
client = await self.async_rs_or_single_client()
coll = self.db.t
pool = await async_get_pool(client)
await coll.insert_many([{"x": 1} for _ in range(10)])
t = SocketGetter(self.c, pool)
await t.start()
while t.state != "connection":
await asyncio.sleep(0.1)

assert not t.sock.conn_closed()

# Mock a session establishment overload.
mock_connection_fail = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"closeConnection": True,
},
}

async with self.fail_point(mock_connection_fail):
await coll.find_one({})

# Make sure the pool is out of backoff state.
assert pool._backoff == 0

# Make sure the existing socket was not affected.
assert not t.sock.conn_closed()

# Cleanup
await t.release_conn()
await t.join()
await pool.close()

async def test_pool_check_backoff(self):
# Test that Pool recovers from two connection failures in a row.
# This exercises code at the end of Pool._check().
cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1)
self.addAsyncCleanup(cx_pool.close)

async with cx_pool.checkout() as conn:
# Simulate a closed socket without telling the Connection it's
# closed.
await conn.conn.close()

# Enable backoff.
cx_pool._backoff = 1

# Swap pool's address with a bad one.
address, cx_pool.address = cx_pool.address, ("foo.com", 1234)
with self.assertRaises(AutoReconnect):
async with cx_pool.checkout():
pass

# Back to normal, semaphore was correctly released.
cx_pool.address = address
async with cx_pool.checkout():
pass


class TestPoolMaxSize(_TestPoolingBase):
async def test_max_pool_size(self):
Expand Down
Loading
Loading