Skip to content

New verify_connectivity + add get_server_info #654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
It now raises a `ResultConsumedError`.
- New method `Result.closed()` can be used to check for this condition if
necessary.
- `driver.verify_connectivity()`
- All keyword arguments have been deprecated (they were experimental).
They are now ignored and will be removed in a future release.
- The undocumented return value has been removed. If you need information
about the remote server, use `driver.get_server_info()` instead.


## Version 4.4
Expand Down
1 change: 1 addition & 0 deletions bin/make-unasync
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def apply_unasync(files):
additional_test_replacements = {
"_async": "_sync",
"mark_async_test": "mark_sync_test",
"assert_awaited_once": "assert_called_once",
}
additional_testkit_backend_replacements = {}
rules = [
Expand Down
2 changes: 1 addition & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ This object holds the details required to establish connections with a Neo4j dat
Closing a driver will immediately shut down all connections in the pool.

.. autoclass:: neo4j.Driver()
:members: session, encrypted, close
:members: session, encrypted, close, verify_connectivity, get_server_info


.. _driver-configuration-ref:
Expand Down
4 changes: 3 additions & 1 deletion docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Async API Documentation
This means everything documented on this page might be removed or change
its API at any time (including in patch releases).

.. versionadded:: 5.0

******************
AsyncGraphDatabase
******************
Expand Down Expand Up @@ -126,7 +128,7 @@ This object holds the details required to establish connections with a Neo4j dat
Closing a driver will immediately shut down all connections in the pool.

.. autoclass:: neo4j.AsyncDriver()
:members: session, encrypted, close
:members: session, encrypted, close, verify_connectivity, get_server_info


.. _async-driver-configuration-ref:
Expand Down
111 changes: 55 additions & 56 deletions neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
SessionConfig,
WorkspaceConfig,
)
from ..exceptions import (
ServiceUnavailable,
SessionExpired,
)
from ..meta import (
deprecation_warn,
experimental,
Expand Down Expand Up @@ -236,9 +240,11 @@ class AsyncDriver:
#: Flag if the driver has been closed
_closed = False

def __init__(self, pool):
def __init__(self, pool, default_workspace_config):
assert pool is not None
assert default_workspace_config is not None
self._pool = pool
self._default_workspace_config = default_workspace_config

async def __aenter__(self):
return self
Expand Down Expand Up @@ -285,17 +291,56 @@ async def close(self):
await self._pool.close()
self._closed = True

@experimental("The configuration may change in the future.")
# TODO: 6.0 - remove config argument
async def verify_connectivity(self, **config):
""" This verifies if the driver can connect to a remote server or a cluster
by establishing a network connection with the remote and possibly exchanging
a few data before closing the connection. It throws exception if fails to connect.
"""Verify that the driver can establish a connection to the server.

This verifies if the driver can establish a reading connection to a
remote server or a cluster. Some data will be exchanged.

Use the exception to further understand the cause of the connectivity problem.
.. note::
Even if this method raises an exception, the driver still needs to
be closed via :meth:`close` to free up all resources.

Note: Even if this method throws an exception, the driver still need to be closed via close() to free up all resources.
:raises DriverError: if the driver cannot connect to the remote.
Use the exception to further understand the cause of the
connectivity problem.

.. versionchanged:: 5.0 the config parameters will be removed in
version 6 0. It has no effect starting in version 5.0.
"""
raise NotImplementedError
if config:
deprecation_warn(
"verify_connectivity() will not accept any configuration "
"parameters starting with version 6.0."
)

await self.get_server_info()

async def get_server_info(self):
"""Get information about the connected Neo4j server.

Try to establish a working read connection to the remote server or a
member of a cluster and exchange some data. Then return the contacted
server's information.

In a cluster, there is no guarantee about which server will be
contacted.

.. note::
Even if this method raises an exception, the driver still needs to
be closed via :meth:`close` to free up all resources.

:rtype: ServerInfo

:raises DriverError: if the driver cannot connect to the remote.
Use the exception to further understand the cause of the
connectivity problem.

.. versionadded:: 5.0
"""
async with self.session() as session:
return await session._get_server_info()

@experimental("Feature support query, based on Bolt Protocol Version and Neo4j Server Version will change in the future.")
async def supports_multi_db(self):
Expand Down Expand Up @@ -339,7 +384,7 @@ def open(cls, target, *, auth=None, **config):

def __init__(self, pool, default_workspace_config):
_Direct.__init__(self, pool.address)
AsyncDriver.__init__(self, pool)
AsyncDriver.__init__(self, pool, default_workspace_config)
self._default_workspace_config = default_workspace_config

def session(self, **config):
Expand All @@ -354,17 +399,6 @@ def session(self, **config):
SessionConfig.consume(config) # Consume the config
return AsyncSession(self._pool, session_config)

@experimental("The configuration may change in the future.")
async def verify_connectivity(self, **config):
server_agent = None
config["fetch_size"] = -1
async with self.session(**config) as session:
result = await session.run("RETURN 1 AS x")
value = await result.single().value()
summary = await result.consume()
server_agent = summary.server.agent
return server_agent


class AsyncNeo4jDriver(_Routing, AsyncDriver):
""":class:`.AsyncNeo4jDriver` is instantiated for ``neo4j`` URIs. The
Expand All @@ -387,45 +421,10 @@ def open(cls, *targets, auth=None, routing_context=None, **config):

def __init__(self, pool, default_workspace_config):
_Routing.__init__(self, pool.get_default_database_initial_router_addresses())
AsyncDriver.__init__(self, pool)
self._default_workspace_config = default_workspace_config
AsyncDriver.__init__(self, pool, default_workspace_config)

def session(self, **config):
from .work import AsyncSession
session_config = SessionConfig(self._default_workspace_config, config)
SessionConfig.consume(config) # Consume the config
return AsyncSession(self._pool, session_config)

@experimental("The configuration may change in the future.")
async def verify_connectivity(self, **config):
"""
:raise ServiceUnavailable: raised if the server does not support routing or if routing support is broken.
"""
# TODO: Improve and update Stub Test Server to be able to test.
return await self._verify_routing_connectivity()

async def _verify_routing_connectivity(self):
from ..exceptions import (
Neo4jError,
ServiceUnavailable,
SessionExpired,
)

table = self._pool.get_routing_table_for_default_database()
routing_info = {}
for ix in list(table.routers):
try:
routing_info[ix] = await self._pool.fetch_routing_info(
address=table.routers[0],
database=self._default_workspace_config.database,
imp_user=self._default_workspace_config.impersonated_user,
bookmarks=None,
timeout=self._default_workspace_config
.connection_acquisition_timeout
)
except (ServiceUnavailable, SessionExpired, Neo4jError):
routing_info[ix] = None
for key, val in routing_info.items():
if val is not None:
return routing_info
raise ServiceUnavailable("Could not connect to any routing servers.")
52 changes: 46 additions & 6 deletions neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class AsyncBolt:
# The socket
in_use = False

# When the connection was last put back into the pool
idle_since = float("-inf")

# The socket
_closed = False

Expand Down Expand Up @@ -104,6 +107,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *,
self._max_connection_lifetime = max_connection_lifetime
self._creation_timestamp = perf_counter()
self.routing_context = routing_context
self.idle_since = perf_counter()

# Determine the user agent
if user_agent:
Expand Down Expand Up @@ -456,29 +460,55 @@ async def _send_all(self):
except OSError as error:
await self._set_defunct_write(error)
self.outbox.clear()
self.idle_since = perf_counter()

async def send_all(self):
""" Send all queued messages to the server.
"""
if self.closed():
raise ServiceUnavailable("Failed to write to closed connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address))

raise ServiceUnavailable(
"Failed to write to closed connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address
)
)
if self.defunct():
raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address))
raise ServiceUnavailable(
"Failed to write to defunct connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address
)
)

await self._send_all()

@abc.abstractmethod
async def fetch_message(self):
async def _fetch_message(self):
""" Receive at most one message from the server, if available.

:return: 2-tuple of number of detail messages and number of summary
messages fetched
"""
pass

async def fetch_message(self):
if self._closed:
raise ServiceUnavailable(
"Failed to read from closed connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address
)
)
if self._defunct:
raise ServiceUnavailable(
"Failed to read from defunct connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address
)
)
if not self.responses:
return 0, 0

res = await self._fetch_message()
self.idle_since = perf_counter()
return res

async def fetch_all(self):
""" Fetch all outstanding messages.

Expand Down Expand Up @@ -568,5 +598,15 @@ def closed(self):
def defunct(self):
pass

def is_idle_for(self, timeout):
"""Check if connection has been idle for at least the given timeout.

:param timeout: timeout in seconds
:type timeout: float

:rtype: bool
"""
return perf_counter() - self.idle_since > timeout


AsyncBoltSocket.Bolt = AsyncBolt
13 changes: 1 addition & 12 deletions neo4j/_async/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,23 +314,12 @@ def fail(metadata):
await self.send_all()
await self.fetch_all()

async def fetch_message(self):
async def _fetch_message(self):
""" Receive at most one message from the server, if available.

:return: 2-tuple of number of detail messages and number of summary
messages fetched
"""
if self._closed:
raise ServiceUnavailable("Failed to read from closed connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address))

if self._defunct:
raise ServiceUnavailable("Failed to read from defunct connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address))

if not self.responses:
return 0, 0

# Receive exactly one message
details, summary_signature, summary_metadata = \
await AsyncUtil.next(self.inbox)
Expand Down
13 changes: 1 addition & 12 deletions neo4j/_async/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,12 @@ def fail(metadata):
await self.send_all()
await self.fetch_all()

async def fetch_message(self):
async def _fetch_message(self):
""" Receive at most one message from the server, if available.

:return: 2-tuple of number of detail messages and number of summary
messages fetched
"""
if self._closed:
raise ServiceUnavailable("Failed to read from closed connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address))

if self._defunct:
raise ServiceUnavailable("Failed to read from defunct connection {!r} ({!r})".format(
self.unresolved_address, self.server_info.address))

if not self.responses:
return 0, 0

# Receive exactly one message
details, summary_signature, summary_metadata = \
await AsyncUtil.next(self.inbox)
Expand Down
Loading