Skip to content

Commit 4e33fc9

Browse files
committed
Fix purging of connections on INIT failure
Commit moves waiting for connection initialization promise higher in the stack - from `ConnectionProvider` to `ConnectionHolder`. This is needed to be able to notify stream observer about used connection even if initialization fails. It is later possible to purge connections for the given server. Previously stream observer did not get to know the connection and pool was not purged on connectivity errors.
1 parent 3c87a27 commit 4e33fc9

8 files changed

+112
-94
lines changed

src/v1/internal/connection-holder.js

+7-3
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@ export default class ConnectionHolder {
4949

5050
/**
5151
* Get the current connection promise.
52+
* @param {StreamObserver} streamObserver an observer for this connection.
5253
* @return {Promise<Connection>} promise resolved with the current connection.
5354
*/
54-
getConnection() {
55-
return this._connectionPromise;
55+
getConnection(streamObserver) {
56+
return this._connectionPromise.then(connection => {
57+
streamObserver.resolveConnection(connection);
58+
return connection.initializationCompleted();
59+
});
5660
}
5761

5862
/**
@@ -117,7 +121,7 @@ class EmptyConnectionHolder extends ConnectionHolder {
117121
// nothing to initialize
118122
}
119123

120-
getConnection() {
124+
getConnection(streamObserver) {
121125
return Promise.reject(newError('This connection holder does not serve connections'));
122126
}
123127

src/v1/internal/connection-providers.js

+5-19
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ export class DirectConnectionProvider extends ConnectionProvider {
5454
}
5555

5656
acquireConnection(mode) {
57-
const connectionPromise = acquireConnectionFromPool(this._connectionPool, this._address);
57+
const connection = this._connectionPool.acquire(this._address);
58+
const connectionPromise = Promise.resolve(connection);
5859
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
5960
}
6061
}
@@ -101,7 +102,7 @@ export class LoadBalancer extends ConnectionProvider {
101102
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${this._routingTable}`,
102103
SESSION_EXPIRED));
103104
}
104-
return acquireConnectionFromPool(this._connectionPool, address);
105+
return this._connectionPool.acquire(address);
105106
}
106107

107108
_freshRoutingTable(accessMode) {
@@ -198,9 +199,10 @@ export class LoadBalancer extends ConnectionProvider {
198199
}
199200

200201
_createSessionForRediscovery(routerAddress) {
202+
const connection = this._connectionPool.acquire(routerAddress);
201203
// initialized connection is required for routing procedure call
202204
// server version needs to be known to decide which routing procedure to use
203-
const initializedConnectionPromise = acquireConnectionFromPool(this._connectionPool, routerAddress);
205+
const initializedConnectionPromise = connection.initializationCompleted();
204206
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
205207
return new Session(READ, connectionProvider);
206208
}
@@ -261,19 +263,3 @@ export class SingleConnectionProvider extends ConnectionProvider {
261263
return connectionPromise;
262264
}
263265
}
264-
265-
// todo: test that all connection providers return initialized connections
266-
267-
/**
268-
* Acquire an initialized connection from the given connection pool for the given address. Returned connection
269-
* promise will be resolved by a connection which completed initialization, i.e. received a SUCCESS response
270-
* for it's INIT message.
271-
* @param {Pool} connectionPool the connection pool to acquire connection from.
272-
* @param {string} address the server address.
273-
* @return {Promise<Connection>} the initialized connection.
274-
*/
275-
function acquireConnectionFromPool(connectionPool, address) {
276-
const connection = connectionPool.acquire(address);
277-
// initialized connection is required to be able to perform subsequent server version checks
278-
return connection.initializationCompleted();
279-
}

src/v1/internal/connector.js

+33-23
Original file line numberDiff line numberDiff line change
@@ -498,11 +498,15 @@ class ConnectionState {
498498
constructor(connection) {
499499
this._connection = connection;
500500

501-
this._initialized = false;
502-
this._initializationError = null;
503-
504-
this._resolvePromise = null;
505-
this._rejectPromise = null;
501+
this._initRequested = false;
502+
this._initError = null;
503+
504+
this._resolveInitPromise = null;
505+
this._rejectInitPromise = null;
506+
this._initPromise = new Promise((resolve, reject) => {
507+
this._resolveInitPromise = resolve;
508+
this._rejectInitPromise = reject;
509+
});
506510
}
507511

508512
/**
@@ -519,11 +523,7 @@ class ConnectionState {
519523
}
520524
},
521525
onError: error => {
522-
this._initializationError = error;
523-
if (this._rejectPromise) {
524-
this._rejectPromise(error);
525-
this._rejectPromise = null;
526-
}
526+
this._processFailure(error);
527527

528528
this._connection._updateCurrentObserver(); // make sure this same observer will not be called again
529529
try {
@@ -536,11 +536,8 @@ class ConnectionState {
536536
},
537537
onCompleted: metaData => {
538538
this._connection._markInitialized(metaData);
539-
this._initialized = true;
540-
if (this._resolvePromise) {
541-
this._resolvePromise(this._connection);
542-
this._resolvePromise = null;
543-
}
539+
this._resolveInitPromise(this._connection);
540+
544541
if (observer && observer.onCompleted) {
545542
observer.onCompleted(metaData);
546543
}
@@ -553,15 +550,28 @@ class ConnectionState {
553550
* @return {Promise<Connection>} the result of connection initialization.
554551
*/
555552
initializationCompleted() {
556-
if (this._initialized) {
557-
return Promise.resolve(this._connection);
558-
} else if (this._initializationError) {
559-
return Promise.reject(this._initializationError);
553+
this._initRequested = true;
554+
555+
if (this._initError) {
556+
const error = this._initError;
557+
this._initError = null; // to reject initPromise only once
558+
this._rejectInitPromise(error);
559+
}
560+
561+
return this._initPromise;
562+
}
563+
564+
/**
565+
* @private
566+
*/
567+
_processFailure(error) {
568+
if (this._initRequested) {
569+
// someone is waiting for initialization to complete, reject the promise
570+
this._rejectInitPromise(error);
560571
} else {
561-
return new Promise((resolve, reject) => {
562-
this._resolvePromise = resolve;
563-
this._rejectPromise = reject;
564-
});
572+
// no one is waiting for initialization, memorize the error but do not reject the promise
573+
// to avoid unnecessary unhandled promise rejection warnings
574+
this._initError = error;
565575
}
566576
}
567577
}

src/v1/session.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ class Session {
7575
const connectionHolder = this._connectionHolderWithMode(this._mode);
7676
if (!this._hasTx) {
7777
connectionHolder.initializeConnection();
78-
connectionHolder.getConnection().then(connection => {
79-
streamObserver.resolveConnection(connection);
78+
connectionHolder.getConnection(streamObserver).then(connection => {
8079
statementRunner(connection, streamObserver);
8180
connection.pullAll(streamObserver);
8281
connection.sync();

src/v1/transaction.js

+7-11
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ class Transaction {
4343
params = {bookmark: bookmark};
4444
}
4545

46-
this._connectionHolder.getConnection().then(conn => {
47-
streamObserver.resolveConnection(conn);
46+
this._connectionHolder.getConnection(streamObserver).then(conn => {
4847
conn.run('BEGIN', params, streamObserver);
4948
conn.pullAll(streamObserver);
5049
}).catch(error => streamObserver.onError(error));
@@ -167,8 +166,7 @@ let _states = {
167166
return {result: _runPullAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK};
168167
},
169168
run: (connectionHolder, observer, statement, parameters) => {
170-
connectionHolder.getConnection().then(conn => {
171-
observer.resolveConnection(conn);
169+
connectionHolder.getConnection(observer).then(conn => {
172170
conn.run(statement, parameters || {}, observer);
173171
conn.pullAll(observer);
174172
conn.sync();
@@ -246,13 +244,11 @@ let _states = {
246244
};
247245

248246
function _runPullAll(msg, connectionHolder, observer) {
249-
connectionHolder.getConnection().then(
250-
conn => {
251-
observer.resolveConnection(conn);
252-
conn.run(msg, {}, observer);
253-
conn.pullAll(observer);
254-
conn.sync();
255-
}).catch(error => observer.onError(error));
247+
connectionHolder.getConnection(observer).then(conn => {
248+
conn.run(msg, {}, observer);
249+
conn.pullAll(observer);
250+
conn.sync();
251+
}).catch(error => observer.onError(error));
256252

257253
// for commit & rollback we need result that uses real connection holder and notifies it when
258254
// connection is not needed and can be safely released to the pool

test/internal/connection-holder.test.js

+49-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import ConnectionHolder, {EMPTY_CONNECTION_HOLDER} from '../../src/v1/internal/c
2121
import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers';
2222
import {READ} from '../../src/v1/driver';
2323
import FakeConnection from './fake-connection';
24+
import StreamObserver from '../../src/v1/internal/stream-observer';
2425

2526
describe('EmptyConnectionHolder', () => {
2627

2728
it('should return rejected promise instead of connection', done => {
28-
EMPTY_CONNECTION_HOLDER.getConnection().catch(() => {
29+
EMPTY_CONNECTION_HOLDER.getConnection(new StreamObserver()).catch(() => {
2930
done();
3031
});
3132
});
@@ -62,8 +63,40 @@ describe('ConnectionHolder', () => {
6263

6364
connectionHolder.initializeConnection();
6465

65-
connectionHolder.getConnection().then(connection => {
66-
expect(connection).toBe(connection);
66+
connectionHolder.getConnection(new StreamObserver()).then(conn => {
67+
expect(conn).toBe(connection);
68+
verifyConnectionInitialized(conn);
69+
done();
70+
});
71+
});
72+
73+
it('should make stream observer aware about connection when initialization successful', done => {
74+
const connection = new FakeConnection().withServerVersion('Neo4j/9.9.9');
75+
const connectionProvider = newSingleConnectionProvider(connection);
76+
const connectionHolder = new ConnectionHolder(READ, connectionProvider);
77+
const streamObserver = new StreamObserver();
78+
79+
connectionHolder.initializeConnection();
80+
81+
connectionHolder.getConnection(streamObserver).then(conn => {
82+
verifyConnectionInitialized(conn);
83+
verifyConnection(streamObserver, 'Neo4j/9.9.9');
84+
done();
85+
});
86+
});
87+
88+
it('should make stream observer aware about connection when initialization fails', done => {
89+
const connection = new FakeConnection().withServerVersion('Neo4j/7.7.7').withFailedInitialization(new Error('Oh!'));
90+
const connectionProvider = newSingleConnectionProvider(connection);
91+
const connectionHolder = new ConnectionHolder(READ, connectionProvider);
92+
const streamObserver = new StreamObserver();
93+
94+
connectionHolder.initializeConnection();
95+
96+
connectionHolder.getConnection(streamObserver).catch(error => {
97+
expect(error.message).toEqual('Oh!');
98+
verifyConnectionInitialized(connection);
99+
verifyConnection(streamObserver, 'Neo4j/7.7.7');
67100
done();
68101
});
69102
});
@@ -195,3 +228,16 @@ class RecordingConnectionProvider extends SingleConnectionProvider {
195228
function newSingleConnectionProvider(connection) {
196229
return new SingleConnectionProvider(Promise.resolve(connection));
197230
}
231+
232+
function verifyConnectionInitialized(connection) {
233+
expect(connection.initializationInvoked).toEqual(1);
234+
}
235+
236+
function verifyConnection(streamObserver, expectedServerVersion) {
237+
expect(streamObserver._conn).toBeDefined();
238+
expect(streamObserver._conn).not.toBeNull();
239+
240+
// server version is taken from connection, verify it as well
241+
const metadata = streamObserver.serverMetadata();
242+
expect(metadata.server.version).toEqual(expectedServerVersion);
243+
}

test/internal/connection-providers.test.js

-32
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,6 @@ describe('DirectConnectionProvider', () => {
4444
});
4545
});
4646

47-
it('returns an initialized connection', done => {
48-
const pool = newPool();
49-
const connectionProvider = newDirectConnectionProvider('localhost:123', pool);
50-
51-
connectionProvider.acquireConnection(READ).then(connection => {
52-
expect(connection.initialized).toBeTruthy();
53-
done();
54-
});
55-
});
56-
5747
});
5848

5949
describe('LoadBalancer', () => {
@@ -1059,26 +1049,6 @@ describe('LoadBalancer', () => {
10591049
});
10601050
});
10611051

1062-
it('returns an initialized connection', done => {
1063-
const pool = newPool();
1064-
const loadBalancer = newLoadBalancer(
1065-
['server-1', 'server-2'],
1066-
['server-3', 'server-4'],
1067-
['server-5', 'server-6'],
1068-
pool
1069-
);
1070-
1071-
loadBalancer.acquireConnection(READ).then(connection => {
1072-
expect(connection.initialized).toBeTruthy();
1073-
1074-
loadBalancer.acquireConnection(WRITE).then(connection => {
1075-
expect(connection.initialized).toBeTruthy();
1076-
1077-
done();
1078-
});
1079-
});
1080-
});
1081-
10821052
});
10831053

10841054
function newDirectConnectionProvider(address, pool) {
@@ -1156,15 +1126,13 @@ class FakeConnection {
11561126
constructor(address, release) {
11571127
this.address = address;
11581128
this.release = release;
1159-
this.initialized = false;
11601129
}
11611130

11621131
static create(address, release) {
11631132
return new FakeConnection(address, release);
11641133
}
11651134

11661135
initializationCompleted() {
1167-
this.initialized = true;
11681136
return Promise.resolve(this);
11691137
}
11701138
}

test/internal/fake-connection.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@ export default class FakeConnection {
3131
this.resetAsyncInvoked = 0;
3232
this.syncInvoked = 0;
3333
this.releaseInvoked = 0;
34+
this.initializationInvoked = 0;
3435
this.seenStatements = [];
3536
this.seenParameters = [];
3637
this.server = {};
38+
39+
this._initializationPromise = Promise.resolve(this);
3740
}
3841

3942
run(statement, parameters) {
@@ -61,7 +64,8 @@ export default class FakeConnection {
6164
}
6265

6366
initializationCompleted() {
64-
return Promise.resolve(this);
67+
this.initializationInvoked++;
68+
return this._initializationPromise;
6569
}
6670

6771
isReleasedOnceOnSessionClose() {
@@ -94,4 +98,9 @@ export default class FakeConnection {
9498
this.server.version = version;
9599
return this;
96100
}
101+
102+
withFailedInitialization(error) {
103+
this._initializationPromise = Promise.reject(error);
104+
return this;
105+
}
97106
};

0 commit comments

Comments
 (0)