Skip to content

Commit e0bb062

Browse files
authored
Merge pull request #250 from lutovich/1.4-byte-arrays
Byte array support
2 parents 4578c2d + b44296c commit e0bb062

22 files changed

+661
-240
lines changed

gulpfile.babel.js

+15-4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ var file = require('gulp-file');
4848
var semver = require('semver');
4949
var sharedNeo4j = require('./test/internal/shared-neo4j').default;
5050

51+
/**
52+
* Useful to investigate resource leaks in tests. Enable to see active sockets and file handles after the 'test' task.
53+
*/
54+
var enableActiveNodeHandlesLogging = false;
55+
5156
gulp.task('default', ["test"]);
5257

5358
gulp.task('browser', function(cb){
@@ -165,15 +170,15 @@ gulp.task('test-nodejs', ['install-driver-into-sandbox'], function () {
165170
.pipe(jasmine({
166171
includeStackTrace: true,
167172
verbose: true
168-
}));
173+
})).on('end', logActiveNodeHandles);
169174
});
170175

171176
gulp.task('test-boltkit', ['nodejs'], function () {
172177
return gulp.src('test/**/*.boltkit.it.js')
173178
.pipe(jasmine({
174179
includeStackTrace: true,
175180
verbose: true
176-
}));
181+
})).on('end', logActiveNodeHandles);
177182
});
178183

179184
gulp.task('test-browser', function (cb) {
@@ -210,7 +215,7 @@ gulp.task('run-tck', ['download-tck', 'nodejs'], function() {
210215
'steps': 'test/v1/tck/steps/*.js',
211216
'format': 'progress',
212217
'tags' : ['~@fixed_session_pool', '~@db', '~@equality', '~@streaming_and_cursor_navigation']
213-
}));
218+
})).on('end', logActiveNodeHandles);
214219
});
215220

216221
/** Set the project version, controls package.json and version.js */
@@ -248,5 +253,11 @@ gulp.task('run-stress-tests', function () {
248253
.pipe(jasmine({
249254
includeStackTrace: true,
250255
verbose: true
251-
}));
256+
})).on('end', logActiveNodeHandles);
252257
});
258+
259+
function logActiveNodeHandles() {
260+
if (enableActiveNodeHandlesLogging) {
261+
console.log('-- Active NodeJS handles START\n', process._getActiveHandles(), '\n-- Active NodeJS handles END');
262+
}
263+
}

src/v1/driver.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class Driver {
6262
/**
6363
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
6464
* @type {ConnectionProvider}
65-
* @private
65+
* @protected
6666
*/
6767
this._connectionProvider = null;
6868
}

src/v1/internal/ch-dummy.js

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
import {CombinedBuffer} from './buf';
21+
2122
const observer = {
2223
instance: null,
2324
updateInstance: (instance) => {
@@ -55,6 +56,13 @@ class DummyChannel {
5556
toBuffer () {
5657
return new CombinedBuffer( this.written );
5758
}
59+
60+
close(cb) {
61+
this.written = [];
62+
if (cb) {
63+
return cb();
64+
}
65+
}
5866
}
5967

6068
const channel = DummyChannel;

src/v1/internal/connection-holder.js

+7-3
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@ export default class ConnectionHolder {
4949

5050
/**
5151
* Get the current connection promise.
52+
* @param {StreamObserver} streamObserver an observer for this connection.
5253
* @return {Promise<Connection>} promise resolved with the current connection.
5354
*/
54-
getConnection() {
55-
return this._connectionPromise;
55+
getConnection(streamObserver) {
56+
return this._connectionPromise.then(connection => {
57+
streamObserver.resolveConnection(connection);
58+
return connection.initializationCompleted();
59+
});
5660
}
5761

5862
/**
@@ -117,7 +121,7 @@ class EmptyConnectionHolder extends ConnectionHolder {
117121
// nothing to initialize
118122
}
119123

120-
getConnection() {
124+
getConnection(streamObserver) {
121125
return Promise.reject(newError('This connection holder does not serve connections'));
122126
}
123127

src/v1/internal/connector.js

+46-27
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {newError} from './../error';
2727
import ChannelConfig from './ch-config';
2828
import {parseHost, parsePort} from './util';
2929
import StreamObserver from './stream-observer';
30+
import {ServerVersion, VERSION_3_2_0} from './server-version';
3031

3132
let Channel;
3233
if( NodeChannel.available ) {
@@ -472,8 +473,18 @@ class Connection {
472473
return this._packer.packable(value, (err) => this._handleFatalError(err));
473474
}
474475

475-
setServerVersion(version) {
476-
this.server.version = version;
476+
/**
477+
* @protected
478+
*/
479+
_markInitialized(metadata) {
480+
const serverVersion = metadata ? metadata.server : null;
481+
if (!this.server.version) {
482+
this.server.version = serverVersion;
483+
const version = ServerVersion.fromString(serverVersion);
484+
if (version.compareTo(VERSION_3_2_0) < 0) {
485+
this._packer.disableByteArrays();
486+
}
487+
}
477488
}
478489
}
479490

@@ -486,11 +497,15 @@ class ConnectionState {
486497
constructor(connection) {
487498
this._connection = connection;
488499

489-
this._initialized = false;
490-
this._initializationError = null;
500+
this._initRequested = false;
501+
this._initError = null;
491502

492-
this._resolvePromise = null;
493-
this._rejectPromise = null;
503+
this._resolveInitPromise = null;
504+
this._rejectInitPromise = null;
505+
this._initPromise = new Promise((resolve, reject) => {
506+
this._resolveInitPromise = resolve;
507+
this._rejectInitPromise = reject;
508+
});
494509
}
495510

496511
/**
@@ -507,11 +522,7 @@ class ConnectionState {
507522
}
508523
},
509524
onError: error => {
510-
this._initializationError = error;
511-
if (this._rejectPromise) {
512-
this._rejectPromise(error);
513-
this._rejectPromise = null;
514-
}
525+
this._processFailure(error);
515526

516527
this._connection._updateCurrentObserver(); // make sure this same observer will not be called again
517528
try {
@@ -523,14 +534,9 @@ class ConnectionState {
523534
}
524535
},
525536
onCompleted: metaData => {
526-
if (metaData && metaData.server) {
527-
this._connection.setServerVersion(metaData.server);
528-
}
529-
this._initialized = true;
530-
if (this._resolvePromise) {
531-
this._resolvePromise(this._connection);
532-
this._resolvePromise = null;
533-
}
537+
this._connection._markInitialized(metaData);
538+
this._resolveInitPromise(this._connection);
539+
534540
if (observer && observer.onCompleted) {
535541
observer.onCompleted(metaData);
536542
}
@@ -543,15 +549,28 @@ class ConnectionState {
543549
* @return {Promise<Connection>} the result of connection initialization.
544550
*/
545551
initializationCompleted() {
546-
if (this._initialized) {
547-
return Promise.resolve(this._connection);
548-
} else if (this._initializationError) {
549-
return Promise.reject(this._initializationError);
552+
this._initRequested = true;
553+
554+
if (this._initError) {
555+
const error = this._initError;
556+
this._initError = null; // to reject initPromise only once
557+
this._rejectInitPromise(error);
558+
}
559+
560+
return this._initPromise;
561+
}
562+
563+
/**
564+
* @private
565+
*/
566+
_processFailure(error) {
567+
if (this._initRequested) {
568+
// someone is waiting for initialization to complete, reject the promise
569+
this._rejectInitPromise(error);
550570
} else {
551-
return new Promise((resolve, reject) => {
552-
this._resolvePromise = resolve;
553-
this._rejectPromise = reject;
554-
});
571+
// no one is waiting for initialization, memorize the error but do not reject the promise
572+
// to avoid unnecessary unhandled promise rejection warnings
573+
this._initError = error;
555574
}
556575
}
557576
}

0 commit comments

Comments
 (0)