Skip to content

Introduce (Async)ManagedTransaction #658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
They are now ignored and will be removed in a future release.
- The undocumented return value has been removed. If you need information
about the remote server, use `driver.get_server_info()` instead.
- Transaction functions (a.k.a. managed transactions):
The first argument of transaction functions is now a `ManagedTransaction`
object. It behaves exactly like a regular `Transaction` object, except it
does not offer the `commit`, `rollback`, `close`, and `closed` methods.
Those methods would have caused a hard to interpreted error previously. Hence,
they have been removed.


## Version 4.4
Expand Down
1 change: 1 addition & 0 deletions bin/make-unasync
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def apply_unasync(files):
"_async": "_sync",
"mark_async_test": "mark_sync_test",
"assert_awaited_once": "assert_called_once",
"assert_awaited_once_with": "assert_called_once_with",
}
additional_testkit_backend_replacements = {}
rules = [
Expand Down
17 changes: 12 additions & 5 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -422,16 +422,18 @@ Will result in:
***********************
Sessions & Transactions
***********************
All database activity is co-ordinated through two mechanisms: the :class:`neo4j.Session` and the :class:`neo4j.Transaction`.
All database activity is co-ordinated through two mechanisms:
**sessions** (:class:`neo4j.AsyncSession`) and **transactions**
(:class:`neo4j.Transaction`, :class:`neo4j.ManagedTransaction`).

A :class:`neo4j.Session` is a logical container for any number of causally-related transactional units of work.
A **session** is a logical container for any number of causally-related transactional units of work.
Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required.
Sessions provide the top level of containment for database activity.
Session creation is a lightweight operation and *sessions are not thread safe*.

Connections are drawn from the :class:`neo4j.Driver` connection pool as required.

A :class:`neo4j.Transaction` is a unit of work that is either committed in its entirety or is rolled back on failure.
A **transaction** is a unit of work that is either committed in its entirety or is rolled back on failure.


.. _session-construction-ref:
Expand Down Expand Up @@ -724,7 +726,6 @@ Example:
node_id = create_person_node(tx)
set_person_name(tx, node_id, name)
tx.commit()
tx.close()

def create_person_node(tx):
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
Expand Down Expand Up @@ -753,6 +754,12 @@ This function is called one or more times, within a configurable time limit, unt
Results should be fully consumed within the function and only aggregate or status values should be returned.
Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.

This function will receive a :class:`neo4j.ManagedTransaction` object as its first parameter.

.. autoclass:: neo4j.ManagedTransaction

.. automethod:: run

Example:

.. code-block:: python
Expand Down Expand Up @@ -811,7 +818,7 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n

.. automethod:: closed

See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.


Graph
Expand Down
19 changes: 13 additions & 6 deletions docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,18 @@ Will result in:
*********************************
AsyncSessions & AsyncTransactions
*********************************
All database activity is co-ordinated through two mechanisms: the :class:`neo4j.AsyncSession` and the :class:`neo4j.AsyncTransaction`.
All database activity is co-ordinated through two mechanisms:
**sessions** (:class:`neo4j.AsyncSession`) and **transactions**
(:class:`neo4j.AsyncTransaction`, :class:`neo4j.AsyncManagedTransaction`).

A :class:`neo4j.AsyncSession` is a logical container for any number of causally-related transactional units of work.
A **session** is a logical container for any number of causally-related transactional units of work.
Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required.
Sessions provide the top level of containment for database activity.
Session creation is a lightweight operation and *sessions cannot be shared between coroutines*.
Session creation is a lightweight operation and *sessions are not thread safe*.

Connections are drawn from the :class:`neo4j.AsyncDriver` connection pool as required.

A :class:`neo4j.AsyncTransaction` is a unit of work that is either committed in its entirety or is rolled back on failure.
A **transaction** is a unit of work that is either committed in its entirety or is rolled back on failure.


.. _async-session-construction-ref:
Expand Down Expand Up @@ -417,7 +419,6 @@ Example:
node_id = await create_person_node(tx)
await set_person_name(tx, node_id, name)
await tx.commit()
await tx.close()

async def create_person_node(tx):
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
Expand Down Expand Up @@ -447,6 +448,12 @@ This function is called one or more times, within a configurable time limit, unt
Results should be fully consumed within the function and only aggregate or status values should be returned.
Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.

This function will receive a :class:`neo4j.AsyncManagedTransaction` object as its first parameter.

.. autoclass:: neo4j.AsyncManagedTransaction

.. automethod:: run

Example:

.. code-block:: python
Expand Down Expand Up @@ -505,4 +512,4 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla

.. automethod:: closed

See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.
4 changes: 4 additions & 0 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"AsyncDriver",
"AsyncGraphDatabase",
"AsyncNeo4jDriver",
"AsyncManagedTransaction",
"AsyncResult",
"AsyncSession",
"AsyncTransaction",
Expand All @@ -42,6 +43,7 @@
"IPv4Address",
"IPv6Address",
"kerberos_auth",
"ManagedTransaction",
"Neo4jDriver",
"PoolConfig",
"Query",
Expand Down Expand Up @@ -72,6 +74,7 @@
AsyncNeo4jDriver,
)
from ._async.work import (
AsyncManagedTransaction,
AsyncResult,
AsyncSession,
AsyncTransaction,
Expand All @@ -83,6 +86,7 @@
Neo4jDriver,
)
from ._sync.work import (
ManagedTransaction,
Result,
Session,
Transaction,
Expand Down
6 changes: 5 additions & 1 deletion neo4j/_async/work/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
from .session import (
AsyncResult,
AsyncSession,
AsyncTransaction,
AsyncWorkspace,
)
from .transaction import (
AsyncManagedTransaction,
AsyncTransaction,
)


__all__ = [
"AsyncResult",
"AsyncSession",
"AsyncTransaction",
"AsyncManagedTransaction",
"AsyncWorkspace",
]
23 changes: 14 additions & 9 deletions neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# limitations under the License.


import asyncio
from logging import getLogger
from random import random
from time import perf_counter
Expand Down Expand Up @@ -44,7 +43,10 @@
)
from ...work import Query
from .result import AsyncResult
from .transaction import AsyncTransaction
from .transaction import (
AsyncManagedTransaction,
AsyncTransaction,
)
from .workspace import AsyncWorkspace


Expand Down Expand Up @@ -157,8 +159,9 @@ async def close(self):
self._state_failed = True

if self._transaction:
if self._transaction.closed() is False:
await self._transaction.rollback() # roll back the transaction if it is not closed
if self._transaction._closed() is False:
# roll back the transaction if it is not closed
await self._transaction._rollback()
self._transaction = None

try:
Expand Down Expand Up @@ -306,7 +309,7 @@ async def last_bookmarks(self):
if self._auto_result:
await self._auto_result.consume()

if self._transaction and self._transaction._closed:
if self._transaction and self._transaction._closed():
self._collect_bookmark(self._transaction._bookmark)
self._transaction = None

Expand All @@ -323,10 +326,10 @@ async def _transaction_error_handler(self, _):
self._transaction = None
await self._disconnect()

async def _open_transaction(self, *, access_mode, metadata=None,
async def _open_transaction(self, *, tx_cls, access_mode, metadata=None,
timeout=None):
await self._connect(access_mode=access_mode)
self._transaction = AsyncTransaction(
self._transaction = tx_cls(
self._connection, self._config.fetch_size,
self._transaction_closed_handler,
self._transaction_error_handler
Expand Down Expand Up @@ -372,6 +375,7 @@ async def begin_transaction(self, metadata=None, timeout=None):
raise TransactionError("Explicit transaction already open")

await self._open_transaction(
tx_cls=AsyncTransaction,
access_mode=self._config.default_access_mode, metadata=metadata,
timeout=timeout
)
Expand All @@ -396,17 +400,18 @@ async def _run_transaction(
while True:
try:
await self._open_transaction(
tx_cls=AsyncManagedTransaction,
access_mode=access_mode, metadata=metadata,
timeout=timeout
)
tx = self._transaction
try:
result = await transaction_function(tx, *args, **kwargs)
except Exception:
await tx.close()
await tx._close()
raise
else:
await tx.commit()
await tx._commit()
except IncompleteCommit:
raise
except (ServiceUnavailable, SessionExpired) as error:
Expand Down
Loading