Skip to content

Handle asyncio.CancelledError gracefully #761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@
they have been removed.
- Deprecated Nodes' and Relationships' `id` property (`int`) in favor of
`element_id` (`str`).
This also affects `Graph` objects as `graph.nodes[...]` and
`graph.relationships[...]` now prefers strings over integers.
This also affects `Graph` objects as indexing `graph.nodes[...]` and
`graph.relationships[...]` with integers has been deprecated in favor of
indexing them with strings.
- `ServerInfo.connection_id` has been deprecated and will be removed in a
future release. There is no replacement as this is considered internal
information.
Expand Down Expand Up @@ -118,6 +119,8 @@
be used by client code. `Record` should be imported directly from `neo4j`
instead. `neo4j.data.DataHydrator` and `neo4j.data.DataDeydrator` have been
removed without replacement.
- Introduced `neo4j.exceptions.SessionError` that is raised when trying to
execute work on a closed or otherwise terminated session.


## Version 4.4
Expand Down
35 changes: 22 additions & 13 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,8 @@ To construct a :class:`neo4j.Session` use the :meth:`neo4j.Driver.session` metho


Sessions will often be created and destroyed using a *with block context*.
This is the recommended approach as it takes care of closing the session
properly even when an exception is raised.

.. code-block:: python

Expand All @@ -536,6 +538,8 @@ Session

.. automethod:: close

.. automethod:: closed

.. automethod:: run

.. automethod:: last_bookmarks
Expand Down Expand Up @@ -643,7 +647,7 @@ context of the impersonated user. For this, the user for which the
.. Note::

The server or all servers of the cluster need to support impersonation when.
Otherwise, the driver will raise :py:exc:`.ConfigurationError`
Otherwise, the driver will raise :exc:`.ConfigurationError`
as soon as it encounters a server that does not.


Expand Down Expand Up @@ -708,15 +712,15 @@ Neo4j supports three kinds of transaction:
+ :ref:`explicit-transactions-ref`
+ :ref:`managed-transactions-ref`

Each has pros and cons but if in doubt, use a managed transaction with a `transaction function`.
Each has pros and cons but if in doubt, use a managed transaction with a *transaction function*.


.. _auto-commit-transactions-ref:

Auto-commit Transactions
========================
Auto-commit transactions are the simplest form of transaction, available via
:py:meth:`neo4j.Session.run`. These are easy to use but support only one
:meth:`neo4j.Session.run`. These are easy to use but support only one
statement per transaction and are not automatically retried on failure.

Auto-commit transactions are also the only way to run ``PERIODIC COMMIT``
Expand Down Expand Up @@ -756,7 +760,7 @@ Example:

Explicit Transactions
=====================
Explicit transactions support multiple statements and must be created with an explicit :py:meth:`neo4j.Session.begin_transaction` call.
Explicit transactions support multiple statements and must be created with an explicit :meth:`neo4j.Session.begin_transaction` call.

This creates a new :class:`neo4j.Transaction` object that can be used to run Cypher.

Expand All @@ -766,16 +770,16 @@ It also gives applications the ability to directly control ``commit`` and ``roll

.. automethod:: run

.. automethod:: close

.. automethod:: closed

.. automethod:: commit

.. automethod:: rollback

.. automethod:: close

.. automethod:: closed

Closing an explicit transaction can either happen automatically at the end of a ``with`` block,
or can be explicitly controlled through the :py:meth:`neo4j.Transaction.commit`, :py:meth:`neo4j.Transaction.rollback` or :py:meth:`neo4j.Transaction.close` methods.
or can be explicitly controlled through the :meth:`neo4j.Transaction.commit`, :meth:`neo4j.Transaction.rollback` or :meth:`neo4j.Transaction.close` methods.

Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction.

Expand Down Expand Up @@ -811,8 +815,8 @@ Managed Transactions (`transaction functions`)
==============================================
Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.

+ :py:meth:`neo4j.Session.write_transaction`
+ :py:meth:`neo4j.Session.read_transaction`
+ :meth:`neo4j.Session.write_transaction`
+ :meth:`neo4j.Session.read_transaction`

These allow a function object representing the transactional unit of work to be passed as a parameter.
This function is called one or more times, within a configurable time limit, until it succeeds.
Expand Down Expand Up @@ -912,8 +916,8 @@ Record
.. autoclass:: neo4j.Record()

A :class:`neo4j.Record` is an immutable ordered collection of key-value
pairs. It is generally closer to a :py:class:`namedtuple` than to an
:py:class:`OrderedDict` inasmuch as iteration of the collection will
pairs. It is generally closer to a :class:`namedtuple` than to an
:class:`OrderedDict` inasmuch as iteration of the collection will
yield values rather than keys.

.. describe:: Record(iterable)
Expand Down Expand Up @@ -1313,6 +1317,8 @@ Client-side errors

* :class:`neo4j.exceptions.DriverError`

* :class:`neo4j.exceptions.SessionError`

* :class:`neo4j.exceptions.TransactionError`

* :class:`neo4j.exceptions.TransactionNestingError`
Expand Down Expand Up @@ -1347,6 +1353,9 @@ Client-side errors
.. autoclass:: neo4j.exceptions.DriverError
:members: is_retryable

.. autoclass:: neo4j.exceptions.SessionError
:show-inheritance:

.. autoclass:: neo4j.exceptions.TransactionError
:show-inheritance:

Expand Down
104 changes: 94 additions & 10 deletions docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ To construct a :class:`neo4j.AsyncSession` use the :meth:`neo4j.AsyncDriver.sess


Sessions will often be created and destroyed using a *with block context*.
This is the recommended approach as it takes care of closing the session
properly even when an exception is raised.

.. code-block:: python

Expand All @@ -311,8 +313,43 @@ AsyncSession

.. autoclass:: neo4j.AsyncSession()

.. note::

Some asyncio utility functions (e.g., :func:`asyncio.wait_for` and
:func:`asyncio.shield`) will wrap work in a :class:`asyncio.Task`.
This introduces concurrency and can lead to undefined behavior as
:class:`AsyncSession` is not concurrency-safe.

Consider this **wrong** example::

async def dont_do_this(driver):
async with driver.session() as session:
await asyncio.shield(session.run("RETURN 1"))

If ``dont_do_this`` gets cancelled while waiting for ``session.run``,
``session.run`` itself won't get cancelled (it's shielded) so it will
continue to use the session in another Task. Concurrently, will the
async context manager (``async with driver.session()``) on exit clean
up the session. That's two Tasks handling the session concurrently.
Therefore, this yields undefined behavior.

In this particular example, the problem could be solved by shielding
the whole coroutine ``dont_do_this`` instead of only the
``session.run``. Like so::

async def thats_better(driver):
async def inner()
async with driver.session() as session:
await session.run("RETURN 1")

await asyncio.shield(inner())

.. automethod:: close

.. automethod:: cancel

.. automethod:: closed

.. automethod:: run

.. automethod:: last_bookmarks
Expand Down Expand Up @@ -346,15 +383,15 @@ Neo4j supports three kinds of async transaction:
+ :ref:`async-explicit-transactions-ref`
+ :ref:`async-managed-transactions-ref`

Each has pros and cons but if in doubt, use a managed transaction with a `transaction function`.
Each has pros and cons but if in doubt, use a managed transaction with a *transaction function*.


.. _async-auto-commit-transactions-ref:

Auto-commit Transactions
========================
Auto-commit transactions are the simplest form of transaction, available via
:py:meth:`neo4j.Session.run`. These are easy to use but support only one
:meth:`neo4j.Session.run`. These are easy to use but support only one
statement per transaction and are not automatically retried on failure.

Auto-commit transactions are also the only way to run ``PERIODIC COMMIT``
Expand Down Expand Up @@ -398,7 +435,7 @@ Example:

Explicit Async Transactions
===========================
Explicit transactions support multiple statements and must be created with an explicit :py:meth:`neo4j.AsyncSession.begin_transaction` call.
Explicit transactions support multiple statements and must be created with an explicit :meth:`neo4j.AsyncSession.begin_transaction` call.

This creates a new :class:`neo4j.AsyncTransaction` object that can be used to run Cypher.

Expand All @@ -408,16 +445,18 @@ It also gives applications the ability to directly control ``commit`` and ``roll

.. automethod:: run

.. automethod:: close

.. automethod:: closed

.. automethod:: commit

.. automethod:: rollback

.. automethod:: close

.. automethod:: cancel

.. automethod:: closed

Closing an explicit transaction can either happen automatically at the end of a ``async with`` block,
or can be explicitly controlled through the :py:meth:`neo4j.AsyncTransaction.commit`, :py:meth:`neo4j.AsyncTransaction.rollback` or :py:meth:`neo4j.AsyncTransaction.close` methods.
or can be explicitly controlled through the :meth:`neo4j.AsyncTransaction.commit`, :meth:`neo4j.AsyncTransaction.rollback`, :meth:`neo4j.AsyncTransaction.close` or :meth:`neo4j.AsyncTransaction.cancel` methods.

Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction.

Expand Down Expand Up @@ -456,8 +495,8 @@ Managed Async Transactions (`transaction functions`)
====================================================
Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.

+ :py:meth:`neo4j.AsyncSession.write_transaction`
+ :py:meth:`neo4j.AsyncSession.read_transaction`
+ :meth:`neo4j.AsyncSession.write_transaction`
+ :meth:`neo4j.AsyncSession.read_transaction`

These allow a function object representing the transactional unit of work to be passed as a parameter.
This function is called one or more times, within a configurable time limit, until it succeeds.
Expand Down Expand Up @@ -531,3 +570,48 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla
.. automethod:: closed

See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.



******************
Async Cancellation
******************

Async Python provides a mechanism for cancelling futures
(:meth:`asyncio.Future.cancel`). The driver and its components can handle this.
However, generally, it's not advised to rely on cancellation as it forces the
driver to close affected connections to avoid leaving them in an undefined
state. This makes the driver less efficient.

The easiest way to make sure your application code's interaction with the driver
is playing nicely with cancellation is to always use the async context manager
provided by :class:`neo4j.AsyncSession` like so: ::

async with driver.session() as session:
... # do what you need to do with the session

If, for whatever reason, you need handle the session manually, you can it like
so: ::

session = await with driver.session()
try:
... # do what you need to do with the session
except asyncio.CancelledError:
session.cancel()
raise
finally:
# this becomes a no-op if the session has been cancelled before
await session.close()

As mentioned above, any cancellation of I/O work will cause the driver to close
the affected connection. This will kill any :class:`neo4j.AsyncTransaction` and
:class:`neo4j.AsyncResult` objects that are attached to that connection. Hence,
after catching a :class:`asyncio.CancelledError`, you should not try to use
transactions or results created earlier. They are likely to not be valid
anymore.

Furthermore, there is no guarantee as to whether a piece of ongoing work got
successfully executed on the server side or not, when a cancellation happens:
``await transaction.commit()`` and other methods can throw
:exc:`asyncio.CancelledError` but still have managed to complete from the
server's perspective.
44 changes: 27 additions & 17 deletions neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import abc
import asyncio
import socket
from collections import deque
from logging import getLogger
from time import perf_counter
Expand Down Expand Up @@ -370,8 +371,9 @@ def time_remaining():
await connection.hello()
finally:
connection.socket.set_deadline(None)
except Exception:
await connection.close_non_blocking()
except Exception as e:
log.debug("[#%04X] C: <OPEN FAILED> %r", s.getsockname()[1], e)
connection.kill()
raise

return connection
Expand Down Expand Up @@ -678,22 +680,28 @@ async def _set_defunct_write(self, error=None, silent=False):
async def _set_defunct(self, message, error=None, silent=False):
from ._pool import AsyncBoltPool
direct_driver = isinstance(self.pool, AsyncBoltPool)
user_cancelled = isinstance(error, asyncio.CancelledError)

if error:
log.debug("[#%04X] %r", self.socket.getsockname()[1], error)
log.error(message)
log.debug("[#%04X] %r", self.local_port, error)
if not user_cancelled:
log.error(message)
# We were attempting to receive data but the connection
# has unexpectedly terminated. So, we need to close the
# connection from the client side, and remove the address
# from the connection pool.
self._defunct = True
if user_cancelled:
self.kill()
raise error # cancellation error should not be re-written
if not self._closing:
# If we fail while closing the connection, there is no need to
# remove the connection from the pool, nor to try to close the
# connection again.
await self.close()
if self.pool:
await self.pool.deactivate(address=self.unresolved_address)

# Iterate through the outstanding responses, and if any correspond
# to COMMIT requests then raise an error to signal that we are
# unable to confirm that the COMMIT completed successfully.
Expand Down Expand Up @@ -736,8 +744,9 @@ async def close(self):
self.goodbye()
try:
await self._send_all()
except (OSError, BoltError, DriverError):
pass
except (OSError, BoltError, DriverError) as exc:
log.debug("[#%04X] ignoring failed close %r",
self.local_port, exc)
log.debug("[#%04X] C: <CLOSE>", self.local_port)
try:
await self.socket.close()
Expand All @@ -746,18 +755,19 @@ async def close(self):
finally:
self._closed = True

async def close_non_blocking(self):
"""Set the socket to non-blocking and close it.

This will try to send the `GOODBYE` message (given the socket is not
marked as defunct). However, should the write operation require
blocking (e.g., a full network buffer), then the socket will be closed
immediately (without `GOODBYE` message).
"""
if self._closed or self._closing:
def kill(self):
"""Close the socket most violently. No flush, no goodbye, no mercy."""
if self._closed:
return
self.socket.settimeout(0)
await self.close()
log.debug("[#%04X] C: <KILL>", self.local_port)
self._closing = True
try:
self.socket.kill()
except OSError as exc:
log.debug("[#%04X] ignoring failed kill %r",
self.local_port, exc)
finally:
self._closed = True

def closed(self):
return self._closed
Expand Down
Loading