Skip to content

Commit 343470c

Browse files
committed
Merge pull request #21 from pontusmelke/tx
Support for transactions in js driver
2 parents 1eff84f + cdedd34 commit 343470c

File tree

8 files changed

+531
-23
lines changed

8 files changed

+531
-23
lines changed

README.md

+33-8
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ session
4747
.subscribe({
4848
onNext: function(record) {
4949
console.log(record);
50-
},
50+
},
5151
onCompleted: function() {
5252
// Completed!
5353
session.close();
54-
},
54+
},
5555
onError: function(error) {
5656
console.log(error);
5757
}
@@ -72,6 +72,31 @@ session
7272
.catch(function(error) {
7373
console.log(error);
7474
});
75+
76+
//run statement in a transaction
77+
var tx = session.beginTransaction();
78+
tx.run("CREATE (alice {name : {nameParam} })", { nameParam:'Alice'}");
79+
tx.run("MATCH (alice {name : {nameParam} }) RETURN alice.age", { nameParam:'Alice'}");
80+
//decide if the transaction should be committed or rolled back
81+
var success = ...
82+
...
83+
if (success) {
84+
tx.commit()
85+
.subscribe({
86+
onCompleted: function() {
87+
// Completed!
88+
session.close();
89+
},
90+
onError: function(error) {
91+
console.log(error);
92+
}
93+
});
94+
} else {
95+
//transaction is rolled black nothing is created in the database
96+
tx.rollback();
97+
}
98+
99+
75100
```
76101

77102
## Building
@@ -89,25 +114,25 @@ See files under `examples/` on how to use.
89114
This runs the test suite against a fresh download of Neo4j.
90115
Or `npm test` if you already have a running version of a compatible Neo4j server.
91116

92-
For development, you can have the build tool rerun the tests each time you change
117+
For development, you can have the build tool rerun the tests each time you change
93118
the source code:
94119

95120
gulp watch-n-test
96121

97122
### Testing on windows
98-
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
99-
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
100-
The admin right is required to start/stop Neo4j properly as a system service.
123+
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
124+
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
125+
The admin right is required to start/stop Neo4j properly as a system service.
101126
While there is no need to grab admin right if you are running tests against an existing Neo4j server using `npm test`.
102127

103128
## A note on numbers and the Integer type
104129
The Neo4j type system includes 64-bit integer values.
105-
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
130+
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
106131
In order to support the full Neo4j type system, the driver includes an explicit Integer types.
107132
Any time the driver recieves an Integer value from Neo4j, it will be represented with the Integer type by the driver.
108133

109134
### Write integers
110-
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
135+
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
111136
To write the `age` as an integer the `neo4j.int` method should be used:
112137

113138
```javascript

src/v1/internal/connector.js

+15-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class Connection {
190190
// this to the dechunker
191191
self._ch.onmessage = (buf) => {
192192
self._dechunker.write(buf);
193-
}
193+
};
194194

195195
if( buf.hasRemaining() ) {
196196
self._dechunker.write(buf.readSlice( buf.remaining() ));
@@ -219,6 +219,7 @@ class Connection {
219219
}
220220

221221
_handleMessage( msg ) {
222+
222223
switch( msg.signature ) {
223224
case RECORD:
224225
this._currentObserver.onNext( msg.fields[0] );
@@ -233,6 +234,7 @@ class Connection {
233234
case FAILURE:
234235
try {
235236
this._currentObserver.onError( msg );
237+
this._errorMsg = msg;
236238
} finally {
237239
this._currentObserver = this._pendingObservers.shift();
238240
// Things are now broken. Pending observers will get FAILURE messages routed until
@@ -256,6 +258,18 @@ class Connection {
256258
}
257259
}
258260
break;
261+
case IGNORED:
262+
try {
263+
if (this._errorMsg)
264+
this._currentObserver.onError(this._errorMsg);
265+
else
266+
this._currentObserver.onError(msg);
267+
} finally {
268+
this._currentObserver = this._pendingObservers.shift();
269+
}
270+
break;
271+
default:
272+
console.log("UNKNOWN MESSAGE: ", msg);
259273
}
260274
}
261275

src/v1/internal/stream-observer.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
20-
/**
19+
20+
/**
2121
* Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses
2222
* in a way that a user-provided observer can see these as a clean Stream
2323
* of records.
@@ -106,7 +106,7 @@ class StreamObserver {
106106
if( this._queuedRecords.length > 0 ) {
107107
for (var i = 0; i < _queuedRecords.length; i++) {
108108
observer.onNext( _queuedRecords[i] );
109-
};
109+
}
110110
}
111111
if( this._tail ) {
112112
observer.onCompleted( this._tail );

src/v1/result.js

+4-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Result {
3838
this._p = null;
3939
this._statement = statement;
4040
this._parameters = parameters;
41-
this.summary = {}
41+
this.summary = {};
4242
}
4343

4444
/**
@@ -56,7 +56,7 @@ class Result {
5656
onNext: (record) => { records.push(record); },
5757
onCompleted: () => { resolve(records); },
5858
onError: (error) => { reject(error); }
59-
}
59+
};
6060
self.subscribe(observer);
6161
});
6262
}
@@ -99,7 +99,7 @@ class Result {
9999
let onCompletedWrapper = (metadata) => {
100100
this.summary = new ResultSummary(this._statement, this._parameters, metadata);
101101
onCompletedOriginal.call(observer);
102-
}
102+
};
103103
observer.onCompleted = onCompletedWrapper;
104104
this._streamObserver.subscribe(observer);
105105
}
@@ -113,6 +113,4 @@ class Result {
113113
}
114114
}
115115

116-
export default {
117-
Result
118-
}
116+
export default Result

src/v1/session.js

+30-5
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
19+
2020
import StreamObserver from './internal/stream-observer';
21-
import {Result} from './result';
21+
import Result from './result';
22+
import Transaction from './transaction';
2223

2324
/**
2425
* A Session instance is used for handling the connection and
@@ -35,6 +36,7 @@ class Session {
3536
constructor( conn, onClose ) {
3637
this._conn = conn;
3738
this._onClose = onClose;
39+
this._hasTx = false;
3840
}
3941

4042
/**
@@ -51,12 +53,34 @@ class Session {
5153
statement = statement.text;
5254
}
5355
let streamObserver = new StreamObserver();
54-
this._conn.run( statement, parameters || {}, streamObserver );
55-
this._conn.pullAll( streamObserver );
56-
this._conn.sync();
56+
if (!this._hasTx) {
57+
this._conn.run(statement, parameters || {}, streamObserver);
58+
this._conn.pullAll(streamObserver);
59+
this._conn.sync();
60+
} else {
61+
streamObserver.onError({error: "Please close the currently open transaction object before running " +
62+
"more statements/transactions in the current session." });
63+
}
5764
return new Result( streamObserver, statement, parameters );
5865
}
5966

67+
/**
68+
* Begin a new transaction in this session. A session can have at most one transaction running at a time, if you
69+
* want to run multiple concurrent transactions, you should use multiple concurrent sessions.
70+
*
71+
* While a transaction is open the session cannot be used to run statements.
72+
*
73+
* @returns {Transaction} - New Transaction
74+
*/
75+
beginTransaction() {
76+
if (this._hasTx) {
77+
throw new Error("Cannot have multiple transactions open for the session. Use multiple sessions or close the transaction before opening a new one.")
78+
}
79+
80+
this._hasTx = true;
81+
return new Transaction(this._conn, () => {this._hasTx = false});
82+
}
83+
6084
/**
6185
* Close connection
6286
* @param {function()} cb - Function to be called on connection close
@@ -67,4 +91,5 @@ class Session {
6791
this._conn.close(cb);
6892
}
6993
}
94+
7095
export default Session;

0 commit comments

Comments
 (0)