Skip to content

Commit c6d0a17

Browse files
committed
move transaction-executor to core
1 parent 34bb0e0 commit c6d0a17

File tree

3 files changed

+298
-239
lines changed

3 files changed

+298
-239
lines changed

core/src/internal/index.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import * as bookmark from './bookmark'
2424
import * as constants from './constants'
2525
import * as connectionHolder from './connection-holder'
2626
import * as txConfig from './tx-config'
27+
import * as transactionExecutor from './transaction-executor'
2728

2829
export {
2930
util,
@@ -32,5 +33,6 @@ export {
3233
bookmark,
3334
constants,
3435
connectionHolder,
35-
txConfig
36+
txConfig,
37+
transactionExecutor
3638
}
+290
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {
21+
newError,
22+
Neo4jError,
23+
SERVICE_UNAVAILABLE,
24+
SESSION_EXPIRED
25+
} from '../error'
26+
import Transaction from '../transaction'
27+
28+
const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000 // 30 seconds
29+
const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000 // 1 seconds
30+
const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0
31+
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2
32+
33+
type TransactionCreator = () => Transaction
34+
type TransactionWork<T> = (tx: Transaction) => T | Promise<T>
35+
type Resolve<T> = (value: T | PromiseLike<T>) => void
36+
type Reject = (value: any) => void
37+
38+
export class TransactionExecutor {
39+
private _maxRetryTimeMs: number
40+
private _initialRetryDelayMs: number
41+
private _multiplier: number
42+
private _jitterFactor: number
43+
private _inFlightTimeoutIds: NodeJS.Timeout[]
44+
45+
constructor(
46+
maxRetryTimeMs?: number,
47+
initialRetryDelayMs?: number,
48+
multiplier?: number,
49+
jitterFactor?: number
50+
) {
51+
this._maxRetryTimeMs = _valueOrDefault(
52+
maxRetryTimeMs,
53+
DEFAULT_MAX_RETRY_TIME_MS
54+
)
55+
this._initialRetryDelayMs = _valueOrDefault(
56+
initialRetryDelayMs,
57+
DEFAULT_INITIAL_RETRY_DELAY_MS
58+
)
59+
this._multiplier = _valueOrDefault(
60+
multiplier,
61+
DEFAULT_RETRY_DELAY_MULTIPLIER
62+
)
63+
this._jitterFactor = _valueOrDefault(
64+
jitterFactor,
65+
DEFAULT_RETRY_DELAY_JITTER_FACTOR
66+
)
67+
68+
this._inFlightTimeoutIds = []
69+
70+
this._verifyAfterConstruction()
71+
}
72+
73+
execute<T>(
74+
transactionCreator: TransactionCreator,
75+
transactionWork: TransactionWork<T>
76+
): Promise<T> {
77+
return new Promise<T>((resolve, reject) => {
78+
this._executeTransactionInsidePromise(
79+
transactionCreator,
80+
transactionWork,
81+
resolve,
82+
reject
83+
)
84+
}).catch(error => {
85+
const retryStartTimeMs = Date.now()
86+
const retryDelayMs = this._initialRetryDelayMs
87+
return this._retryTransactionPromise(
88+
transactionCreator,
89+
transactionWork,
90+
error,
91+
retryStartTimeMs,
92+
retryDelayMs
93+
)
94+
})
95+
}
96+
97+
close() {
98+
// cancel all existing timeouts to prevent further retries
99+
this._inFlightTimeoutIds.forEach(timeoutId => clearTimeout(timeoutId))
100+
this._inFlightTimeoutIds = []
101+
}
102+
103+
_retryTransactionPromise<T>(
104+
transactionCreator: TransactionCreator,
105+
transactionWork: TransactionWork<T>,
106+
error: Error,
107+
retryStartTime: number,
108+
retryDelayMs: number
109+
): Promise<T> {
110+
const elapsedTimeMs = Date.now() - retryStartTime
111+
112+
if (
113+
elapsedTimeMs > this._maxRetryTimeMs ||
114+
!TransactionExecutor._canRetryOn(error)
115+
) {
116+
return Promise.reject(error)
117+
}
118+
119+
return new Promise<T>((resolve, reject) => {
120+
const nextRetryTime = this._computeDelayWithJitter(retryDelayMs)
121+
const timeoutId = setTimeout(() => {
122+
// filter out this timeoutId when time has come and function is being executed
123+
this._inFlightTimeoutIds = this._inFlightTimeoutIds.filter(
124+
id => id !== timeoutId
125+
)
126+
this._executeTransactionInsidePromise(
127+
transactionCreator,
128+
transactionWork,
129+
resolve,
130+
reject
131+
)
132+
}, nextRetryTime)
133+
// add newly created timeoutId to the list of all in-flight timeouts
134+
this._inFlightTimeoutIds.push(timeoutId)
135+
}).catch(error => {
136+
const nextRetryDelayMs = retryDelayMs * this._multiplier
137+
return this._retryTransactionPromise(
138+
transactionCreator,
139+
transactionWork,
140+
error,
141+
retryStartTime,
142+
nextRetryDelayMs
143+
)
144+
})
145+
}
146+
147+
_executeTransactionInsidePromise<T>(
148+
transactionCreator: TransactionCreator,
149+
transactionWork: TransactionWork<T>,
150+
resolve: Resolve<T>,
151+
reject: Reject
152+
): void {
153+
let tx: Transaction
154+
try {
155+
tx = transactionCreator()
156+
} catch (error) {
157+
// failed to create a transaction
158+
reject(error)
159+
return
160+
}
161+
162+
const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork)
163+
164+
resultPromise
165+
.then(result =>
166+
this._handleTransactionWorkSuccess(result, tx, resolve, reject)
167+
)
168+
.catch(error => this._handleTransactionWorkFailure(error, tx, reject))
169+
}
170+
171+
_safeExecuteTransactionWork<T>(
172+
tx: Transaction,
173+
transactionWork: TransactionWork<T>
174+
): Promise<T> {
175+
try {
176+
const result = transactionWork(tx)
177+
// user defined callback is supposed to return a promise, but it might not; so to protect against an
178+
// incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
179+
// validation step without type checks
180+
return Promise.resolve(result)
181+
} catch (error) {
182+
return Promise.reject(error)
183+
}
184+
}
185+
186+
_handleTransactionWorkSuccess<T>(
187+
result: T,
188+
tx: Transaction,
189+
resolve: Resolve<T>,
190+
reject: Reject
191+
) {
192+
if (tx.isOpen()) {
193+
// transaction work returned resolved promise and transaction has not been committed/rolled back
194+
// try to commit the transaction
195+
tx.commit()
196+
.then(() => {
197+
// transaction was committed, return result to the user
198+
resolve(result)
199+
})
200+
.catch(error => {
201+
// transaction failed to commit, propagate the failure
202+
reject(error)
203+
})
204+
} else {
205+
// transaction work returned resolved promise and transaction is already committed/rolled back
206+
// return the result returned by given transaction work
207+
resolve(result)
208+
}
209+
}
210+
211+
_handleTransactionWorkFailure(error: any, tx: Transaction, reject: Reject) {
212+
if (tx.isOpen()) {
213+
// transaction work failed and the transaction is still open, roll it back and propagate the failure
214+
tx.rollback()
215+
.catch(ignore => {
216+
// ignore the rollback error
217+
})
218+
.then(() => reject(error)) // propagate the original error we got from the transaction work
219+
} else {
220+
// transaction is already rolled back, propagate the error
221+
reject(error)
222+
}
223+
}
224+
225+
_computeDelayWithJitter(delayMs: number): number {
226+
const jitter = delayMs * this._jitterFactor
227+
const min = delayMs - jitter
228+
const max = delayMs + jitter
229+
return Math.random() * (max - min) + min
230+
}
231+
232+
static _canRetryOn(error: any): boolean {
233+
return (
234+
error &&
235+
error instanceof Neo4jError &&
236+
error.code &&
237+
(error.code === SERVICE_UNAVAILABLE ||
238+
error.code === SESSION_EXPIRED ||
239+
this._isTransientError(error))
240+
)
241+
}
242+
243+
static _isTransientError(error: Neo4jError): boolean {
244+
// Retries should not happen when transaction was explicitly terminated by the user.
245+
// Termination of transaction might result in two different error codes depending on where it was
246+
// terminated. These are really client errors but classification on the server is not entirely correct and
247+
// they are classified as transient.
248+
249+
const code = error.code
250+
if (code.indexOf('TransientError') >= 0) {
251+
if (
252+
code === 'Neo.TransientError.Transaction.Terminated' ||
253+
code === 'Neo.TransientError.Transaction.LockClientStopped'
254+
) {
255+
return false
256+
}
257+
return true
258+
}
259+
return false
260+
}
261+
262+
_verifyAfterConstruction() {
263+
if (this._maxRetryTimeMs < 0) {
264+
throw newError('Max retry time should be >= 0: ' + this._maxRetryTimeMs)
265+
}
266+
if (this._initialRetryDelayMs < 0) {
267+
throw newError(
268+
'Initial retry delay should >= 0: ' + this._initialRetryDelayMs
269+
)
270+
}
271+
if (this._multiplier < 1.0) {
272+
throw newError('Multiplier should be >= 1.0: ' + this._multiplier)
273+
}
274+
if (this._jitterFactor < 0 || this._jitterFactor > 1) {
275+
throw newError(
276+
'Jitter factor should be in [0.0, 1.0]: ' + this._jitterFactor
277+
)
278+
}
279+
}
280+
}
281+
282+
function _valueOrDefault(
283+
value: number | undefined,
284+
defaultValue: number
285+
): number {
286+
if (value || value === 0) {
287+
return value
288+
}
289+
return defaultValue
290+
}

0 commit comments

Comments
 (0)