Skip to content

Support transient but not terminated by user errors in retry #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions neo4j/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@

from neo4j.bolt import ProtocolError, ServiceUnavailable
from neo4j.compat import urlparse
from neo4j.exceptions import CypherError
from neo4j.exceptions import CypherError, TransientError

from .exceptions import DriverError, SessionError, SessionExpired, TransactionError


_warned_about_transaction_bookmarks = False


READ_ACCESS = "READ"
WRITE_ACCESS = "WRITE"

Expand All @@ -53,7 +51,6 @@ def retry_delay_generator(initial_delay, multiplier, jitter_factor):


class ValueSystem(object):

def hydrate(self, values):
""" Hydrate values from raw representations into client objects.
"""
Expand Down Expand Up @@ -435,6 +432,11 @@ def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs):
return unit_of_work(tx, *args, **kwargs)
except (ServiceUnavailable, SessionExpired) as error:
last_error = error
except TransientError as error:
if is_retriable_transientError(error):
last_error = error
else:
raise error
sleep(next(retry_delay))
t1 = clock()
raise last_error
Expand All @@ -461,6 +463,14 @@ def __bookmark__(self, result):
pass


def is_retriable_transientError(error):
"""
:type error: TransientError
"""
return not (error.code in ("Neo.TransientError.Transaction.Terminated",
"Neo.TransientError.Transaction.LockClientStopped"))


class Transaction(object):
""" Container for multiple Cypher queries to be executed within
a single context. Transactions can be used within a :py:const:`with`
Expand Down
20 changes: 20 additions & 0 deletions test/stub/scripts/user_canceled_tx.script.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
!: AUTO INIT
!: AUTO RESET

C: RUN "BEGIN" {}
PULL_ALL
S: SUCCESS {"fields": []}
SUCCESS {}

C: RUN "RETURN 1" {}
PULL_ALL
S: FAILURE {"code": "Neo.TransientError.Transaction.LockClientStopped", "message": "X"}
IGNORED {}

C: ACK_FAILURE
S: SUCCESS {}

C: RUN "ROLLBACK" {}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
24 changes: 23 additions & 1 deletion test/stub/test_accesslevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# limitations under the License.


from neo4j.v1 import GraphDatabase, CypherError
from neo4j.v1 import GraphDatabase, CypherError, TransientError

from test.stub.tools import StubTestCase, StubCluster

Expand Down Expand Up @@ -159,3 +159,25 @@ def unit_of_work_2(tx):
assert value == 1
value = session.read_transaction(unit_of_work_2)
assert value == 2

def test_no_retry_read_on_user_canceled_tx(self):
with StubCluster({9001: "router.script", 9004: "user_canceled_tx.script.script"}):
uri = "bolt+routing://127.0.0.1:9001"
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
with driver.session() as session:
def unit_of_work(tx):
tx.run("RETURN 1")

with self.assertRaises(TransientError):
_ = session.read_transaction(unit_of_work)

def test_no_retry_write_on_user_canceled_tx(self):
with StubCluster({9001: "router.script", 9006: "user_canceled_tx.script.script"}):
uri = "bolt+routing://127.0.0.1:9001"
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
with driver.session() as session:
def unit_of_work(tx):
tx.run("RETURN 1")

with self.assertRaises(TransientError):
_ = session.write_transaction(unit_of_work)