Skip to content

Commit 34bb0e0

Browse files
committed
moving transaction to core
1 parent 6bbaade commit 34bb0e0

13 files changed

+108
-541
lines changed

core/src/index.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import ResultSummary, {
6666
} from './result-summary'
6767
import Result, { QueryResult, ResultObserver } from './result'
6868
import ConnectionProvider from './connection-provider'
69+
import Transaction from './transaction'
6970
import * as internal from './internal' // todo: removed afterwards
7071

7172
/**
@@ -123,7 +124,8 @@ const forExport = {
123124
Plan,
124125
ProfiledPlan,
125126
QueryStatistics,
126-
Result
127+
Result,
128+
Transaction
127129
}
128130

129131
export {
@@ -175,7 +177,8 @@ export {
175177
Result,
176178
QueryResult,
177179
ResultObserver,
178-
ConnectionProvider
180+
ConnectionProvider,
181+
Transaction
179182
}
180183

181184
export default forExport

core/src/internal/observers.ts

+75-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20+
import { observer } from '.'
2021
import Record from '../record'
2122
import ResultSummary from '../result-summary'
2223

@@ -34,7 +35,7 @@ interface StreamObserver {
3435
* to it's onError method, otherwise set instance variable _error.
3536
* @param {Object} error - An error object
3637
*/
37-
onError?: (error: Error) => void
38+
onError: (error: Error) => void
3839
onCompleted?: (meta: any) => void
3940
}
4041

@@ -58,9 +59,9 @@ interface ResultObserver {
5859

5960
/**
6061
* Called when the result is fully received
61-
* @param {ResultSummary} summary The result summary
62+
* @param {ResultSummary| any} summary The result summary
6263
*/
63-
onCompleted?: (summary: ResultSummary) => void
64+
onCompleted?: (summary: ResultSummary | any) => void
6465

6566
/**
6667
* Called when some error occurs during the result proccess or query execution
@@ -105,4 +106,74 @@ export interface ResultStreamObserver extends StreamObserver {
105106
subscribe(observer: ResultObserver): void
106107
}
107108

108-
// @todo implement observers
109+
export class CompletedObserver implements ResultStreamObserver {
110+
subscribe(observer: ResultObserver): void {
111+
apply(observer, observer.onKeys, [])
112+
apply(observer, observer.onCompleted, {})
113+
}
114+
115+
cancel(): void {
116+
// do nothing
117+
}
118+
119+
prepareToHandleSingleResponse(): void {
120+
// do nothing
121+
}
122+
123+
markCompleted(): void {
124+
// do nothing
125+
}
126+
127+
onError(error: Error): void {
128+
// nothing to do, already finished
129+
throw Error('CompletedObserver not supposed to call onError')
130+
}
131+
}
132+
133+
export class FailedObserver implements ResultStreamObserver {
134+
private _error: Error
135+
private _beforeError?: (error: Error) => void
136+
private _observers: ResultObserver[]
137+
138+
constructor({
139+
error,
140+
onError
141+
}: {
142+
error: Error
143+
onError?: (error: Error) => void | Promise<void>
144+
}) {
145+
this._error = error
146+
this._beforeError = onError
147+
this._observers = []
148+
this.onError(error)
149+
}
150+
151+
subscribe(observer: ResultObserver): void {
152+
apply(observer, observer.onError, this._error)
153+
this._observers.push(observer)
154+
}
155+
156+
onError(error: Error): void {
157+
Promise.resolve(apply(this, this._beforeError, error)).then(() =>
158+
this._observers.forEach(o => apply(o, o.onError, error))
159+
)
160+
}
161+
162+
cancel(): void {
163+
// do nothing
164+
}
165+
166+
prepareToHandleSingleResponse(): void {
167+
// do nothing
168+
}
169+
170+
markCompleted(): void {
171+
// do nothing
172+
}
173+
}
174+
175+
function apply<T>(thisArg: any, func?: (param: T) => void, param?: T): void {
176+
if (func) {
177+
func.bind(thisArg)(param)
178+
}
179+
}

core/src/result.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import ResultSummary from './result-summary'
2121
import Record from './record'
2222
import { Query } from './types'
2323
import { observer, util, connectionHolder } from './internal'
24+
import { CompletedObserver, FailedObserver } from './internal/observers'
2425

2526
const { EMPTY_CONNECTION_HOLDER } = connectionHolder
2627

@@ -288,7 +289,9 @@ class Result implements Promise<QueryResult> {
288289
}
289290
observer.onError = onErrorWrapper
290291

291-
this._streamObserverPromise.then(o => o.subscribe(observer))
292+
this._streamObserverPromise.then(o => {
293+
return o.subscribe(observer)
294+
})
292295
}
293296

294297
/**

core/src/transacation.ts renamed to core/src/transaction.ts

+8-9
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,10 @@ class Transaction {
9393
_begin(bookmark: Bookmark | string | string[], txConfig: TxConfig) {
9494
this._connectionHolder
9595
.getConnection()
96-
.then(conn => {
97-
if (conn) {
98-
this._onConnection()
99-
100-
return conn.protocol().beginTransaction({
96+
.then(connection => {
97+
this._onConnection()
98+
if (connection) {
99+
return connection.protocol().beginTransaction({
101100
bookmark: bookmark,
102101
txConfig: txConfig,
103102
mode: this._connectionHolder.mode(),
@@ -120,7 +119,7 @@ class Transaction {
120119
* @param {Object} parameters - Map with parameters to use in query
121120
* @return {Result} New Result
122121
*/
123-
run(query: Query, parameters: any): Result {
122+
run(query: Query, parameters?: any): Result {
124123
const { validatedQuery, params } = validateQueryAndParameters(
125124
query,
126125
parameters
@@ -287,8 +286,8 @@ const _states = {
287286
const observerPromise = connectionHolder
288287
.getConnection()
289288
.then(conn => {
289+
onConnection()
290290
if (conn) {
291-
onConnection()
292291
return conn.protocol().run(query, parameters, {
293292
bookmark: Bookmark.empty(),
294293
txConfig: TxConfig.empty(),
@@ -507,7 +506,7 @@ function finishTransaction(
507506
onComplete: (metadata: any) => any,
508507
onConnection: () => any,
509508
pendingResults: Result[]
510-
) {
509+
): Result {
511510
const observerPromise = connectionHolder
512511
.getConnection()
513512
.then(connection => {
@@ -559,7 +558,7 @@ function newCompletedResult(
559558
query: Query,
560559
parameters: any,
561560
connectionHolder: ConnectionHolder = EMPTY_CONNECTION_HOLDER
562-
) {
561+
): Result {
563562
return new Result(
564563
Promise.resolve(observerPromise),
565564
query,

src/session.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
* limitations under the License.
1818
*/
1919
import { ResultStreamObserver, FailedObserver } from './internal/bolt'
20-
import Transaction from './transaction'
21-
import { newError, internal, Result } from 'neo4j-driver-core'
20+
import { newError, internal, Result, Transaction } from 'neo4j-driver-core'
2221
import ConnectionHolder from './internal/connection-holder'
2322
import { ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants'
2423
import TransactionExecutor from './internal/transaction-executor'

src/transaction-rx.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
import { Observable } from 'rxjs'
2020
import RxResult from './result-rx'
21-
import Transaction from './transaction'
21+
import Transaction from 'neo4j-driver-core'
2222

2323
/**
2424
* A reactive transaction, which provides the same functionality as {@link Transaction} but through a Reactive API.

0 commit comments

Comments
 (0)