From 6116789d6bff17a4eef60473432e8035bfbb35f7 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Thu, 23 Nov 2023 18:43:51 +0100 Subject: [PATCH 1/3] ADR 021: add liveness_check_timeout driver config option --- docs/source/api.rst | 34 ++++++- src/neo4j/_async/driver.py | 38 ++++---- src/neo4j/_async/io/_bolt.py | 14 +-- src/neo4j/_async/io/_pool.py | 2 + src/neo4j/_async/work/session.py | 6 +- src/neo4j/_conf.py | 4 + src/neo4j/_deadline.py | 6 +- src/neo4j/_routing.py | 10 +- src/neo4j/_sync/driver.py | 38 ++++---- src/neo4j/_sync/io/_bolt.py | 14 +-- src/neo4j/_sync/io/_pool.py | 2 + src/neo4j/_sync/work/session.py | 6 +- testkitbackend/_async/requests.py | 11 ++- testkitbackend/_sync/requests.py | 11 ++- testkitbackend/test_config.json | 2 +- tests/unit/async_/io/test_direct.py | 141 ++++++++++++++-------------- tests/unit/async_/test_driver.py | 42 ++++++--- tests/unit/common/test_conf.py | 1 + tests/unit/mixed/io/test_direct.py | 36 ++++--- tests/unit/sync/io/test_direct.py | 141 ++++++++++++++-------------- tests/unit/sync/test_driver.py | 42 ++++++--- 21 files changed, 351 insertions(+), 250 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index 960f25e71..4acb349df 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -388,6 +388,7 @@ Additional configuration can be provided via the :class:`neo4j.Driver` construct + :ref:`encrypted-ref` + :ref:`keep-alive-ref` + :ref:`max-connection-lifetime-ref` ++ :ref:`liveness-check-timeout-ref` + :ref:`max-connection-pool-size-ref` + :ref:`max-transaction-retry-time-ref` + :ref:`resolver-ref` @@ -463,6 +464,33 @@ The maximum duration in seconds that the driver will keep a connection for befor :Default: ``3600`` +.. _liveness-check-timeout-ref: + +``liveness_check_timeout`` +-------------------------- +Pooled connections that have been idle in the pool for longer than this timeout (specified in seconds) will be tested +before they are used again, to ensure they are still live. +If this option is set too low, additional network round trips will be incurred when acquiring a connection, which causes +a performance hit. + +If this is set high, you may receive sessions that are backed by no longer live connections, which will lead to +exceptions in your application. +Assuming the database is running, these exceptions will go away if you retry or use a driver API with built-in retries. + +Hence, this parameter tunes a balance between the likelihood of your application seeing connection problems, and +performance. + +You normally should not need to tune this parameter. +No connection liveliness check is done by default (:data:`None`). +A value of ``0`` means connections will always be tested for validity. +Negative values are not allowed. + +:Type: :class:`float` or :data:`None` +:Default: :data:`None` + +.. versionadded:: 5.15 + + .. _max-connection-pool-size-ref: ``max_connection_pool_size`` @@ -525,8 +553,8 @@ For example: resolver=custom_resolver) -:Type: ``Callable | None`` -:Default: ``None`` +:Type: ``Callable`` or :data:`None` +:Default: :data:`None` .. _trust-ref: @@ -611,7 +639,7 @@ custom ``ssl_context`` is configured. -------------- Specify the client agent name. -:Type: ``str`` +:Type: :class:`str` :Default: *The Python Driver will generate a user agent name.* diff --git a/src/neo4j/_async/driver.py b/src/neo4j/_async/driver.py index 596c95c5a..7da63d2d0 100644 --- a/src/neo4j/_async/driver.py +++ b/src/neo4j/_async/driver.py @@ -38,6 +38,7 @@ from .._async_compat.util import AsyncUtil from .._conf import ( Config, + ConfigurationError, PoolConfig, SessionConfig, TrustAll, @@ -123,11 +124,9 @@ def driver( cls, uri: str, *, - auth: t.Union[ - _TAuth, - AsyncAuthManager, - ] = ..., + auth: t.Union[_TAuth, AsyncAuthManager] = ..., max_connection_lifetime: float = ..., + liveness_check_timeout: t.Optional[float] = ..., max_connection_pool_size: int = ..., connection_timeout: float = ..., trust: t.Union[ @@ -149,9 +148,10 @@ def driver( notifications_disabled_categories: t.Optional[ t.Iterable[T_NotificationDisabledCategory] ] = ..., + telemetry_disabled: bool = ..., # undocumented/unsupported options - # they may be change or removed any time without prior notice + # they may be changed or removed any time without prior notice connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., initial_retry_delay: float = ..., @@ -162,7 +162,6 @@ def driver( impersonated_user: t.Optional[str] = ..., bookmark_manager: t.Union[AsyncBookmarkManager, BookmarkManager, None] = ..., - telemetry_disabled: bool = ..., ) -> AsyncDriver: ... @@ -170,11 +169,10 @@ def driver( @classmethod def driver( - cls, uri: str, *, - auth: t.Union[ - _TAuth, - AsyncAuthManager, - ] = None, + cls, + uri: str, + *, + auth: t.Union[_TAuth, AsyncAuthManager] = None, **config ) -> AsyncDriver: """Create a driver. @@ -200,7 +198,6 @@ def driver( TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES ): - from ..exceptions import ConfigurationError raise ConfigurationError( "The config setting `trust` values are {!r}" .format( @@ -214,8 +211,8 @@ def driver( if ("trusted_certificates" in config.keys() and not isinstance(config["trusted_certificates"], TrustStore)): - raise ConnectionError( - "The config setting `trusted_certificates` must be of " + raise ConfigurationError( + 'The config setting "trusted_certificates" must be of ' "type neo4j.TrustAll, neo4j.TrustCustomCAs, or" "neo4j.TrustSystemCAs but was {}".format( type(config["trusted_certificates"]) @@ -227,7 +224,6 @@ def driver( or "trust" in config.keys() or "trusted_certificates" in config.keys() or "ssl_context" in config.keys())): - from ..exceptions import ConfigurationError # TODO: 6.0 - remove "trust" from error message raise ConfigurationError( @@ -255,12 +251,22 @@ def driver( config["encrypted"] = True config["trusted_certificates"] = TrustAll() _normalize_notifications_config(config) + liveness_check_timeout = config.get("liveness_check_timeout") + if ( + liveness_check_timeout is not None + and liveness_check_timeout < 0 + ): + raise ConfigurationError( + 'The config setting "liveness_check_timeout" must be ' + "greater than or equal to 0 but was " + f"{liveness_check_timeout}." + ) assert driver_type in (DRIVER_BOLT, DRIVER_NEO4J) if driver_type == DRIVER_BOLT: if parse_routing_context(parsed.query): deprecation_warn( - "Creating a direct driver (`bolt://` scheme) with " + 'Creating a direct driver ("bolt://" scheme) with ' "routing context (URI parameters) is deprecated. They " "will be ignored. This will raise an error in a " 'future release. Given URI "{}"'.format(uri), diff --git a/src/neo4j/_async/io/_bolt.py b/src/neo4j/_async/io/_bolt.py index afff0f257..294e6104a 100644 --- a/src/neo4j/_async/io/_bolt.py +++ b/src/neo4j/_async/io/_bolt.py @@ -23,7 +23,7 @@ import typing as t from collections import deque from logging import getLogger -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat.network import AsyncBoltSocket @@ -161,9 +161,9 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, self.hydration_handler = self.HYDRATION_HANDLER_CLS() self.responses = deque() self._max_connection_lifetime = max_connection_lifetime - self._creation_timestamp = perf_counter() + self._creation_timestamp = monotonic() self.routing_context = routing_context - self.idle_since = perf_counter() + self.idle_since = monotonic() # Determine the user agent if user_agent: @@ -799,7 +799,7 @@ def _append(self, signature, fields=(), response=None, async def _send_all(self): if await self.outbox.flush(): - self.idle_since = perf_counter() + self.idle_since = monotonic() async def send_all(self): """ Send all queued messages to the server. @@ -849,7 +849,7 @@ async def fetch_message(self): hydration_hooks=self.responses[0].hydration_hooks ) res = await self._process_message(tag, fields) - self.idle_since = perf_counter() + self.idle_since = monotonic() return res async def fetch_all(self): @@ -930,7 +930,7 @@ async def _set_defunct(self, message, error=None, silent=False): def stale(self): return (self._stale or (0 <= self._max_connection_lifetime - <= perf_counter() - self._creation_timestamp)) + <= monotonic() - self._creation_timestamp)) _stale = False @@ -985,7 +985,7 @@ def is_idle_for(self, timeout): :rtype: bool """ - return perf_counter() - self.idle_since > timeout + return monotonic() - self.idle_since > timeout AsyncBoltSocket.Bolt = AsyncBolt # type: ignore diff --git a/src/neo4j/_async/io/_pool.py b/src/neo4j/_async/io/_pool.py index 0471c92fe..ad2786480 100644 --- a/src/neo4j/_async/io/_pool.py +++ b/src/neo4j/_async/io/_pool.py @@ -249,6 +249,8 @@ async def _acquire( auth = AcquireAuth(None) force_auth = auth.force_auth auth = auth.auth + if liveness_check_timeout is None: + liveness_check_timeout = self.pool_config.liveness_check_timeout async def health_check(connection_, deadline_): if (connection_.closed() diff --git a/src/neo4j/_async/work/session.py b/src/neo4j/_async/work/session.py index 4dd4bfef5..7a7aa4ddc 100644 --- a/src/neo4j/_async/work/session.py +++ b/src/neo4j/_async/work/session.py @@ -22,7 +22,7 @@ import typing as t from logging import getLogger from random import random -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat import async_sleep @@ -567,8 +567,8 @@ def api_success_cb(meta): return result if t0 == -1: # The timer should be started after the first attempt - t0 = perf_counter() - t1 = perf_counter() + t0 = monotonic() + t1 = monotonic() if t1 - t0 > self._config.max_transaction_retry_time: break delay = next(retry_delay) diff --git a/src/neo4j/_conf.py b/src/neo4j/_conf.py index e17b9ef36..9d235f6a6 100644 --- a/src/neo4j/_conf.py +++ b/src/neo4j/_conf.py @@ -357,6 +357,10 @@ class PoolConfig(Config): max_connection_lifetime = 3600 # seconds # The maximum duration the driver will keep a connection for before being removed from the pool. + #: Timeout after which idle connections will be checked for liveness + #: before returned from the pool. + liveness_check_timeout = None + #: Max Connection Pool Size max_connection_pool_size = 100 # The maximum total number of connections allowed, per host (i.e. cluster nodes), to be managed by the connection pool. diff --git a/src/neo4j/_deadline.py b/src/neo4j/_deadline.py index 560e55a54..e58864991 100644 --- a/src/neo4j/_deadline.py +++ b/src/neo4j/_deadline.py @@ -17,7 +17,7 @@ from contextlib import contextmanager -from time import perf_counter +from time import monotonic class Deadline: @@ -25,7 +25,7 @@ def __init__(self, timeout): if timeout is None or timeout == float("inf"): self._deadline = float("inf") else: - self._deadline = perf_counter() + timeout + self._deadline = monotonic() + timeout self._original_timeout = timeout @property @@ -38,7 +38,7 @@ def expired(self): def to_timeout(self): if self._deadline == float("inf"): return None - timeout = self._deadline - perf_counter() + timeout = self._deadline - monotonic() return timeout if timeout > 0 else 0 def __eq__(self, other): diff --git a/src/neo4j/_routing.py b/src/neo4j/_routing.py index 7d0a020e9..042fcbdae 100644 --- a/src/neo4j/_routing.py +++ b/src/neo4j/_routing.py @@ -18,7 +18,7 @@ from collections.abc import MutableSet from logging import getLogger -from time import perf_counter +from time import monotonic from .addressing import Address @@ -108,7 +108,7 @@ def __init__(self, *, database, routers=(), readers=(), writers=(), ttl=0): self.readers = OrderedSet(readers) self.writers = OrderedSet(writers) self.initialized_without_writers = not self.writers - self.last_updated_time = perf_counter() + self.last_updated_time = monotonic() self.ttl = ttl self.database = database @@ -129,7 +129,7 @@ def is_fresh(self, readonly=False): """ Indicator for whether routing information is still usable. """ assert isinstance(readonly, bool) - expired = self.last_updated_time + self.ttl <= perf_counter() + expired = self.last_updated_time + self.ttl <= monotonic() if readonly: has_server_for_mode = bool(self.readers) else: @@ -148,7 +148,7 @@ def should_be_purged_from_memory(self): :rtype: bool """ from ._conf import RoutingConfig - perf_time = perf_counter() + perf_time = monotonic() res = self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time log.debug("[#0000] _: purge check: " "last_updated_time=%r, ttl=%r, perf_time=%r => %r", @@ -163,7 +163,7 @@ def update(self, new_routing_table): self.readers.replace(new_routing_table.readers) self.writers.replace(new_routing_table.writers) self.initialized_without_writers = not self.writers - self.last_updated_time = perf_counter() + self.last_updated_time = monotonic() self.ttl = new_routing_table.ttl log.debug("[#0000] _: updated table=%r", self) diff --git a/src/neo4j/_sync/driver.py b/src/neo4j/_sync/driver.py index 44f388996..b7f29ca74 100644 --- a/src/neo4j/_sync/driver.py +++ b/src/neo4j/_sync/driver.py @@ -38,6 +38,7 @@ from .._async_compat.util import Util from .._conf import ( Config, + ConfigurationError, PoolConfig, SessionConfig, TrustAll, @@ -122,11 +123,9 @@ def driver( cls, uri: str, *, - auth: t.Union[ - _TAuth, - AuthManager, - ] = ..., + auth: t.Union[_TAuth, AuthManager] = ..., max_connection_lifetime: float = ..., + liveness_check_timeout: t.Optional[float] = ..., max_connection_pool_size: int = ..., connection_timeout: float = ..., trust: t.Union[ @@ -148,9 +147,10 @@ def driver( notifications_disabled_categories: t.Optional[ t.Iterable[T_NotificationDisabledCategory] ] = ..., + telemetry_disabled: bool = ..., # undocumented/unsupported options - # they may be change or removed any time without prior notice + # they may be changed or removed any time without prior notice connection_acquisition_timeout: float = ..., max_transaction_retry_time: float = ..., initial_retry_delay: float = ..., @@ -161,7 +161,6 @@ def driver( impersonated_user: t.Optional[str] = ..., bookmark_manager: t.Union[BookmarkManager, BookmarkManager, None] = ..., - telemetry_disabled: bool = ..., ) -> Driver: ... @@ -169,11 +168,10 @@ def driver( @classmethod def driver( - cls, uri: str, *, - auth: t.Union[ - _TAuth, - AuthManager, - ] = None, + cls, + uri: str, + *, + auth: t.Union[_TAuth, AuthManager] = None, **config ) -> Driver: """Create a driver. @@ -199,7 +197,6 @@ def driver( TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES ): - from ..exceptions import ConfigurationError raise ConfigurationError( "The config setting `trust` values are {!r}" .format( @@ -213,8 +210,8 @@ def driver( if ("trusted_certificates" in config.keys() and not isinstance(config["trusted_certificates"], TrustStore)): - raise ConnectionError( - "The config setting `trusted_certificates` must be of " + raise ConfigurationError( + 'The config setting "trusted_certificates" must be of ' "type neo4j.TrustAll, neo4j.TrustCustomCAs, or" "neo4j.TrustSystemCAs but was {}".format( type(config["trusted_certificates"]) @@ -226,7 +223,6 @@ def driver( or "trust" in config.keys() or "trusted_certificates" in config.keys() or "ssl_context" in config.keys())): - from ..exceptions import ConfigurationError # TODO: 6.0 - remove "trust" from error message raise ConfigurationError( @@ -254,12 +250,22 @@ def driver( config["encrypted"] = True config["trusted_certificates"] = TrustAll() _normalize_notifications_config(config) + liveness_check_timeout = config.get("liveness_check_timeout") + if ( + liveness_check_timeout is not None + and liveness_check_timeout < 0 + ): + raise ConfigurationError( + 'The config setting "liveness_check_timeout" must be ' + "greater than or equal to 0 but was " + f"{liveness_check_timeout}." + ) assert driver_type in (DRIVER_BOLT, DRIVER_NEO4J) if driver_type == DRIVER_BOLT: if parse_routing_context(parsed.query): deprecation_warn( - "Creating a direct driver (`bolt://` scheme) with " + 'Creating a direct driver ("bolt://" scheme) with ' "routing context (URI parameters) is deprecated. They " "will be ignored. This will raise an error in a " 'future release. Given URI "{}"'.format(uri), diff --git a/src/neo4j/_sync/io/_bolt.py b/src/neo4j/_sync/io/_bolt.py index 22c1efcaa..2b8576de2 100644 --- a/src/neo4j/_sync/io/_bolt.py +++ b/src/neo4j/_sync/io/_bolt.py @@ -23,7 +23,7 @@ import typing as t from collections import deque from logging import getLogger -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat.network import BoltSocket @@ -161,9 +161,9 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, self.hydration_handler = self.HYDRATION_HANDLER_CLS() self.responses = deque() self._max_connection_lifetime = max_connection_lifetime - self._creation_timestamp = perf_counter() + self._creation_timestamp = monotonic() self.routing_context = routing_context - self.idle_since = perf_counter() + self.idle_since = monotonic() # Determine the user agent if user_agent: @@ -799,7 +799,7 @@ def _append(self, signature, fields=(), response=None, def _send_all(self): if self.outbox.flush(): - self.idle_since = perf_counter() + self.idle_since = monotonic() def send_all(self): """ Send all queued messages to the server. @@ -849,7 +849,7 @@ def fetch_message(self): hydration_hooks=self.responses[0].hydration_hooks ) res = self._process_message(tag, fields) - self.idle_since = perf_counter() + self.idle_since = monotonic() return res def fetch_all(self): @@ -930,7 +930,7 @@ def _set_defunct(self, message, error=None, silent=False): def stale(self): return (self._stale or (0 <= self._max_connection_lifetime - <= perf_counter() - self._creation_timestamp)) + <= monotonic() - self._creation_timestamp)) _stale = False @@ -985,7 +985,7 @@ def is_idle_for(self, timeout): :rtype: bool """ - return perf_counter() - self.idle_since > timeout + return monotonic() - self.idle_since > timeout BoltSocket.Bolt = Bolt # type: ignore diff --git a/src/neo4j/_sync/io/_pool.py b/src/neo4j/_sync/io/_pool.py index 2964a7ef5..6b45875b8 100644 --- a/src/neo4j/_sync/io/_pool.py +++ b/src/neo4j/_sync/io/_pool.py @@ -246,6 +246,8 @@ def _acquire( auth = AcquireAuth(None) force_auth = auth.force_auth auth = auth.auth + if liveness_check_timeout is None: + liveness_check_timeout = self.pool_config.liveness_check_timeout def health_check(connection_, deadline_): if (connection_.closed() diff --git a/src/neo4j/_sync/work/session.py b/src/neo4j/_sync/work/session.py index d6b57240d..72773c845 100644 --- a/src/neo4j/_sync/work/session.py +++ b/src/neo4j/_sync/work/session.py @@ -22,7 +22,7 @@ import typing as t from logging import getLogger from random import random -from time import perf_counter +from time import monotonic from ..._api import TelemetryAPI from ..._async_compat import sleep @@ -567,8 +567,8 @@ def api_success_cb(meta): return result if t0 == -1: # The timer should be started after the first attempt - t0 = perf_counter() - t1 = perf_counter() + t0 = monotonic() + t1 = monotonic() if t1 - t0 > self._config.max_transaction_retry_time: break delay = next(retry_delay) diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index 853d5d7a0..512819e62 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -40,10 +40,6 @@ test_subtest_skips, totestkit, ) -from .._warning_check import ( - warning_check, - warnings_check, -) from ..exceptions import MarkdAsDriverException @@ -121,6 +117,7 @@ async def NewDriver(backend, data): ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), + ("livenessCheckTimeoutMs", "liveness_check_timeout"), ): if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 @@ -149,7 +146,6 @@ async def NewDriver(backend, data): for cert in data["trustedCertificates"]) kwargs["trusted_certificates"] = neo4j.TrustCustomCAs(*cert_paths) fromtestkit.set_notifications_config(kwargs, data) - data.mark_item_as_read_if_equals("livenessCheckTimeoutMs", None) driver = neo4j.AsyncGraphDatabase.driver( data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs, @@ -372,6 +368,9 @@ async def ExecuteQuery(backend, data): else: bookmark_manager = backend.bookmark_managers[bookmark_manager_id] kwargs["bookmark_manager_"] = bookmark_manager + if "authorizationToken" in config: + kwargs["auth_"] = fromtestkit.to_auth_token(config, + "authorizationToken") eager_result = await driver.execute_query(cypher, params, **kwargs) await backend.send_response("EagerResult", { @@ -793,6 +792,7 @@ async def GetRoutingTable(backend, data): response_data[role] = list(map(str, addresses)) await backend.send_response("RoutingTable", response_data) + async def FakeTimeInstall(backend, _data): assert backend.fake_time is None assert backend.fake_time_ticker is None @@ -801,6 +801,7 @@ async def FakeTimeInstall(backend, _data): backend.fake_time_ticker = backend.fake_time.start() await backend.send_response("FakeTimeAck", {}) + async def FakeTimeTick(backend, data): assert backend.fake_time is not None assert backend.fake_time_ticker is not None diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index 620ca476b..f5e395b08 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -40,10 +40,6 @@ test_subtest_skips, totestkit, ) -from .._warning_check import ( - warning_check, - warnings_check, -) from ..exceptions import MarkdAsDriverException @@ -121,6 +117,7 @@ def NewDriver(backend, data): ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), + ("livenessCheckTimeoutMs", "liveness_check_timeout"), ): if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 @@ -149,7 +146,6 @@ def NewDriver(backend, data): for cert in data["trustedCertificates"]) kwargs["trusted_certificates"] = neo4j.TrustCustomCAs(*cert_paths) fromtestkit.set_notifications_config(kwargs, data) - data.mark_item_as_read_if_equals("livenessCheckTimeoutMs", None) driver = neo4j.GraphDatabase.driver( data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs, @@ -372,6 +368,9 @@ def ExecuteQuery(backend, data): else: bookmark_manager = backend.bookmark_managers[bookmark_manager_id] kwargs["bookmark_manager_"] = bookmark_manager + if "authorizationToken" in config: + kwargs["auth_"] = fromtestkit.to_auth_token(config, + "authorizationToken") eager_result = driver.execute_query(cypher, params, **kwargs) backend.send_response("EagerResult", { @@ -793,6 +792,7 @@ def GetRoutingTable(backend, data): response_data[role] = list(map(str, addresses)) backend.send_response("RoutingTable", response_data) + def FakeTimeInstall(backend, _data): assert backend.fake_time is None assert backend.fake_time_ticker is None @@ -801,6 +801,7 @@ def FakeTimeInstall(backend, _data): backend.fake_time_ticker = backend.fake_time.start() backend.send_response("FakeTimeAck", {}) + def FakeTimeTick(backend, data): assert backend.fake_time is not None assert backend.fake_time_ticker is not None diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 92af6d4ab..cfe31b1bf 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -25,7 +25,7 @@ "Feature:API:Driver.VerifyConnectivity": true, "Feature:API:Driver.SupportsSessionAuth": true, "Feature:API:Driver:NotificationsConfig": true, - "Feature:API:Liveness.Check": false, + "Feature:API:Liveness.Check": true, "Feature:API:Result.List": true, "Feature:API:Result.Peek": true, "Feature:API:Result.Single": true, diff --git a/tests/unit/async_/io/test_direct.py b/tests/unit/async_/io/test_direct.py index e9007d9f1..8d79e7ce1 100644 --- a/tests/unit/async_/io/test_direct.py +++ b/tests/unit/async_/io/test_direct.py @@ -36,67 +36,23 @@ from ...._async_compat import mark_async_test -class AsyncFakeSocket: - def __init__(self, address): - self.address = address - - def getpeername(self): - return self.address - - async def sendall(self, data): - return - - def close(self): - return - - -class AsyncQuickConnection: - def __init__(self, socket): - self.socket = socket - self.address = socket.getpeername() - self.local_port = self.address[1] - self.connection_id = "bolt-1234" - - @property - def is_reset(self): - return True - - def stale(self): - return False - - async def reset(self): - pass - - def re_auth(self, auth, auth_manager, force=False): - return False - - def close(self): - self.socket.close() - - def closed(self): - return False - - def defunct(self): - return False - - def timedout(self): - return False - - def assert_re_auth_support(self): - pass - - class AsyncFakeBoltPool(AsyncIOPool): is_direct_pool = False - def __init__(self, address, *, auth=None, **config): + def __init__(self, connection_gen, address, *, auth=None, **config): + self.buffered_connection_mocks = [] config["auth"] = static_auth(None) self.pool_config, self.workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) if config: raise ValueError("Unexpected config keys: %s" % ", ".join(config.keys())) async def opener(addr, auth, timeout): - return AsyncQuickConnection(AsyncFakeSocket(addr)) + if self.buffered_connection_mocks: + mock = self.buffered_connection_mocks.pop() + else: + mock = connection_gen() + mock.address = addr + return mock super().__init__(opener, self.pool_config, self.workspace_config) self.address = address @@ -149,12 +105,14 @@ async def test_bolt_connection_ping_timeout(): @pytest.fixture -async def pool(): - async with AsyncFakeBoltPool(("127.0.0.1", 7687)) as pool: +async def pool(async_fake_connection_generator): + async with AsyncFakeBoltPool( + async_fake_connection_generator, ("127.0.0.1", 7687) + ) as pool: yield pool -def assert_pool_size( address, expected_active, expected_inactive, pool): +def assert_pool_size(address, expected_active, expected_inactive, pool): try: connections = pool.connections[address] except KeyError: @@ -227,8 +185,10 @@ async def test_pool_in_use_count(pool): @mark_async_test -async def test_pool_max_conn_pool_size(pool): - async with AsyncFakeBoltPool((), max_connection_pool_size=1) as pool: +async def test_pool_max_conn_pool_size(async_fake_connection_generator): + async with AsyncFakeBoltPool( + async_fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: address = neo4j.Address(("127.0.0.1", 7687)) await pool._acquire(address, None, Deadline(0), None) assert pool.in_use_connection_count(address) == 1 @@ -239,22 +199,67 @@ async def test_pool_max_conn_pool_size(pool): @pytest.mark.parametrize("is_reset", (True, False)) @mark_async_test -async def test_pool_reset_when_released(is_reset, pool, mocker): +async def test_pool_reset_when_released( + is_reset, pool, async_fake_connection_generator +): + connection_mock = async_fake_connection_generator() + pool.buffered_connection_mocks.append(connection_mock) address = neo4j.Address(("127.0.0.1", 7687)) - quick_connection_name = AsyncQuickConnection.__name__ - is_reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.is_reset", - new_callable=mocker.PropertyMock - ) - reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.reset", - new_callable=mocker.AsyncMock - ) + is_reset_mock = connection_mock.is_reset_mock + reset_mock = connection_mock.reset is_reset_mock.return_value = is_reset connection = await pool._acquire(address, None, Deadline(3), None) - assert isinstance(connection, AsyncQuickConnection) assert is_reset_mock.call_count == 0 assert reset_mock.call_count == 0 await pool.release(connection) assert is_reset_mock.call_count == 1 assert reset_mock.call_count == int(not is_reset) + + +@pytest.mark.parametrize("config_timeout", (None, 0, 0.2, 1234)) +@pytest.mark.parametrize("acquire_timeout", (None, 0, 0.2, 1234)) +@mark_async_test +async def test_liveness_check( + config_timeout, acquire_timeout, async_fake_connection_generator +): + effective_timeout = config_timeout + if acquire_timeout is not None: + effective_timeout = acquire_timeout + async with AsyncFakeBoltPool( + async_fake_connection_generator, ("127.0.0.1", 7687), + liveness_check_timeout=config_timeout, + ) as pool: + address = neo4j.Address(("127.0.0.1", 7687)) + # pre-populate pool + cx1 = await pool._acquire(address, None, Deadline(3), None) + await pool.release(cx1) + cx1.reset.assert_not_called() + cx1.is_idle_for.assert_not_called() + + # simulate just before timeout + cx1.is_idle_for.return_value = False + + cx2 = await pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + else: + cx1.is_idle_for.assert_not_called() + await pool.release(cx1) + cx1.reset.assert_not_called() + + # simulate after timeout + cx1.is_idle_for.return_value = True + cx1.is_idle_for.reset_mock() + + cx2 = await pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + cx1.reset.assert_awaited_once() + else: + cx1.is_idle_for.assert_not_called() + cx1.reset.assert_not_called() + cx1.reset.reset_mock() + await pool.release(cx1) + cx1.reset.assert_not_called() diff --git a/tests/unit/async_/test_driver.py b/tests/unit/async_/test_driver.py index 432aa68f6..4473d34be 100644 --- a/tests/unit/async_/test_driver.py +++ b/tests/unit/async_/test_driver.py @@ -118,48 +118,48 @@ async def test_routing_driver_constructor(protocol, host, port, params, auth_tok @pytest.mark.parametrize( ("test_config", "expected_failure", "expected_failure_message"), ( - ({"encrypted": False}, ConfigurationError, "The config settings"), - ({"encrypted": True}, ConfigurationError, "The config settings"), + ({"encrypted": False}, ConfigurationError, '"encrypted"'), + ({"encrypted": True}, ConfigurationError, '"encrypted"'), ( {"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"encrypted": True, "trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustSystemCAs()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustCustomCAs("foo", "bar")}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"ssl_context": None}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ( {"ssl_context": ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ) ) @mark_async_test -async def test_driver_config_error( +async def test_driver_config_error_uri_conflict( test_uri, test_config, expected_failure, expected_failure_message ): def driver_builder(): @@ -170,7 +170,7 @@ def driver_builder(): return AsyncGraphDatabase.driver(test_uri, **test_config) if "+" in test_uri: - # `+s` and `+ssc` are short hand syntax for not having to configure the + # `+s` and `+ssc` are shorthand syntax for not having to configure the # encryption behavior of the driver. Specifying both is invalid. with pytest.raises(expected_failure, match=expected_failure_message): driver_builder() @@ -204,6 +204,22 @@ def test_driver_trust_config_error( AsyncGraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) +@pytest.mark.parametrize( + ("test_config", "expected_failure", "expected_failure_message"), + ( + ( + {"liveness_check_timeout": -1}, + ConfigurationError, '"liveness_check_timeout"' + ), + ) +) +def test_driver_liveness_timeout_config_error( + test_config, expected_failure, expected_failure_message +): + with pytest.raises(expected_failure, match=expected_failure_message): + AsyncGraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) + + @pytest.mark.parametrize("uri", ( "bolt://127.0.0.1:9000", "neo4j://127.0.0.1:9000", diff --git a/tests/unit/common/test_conf.py b/tests/unit/common/test_conf.py index 52e1def86..0f465a903 100644 --- a/tests/unit/common/test_conf.py +++ b/tests/unit/common/test_conf.py @@ -49,6 +49,7 @@ "connection_timeout": 30.0, "keep_alive": True, "max_connection_lifetime": 3600, + "liveness_check_timeout": None, "max_connection_pool_size": 100, "resolver": None, "encrypted": False, diff --git a/tests/unit/mixed/io/test_direct.py b/tests/unit/mixed/io/test_direct.py index 04e6b98db..d2e8bed89 100644 --- a/tests/unit/mixed/io/test_direct.py +++ b/tests/unit/mixed/io/test_direct.py @@ -31,15 +31,13 @@ from neo4j._async.io._pool import AcquireAuth as AsyncAcquireAuth from neo4j._deadline import Deadline from neo4j._sync.io._pool import AcquireAuth -from neo4j.auth_management import ( - AsyncAuthManagers, - AuthManagers, -) +from ...async_.conftest import async_fake_connection_generator from ...async_.io.test_direct import AsyncFakeBoltPool from ...async_.test_auth_manager import ( static_auth_manager as static_async_auth_manager, ) +from ...sync.conftest import fake_connection_generator from ...sync.io.test_direct import FakeBoltPool from ...sync.test_auth_manager import static_auth_manager from ._common import ( @@ -63,7 +61,7 @@ def assert_pool_size(self, address, expected_active, expected_inactive, == len([cx for cx in connections if not cx.in_use])) @pytest.mark.parametrize("pre_populated", (0, 3, 5)) - def test_multithread(self, pre_populated): + def test_multithread(self, pre_populated, fake_connection_generator): connections_lock = Lock() connections = [] pre_populated_connections = [] @@ -79,7 +77,9 @@ def acquire_release_conn(pool_, address_, acquired_counter_, release_event_.wait() pool_.release(conn_) - with FakeBoltPool((), max_connection_pool_size=5) as pool: + with FakeBoltPool( + fake_connection_generator, (), max_connection_pool_size=5 + ) as pool: address = ("127.0.0.1", 7687) acquired_counter = MultiEvent() release_event = Event() @@ -123,7 +123,7 @@ def acquire_release_conn(pool_, address_, acquired_counter_, # The pool size is still 5, but all are free self.assert_pool_size(address, 0, 5, pool) - def test_full_pool_re_auth(self, mocker): + def test_full_pool_re_auth(self, fake_connection_generator, mocker): address = ("127.0.0.1", 7687) acquire_auth1 = AcquireAuth(auth=static_auth_manager( ("user1", "pass1")) @@ -147,7 +147,6 @@ def acquire1(pool_): if waiters: break time.sleep(0.001) - cx.re_auth = mocker.Mock(spec=cx.re_auth) pool_.release(cx) def acquire2(pool_): @@ -158,7 +157,9 @@ def acquire2(pool_): assert auth2 in cx.re_auth.call_args.args pool_.release(cx) - with FakeBoltPool((), max_connection_pool_size=1) as pool: + with FakeBoltPool( + fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: t1 = threading.Thread(target=acquire1, args=(pool,), daemon=True) t2 = threading.Thread(target=acquire2, args=(pool,), daemon=True) t1.start() @@ -168,7 +169,9 @@ def acquire2(pool_): @pytest.mark.parametrize("pre_populated", (0, 3, 5)) @pytest.mark.asyncio - async def test_multi_coroutine(self, pre_populated): + async def test_multi_coroutine( + self, pre_populated, async_fake_connection_generator + ): connections = [] pre_populated_connections = [] @@ -202,7 +205,9 @@ async def waiter(pool_, acquired_counter_, release_event_): # The pool size is still 5, but all are free self.assert_pool_size(address, 0, 5, pool_) - async with AsyncFakeBoltPool((), max_connection_pool_size=5) as pool: + async with AsyncFakeBoltPool( + async_fake_connection_generator, (), max_connection_pool_size=5 + ) as pool: address = ("127.0.0.1", 7687) acquired_counter = AsyncMultiEvent() release_event = AsyncEvent() @@ -229,7 +234,9 @@ async def waiter(pool_, acquired_counter_, release_event_): ) @pytest.mark.asyncio - async def test_full_pool_re_auth_async(self, mocker): + async def test_full_pool_re_auth_async( + self, async_fake_connection_generator, mocker + ): address = ("127.0.0.1", 7687) acquire_auth1 = AsyncAcquireAuth(auth=static_async_auth_manager( ("user1", "pass1")) @@ -244,7 +251,6 @@ async def acquire1(pool_): cx1 = cx while len(pool_.cond._waiters) == 0: await asyncio.sleep(0) - cx.re_auth = mocker.Mock(spec=cx.re_auth) await pool_.release(cx) async def acquire2(pool_): @@ -257,5 +263,7 @@ async def acquire2(pool_): assert auth2 in cx.re_auth.call_args.args await pool_.release(cx) - async with AsyncFakeBoltPool((), max_connection_pool_size=1) as pool: + async with AsyncFakeBoltPool( + async_fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: await asyncio.gather(acquire1(pool), acquire2(pool)) diff --git a/tests/unit/sync/io/test_direct.py b/tests/unit/sync/io/test_direct.py index 738aed293..79dafc4d1 100644 --- a/tests/unit/sync/io/test_direct.py +++ b/tests/unit/sync/io/test_direct.py @@ -36,67 +36,23 @@ from ...._async_compat import mark_sync_test -class FakeSocket: - def __init__(self, address): - self.address = address - - def getpeername(self): - return self.address - - def sendall(self, data): - return - - def close(self): - return - - -class QuickConnection: - def __init__(self, socket): - self.socket = socket - self.address = socket.getpeername() - self.local_port = self.address[1] - self.connection_id = "bolt-1234" - - @property - def is_reset(self): - return True - - def stale(self): - return False - - def reset(self): - pass - - def re_auth(self, auth, auth_manager, force=False): - return False - - def close(self): - self.socket.close() - - def closed(self): - return False - - def defunct(self): - return False - - def timedout(self): - return False - - def assert_re_auth_support(self): - pass - - class FakeBoltPool(IOPool): is_direct_pool = False - def __init__(self, address, *, auth=None, **config): + def __init__(self, connection_gen, address, *, auth=None, **config): + self.buffered_connection_mocks = [] config["auth"] = static_auth(None) self.pool_config, self.workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) if config: raise ValueError("Unexpected config keys: %s" % ", ".join(config.keys())) def opener(addr, auth, timeout): - return QuickConnection(FakeSocket(addr)) + if self.buffered_connection_mocks: + mock = self.buffered_connection_mocks.pop() + else: + mock = connection_gen() + mock.address = addr + return mock super().__init__(opener, self.pool_config, self.workspace_config) self.address = address @@ -149,12 +105,14 @@ def test_bolt_connection_ping_timeout(): @pytest.fixture -def pool(): - with FakeBoltPool(("127.0.0.1", 7687)) as pool: +def pool(fake_connection_generator): + with FakeBoltPool( + fake_connection_generator, ("127.0.0.1", 7687) + ) as pool: yield pool -def assert_pool_size( address, expected_active, expected_inactive, pool): +def assert_pool_size(address, expected_active, expected_inactive, pool): try: connections = pool.connections[address] except KeyError: @@ -227,8 +185,10 @@ def test_pool_in_use_count(pool): @mark_sync_test -def test_pool_max_conn_pool_size(pool): - with FakeBoltPool((), max_connection_pool_size=1) as pool: +def test_pool_max_conn_pool_size(fake_connection_generator): + with FakeBoltPool( + fake_connection_generator, (), max_connection_pool_size=1 + ) as pool: address = neo4j.Address(("127.0.0.1", 7687)) pool._acquire(address, None, Deadline(0), None) assert pool.in_use_connection_count(address) == 1 @@ -239,22 +199,67 @@ def test_pool_max_conn_pool_size(pool): @pytest.mark.parametrize("is_reset", (True, False)) @mark_sync_test -def test_pool_reset_when_released(is_reset, pool, mocker): +def test_pool_reset_when_released( + is_reset, pool, fake_connection_generator +): + connection_mock = fake_connection_generator() + pool.buffered_connection_mocks.append(connection_mock) address = neo4j.Address(("127.0.0.1", 7687)) - quick_connection_name = QuickConnection.__name__ - is_reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.is_reset", - new_callable=mocker.PropertyMock - ) - reset_mock = mocker.patch( - f"{__name__}.{quick_connection_name}.reset", - new_callable=mocker.MagicMock - ) + is_reset_mock = connection_mock.is_reset_mock + reset_mock = connection_mock.reset is_reset_mock.return_value = is_reset connection = pool._acquire(address, None, Deadline(3), None) - assert isinstance(connection, QuickConnection) assert is_reset_mock.call_count == 0 assert reset_mock.call_count == 0 pool.release(connection) assert is_reset_mock.call_count == 1 assert reset_mock.call_count == int(not is_reset) + + +@pytest.mark.parametrize("config_timeout", (None, 0, 0.2, 1234)) +@pytest.mark.parametrize("acquire_timeout", (None, 0, 0.2, 1234)) +@mark_sync_test +def test_liveness_check( + config_timeout, acquire_timeout, fake_connection_generator +): + effective_timeout = config_timeout + if acquire_timeout is not None: + effective_timeout = acquire_timeout + with FakeBoltPool( + fake_connection_generator, ("127.0.0.1", 7687), + liveness_check_timeout=config_timeout, + ) as pool: + address = neo4j.Address(("127.0.0.1", 7687)) + # pre-populate pool + cx1 = pool._acquire(address, None, Deadline(3), None) + pool.release(cx1) + cx1.reset.assert_not_called() + cx1.is_idle_for.assert_not_called() + + # simulate just before timeout + cx1.is_idle_for.return_value = False + + cx2 = pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + else: + cx1.is_idle_for.assert_not_called() + pool.release(cx1) + cx1.reset.assert_not_called() + + # simulate after timeout + cx1.is_idle_for.return_value = True + cx1.is_idle_for.reset_mock() + + cx2 = pool._acquire(address, None, Deadline(3), acquire_timeout) + assert cx2 is cx1 + if effective_timeout is not None: + cx1.is_idle_for.assert_called_once_with(effective_timeout) + cx1.reset.assert_called_once() + else: + cx1.is_idle_for.assert_not_called() + cx1.reset.assert_not_called() + cx1.reset.reset_mock() + pool.release(cx1) + cx1.reset.assert_not_called() diff --git a/tests/unit/sync/test_driver.py b/tests/unit/sync/test_driver.py index 11a11e131..1e74fb663 100644 --- a/tests/unit/sync/test_driver.py +++ b/tests/unit/sync/test_driver.py @@ -117,48 +117,48 @@ def test_routing_driver_constructor(protocol, host, port, params, auth_token): @pytest.mark.parametrize( ("test_config", "expected_failure", "expected_failure_message"), ( - ({"encrypted": False}, ConfigurationError, "The config settings"), - ({"encrypted": True}, ConfigurationError, "The config settings"), + ({"encrypted": False}, ConfigurationError, '"encrypted"'), + ({"encrypted": True}, ConfigurationError, '"encrypted"'), ( {"encrypted": True, "trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trust": TRUST_ALL_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"trust": TRUST_SYSTEM_CA_SIGNED_CERTIFICATES}, - ConfigurationError, "The config settings" + ConfigurationError, '"trust"' ), ( {"encrypted": True, "trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"encrypted"' ), ( {"trusted_certificates": TrustAll()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustSystemCAs()}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"trusted_certificates": TrustCustomCAs("foo", "bar")}, - ConfigurationError, "The config settings" + ConfigurationError, '"trusted_certificates"' ), ( {"ssl_context": None}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ( {"ssl_context": ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)}, - ConfigurationError, "The config settings" + ConfigurationError, '"ssl_context"' ), ) ) @mark_sync_test -def test_driver_config_error( +def test_driver_config_error_uri_conflict( test_uri, test_config, expected_failure, expected_failure_message ): def driver_builder(): @@ -169,7 +169,7 @@ def driver_builder(): return GraphDatabase.driver(test_uri, **test_config) if "+" in test_uri: - # `+s` and `+ssc` are short hand syntax for not having to configure the + # `+s` and `+ssc` are shorthand syntax for not having to configure the # encryption behavior of the driver. Specifying both is invalid. with pytest.raises(expected_failure, match=expected_failure_message): driver_builder() @@ -203,6 +203,22 @@ def test_driver_trust_config_error( GraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) +@pytest.mark.parametrize( + ("test_config", "expected_failure", "expected_failure_message"), + ( + ( + {"liveness_check_timeout": -1}, + ConfigurationError, '"liveness_check_timeout"' + ), + ) +) +def test_driver_liveness_timeout_config_error( + test_config, expected_failure, expected_failure_message +): + with pytest.raises(expected_failure, match=expected_failure_message): + GraphDatabase.driver("bolt://127.0.0.1:9001", **test_config) + + @pytest.mark.parametrize("uri", ( "bolt://127.0.0.1:9000", "neo4j://127.0.0.1:9000", From 59633e3770639f170d6faa879c6b3a77876c6b3c Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 24 Nov 2023 14:17:22 +0100 Subject: [PATCH 2/3] TestKit backend: handle GetConnectionPoolMetrics request --- testkitbackend/_async/requests.py | 16 ++++++++++++++++ testkitbackend/_sync/requests.py | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index 512819e62..e4aac1f08 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -793,6 +793,22 @@ async def GetRoutingTable(backend, data): await backend.send_response("RoutingTable", response_data) +async def GetConnectionPoolMetrics(backend, data): + driver_id = data["driverId"] + address = neo4j.Address.parse(data["address"]) + driver = backend.drivers[driver_id] + connections = driver._pool.connections.get(address, ()) + in_use = ( + sum(c.in_use for c in connections) + + driver._pool.connections_reservations[address] + ) + idle = len(connections) - in_use + await backend.send_response("ConnectionPoolMetrics", { + "inUse": in_use, + "idle": idle, + }) + + async def FakeTimeInstall(backend, _data): assert backend.fake_time is None assert backend.fake_time_ticker is None diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index f5e395b08..c058af5f6 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -793,6 +793,22 @@ def GetRoutingTable(backend, data): backend.send_response("RoutingTable", response_data) +def GetConnectionPoolMetrics(backend, data): + driver_id = data["driverId"] + address = neo4j.Address.parse(data["address"]) + driver = backend.drivers[driver_id] + connections = driver._pool.connections.get(address, ()) + in_use = ( + sum(c.in_use for c in connections) + + driver._pool.connections_reservations[address] + ) + idle = len(connections) - in_use + backend.send_response("ConnectionPoolMetrics", { + "inUse": in_use, + "idle": idle, + }) + + def FakeTimeInstall(backend, _data): assert backend.fake_time is None assert backend.fake_time_ticker is None From 20126112dde1b82da0c441c0232ff756f0830b5b Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Thu, 30 Nov 2023 11:12:10 +0100 Subject: [PATCH 3/3] Skip TestKit test: accepted outstanding unification --- testkitbackend/test_config.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index cfe31b1bf..2f11e9d46 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -13,7 +13,9 @@ "'neo4j.datatypes.test_temporal_types.TestDataTypes.test_should_echo_all_timezone_ids'": "test_subtest_skips.dt_conversion", "'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'": - "test_subtest_skips.tz_id" + "test_subtest_skips.tz_id", + "stub\\.routing\\.test_routing_v[0-9x]+\\.RoutingV[0-9x]+\\.test_should_drop_connections_failing_liveness_check": + "Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83" }, "features": { "Feature:API:BookmarkManager": true,