Skip to content

Commit e93cdf0

Browse files
committed
Routing context in Bolt URI
Commit makes it possible to specify routing parameters in Bolt URI query. These parameters are then send to server when doing rediscovery. This is true only for 3.2+ Neo4j database because it supports `getRoutingTable(context)` procedure. Renamed `GetServersUtil` to `RoutingUtil` because it now knows about both routing procedures `getServers` and `getRoutingTable`.
1 parent 2c05a76 commit e93cdf0

18 files changed

+504
-119
lines changed

src/v1/driver.js

+20-4
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ class Driver {
5858
Driver._validateConnection.bind(this),
5959
config.connectionPoolSize
6060
);
61-
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));
61+
62+
/**
63+
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
64+
* @type {ConnectionProvider}
65+
* @private
66+
*/
67+
this._connectionProvider = null;
6268
}
6369

6470
/**
@@ -115,7 +121,8 @@ class Driver {
115121
*/
116122
session(mode, bookmark) {
117123
const sessionMode = Driver._validateSessionMode(mode);
118-
return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
124+
const connectionProvider = this._getOrCreateConnectionProvider();
125+
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
119126
}
120127

121128
static _validateSessionMode(rawMode) {
@@ -142,6 +149,14 @@ class Driver {
142149
return SERVICE_UNAVAILABLE;
143150
}
144151

152+
_getOrCreateConnectionProvider() {
153+
if (!this._connectionProvider) {
154+
const driverOnErrorCallback = this._driverOnErrorCallback.bind(this);
155+
this._connectionProvider = this._createConnectionProvider(this._url, this._pool, driverOnErrorCallback);
156+
}
157+
return this._connectionProvider;
158+
}
159+
145160
_driverOnErrorCallback(error) {
146161
const userDefinedOnErrorCallback = this.onError;
147162
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
@@ -189,8 +204,9 @@ class _ConnectionStreamObserver extends StreamObserver {
189204
if (this._driver.onCompleted) {
190205
this._driver.onCompleted(message);
191206
}
192-
if (this._conn && message && message.server) {
193-
this._conn.setServerVersion(message.server);
207+
208+
if (this._observer && this._observer.onComplete) {
209+
this._observer.onCompleted(message);
194210
}
195211
}
196212
}

src/v1/index.js

+9-11
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717
* limitations under the License.
1818
*/
1919

20-
import {int, isInt, inSafeRange, toNumber, toString} from './integer';
21-
import {Node, Relationship, UnboundRelationship, PathSegment, Path} from './graph-types'
22-
import {Neo4jError, SERVICE_UNAVAILABLE, SESSION_EXPIRED, PROTOCOL_ERROR} from './error';
20+
import {inSafeRange, int, isInt, toNumber, toString} from './integer';
21+
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from './graph-types';
22+
import {Neo4jError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from './error';
2323
import Result from './result';
2424
import ResultSummary from './result-summary';
2525
import Record from './record';
2626
import {Driver, READ, WRITE} from './driver';
2727
import RoutingDriver from './routing-driver';
2828
import VERSION from '../version';
29-
import {assertString, isEmptyObjectOrNull, parseScheme, parseUrl, parseRoutingContext} from "./internal/util";
29+
import {assertString, isEmptyObjectOrNull, parseRoutingContext, parseScheme, parseUrl} from './internal/util';
3030

3131

3232
const auth ={
@@ -120,20 +120,18 @@ function driver(url, authToken, config = {}) {
120120
assertString(url, 'Bolt URL');
121121
const scheme = parseScheme(url);
122122
const routingContext = parseRoutingContext(url);
123-
if (scheme === "bolt+routing://") {
123+
if (scheme === 'bolt+routing://') {
124124
return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config);
125-
} else if (scheme === "bolt://") {
126-
if(!isEmptyObjectOrNull(routingContext))
127-
{
128-
throw new Error("Routing context are not supported with scheme 'bolt'. Given URI: '" + url + "'");
125+
} else if (scheme === 'bolt://') {
126+
if (!isEmptyObjectOrNull(routingContext)) {
127+
throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
129128
}
130129
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
131130
} else {
132-
throw new Error("Unknown scheme: " + scheme);
131+
throw new Error(`Unknown scheme: ${scheme}`);
133132
}
134133
}
135134

136-
137135
const types ={
138136
Node,
139137
Relationship,

src/v1/internal/connection-providers.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import RoutingTable from './routing-table';
2525
import Rediscovery from './rediscovery';
2626
import hasFeature from './features';
2727
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
28-
import GetServersUtil from './get-servers-util';
28+
import RoutingUtil from './routing-util';
2929

3030
class ConnectionProvider {
3131

@@ -66,7 +66,7 @@ export class LoadBalancer extends ConnectionProvider {
6666
super();
6767
this._seedRouter = address;
6868
this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter]));
69-
this._rediscovery = new Rediscovery(new GetServersUtil(routingContext));
69+
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
7070
this._connectionPool = connectionPool;
7171
this._driverOnErrorCallback = driverOnErrorCallback;
7272
this._hostNameResolver = LoadBalancer._createHostNameResolver();
@@ -172,8 +172,10 @@ export class LoadBalancer extends ConnectionProvider {
172172

173173
_createSessionForRediscovery(routerAddress) {
174174
const connection = this._connectionPool.acquire(routerAddress);
175-
const connectionPromise = Promise.resolve(connection);
176-
const connectionProvider = new SingleConnectionProvider(connectionPromise);
175+
// initialized connection is required for routing procedure call
176+
// server version needs to be known to decide which routing procedure to use
177+
const initializedConnectionPromise = connection.initializationCompleted();
178+
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
177179
return new Session(READ, connectionProvider);
178180
}
179181

src/v1/internal/connector.js

+67-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../gra
2626
import {newError} from './../error';
2727
import ChannelConfig from './ch-config';
2828
import {parseHost, parsePort} from './util';
29+
import StreamObserver from './stream-observer';
2930

3031
let Channel;
3132
if( NodeChannel.available ) {
@@ -183,6 +184,8 @@ class Connection {
183184
this._isHandlingFailure = false;
184185
this._currentFailure = null;
185186

187+
this._state = new ConnectionState(this);
188+
186189
// Set to true on fatal errors, to get this out of session pool.
187190
this._isBroken = false;
188191

@@ -330,7 +333,8 @@ class Connection {
330333
/** Queue an INIT-message to be sent to the database */
331334
initialize( clientName, token, observer ) {
332335
log("C", "INIT", clientName, token);
333-
this._queueObserver(observer);
336+
const initObserver = this._state.wrap(observer);
337+
this._queueObserver(initObserver);
334338
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
335339
(err) => this._handleFatalError(err) );
336340
this._chunker.messageBoundary();
@@ -416,6 +420,15 @@ class Connection {
416420
}
417421
}
418422

423+
/**
424+
* Get promise resolved when connection initialization succeed or rejected when it fails.
425+
* Connection is initialized using {@link initialize} function.
426+
* @return {Promise<Connection>} the result of connection initialization.
427+
*/
428+
initializationCompleted() {
429+
return this._state.initializationCompleted();
430+
}
431+
419432
/**
420433
* Synchronize - flush all queued outgoing messages and route their responses
421434
* to their respective handlers.
@@ -450,6 +463,59 @@ class Connection {
450463
}
451464
}
452465

466+
class ConnectionState {
467+
468+
/**
469+
* @constructor
470+
* @param {Connection} connection the connection to track state for.
471+
*/
472+
constructor(connection) {
473+
this._connection = connection;
474+
this._resolvePromise = null;
475+
this._promise = new Promise(resolve => {
476+
this._resolvePromise = resolve;
477+
});
478+
}
479+
480+
/**
481+
* Wrap the given observer to track connection's initialization state.
482+
* @param {StreamObserver} observer the observer used for INIT message.
483+
* @return {StreamObserver} updated observer.
484+
*/
485+
wrap(observer) {
486+
return {
487+
onNext: record => {
488+
if (observer && observer.onNext) {
489+
observer.onNext(record);
490+
}
491+
},
492+
onError: error => {
493+
this._resolvePromise(Promise.reject(error));
494+
if (observer && observer.onError) {
495+
observer.onError(error);
496+
}
497+
},
498+
onCompleted: metaData => {
499+
if (metaData && metaData.server) {
500+
this._connection.setServerVersion(metaData.server);
501+
}
502+
this._resolvePromise(this._connection);
503+
if (observer && observer.onCompleted) {
504+
observer.onCompleted(metaData);
505+
}
506+
}
507+
};
508+
}
509+
510+
/**
511+
* Get promise resolved when connection initialization succeed or rejected when it fails.
512+
* @return {Promise<Connection>} the result of connection initialization.
513+
*/
514+
initializationCompleted() {
515+
return this._promise;
516+
}
517+
}
518+
453519
/**
454520
* Crete new connection to the provided url.
455521
* @access private

src/v1/internal/rediscovery.js

+17-8
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@
1717
* limitations under the License.
1818
*/
1919

20-
import GetServersUtil from "./get-servers-util";
21-
import RoutingTable from "./routing-table";
22-
import {newError, PROTOCOL_ERROR} from "../error";
20+
import RoutingTable from './routing-table';
21+
import {newError, PROTOCOL_ERROR} from '../error';
2322

2423
export default class Rediscovery {
2524

26-
constructor(getServersUtil) {
27-
this._getServersUtil = getServersUtil;
25+
/**
26+
* @constructor
27+
* @param {RoutingUtil} routingUtil the util to use.
28+
*/
29+
constructor(routingUtil) {
30+
this._routingUtil = routingUtil;
2831
}
2932

33+
/**
34+
* Try to fetch new routing table from the given router.
35+
* @param {Session} session the session to use.
36+
* @param {string} routerAddress the URL of the router.
37+
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
38+
*/
3039
lookupRoutingTableOnRouter(session, routerAddress) {
31-
return this._getServersUtil.callGetServers(session, routerAddress).then(records => {
40+
return this._routingUtil.callRoutingProcedure(session, routerAddress).then(records => {
3241
if (records === null) {
3342
// connection error happened, unable to retrieve routing table from this router, next one should be queried
3443
return null;
@@ -42,8 +51,8 @@ export default class Rediscovery {
4251

4352
const record = records[0];
4453

45-
const expirationTime = this._getServersUtil.parseTtl(record, routerAddress);
46-
const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress);
54+
const expirationTime = this._routingUtil.parseTtl(record, routerAddress);
55+
const {routers, readers, writers} = this._routingUtil.parseServers(record, routerAddress);
4756

4857
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
4958
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);

src/v1/internal/get-servers-util.js renamed to src/v1/internal/routing-util.js

+38-29
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,39 @@
2020
import RoundRobinArray from './round-robin-array';
2121
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error';
2222
import Integer, {int} from '../integer';
23-
import {ServerVersion, VERSION3_2} from './server-version-util'
23+
import {ServerVersion, VERSION_3_2_0} from './server-version';
2424

2525
const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers';
26-
const GET_ROUTING_TABLE_PARAM = "context";
27-
const CALL_GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable({"
28-
+ GET_ROUTING_TABLE_PARAM + "})";
26+
const GET_ROUTING_TABLE_PARAM = 'context';
27+
const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable({' + GET_ROUTING_TABLE_PARAM + '})';
2928
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
3029

31-
export default class GetServersUtil {
30+
export default class RoutingUtil {
3231

33-
constructor(routingContext={}) {
32+
constructor(routingContext) {
3433
this._routingContext = routingContext;
3534
}
3635

37-
callGetServers(session, routerAddress) {
38-
session.run("RETURN 1").then(result=>{
39-
let statement = {text:CALL_GET_SERVERS};
40-
41-
if(ServerVersion.fromString(result.summary.server.version).compare(VERSION3_2)>=0)
42-
{
43-
statement = {
44-
text:CALL_GET_ROUTING_TABLE,
45-
parameters:{GET_ROUTING_TABLE_PARAM: this._routingContext}};
36+
/**
37+
* Invoke routing procedure using the given session.
38+
* @param {Session} session the session to use.
39+
* @param {string} routerAddress the URL of the router.
40+
* @return {Promise<Record[]>} promise resolved with records returned by the procedure call or null if
41+
* connection error happened.
42+
*/
43+
callRoutingProcedure(session, routerAddress) {
44+
return this._callAvailableRoutingProcedure(session).then(result => {
45+
session.close();
46+
return result.records;
47+
}).catch(error => {
48+
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
49+
// throw when getServers procedure not found because this is clearly a configuration issue
50+
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
51+
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
4652
}
47-
48-
return session.run(statement).then(result => {
49-
session.close();
50-
return result.records;
51-
}).catch(error => {
52-
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
53-
// throw when getServers procedure not found because this is clearly a configuration issue
54-
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
55-
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
56-
}
57-
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
58-
// different session towards a different router
59-
return null;
60-
});
53+
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
54+
// different session towards a different router
55+
return null;
6156
});
6257
}
6358

@@ -111,4 +106,18 @@ export default class GetServersUtil {
111106
PROTOCOL_ERROR);
112107
}
113108
}
109+
110+
_callAvailableRoutingProcedure(session) {
111+
return session._run(null, null, (connection, streamObserver) => {
112+
const serverVersionString = connection.server.version;
113+
const serverVersion = ServerVersion.fromString(serverVersionString);
114+
115+
if (serverVersion.compareTo(VERSION_3_2_0) >= 0) {
116+
const params = {[GET_ROUTING_TABLE_PARAM]: this._routingContext};
117+
connection.run(CALL_GET_ROUTING_TABLE, params, streamObserver);
118+
} else {
119+
connection.run(CALL_GET_SERVERS, {}, streamObserver);
120+
}
121+
});
122+
}
114123
}

0 commit comments

Comments
 (0)