Skip to content

ADR 021: add liveness_check_timeout driver config option #992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ Additional configuration can be provided via the :class:`neo4j.Driver` construct
+ :ref:`encrypted-ref`
+ :ref:`keep-alive-ref`
+ :ref:`max-connection-lifetime-ref`
+ :ref:`liveness-check-timeout-ref`
+ :ref:`max-connection-pool-size-ref`
+ :ref:`max-transaction-retry-time-ref`
+ :ref:`resolver-ref`
Expand Down Expand Up @@ -471,6 +472,33 @@ The maximum duration in seconds that the driver will keep a connection for befor
:Default: ``3600``


.. _liveness-check-timeout-ref:

``liveness_check_timeout``
--------------------------
Pooled connections that have been idle in the pool for longer than this timeout (specified in seconds) will be tested
before they are used again, to ensure they are still live.
If this option is set too low, additional network round trips will be incurred when acquiring a connection, which causes
a performance hit.

If this is set high, you may receive sessions that are backed by no longer live connections, which will lead to
exceptions in your application.
Assuming the database is running, these exceptions will go away if you retry or use a driver API with built-in retries.

Hence, this parameter tunes a balance between the likelihood of your application seeing connection problems, and
performance.

You normally should not need to tune this parameter.
No connection liveliness check is done by default (:data:`None`).
A value of ``0`` means connections will always be tested for validity.
Negative values are not allowed.

:Type: :class:`float` or :data:`None`
:Default: :data:`None`

.. versionadded:: 5.15


.. _max-connection-pool-size-ref:

``max_connection_pool_size``
Expand Down Expand Up @@ -533,8 +561,8 @@ For example:
resolver=custom_resolver)


:Type: ``Callable | None``
:Default: ``None``
:Type: ``Callable`` or :data:`None`
:Default: :data:`None`


.. _trust-ref:
Expand Down Expand Up @@ -619,7 +647,7 @@ custom ``ssl_context`` is configured.
--------------
Specify the client agent name.

:Type: ``str``
:Type: :class:`str`
:Default: *The Python Driver will generate a user agent name.*


Expand Down
38 changes: 22 additions & 16 deletions src/neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .._async_compat.util import AsyncUtil
from .._conf import (
Config,
ConfigurationError,
PoolConfig,
SessionConfig,
TrustAll,
Expand Down Expand Up @@ -125,11 +126,9 @@ def driver(
cls,
uri: str,
*,
auth: t.Union[
_TAuth,
AsyncAuthManager,
] = ...,
auth: t.Union[_TAuth, AsyncAuthManager] = ...,
max_connection_lifetime: float = ...,
liveness_check_timeout: t.Optional[float] = ...,
max_connection_pool_size: int = ...,
connection_timeout: float = ...,
trust: t.Union[
Expand All @@ -151,9 +150,10 @@ def driver(
notifications_disabled_categories: t.Optional[
t.Iterable[T_NotificationDisabledCategory]
] = ...,
telemetry_disabled: bool = ...,

# undocumented/unsupported options
# they may be change or removed any time without prior notice
# they may be changed or removed any time without prior notice
connection_acquisition_timeout: float = ...,
max_transaction_retry_time: float = ...,
initial_retry_delay: float = ...,
Expand All @@ -164,19 +164,17 @@ def driver(
impersonated_user: t.Optional[str] = ...,
bookmark_manager: t.Union[AsyncBookmarkManager,
BookmarkManager, None] = ...,
telemetry_disabled: bool = ...,
) -> AsyncDriver:
...

else:

@classmethod
def driver(
cls, uri: str, *,
auth: t.Union[
_TAuth,
AsyncAuthManager,
] = None,
cls,
uri: str,
*,
auth: t.Union[_TAuth, AsyncAuthManager] = None,
**config
) -> AsyncDriver:
"""Create a driver.
Expand All @@ -202,7 +200,6 @@ def driver(
TRUST_ALL_CERTIFICATES,
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES
):
from ..exceptions import ConfigurationError
raise ConfigurationError(
"The config setting `trust` values are {!r}"
.format(
Expand All @@ -216,8 +213,8 @@ def driver(
if ("trusted_certificates" in config.keys()
and not isinstance(config["trusted_certificates"],
TrustStore)):
raise ConnectionError(
"The config setting `trusted_certificates` must be of "
raise ConfigurationError(
'The config setting "trusted_certificates" must be of '
"type neo4j.TrustAll, neo4j.TrustCustomCAs, or"
"neo4j.TrustSystemCAs but was {}".format(
type(config["trusted_certificates"])
Expand All @@ -229,7 +226,6 @@ def driver(
or "trust" in config.keys()
or "trusted_certificates" in config.keys()
or "ssl_context" in config.keys())):
from ..exceptions import ConfigurationError

# TODO: 6.0 - remove "trust" from error message
raise ConfigurationError(
Expand Down Expand Up @@ -257,12 +253,22 @@ def driver(
config["encrypted"] = True
config["trusted_certificates"] = TrustAll()
_normalize_notifications_config(config)
liveness_check_timeout = config.get("liveness_check_timeout")
if (
liveness_check_timeout is not None
and liveness_check_timeout < 0
):
raise ConfigurationError(
'The config setting "liveness_check_timeout" must be '
"greater than or equal to 0 but was "
f"{liveness_check_timeout}."
)

assert driver_type in (DRIVER_BOLT, DRIVER_NEO4J)
if driver_type == DRIVER_BOLT:
if parse_routing_context(parsed.query):
deprecation_warn(
"Creating a direct driver (`bolt://` scheme) with "
'Creating a direct driver ("bolt://" scheme) with '
"routing context (URI parameters) is deprecated. They "
"will be ignored. This will raise an error in a "
'future release. Given URI "{}"'.format(uri),
Expand Down
14 changes: 7 additions & 7 deletions src/neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import typing as t
from collections import deque
from logging import getLogger
from time import perf_counter
from time import monotonic

from ..._api import TelemetryAPI
from ..._async_compat.network import AsyncBoltSocket
Expand Down Expand Up @@ -159,9 +159,9 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *,
self.hydration_handler = self.HYDRATION_HANDLER_CLS()
self.responses = deque()
self._max_connection_lifetime = max_connection_lifetime
self._creation_timestamp = perf_counter()
self._creation_timestamp = monotonic()
self.routing_context = routing_context
self.idle_since = perf_counter()
self.idle_since = monotonic()

# Determine the user agent
if user_agent:
Expand Down Expand Up @@ -797,7 +797,7 @@ def _append(self, signature, fields=(), response=None,

async def _send_all(self):
if await self.outbox.flush():
self.idle_since = perf_counter()
self.idle_since = monotonic()

async def send_all(self):
""" Send all queued messages to the server.
Expand Down Expand Up @@ -847,7 +847,7 @@ async def fetch_message(self):
hydration_hooks=self.responses[0].hydration_hooks
)
res = await self._process_message(tag, fields)
self.idle_since = perf_counter()
self.idle_since = monotonic()
return res

async def fetch_all(self):
Expand Down Expand Up @@ -928,7 +928,7 @@ async def _set_defunct(self, message, error=None, silent=False):
def stale(self):
return (self._stale
or (0 <= self._max_connection_lifetime
<= perf_counter() - self._creation_timestamp))
<= monotonic() - self._creation_timestamp))

_stale = False

Expand Down Expand Up @@ -983,7 +983,7 @@ def is_idle_for(self, timeout):

:rtype: bool
"""
return perf_counter() - self.idle_since > timeout
return monotonic() - self.idle_since > timeout


AsyncBoltSocket.Bolt = AsyncBolt # type: ignore
Expand Down
2 changes: 2 additions & 0 deletions src/neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ async def _acquire(
auth = AcquireAuth(None)
force_auth = auth.force_auth
auth = auth.auth
if liveness_check_timeout is None:
liveness_check_timeout = self.pool_config.liveness_check_timeout

async def health_check(connection_, deadline_):
if (connection_.closed()
Expand Down
6 changes: 3 additions & 3 deletions src/neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import typing as t
from logging import getLogger
from random import random
from time import perf_counter
from time import monotonic

from ..._api import TelemetryAPI
from ..._async_compat import async_sleep
Expand Down Expand Up @@ -570,8 +570,8 @@ def api_success_cb(meta):
return result
if t0 == -1:
# The timer should be started after the first attempt
t0 = perf_counter()
t1 = perf_counter()
t0 = monotonic()
t1 = monotonic()
if t1 - t0 > self._config.max_transaction_retry_time:
break
delay = next(retry_delay)
Expand Down
4 changes: 4 additions & 0 deletions src/neo4j/_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ class PoolConfig(Config):
max_connection_lifetime = 3600 # seconds
# The maximum duration the driver will keep a connection for before being removed from the pool.

#: Timeout after which idle connections will be checked for liveness
#: before returned from the pool.
liveness_check_timeout = None

#: Max Connection Pool Size
max_connection_pool_size = 100
# The maximum total number of connections allowed, per host (i.e. cluster nodes), to be managed by the connection pool.
Expand Down
6 changes: 3 additions & 3 deletions src/neo4j/_deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@


from contextlib import contextmanager
from time import perf_counter
from time import monotonic


class Deadline:
def __init__(self, timeout):
if timeout is None or timeout == float("inf"):
self._deadline = float("inf")
else:
self._deadline = perf_counter() + timeout
self._deadline = monotonic() + timeout
self._original_timeout = timeout

@property
Expand All @@ -36,7 +36,7 @@ def expired(self):
def to_timeout(self):
if self._deadline == float("inf"):
return None
timeout = self._deadline - perf_counter()
timeout = self._deadline - monotonic()
return timeout if timeout > 0 else 0

def __eq__(self, other):
Expand Down
10 changes: 5 additions & 5 deletions src/neo4j/_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from collections.abc import MutableSet
from logging import getLogger
from time import perf_counter
from time import monotonic

from .addressing import Address

Expand Down Expand Up @@ -106,7 +106,7 @@ def __init__(self, *, database, routers=(), readers=(), writers=(), ttl=0):
self.readers = OrderedSet(readers)
self.writers = OrderedSet(writers)
self.initialized_without_writers = not self.writers
self.last_updated_time = perf_counter()
self.last_updated_time = monotonic()
self.ttl = ttl
self.database = database

Expand All @@ -127,7 +127,7 @@ def is_fresh(self, readonly=False):
""" Indicator for whether routing information is still usable.
"""
assert isinstance(readonly, bool)
expired = self.last_updated_time + self.ttl <= perf_counter()
expired = self.last_updated_time + self.ttl <= monotonic()
if readonly:
has_server_for_mode = bool(self.readers)
else:
Expand All @@ -146,7 +146,7 @@ def should_be_purged_from_memory(self):
:rtype: bool
"""
from ._conf import RoutingConfig
perf_time = perf_counter()
perf_time = monotonic()
res = self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time
log.debug("[#0000] _: <ROUTING> purge check: "
"last_updated_time=%r, ttl=%r, perf_time=%r => %r",
Expand All @@ -161,7 +161,7 @@ def update(self, new_routing_table):
self.readers.replace(new_routing_table.readers)
self.writers.replace(new_routing_table.writers)
self.initialized_without_writers = not self.writers
self.last_updated_time = perf_counter()
self.last_updated_time = monotonic()
self.ttl = new_routing_table.ttl
log.debug("[#0000] _: <ROUTING> updated table=%r", self)

Expand Down
Loading