-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5536 Avoid clearing the connection pool when the server connection rate limiter triggers #2509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PYTHON-5536 Avoid clearing the connection pool when the server connection rate limiter triggers #2509
Changes from 54 commits
d24b4a5
3a26119
db3d3c7
f7b94be
9a9a65c
5e96353
e08284b
ddf9508
3ebd934
cd4e5db
9892e1b
1179c5c
bc91967
8c361be
0d4c84e
f51e8a5
c1fe2e3
7584d2d
f1544aa
9d34e52
bb5ac35
957a87d
9d0af17
da0c0e5
70b4113
c974d36
845f17a
09fc66d
84478d0
532c1b8
6890c73
6f38a9b
0997248
320cb54
7734af7
452cdd6
1f44c48
fa5c151
0ab78e4
07d0233
771570d
6623261
f602d4c
64aa0af
7f6335e
6db793d
8c2eb91
a84a181
c5ce8dd
679807e
7e9f19f
b0b5800
a033c58
ded90b0
2cd3c18
0b4b265
7548f7b
5e34bdc
f1294dc
6a47fc6
6d57549
9fa3a6f
47e55b4
4f06bb6
3c760e7
4cc6619
eaab484
65832a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,7 @@ | |
from bson import DEFAULT_CODEC_OPTIONS | ||
from pymongo import _csot, helpers_shared | ||
from pymongo.asynchronous.client_session import _validate_session_write_concern | ||
from pymongo.asynchronous.helpers import _handle_reauth | ||
from pymongo.asynchronous.helpers import _backoff, _handle_reauth | ||
from pymongo.asynchronous.network import command | ||
from pymongo.common import ( | ||
MAX_BSON_SIZE, | ||
|
@@ -788,9 +788,9 @@ def __init__( | |
# Enforces: maxConnecting | ||
# Also used for: clearing the wait queue | ||
self._max_connecting_cond = _async_create_condition(self.lock) | ||
self._max_connecting = self.opts.max_connecting | ||
self._pending = 0 | ||
self._client_id = client_id | ||
self._backoff = 0 | ||
if self.enabled_for_cmap: | ||
assert self.opts._event_listeners is not None | ||
self.opts._event_listeners.publish_pool_created( | ||
|
@@ -846,6 +846,8 @@ async def _reset( | |
async with self.size_cond: | ||
if self.closed: | ||
return | ||
# Clear the backoff state. | ||
self._backoff = 0 | ||
if self.opts.pause_enabled and pause and not self.opts.load_balanced: | ||
old_state, self.state = self.state, PoolState.PAUSED | ||
self.gen.inc(service_id) | ||
|
@@ -928,6 +930,11 @@ async def _reset( | |
for conn in sockets: | ||
await conn.close_conn(ConnectionClosedReason.STALE) | ||
|
||
@property | ||
def max_connecting(self) -> int: | ||
"""The current max connecting limit for the pool.""" | ||
return 1 if self._backoff else self.opts.max_connecting | ||
|
||
async def update_is_writable(self, is_writable: Optional[bool]) -> None: | ||
"""Updates the is_writable attribute on all sockets currently in the | ||
Pool. | ||
|
@@ -994,7 +1001,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: | |
async with self._max_connecting_cond: | ||
# If maxConnecting connections are already being created | ||
# by this pool then try again later instead of waiting. | ||
if self._pending >= self._max_connecting: | ||
if self._pending >= self.max_connecting: | ||
return | ||
self._pending += 1 | ||
incremented = True | ||
|
@@ -1022,6 +1029,20 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: | |
self.requests -= 1 | ||
self.size_cond.notify() | ||
|
||
def _handle_connection_error(self, error: Exception, phase: str) -> None: | ||
# Handle system overload condition. When the base AutoReconnect is | ||
# raised and we are not an sdam pool, add to backoff and add the | ||
# appropriate error label. | ||
if ( | ||
not self.is_sdam | ||
and "connection reset by peer" in str(error).lower() | ||
or ("connection closed" in str(error).lower() and self._backoff) | ||
): | ||
self._backoff += 1 | ||
error._add_error_label("SystemOverloaded") | ||
error._add_error_label("Retryable") | ||
print(f"Setting backoff in {phase}:", self._backoff) # noqa: T201 | ||
|
||
|
||
async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: | ||
"""Connect to Mongo and return a new AsyncConnection. | ||
|
||
|
@@ -1051,6 +1072,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A | |
driverConnectionId=conn_id, | ||
) | ||
|
||
# Apply backoff if applicable. | ||
if self._backoff: | ||
await asyncio.sleep(_backoff(self._backoff)) | ||
|
||
try: | ||
networking_interface = await _configured_protocol_interface(self.address, self.opts) | ||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup. | ||
|
@@ -1073,10 +1098,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A | |
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), | ||
error=ConnectionClosedReason.ERROR, | ||
) | ||
self._handle_connection_error(error, "handshake") | ||
if isinstance(error, (IOError, OSError, *SSLErrors)): | ||
details = _get_timeout_details(self.opts) | ||
_raise_connection_failure(self.address, error, timeout_details=details) | ||
|
||
raise | ||
|
||
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] | ||
|
@@ -1094,15 +1119,18 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A | |
|
||
await conn.authenticate() | ||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup. | ||
except BaseException: | ||
except BaseException as e: | ||
async with self.lock: | ||
self.active_contexts.discard(conn.cancel_context) | ||
self._handle_connection_error(e, "hello") | ||
await conn.close_conn(ConnectionClosedReason.ERROR) | ||
raise | ||
|
||
if handler: | ||
await handler.client._topology.receive_cluster_time(conn._cluster_time) | ||
|
||
# Clear the backoff state. | ||
self._backoff = 0 | ||
return conn | ||
|
||
@contextlib.asynccontextmanager | ||
|
@@ -1279,12 +1307,12 @@ async def _get_conn( | |
# to be checked back into the pool. | ||
async with self._max_connecting_cond: | ||
self._raise_if_not_ready(checkout_started_time, emit_event=False) | ||
while not (self.conns or self._pending < self._max_connecting): | ||
while not (self.conns or self._pending < self.max_connecting): | ||
timeout = deadline - time.monotonic() if deadline else None | ||
if not await _async_cond_wait(self._max_connecting_cond, timeout): | ||
# Timed out, notify the next thread to ensure a | ||
# timeout doesn't consume the condition. | ||
if self.conns or self._pending < self._max_connecting: | ||
if self.conns or self._pending < self.max_connecting: | ||
self._max_connecting_cond.notify() | ||
emitted_event = True | ||
self._raise_wait_queue_timeout(checkout_started_time) | ||
|
@@ -1425,8 +1453,8 @@ async def _perished(self, conn: AsyncConnection) -> bool: | |
:class:`~pymongo.errors.AutoReconnect` exceptions on server | ||
hiccups, etc. We only check if the socket was closed by an external | ||
error if it has been > 1 second since the socket was checked into the | ||
pool, to keep performance reasonable - we can't avoid AutoReconnects | ||
completely anyway. | ||
pool, or we are in backoff mode, to keep performance reasonable - | ||
we can't avoid AutoReconnects completely anyway. | ||
""" | ||
idle_time_seconds = conn.idle_time_seconds() | ||
# If socket is idle, open a new one. | ||
|
@@ -1437,8 +1465,11 @@ async def _perished(self, conn: AsyncConnection) -> bool: | |
await conn.close_conn(ConnectionClosedReason.IDLE) | ||
return True | ||
|
||
if self._check_interval_seconds is not None and ( | ||
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds | ||
check_interval_seconds = self._check_interval_seconds | ||
if self._backoff: | ||
check_interval_seconds = 0 | ||
if check_interval_seconds is not None and ( | ||
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds | ||
): | ||
if conn.conn_closed(): | ||
await conn.close_conn(ConnectionClosedReason.ERROR) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,6 +256,7 @@ def __init__(self, timeout: Optional[float] = None): | |
self._timeout = timeout | ||
self._closed = asyncio.get_running_loop().create_future() | ||
self._connection_lost = False | ||
self._closing_exception = None | ||
|
||
def settimeout(self, timeout: float | None) -> None: | ||
self._timeout = timeout | ||
|
@@ -269,9 +270,11 @@ def close(self, exc: Optional[Exception] = None) -> None: | |
self.transport.abort() | ||
self._resolve_pending(exc) | ||
self._connection_lost = True | ||
self._closing_exception = exc | ||
|
||
def connection_lost(self, exc: Optional[Exception] = None) -> None: | ||
self._resolve_pending(exc) | ||
self._closing_exception = exc | ||
if not self._closed.done(): | ||
self._closed.set_result(None) | ||
|
||
|
@@ -335,8 +338,11 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[ | |
if self._done_messages: | ||
message = await self._done_messages.popleft() | ||
else: | ||
if self.transport and self.transport.is_closing(): | ||
raise OSError("connection is already closed") | ||
if self._closed.done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm let me try that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it is ambiguous as to whether |
||
if self._closing_exception: | ||
raise self._closing_exception | ||
else: | ||
raise OSError("connection closed") | ||
read_waiter = asyncio.get_running_loop().create_future() | ||
self._pending_messages.append(read_waiter) | ||
try: | ||
|
@@ -474,6 +480,7 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None: | |
else: | ||
msg.set_exception(exc) | ||
self._done_messages.append(msg) | ||
self._pending_messages.clear() | ||
|
||
|
||
class PyMongoKMSProtocol(PyMongoBaseProtocol): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you merge backpressure? Originally I added the incorrect labels here. It should be "SystemOverloadedError" and "RetryableError"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done