Skip to content

Routing context in URI #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ class Driver {
Driver._validateConnection.bind(this),
config.connectionPoolSize
);
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));

/**
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
* @type {ConnectionProvider}
* @private
*/
this._connectionProvider = null;
}

/**
Expand Down Expand Up @@ -115,7 +121,8 @@ class Driver {
*/
session(mode, bookmark) {
const sessionMode = Driver._validateSessionMode(mode);
return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
const connectionProvider = this._getOrCreateConnectionProvider();
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
}

static _validateSessionMode(rawMode) {
Expand All @@ -142,6 +149,14 @@ class Driver {
return SERVICE_UNAVAILABLE;
}

_getOrCreateConnectionProvider() {
if (!this._connectionProvider) {
const driverOnErrorCallback = this._driverOnErrorCallback.bind(this);
this._connectionProvider = this._createConnectionProvider(this._url, this._pool, driverOnErrorCallback);
}
return this._connectionProvider;
}

_driverOnErrorCallback(error) {
const userDefinedOnErrorCallback = this.onError;
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
Expand Down Expand Up @@ -189,8 +204,9 @@ class _ConnectionStreamObserver extends StreamObserver {
if (this._driver.onCompleted) {
this._driver.onCompleted(message);
}
if (this._conn && message && message.server) {
this._conn.setServerVersion(message.server);

if (this._observer && this._observer.onComplete) {
this._observer.onCompleted(message);
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import Record from './record';
import {Driver, READ, WRITE} from './driver';
import RoutingDriver from './routing-driver';
import VERSION from '../version';
import {parseScheme, parseUrl} from './internal/connector';
import {assertString} from './internal/util';

import {assertString, isEmptyObjectOrNull, parseRoutingContext, parseScheme, parseUrl} from './internal/util';

const auth = {
basic: (username, password, realm = undefined) => {
Expand Down Expand Up @@ -129,17 +127,19 @@ const USER_AGENT = "neo4j-javascript/" + VERSION;
function driver(url, authToken, config = {}) {
assertString(url, 'Bolt URL');
const scheme = parseScheme(url);
if (scheme === "bolt+routing://") {
return new RoutingDriver(parseUrl(url), USER_AGENT, authToken, config);
} else if (scheme === "bolt://") {
const routingContext = parseRoutingContext(url);
if (scheme === 'bolt+routing://') {
return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config);
} else if (scheme === 'bolt://') {
if (!isEmptyObjectOrNull(routingContext)) {
throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "Routing"

}
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
} else {
throw new Error("Unknown scheme: " + scheme);

throw new Error(`Unknown scheme: ${scheme}`);
}
}


const types ={
Node,
Relationship,
Expand Down
11 changes: 7 additions & 4 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import RoutingTable from './routing-table';
import Rediscovery from './rediscovery';
import hasFeature from './features';
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
import RoutingUtil from './routing-util';

class ConnectionProvider {

Expand Down Expand Up @@ -61,11 +62,11 @@ export class DirectConnectionProvider extends ConnectionProvider {

export class LoadBalancer extends ConnectionProvider {

constructor(address, connectionPool, driverOnErrorCallback) {
constructor(address, routingContext, connectionPool, driverOnErrorCallback) {
super();
this._seedRouter = address;
this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter]));
this._rediscovery = new Rediscovery();
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
this._hostNameResolver = LoadBalancer._createHostNameResolver();
Expand Down Expand Up @@ -171,8 +172,10 @@ export class LoadBalancer extends ConnectionProvider {

_createSessionForRediscovery(routerAddress) {
const connection = this._connectionPool.acquire(routerAddress);
const connectionPromise = Promise.resolve(connection);
const connectionProvider = new SingleConnectionProvider(connectionPromise);
// initialized connection is required for routing procedure call
// server version needs to be known to decide which routing procedure to use
const initializedConnectionPromise = connection.initializationCompleted();
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
return new Session(READ, connectionProvider);
}

Expand Down
122 changes: 91 additions & 31 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import WebSocketChannel from './ch-websocket';
import NodeChannel from './ch-node';
import {Chunker, Dechunker} from './chunking';
Expand All @@ -24,6 +25,8 @@ import {alloc} from './buf';
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
import {newError} from './../error';
import ChannelConfig from './ch-config';
import {parseHost, parsePort} from './util';
import StreamObserver from './stream-observer';

let Channel;
if( NodeChannel.available ) {
Expand Down Expand Up @@ -59,29 +62,6 @@ PATH = 0x50,
MAGIC_PREAMBLE = 0x6060B017,
DEBUG = false;

let URLREGEX = new RegExp([
"([^/]+//)?", // scheme
"(([^:/?#]*)", // hostname
"(?::([0-9]+))?)", // port (optional)
".*"].join("")); // everything else

function parseScheme( url ) {
let scheme = url.match(URLREGEX)[1] || '';
return scheme.toLowerCase();
}

function parseUrl(url) {
return url.match( URLREGEX )[2];
}

function parseHost( url ) {
return url.match( URLREGEX )[3];
}

function parsePort( url ) {
return url.match( URLREGEX )[4];
}

/**
* Very rudimentary log handling, should probably be replaced by something proper at some point.
* @param actor the part that sent the message, 'S' for server and 'C' for client
Expand Down Expand Up @@ -204,6 +184,8 @@ class Connection {
this._isHandlingFailure = false;
this._currentFailure = null;

this._state = new ConnectionState(this);

// Set to true on fatal errors, to get this out of session pool.
this._isBroken = false;

Expand Down Expand Up @@ -351,7 +333,8 @@ class Connection {
/** Queue an INIT-message to be sent to the database */
initialize( clientName, token, observer ) {
log("C", "INIT", clientName, token);
this._queueObserver(observer);
const initObserver = this._state.wrap(observer);
this._queueObserver(initObserver);
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
(err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
Expand Down Expand Up @@ -437,6 +420,15 @@ class Connection {
}
}

/**
* Get promise resolved when connection initialization succeed or rejected when it fails.
* Connection is initialized using {@link initialize} function.
* @return {Promise<Connection>} the result of connection initialization.
*/
initializationCompleted() {
return this._state.initializationCompleted();
}

/**
* Synchronize - flush all queued outgoing messages and route their responses
* to their respective handlers.
Expand Down Expand Up @@ -471,6 +463,78 @@ class Connection {
}
}

class ConnectionState {

/**
* @constructor
* @param {Connection} connection the connection to track state for.
*/
constructor(connection) {
this._connection = connection;

this._initialized = false;
this._initializationError = null;

this._resolvePromise = null;
this._rejectPromise = null;
}

/**
* Wrap the given observer to track connection's initialization state.
* @param {StreamObserver} observer the observer used for INIT message.
* @return {StreamObserver} updated observer.
*/
wrap(observer) {
return {
onNext: record => {
if (observer && observer.onNext) {
observer.onNext(record);
}
},
onError: error => {
this._initializationError = error;
if (this._rejectPromise) {
this._rejectPromise(error);
this._rejectPromise = null;
}
if (observer && observer.onError) {
observer.onError(error);
}
},
onCompleted: metaData => {
if (metaData && metaData.server) {
this._connection.setServerVersion(metaData.server);
}
this._initialized = true;
if (this._resolvePromise) {
this._resolvePromise(this._connection);
this._resolvePromise = null;
}
if (observer && observer.onCompleted) {
observer.onCompleted(metaData);
}
}
};
}

/**
* Get promise resolved when connection initialization succeed or rejected when it fails.
* @return {Promise<Connection>} the result of connection initialization.
*/
initializationCompleted() {
if (this._initialized) {
return Promise.resolve(this._connection);
} else if (this._initializationError) {
return Promise.reject(this._initializationError);
} else {
return new Promise((resolve, reject) => {
this._resolvePromise = resolve;
this._rejectPromise = reject;
});
}
}
}

/**
* Crete new connection to the provided url.
* @access private
Expand All @@ -490,10 +554,6 @@ function connect(url, config = {}, connectionErrorCode = null) {
}

export {
connect,
parseScheme,
parseUrl,
parseHost,
parsePort,
Connection
}
connect,
Connection
};
2 changes: 1 addition & 1 deletion src/v1/internal/host-name-resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

import {parseHost, parsePort} from './connector';
import {parseHost, parsePort} from './util';

class HostNameResolver {

Expand Down
25 changes: 17 additions & 8 deletions src/v1/internal/rediscovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@
* limitations under the License.
*/

import GetServersUtil from "./get-servers-util";
import RoutingTable from "./routing-table";
import {newError, PROTOCOL_ERROR} from "../error";
import RoutingTable from './routing-table';
import {newError, PROTOCOL_ERROR} from '../error';

export default class Rediscovery {

constructor(getServersUtil) {
this._getServersUtil = getServersUtil || new GetServersUtil();
/**
* @constructor
* @param {RoutingUtil} routingUtil the util to use.
*/
constructor(routingUtil) {
this._routingUtil = routingUtil;
}

/**
* Try to fetch new routing table from the given router.
* @param {Session} session the session to use.
* @param {string} routerAddress the URL of the router.
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
*/
lookupRoutingTableOnRouter(session, routerAddress) {
return this._getServersUtil.callGetServers(session, routerAddress).then(records => {
return this._routingUtil.callRoutingProcedure(session, routerAddress).then(records => {
if (records === null) {
// connection error happened, unable to retrieve routing table from this router, next one should be queried
return null;
Expand All @@ -42,8 +51,8 @@ export default class Rediscovery {

const record = records[0];

const expirationTime = this._getServersUtil.parseTtl(record, routerAddress);
const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress);
const expirationTime = this._routingUtil.parseTtl(record, routerAddress);
const {routers, readers, writers} = this._routingUtil.parseServers(record, routerAddress);

Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);
Expand Down
Loading