Skip to content

Remove timeout config options introduced in 4.4.5 #769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
`protocol_version` and `init_size`.
- Introduced `neo4j.exceptions.SessionError` that is raised when trying to
execute work on a closed or otherwise terminated session.
- Removed deprecated config options `update_routing_table_timeout` and
`session_connection_timeout`.
Server-side keep-alives communicated through configuration hints together with
the config option `connection_acquisition_timeout` are sufficient to avoid the
driver getting stuck.


## Version 4.4
Expand Down
3 changes: 0 additions & 3 deletions neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def driver(
max_connection_lifetime: float = ...,
max_connection_pool_size: int = ...,
connection_timeout: float = ...,
update_routing_table_timeout: float = ...,
trust: t.Union[
te.Literal["TRUST_ALL_CERTIFICATES"],
te.Literal["TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"]
Expand All @@ -97,7 +96,6 @@ def driver(

# undocumented/unsupported options
# they may be change or removed any time without prior notice
session_connection_timeout: float = ...,
connection_acquisition_timeout: float = ...,
max_transaction_retry_time: float = ...,
initial_retry_delay: float = ...,
Expand Down Expand Up @@ -339,7 +337,6 @@ def encrypted(self) -> bool:

def session(
self,
session_connection_timeout: float = ...,
connection_acquisition_timeout: float = ...,
max_transaction_retry_time: float = ...,
database: t.Optional[str] = ...,
Expand Down
104 changes: 36 additions & 68 deletions neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@
AsyncRLock,
)
from ..._async_compat.network import AsyncNetworkUtil
from ..._async_compat.util import AsyncUtil
from ..._conf import (
PoolConfig,
WorkspaceConfig,
)
from ..._deadline import (
connection_deadline,
Deadline,
merge_deadlines,
merge_deadlines_and_timeouts,
)
from ..._exceptions import BoltError
from ..._routing import RoutingTable
Expand Down Expand Up @@ -222,18 +219,18 @@ async def health_check(connection_, deadline_):

@abc.abstractmethod
async def acquire(
self, access_mode, timeout, acquisition_timeout,
database, bookmarks, liveness_check_timeout
self, access_mode, timeout, database, bookmarks, liveness_check_timeout
):
""" Acquire a connection to a server that can satisfy a set of parameters.

:param access_mode:
:param timeout: total timeout (including potential preparation)
:param acquisition_timeout: timeout for actually acquiring a connection
:param timeout: timeout for the core acquisition
(excluding potential preparation like fetching routing tables).
:param database:
:param bookmarks:
:param liveness_check_timeout:
"""
...

def kill_and_release(self, *connections):
""" Release connections back into the pool after closing them.
Expand Down Expand Up @@ -397,12 +394,11 @@ def __repr__(self):
self.address)

async def acquire(
self, access_mode, timeout, acquisition_timeout,
database, bookmarks, liveness_check_timeout
self, access_mode, timeout, database, bookmarks, liveness_check_timeout
):
# The access_mode and database is not needed for a direct connection,
# it's just there for consistency.
deadline = merge_deadlines_and_timeouts(timeout, acquisition_timeout)
deadline = Deadline.from_timeout_or_deadline(timeout)
return await self._acquire(
self.address, deadline, liveness_check_timeout
)
Expand Down Expand Up @@ -464,22 +460,6 @@ def __repr__(self):
"""
return "<{} addresses={!r}>".format(self.__class__.__name__, self.get_default_database_initial_router_addresses())

@asynccontextmanager
async def _refresh_lock_deadline(self, deadline):
timeout = deadline.to_timeout()
if timeout == float("inf"):
timeout = -1
if not await self.refresh_lock.acquire(timeout=timeout):
raise ClientError(
"pool failed to update routing table within {!r}s (timeout)"
.format(deadline.original_timeout)
)

try:
yield
finally:
self.refresh_lock.release()

@property
def first_initial_routing_address(self):
return self.get_default_database_initial_router_addresses()[0]
Expand Down Expand Up @@ -513,7 +493,7 @@ async def get_or_create_routing_table(self, database):
return self.routing_tables[database]

async def fetch_routing_info(
self, address, database, imp_user, bookmarks, deadline
self, address, database, imp_user, bookmarks, acquisition_timeout
):
""" Fetch raw routing info from a given router address.

Expand All @@ -524,32 +504,32 @@ async def fetch_routing_info(
:type imp_user: str or None
:param bookmarks: iterable of bookmark values after which the routing
info should be fetched
:param deadline: connection acquisition deadline
:param acquisition_timeout: connection acquisition timeout

:return: list of routing records, or None if no connection
could be established or if no readers or writers are present
:raise ServiceUnavailable: if the server does not support
routing, or if routing support is broken or outdated
"""
deadline = Deadline.from_timeout_or_deadline(acquisition_timeout)
cx = await self._acquire(address, deadline, None)
try:
with connection_deadline(cx, deadline):
routing_table = await cx.route(
database or self.workspace_config.database,
imp_user or self.workspace_config.impersonated_user,
bookmarks
)
routing_table = await cx.route(
database or self.workspace_config.database,
imp_user or self.workspace_config.impersonated_user,
bookmarks
)
finally:
await self.release(cx)
return routing_table

async def fetch_routing_table(
self, *, address, deadline, database, imp_user, bookmarks
self, *, address, acquisition_timeout, database, imp_user, bookmarks
):
""" Fetch a routing table from a given router address.

:param address: router address
:param deadline: deadline
:param acquisition_timeout: connection acquisition timeout
:param database: the database name
:type: str
:param imp_user: the user to impersonate while fetching the routing
Expand All @@ -563,7 +543,7 @@ async def fetch_routing_table(
new_routing_info = None
try:
new_routing_info = await self.fetch_routing_info(
address, database, imp_user, bookmarks, deadline
address, database, imp_user, bookmarks, acquisition_timeout
)
except Neo4jError as e:
# checks if the code is an error that is caused by the client. In
Expand Down Expand Up @@ -606,7 +586,7 @@ async def fetch_routing_table(
return new_routing_table

async def _update_routing_table_from(
self, *routers, database, imp_user, bookmarks, deadline,
self, *routers, database, imp_user, bookmarks, acquisition_timeout,
database_callback
):
""" Try to update routing tables with the given routers.
Expand All @@ -621,11 +601,8 @@ async def _update_routing_table_from(
async for address in AsyncNetworkUtil.resolve_address(
router, resolver=self.pool_config.resolver
):
if deadline.expired():
return False
new_routing_table = await self.fetch_routing_table(
address=address,
deadline=deadline,
address=address, acquisition_timeout=acquisition_timeout,
database=database, imp_user=imp_user, bookmarks=bookmarks
)
if new_routing_table is not None:
Expand All @@ -645,7 +622,7 @@ async def _update_routing_table_from(
return False

async def update_routing_table(
self, *, database, imp_user, bookmarks, timeout=None,
self, *, database, imp_user, bookmarks, acquisition_timeout=None,
database_callback=None
):
""" Update the routing table from the first router able to provide
Expand All @@ -656,7 +633,7 @@ async def update_routing_table(
table
:type imp_user: str or None
:param bookmarks: bookmarks used when fetching routing table
:param timeout: timeout in seconds for how long to try updating
:param acquisition_timeout: connection acquisition timeout
:param database_callback: A callback function that will be called with
the database name as only argument when a new routing table has been
acquired. This database name might different from `database` if that
Expand All @@ -665,10 +642,7 @@ async def update_routing_table(

:raise neo4j.exceptions.ServiceUnavailable:
"""
deadline = merge_deadlines_and_timeouts(
timeout, self.pool_config.update_routing_table_timeout
)
async with self._refresh_lock_deadline(deadline):
async with self.refresh_lock:
routing_table = await self.get_or_create_routing_table(database)
# copied because it can be modified
existing_routers = set(routing_table.routers)
Expand All @@ -681,22 +655,24 @@ async def update_routing_table(
if await self._update_routing_table_from(
self.first_initial_routing_address, database=database,
imp_user=imp_user, bookmarks=bookmarks,
deadline=deadline, database_callback=database_callback
acquisition_timeout=acquisition_timeout,
database_callback=database_callback
):
# Why is only the first initial routing address used?
return
if await self._update_routing_table_from(
*(existing_routers - {self.first_initial_routing_address}),
database=database, imp_user=imp_user, bookmarks=bookmarks,
deadline=deadline, database_callback=database_callback
acquisition_timeout=acquisition_timeout,
database_callback=database_callback
):
return

if not prefer_initial_routing_address:
if await self._update_routing_table_from(
self.first_initial_routing_address, database=database,
imp_user=imp_user, bookmarks=bookmarks,
deadline=deadline,
acquisition_timeout=acquisition_timeout,
database_callback=database_callback
):
# Why is only the first initial routing address used?
Expand All @@ -714,8 +690,8 @@ async def update_connection_pool(self, *, database):
await super(AsyncNeo4jPool, self).deactivate(address)

async def ensure_routing_table_is_fresh(
self, *, access_mode, database, imp_user, bookmarks, deadline=None,
database_callback=None
self, *, access_mode, database, imp_user, bookmarks,
acquisition_timeout=None, database_callback=None
):
""" Update the routing table if stale.

Expand All @@ -730,15 +706,16 @@ async def ensure_routing_table_is_fresh(
:return: `True` if an update was required, `False` otherwise.
"""
from neo4j.api import READ_ACCESS
async with self._refresh_lock_deadline(deadline):
async with self.refresh_lock:
routing_table = await self.get_or_create_routing_table(database)
if routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)):
# Readers are fresh.
return False

await self.update_routing_table(
database=database, imp_user=imp_user, bookmarks=bookmarks,
timeout=deadline, database_callback=database_callback
acquisition_timeout=acquisition_timeout,
database_callback=database_callback
)
await self.update_connection_pool(database=database)

Expand Down Expand Up @@ -778,34 +755,24 @@ async def _select_address(self, *, access_mode, database):
return choice(addresses_by_usage[min(addresses_by_usage)])

async def acquire(
self, access_mode, timeout, acquisition_timeout,
database, bookmarks, liveness_check_timeout
self, access_mode, timeout, database, bookmarks, liveness_check_timeout
):
if access_mode not in (WRITE_ACCESS, READ_ACCESS):
raise ClientError("Non valid 'access_mode'; {}".format(access_mode))
if not timeout:
raise ClientError("'timeout' must be a float larger than 0; {}"
.format(timeout))
if not acquisition_timeout:
raise ClientError("'acquisition_timeout' must be a float larger "
"than 0; {}".format(acquisition_timeout))
deadline = Deadline.from_timeout_or_deadline(timeout)

from neo4j.api import check_access_mode
access_mode = check_access_mode(access_mode)
async with self._refresh_lock_deadline(deadline):
async with self.refresh_lock:
log.debug("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r",
self.routing_tables)
await self.ensure_routing_table_is_fresh(
access_mode=access_mode, database=database, imp_user=None,
bookmarks=bookmarks, deadline=deadline
bookmarks=bookmarks, acquisition_timeout=timeout
)

# Making sure the routing table is fresh is not considered part of the
# connection acquisition. Hence, the acquisition_timeout starts now!
deadline = merge_deadlines(
deadline, Deadline.from_timeout_or_deadline(acquisition_timeout)
)
while True:
try:
# Get an address for a connection that have the fewest in-use
Expand All @@ -817,6 +784,7 @@ async def acquire(
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err
try:
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
deadline = Deadline.from_timeout_or_deadline(timeout)
# should always be a resolved address
connection = await self._acquire(
address, deadline, liveness_check_timeout
Expand Down
7 changes: 3 additions & 4 deletions neo4j/_async/work/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _set_cached_database(self, database):
self._config.database = database

async def _connect(self, access_mode, **acquire_kwargs):
timeout = Deadline(self._config.session_connection_timeout)
acquisition_timeout = self._config.connection_acquisition_timeout
if self._connection:
# TODO: Investigate this
# log.warning("FIXME: should always disconnect before connect")
Expand All @@ -100,13 +100,12 @@ async def _connect(self, access_mode, **acquire_kwargs):
database=self._config.database,
imp_user=self._config.impersonated_user,
bookmarks=self._bookmarks,
timeout=timeout,
acquisition_timeout=acquisition_timeout,
database_callback=self._set_cached_database
)
acquire_kwargs_ = {
"access_mode": access_mode,
"timeout": timeout,
"acquisition_timeout": self._config.connection_acquisition_timeout,
"timeout": acquisition_timeout,
"database": self._config.database,
"bookmarks": self._bookmarks,
"liveness_check_timeout": None,
Expand Down
13 changes: 0 additions & 13 deletions neo4j/_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ class PoolConfig(Config):
connection_timeout = 30.0 # seconds
# The maximum amount of time to wait for a TCP connection to be established.

#: Update Routing Table Timout
update_routing_table_timeout = 90.0 # seconds
# The maximum amount of time to wait for updating the routing table.
# This includes everything necessary for this to happen.
# Including opening sockets, requesting and receiving the routing table,
# etc.

#: Trust
trust = DeprecatedAlternative(
"trusted_certificates", _trust_to_trusted_certificates
Expand Down Expand Up @@ -392,12 +385,6 @@ class WorkspaceConfig(Config):
""" WorkSpace configuration.
"""

#: Session Connection Timeout
session_connection_timeout = 120.0 # seconds
# The maximum amount of time to wait for a session to obtain a usable
# read/write connection. This includes everything necessary for this to
# happen. Including fetching routing tables, opening sockets, etc.

#: Connection Acquisition Timeout
connection_acquisition_timeout = 60.0 # seconds
# The maximum amount of time a session will wait when requesting a connection from the connection pool.
Expand Down
Loading