Skip to content

Commit e2448ec

Browse files
authored
ADR 021: add liveness_check_timeout driver config option (#992)
* ADR 021: add liveness_check_timeout driver config option * TestKit backend: handle GetConnectionPoolMetrics request * Skip TestKit test: accepted outstanding unification
1 parent 13e180f commit e2448ec

File tree

21 files changed

+386
-251
lines changed

21 files changed

+386
-251
lines changed

docs/source/api.rst

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ Additional configuration can be provided via the :class:`neo4j.Driver` construct
396396
+ :ref:`encrypted-ref`
397397
+ :ref:`keep-alive-ref`
398398
+ :ref:`max-connection-lifetime-ref`
399+
+ :ref:`liveness-check-timeout-ref`
399400
+ :ref:`max-connection-pool-size-ref`
400401
+ :ref:`max-transaction-retry-time-ref`
401402
+ :ref:`resolver-ref`
@@ -471,6 +472,33 @@ The maximum duration in seconds that the driver will keep a connection for befor
471472
:Default: ``3600``
472473

473474

475+
.. _liveness-check-timeout-ref:
476+
477+
``liveness_check_timeout``
478+
--------------------------
479+
Pooled connections that have been idle in the pool for longer than this timeout (specified in seconds) will be tested
480+
before they are used again, to ensure they are still live.
481+
If this option is set too low, additional network round trips will be incurred when acquiring a connection, which causes
482+
a performance hit.
483+
484+
If this is set high, you may receive sessions that are backed by no longer live connections, which will lead to
485+
exceptions in your application.
486+
Assuming the database is running, these exceptions will go away if you retry or use a driver API with built-in retries.
487+
488+
Hence, this parameter tunes a balance between the likelihood of your application seeing connection problems, and
489+
performance.
490+
491+
You normally should not need to tune this parameter.
492+
No connection liveliness check is done by default (:data:`None`).
493+
A value of ``0`` means connections will always be tested for validity.
494+
Negative values are not allowed.
495+
496+
:Type: :class:`float` or :data:`None`
497+
:Default: :data:`None`
498+
499+
.. versionadded:: 5.15
500+
501+
474502
.. _max-connection-pool-size-ref:
475503

476504
``max_connection_pool_size``
@@ -533,8 +561,8 @@ For example:
533561
resolver=custom_resolver)
534562
535563
536-
:Type: ``Callable | None``
537-
:Default: ``None``
564+
:Type: ``Callable`` or :data:`None`
565+
:Default: :data:`None`
538566

539567

540568
.. _trust-ref:
@@ -619,7 +647,7 @@ custom ``ssl_context`` is configured.
619647
--------------
620648
Specify the client agent name.
621649

622-
:Type: ``str``
650+
:Type: :class:`str`
623651
:Default: *The Python Driver will generate a user agent name.*
624652

625653

src/neo4j/_async/driver.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from .._async_compat.util import AsyncUtil
3737
from .._conf import (
3838
Config,
39+
ConfigurationError,
3940
PoolConfig,
4041
SessionConfig,
4142
TrustAll,
@@ -125,11 +126,9 @@ def driver(
125126
cls,
126127
uri: str,
127128
*,
128-
auth: t.Union[
129-
_TAuth,
130-
AsyncAuthManager,
131-
] = ...,
129+
auth: t.Union[_TAuth, AsyncAuthManager] = ...,
132130
max_connection_lifetime: float = ...,
131+
liveness_check_timeout: t.Optional[float] = ...,
133132
max_connection_pool_size: int = ...,
134133
connection_timeout: float = ...,
135134
trust: t.Union[
@@ -151,9 +150,10 @@ def driver(
151150
notifications_disabled_categories: t.Optional[
152151
t.Iterable[T_NotificationDisabledCategory]
153152
] = ...,
153+
telemetry_disabled: bool = ...,
154154

155155
# undocumented/unsupported options
156-
# they may be change or removed any time without prior notice
156+
# they may be changed or removed any time without prior notice
157157
connection_acquisition_timeout: float = ...,
158158
max_transaction_retry_time: float = ...,
159159
initial_retry_delay: float = ...,
@@ -164,19 +164,17 @@ def driver(
164164
impersonated_user: t.Optional[str] = ...,
165165
bookmark_manager: t.Union[AsyncBookmarkManager,
166166
BookmarkManager, None] = ...,
167-
telemetry_disabled: bool = ...,
168167
) -> AsyncDriver:
169168
...
170169

171170
else:
172171

173172
@classmethod
174173
def driver(
175-
cls, uri: str, *,
176-
auth: t.Union[
177-
_TAuth,
178-
AsyncAuthManager,
179-
] = None,
174+
cls,
175+
uri: str,
176+
*,
177+
auth: t.Union[_TAuth, AsyncAuthManager] = None,
180178
**config
181179
) -> AsyncDriver:
182180
"""Create a driver.
@@ -202,7 +200,6 @@ def driver(
202200
TRUST_ALL_CERTIFICATES,
203201
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES
204202
):
205-
from ..exceptions import ConfigurationError
206203
raise ConfigurationError(
207204
"The config setting `trust` values are {!r}"
208205
.format(
@@ -216,8 +213,8 @@ def driver(
216213
if ("trusted_certificates" in config.keys()
217214
and not isinstance(config["trusted_certificates"],
218215
TrustStore)):
219-
raise ConnectionError(
220-
"The config setting `trusted_certificates` must be of "
216+
raise ConfigurationError(
217+
'The config setting "trusted_certificates" must be of '
221218
"type neo4j.TrustAll, neo4j.TrustCustomCAs, or"
222219
"neo4j.TrustSystemCAs but was {}".format(
223220
type(config["trusted_certificates"])
@@ -229,7 +226,6 @@ def driver(
229226
or "trust" in config.keys()
230227
or "trusted_certificates" in config.keys()
231228
or "ssl_context" in config.keys())):
232-
from ..exceptions import ConfigurationError
233229

234230
# TODO: 6.0 - remove "trust" from error message
235231
raise ConfigurationError(
@@ -257,12 +253,22 @@ def driver(
257253
config["encrypted"] = True
258254
config["trusted_certificates"] = TrustAll()
259255
_normalize_notifications_config(config)
256+
liveness_check_timeout = config.get("liveness_check_timeout")
257+
if (
258+
liveness_check_timeout is not None
259+
and liveness_check_timeout < 0
260+
):
261+
raise ConfigurationError(
262+
'The config setting "liveness_check_timeout" must be '
263+
"greater than or equal to 0 but was "
264+
f"{liveness_check_timeout}."
265+
)
260266

261267
assert driver_type in (DRIVER_BOLT, DRIVER_NEO4J)
262268
if driver_type == DRIVER_BOLT:
263269
if parse_routing_context(parsed.query):
264270
deprecation_warn(
265-
"Creating a direct driver (`bolt://` scheme) with "
271+
'Creating a direct driver ("bolt://" scheme) with '
266272
"routing context (URI parameters) is deprecated. They "
267273
"will be ignored. This will raise an error in a "
268274
'future release. Given URI "{}"'.format(uri),

src/neo4j/_async/io/_bolt.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import typing as t
2222
from collections import deque
2323
from logging import getLogger
24-
from time import perf_counter
24+
from time import monotonic
2525

2626
from ..._api import TelemetryAPI
2727
from ..._async_compat.network import AsyncBoltSocket
@@ -159,9 +159,9 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *,
159159
self.hydration_handler = self.HYDRATION_HANDLER_CLS()
160160
self.responses = deque()
161161
self._max_connection_lifetime = max_connection_lifetime
162-
self._creation_timestamp = perf_counter()
162+
self._creation_timestamp = monotonic()
163163
self.routing_context = routing_context
164-
self.idle_since = perf_counter()
164+
self.idle_since = monotonic()
165165

166166
# Determine the user agent
167167
if user_agent:
@@ -797,7 +797,7 @@ def _append(self, signature, fields=(), response=None,
797797

798798
async def _send_all(self):
799799
if await self.outbox.flush():
800-
self.idle_since = perf_counter()
800+
self.idle_since = monotonic()
801801

802802
async def send_all(self):
803803
""" Send all queued messages to the server.
@@ -847,7 +847,7 @@ async def fetch_message(self):
847847
hydration_hooks=self.responses[0].hydration_hooks
848848
)
849849
res = await self._process_message(tag, fields)
850-
self.idle_since = perf_counter()
850+
self.idle_since = monotonic()
851851
return res
852852

853853
async def fetch_all(self):
@@ -928,7 +928,7 @@ async def _set_defunct(self, message, error=None, silent=False):
928928
def stale(self):
929929
return (self._stale
930930
or (0 <= self._max_connection_lifetime
931-
<= perf_counter() - self._creation_timestamp))
931+
<= monotonic() - self._creation_timestamp))
932932

933933
_stale = False
934934

@@ -983,7 +983,7 @@ def is_idle_for(self, timeout):
983983
984984
:rtype: bool
985985
"""
986-
return perf_counter() - self.idle_since > timeout
986+
return monotonic() - self.idle_since > timeout
987987

988988

989989
AsyncBoltSocket.Bolt = AsyncBolt # type: ignore

src/neo4j/_async/io/_pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ async def _acquire(
247247
auth = AcquireAuth(None)
248248
force_auth = auth.force_auth
249249
auth = auth.auth
250+
if liveness_check_timeout is None:
251+
liveness_check_timeout = self.pool_config.liveness_check_timeout
250252

251253
async def health_check(connection_, deadline_):
252254
if (connection_.closed()

src/neo4j/_async/work/session.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import typing as t
2121
from logging import getLogger
2222
from random import random
23-
from time import perf_counter
23+
from time import monotonic
2424

2525
from ..._api import TelemetryAPI
2626
from ..._async_compat import async_sleep
@@ -570,8 +570,8 @@ def api_success_cb(meta):
570570
return result
571571
if t0 == -1:
572572
# The timer should be started after the first attempt
573-
t0 = perf_counter()
574-
t1 = perf_counter()
573+
t0 = monotonic()
574+
t1 = monotonic()
575575
if t1 - t0 > self._config.max_transaction_retry_time:
576576
break
577577
delay = next(retry_delay)

src/neo4j/_conf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,10 @@ class PoolConfig(Config):
355355
max_connection_lifetime = 3600 # seconds
356356
# The maximum duration the driver will keep a connection for before being removed from the pool.
357357

358+
#: Timeout after which idle connections will be checked for liveness
359+
#: before returned from the pool.
360+
liveness_check_timeout = None
361+
358362
#: Max Connection Pool Size
359363
max_connection_pool_size = 100
360364
# The maximum total number of connections allowed, per host (i.e. cluster nodes), to be managed by the connection pool.

src/neo4j/_deadline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515

1616

1717
from contextlib import contextmanager
18-
from time import perf_counter
18+
from time import monotonic
1919

2020

2121
class Deadline:
2222
def __init__(self, timeout):
2323
if timeout is None or timeout == float("inf"):
2424
self._deadline = float("inf")
2525
else:
26-
self._deadline = perf_counter() + timeout
26+
self._deadline = monotonic() + timeout
2727
self._original_timeout = timeout
2828

2929
@property
@@ -36,7 +36,7 @@ def expired(self):
3636
def to_timeout(self):
3737
if self._deadline == float("inf"):
3838
return None
39-
timeout = self._deadline - perf_counter()
39+
timeout = self._deadline - monotonic()
4040
return timeout if timeout > 0 else 0
4141

4242
def __eq__(self, other):

src/neo4j/_routing.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from collections.abc import MutableSet
1818
from logging import getLogger
19-
from time import perf_counter
19+
from time import monotonic
2020

2121
from .addressing import Address
2222

@@ -106,7 +106,7 @@ def __init__(self, *, database, routers=(), readers=(), writers=(), ttl=0):
106106
self.readers = OrderedSet(readers)
107107
self.writers = OrderedSet(writers)
108108
self.initialized_without_writers = not self.writers
109-
self.last_updated_time = perf_counter()
109+
self.last_updated_time = monotonic()
110110
self.ttl = ttl
111111
self.database = database
112112

@@ -127,7 +127,7 @@ def is_fresh(self, readonly=False):
127127
""" Indicator for whether routing information is still usable.
128128
"""
129129
assert isinstance(readonly, bool)
130-
expired = self.last_updated_time + self.ttl <= perf_counter()
130+
expired = self.last_updated_time + self.ttl <= monotonic()
131131
if readonly:
132132
has_server_for_mode = bool(self.readers)
133133
else:
@@ -146,7 +146,7 @@ def should_be_purged_from_memory(self):
146146
:rtype: bool
147147
"""
148148
from ._conf import RoutingConfig
149-
perf_time = perf_counter()
149+
perf_time = monotonic()
150150
res = self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time
151151
log.debug("[#0000] _: <ROUTING> purge check: "
152152
"last_updated_time=%r, ttl=%r, perf_time=%r => %r",
@@ -161,7 +161,7 @@ def update(self, new_routing_table):
161161
self.readers.replace(new_routing_table.readers)
162162
self.writers.replace(new_routing_table.writers)
163163
self.initialized_without_writers = not self.writers
164-
self.last_updated_time = perf_counter()
164+
self.last_updated_time = monotonic()
165165
self.ttl = new_routing_table.ttl
166166
log.debug("[#0000] _: <ROUTING> updated table=%r", self)
167167

0 commit comments

Comments
 (0)