diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index 86e990787..6cafc2229 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -22,7 +22,7 @@ import { READ, WRITE } from '../driver' import Session from '../session' import RoutingTable from './routing-table' import Rediscovery from './rediscovery' -import RoutingUtil from './routing-util' +import { RoutingTableGetterFactory } from './routing-table-getter' import { HostNameResolver } from './node' import SingleConnectionProvider from './connection-provider-single' import PooledConnectionProvider from './connection-provider-pooled' @@ -66,7 +66,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._seedRouter = address this._routingTables = {} this._rediscovery = new Rediscovery( - new RoutingUtil(routingContext, address.toString()) + new RoutingTableGetterFactory(routingContext, address.toString()) ) this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( this._connectionPool @@ -417,6 +417,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider `unable to fetch routing table because of an error ${error}` ) return null + } finally { + session.close() } } else { // unable to acquire connection and create session towards the current router diff --git a/src/internal/rediscovery.js b/src/internal/rediscovery.js index 667c3cf88..e7bab622a 100644 --- a/src/internal/rediscovery.js +++ b/src/internal/rediscovery.js @@ -16,79 +16,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - import RoutingTable from './routing-table' -import RoutingUtil from './routing-util' -import { newError, PROTOCOL_ERROR } from '../error' +import Session from '../session' +import { RoutingTableGetterFactory } from './routing-table-getter' +import ServerAddress from './server-address' export default class Rediscovery { /** * @constructor - * @param {RoutingUtil} routingUtil the util to use. + * @param {RoutingTableGetterFactory} routingTableGetterFactory the util to use. */ - constructor (routingUtil) { - this._routingUtil = routingUtil + constructor (routingTableGetterFactory) { + this._routingTableGetterFactory = routingTableGetterFactory } /** * Try to fetch new routing table from the given router. * @param {Session} session the session to use. * @param {string} database the database for which to lookup routing table. - * @param {string} routerAddress the URL of the router. + * @param {ServerAddress} routerAddress the URL of the router. * @return {Promise} promise resolved with new routing table or null when connection error happened. */ - async lookupRoutingTableOnRouter (session, database, routerAddress) { - const records = await this._routingUtil.callRoutingProcedure( - session, - database, - routerAddress - ) - if (records === null) { - // connection error happened, unable to retrieve routing table from this router, next one should be queried - return null - } - - if (records.length !== 1) { - throw newError( - 'Illegal response from router "' + - routerAddress + - '". ' + - 'Received ' + - records.length + - ' records but expected only one.\n' + - JSON.stringify(records), - PROTOCOL_ERROR + lookupRoutingTableOnRouter (session, database, routerAddress) { + return session._acquireConnection(connection => { + const routingTableGetter = this._routingTableGetterFactory.create( + connection ) - } - - const record = records[0] - - const expirationTime = this._routingUtil.parseTtl(record, routerAddress) - const { routers, readers, writers } = this._routingUtil.parseServers( - record, - routerAddress - ) - - Rediscovery._assertNonEmpty(routers, 'routers', routerAddress) - Rediscovery._assertNonEmpty(readers, 'readers', routerAddress) - // case with no writers is processed higher in the promise chain because only RoutingDriver knows - // how to deal with such table and how to treat router that returned such table - - return new RoutingTable({ - database, - routers, - readers, - writers, - expirationTime - }) - } - - static _assertNonEmpty (serverAddressesArray, serversName, routerAddress) { - if (serverAddressesArray.length === 0) { - throw newError( - 'Received no ' + serversName + ' from router ' + routerAddress, - PROTOCOL_ERROR + return routingTableGetter.get( + connection, + database, + routerAddress, + session ) - } + }) } } diff --git a/src/internal/routing-table-getter/index.js b/src/internal/routing-table-getter/index.js new file mode 100644 index 000000000..4fee9f968 --- /dev/null +++ b/src/internal/routing-table-getter/index.js @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RoutingTableGetterFactory from './routing-table-getter-factory' + +export { RoutingTableGetterFactory } diff --git a/src/internal/routing-table-getter/routing-procedure-runner-multi-database.js b/src/internal/routing-table-getter/routing-procedure-runner-multi-database.js new file mode 100644 index 000000000..9227dcf78 --- /dev/null +++ b/src/internal/routing-table-getter/routing-procedure-runner-multi-database.js @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RoutingProcedureRunner from './routing-procedure-runner' + +const CONTEXT = 'context' +const DATABASE = 'database' +const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})` + +/** + * Runs the Multi-Database procedure to get the Routing Table + */ +export default class MultiDatabaseRoutingProcedureRunner extends RoutingProcedureRunner { + constructor (initialAddress) { + super() + this._initialAddress = initialAddress + } + + /** + * Run the procedure + * + * @param {Connection} connection The connection use + * @param {string} database the database + * @param {string} routerAddress the router address + * @param {Session} session the session which was used to get the connection, + * it will be used to get lastBookmark and other properties + * + * @returns {Result} the result of the query + */ + run (connection, database, context, session) { + return this._runQuery( + connection, + CALL_GET_ROUTING_TABLE_MULTI_DB, + { + context: { + ...context, + address: this._initialAddress + }, + database: database || null + }, + session + ) + } +} diff --git a/src/internal/routing-table-getter/routing-procedure-runner-single-database.js b/src/internal/routing-table-getter/routing-procedure-runner-single-database.js new file mode 100644 index 000000000..a9c079eea --- /dev/null +++ b/src/internal/routing-table-getter/routing-procedure-runner-single-database.js @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RoutingProcedureRunner from './routing-procedure-runner' + +const CONTEXT = 'context' +const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})` + +/** + * Runs the Single-Database procedure to get the Routing Table + */ +export default class SingleDatabaseRoutingProcedureRunner extends RoutingProcedureRunner { + /** + * Run the procedure + * + * @param {Connection} connection The connection use + * @param {string} database the database + * @param {string} routerAddress the router address + * @param {Session} session the session which was used to get the connection, + * it will be used to get lastBookmark and other properties + * + * @returns {Result} the result of the query + */ + run (connection, database, context, session) { + return this._runQuery( + connection, + CALL_GET_ROUTING_TABLE, + { context }, + session + ) + } +} diff --git a/src/internal/routing-table-getter/routing-procedure-runner.js b/src/internal/routing-table-getter/routing-procedure-runner.js new file mode 100644 index 000000000..14c844e52 --- /dev/null +++ b/src/internal/routing-table-getter/routing-procedure-runner.js @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Result from '../../result' +import TxConfig from '../tx-config' + +export default class RoutingProcedureRunner { + /** + * @param {Connection} connection the connection + * @param {string} database the database + * @param {object} routerContext the router context + * @param {Session} session the session which was used to get the connection, + * it will be used to get lastBookmark and other properties + * + * @returns {Result} the result of the query + */ + run (connection, database, routerContext, session) { + throw new Error('not implemented') + } + + /** + * Run query using the connection + * @param {Connection} connection the connectiom + * @param {string} query the query + * @param {object} params the query params + * @param {Session} session the session which was used to get the connection, + * it will be used to get lastBookmark and other properties + * + * @returns {Result} the result of the query + */ + _runQuery (connection, query, params, session) { + const resultOberserver = connection.protocol().run(query, params, { + bookmark: session._lastBookmark, + txConfig: TxConfig.empty(), + mode: session._mode, + database: session._database, + afterComplete: session._onComplete + }) + return new Result(Promise.resolve(resultOberserver)) + } +} diff --git a/src/internal/routing-table-getter/routing-table-getter-factory.js b/src/internal/routing-table-getter/routing-table-getter-factory.js new file mode 100644 index 000000000..5d29d6811 --- /dev/null +++ b/src/internal/routing-table-getter/routing-table-getter-factory.js @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { BOLT_PROTOCOL_V4_0 } from '../constants' +import Connection from '../connection' +import ProcedureRoutingTableGetter from './routing-table-getter-procedure' +import SingleDatabaseRoutingProcedureRunner from './routing-procedure-runner-single-database' +import MultiDatabaseRoutingProcedureRunner from './routing-procedure-runner-multi-database' + +/** + * Constructs the RoutingTableGetter according to the correct protocol version. + */ +export default class RoutingTableGetterFactory { + /** + * Constructor + * @param {Object} routingContext Context which the be used to define the routing table + * @param {string} initialAddress The address that the driver is connecting to, + * used by routing as a fallback when routing and clustering isn't configured. + */ + constructor (routingContext, initialAddress) { + this._routingContext = routingContext + this._initialAddress = initialAddress + } + + /** + * Creates the RoutingTableGetter using the given session and database + * + * @param {Connection} connection the connection to use + * @param {string} database the database name + * @param {string} routerAddress the URL of the router. + * @returns {ProcedureRoutingTableGetter} The routing table getter + */ + create (connection) { + const runner = + connection.protocol().version < BOLT_PROTOCOL_V4_0 + ? new SingleDatabaseRoutingProcedureRunner() + : new MultiDatabaseRoutingProcedureRunner(this._initialAddress) + + return new ProcedureRoutingTableGetter(this._routingContext, runner) + } +} diff --git a/src/internal/routing-table-getter/routing-table-getter-procedure.js b/src/internal/routing-table-getter/routing-table-getter-procedure.js new file mode 100644 index 000000000..9c6f001b6 --- /dev/null +++ b/src/internal/routing-table-getter/routing-table-getter-procedure.js @@ -0,0 +1,236 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE } from '../../error' +import ServerAddress from '../server-address' +import RoutingTable from '../routing-table' +import Integer, { int } from '../../integer' +import Session from '../../session' +import RoutingProcedureRunner from './routing-procedure-runner' + +const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound' +const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound' + +/** + * Get the routing table by running the procedure + */ +export default class ProcedureRoutingTableGetter { + /** + * Constructor + * @param {Object} routingContext + * @param {RoutingProcedureRunner} runner the procedure runner + */ + constructor (routingContext, runner) { + this._runner = runner + this._routingContext = routingContext + } + + /** + * Get the routing table by running the routing table procedure + * + * @param {Connection} connection The connection use + * @param {string} database the database + * @param {ServerAddress} routerAddress the router address + * @param {Session} session the session which was used to get the connection, + * it will be used to get lastBookmark and other properties + * + * @returns {Promise} The routing table + */ + async get (connection, database, routerAddress, session) { + const records = await this._runProcedure( + connection, + database, + routerAddress, + session + ) + if (records === null) { + return null // it should go to another layer to retry + } + + if (records.length !== 1) { + throw newError( + 'Illegal response from router "' + + routerAddress + + '". ' + + 'Received ' + + records.length + + ' records but expected only one.\n' + + JSON.stringify(records), + PROTOCOL_ERROR + ) + } + + const record = records[0] + + const expirationTime = this._parseTtl(record, routerAddress) + const { routers, readers, writers } = this._parseServers( + record, + routerAddress + ) + + assertNonEmpty(routers, 'routers', routerAddress) + assertNonEmpty(readers, 'readers', routerAddress) + + return new RoutingTable({ + database, + routers, + readers, + writers, + expirationTime + }) + } + + /** + * Run the routing query and fetch the records + * + * @param {Connection} connection the connection + * @param {string} database the database + * @param {string} routerAddress the router address + * @param {Session} session the session which was used to get the connection, + * it will be used to get lastBookmark and other properties + * + * @returns {Promise} the list of records fetched + */ + async _runProcedure (connection, database, routerAddress, session) { + try { + const result = await this._runner.run( + connection, + database, + this._routingContext, + session + ) + return result.records + } catch (error) { + if (error.code === DATABASE_NOT_FOUND_CODE) { + throw error + } else if (error.code === PROCEDURE_NOT_FOUND_CODE) { + // throw when getServers procedure not found because this is clearly a configuration issue + throw newError( + `Server at ${routerAddress.asHostPort()} can't perform routing. Make sure you are connecting to a causal cluster`, + SERVICE_UNAVAILABLE + ) + } else { + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + return null + } + } + } + + /** + * Parse the ttls from the record and return it + * + * @param {Record} record the record + * @param {string} routerAddress the router address + * @returns {number} the ttl + */ + _parseTtl (record, routerAddress) { + try { + const now = int(Date.now()) + const expires = int(record.get('ttl')) + .multiply(1000) + .add(now) + // if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed + if (expires.lessThan(now)) { + return Integer.MAX_VALUE + } + return expires + } catch (error) { + throw newError( + `Unable to parse TTL entry from router ${routerAddress} from record:\n${JSON.stringify( + record + )}\nError message: ${error.message}`, + PROTOCOL_ERROR + ) + } + } + + /** + * Parse server from the Record. + * + * @param {Record} record the record + * @param {string} routerAddress the router address + * @returns {Object} The object with the list of routers, readers and writers + */ + _parseServers (record, routerAddress) { + try { + const servers = record.get('servers') + + let routers = [] + let readers = [] + let writers = [] + + servers.forEach(server => { + const role = server.role + const addresses = server.addresses + + if (role === 'ROUTE') { + routers = parseArray(addresses).map(address => + ServerAddress.fromUrl(address) + ) + } else if (role === 'WRITE') { + writers = parseArray(addresses).map(address => + ServerAddress.fromUrl(address) + ) + } else if (role === 'READ') { + readers = parseArray(addresses).map(address => + ServerAddress.fromUrl(address) + ) + } else { + throw newError('Unknown server role "' + role + '"', PROTOCOL_ERROR) + } + }) + + return { + routers: routers, + readers: readers, + writers: writers + } + } catch (error) { + throw newError( + `Unable to parse servers entry from router ${routerAddress} from record:\n${JSON.stringify( + record + )}\nError message: ${error.message}`, + PROTOCOL_ERROR + ) + } + } +} + +/** + * Assset if serverAddressesArray is not empty, throws and PROTOCOL_ERROR otherwise + * + * @param {string[]} serverAddressesArray array of addresses + * @param {string} serversName the server name + * @param {string} routerAddress the router address + */ +function assertNonEmpty (serverAddressesArray, serversName, routerAddress) { + if (serverAddressesArray.length === 0) { + throw newError( + 'Received no ' + serversName + ' from router ' + routerAddress, + PROTOCOL_ERROR + ) + } +} + +function parseArray (addresses) { + if (!Array.isArray(addresses)) { + throw new TypeError('Array expected but got: ' + addresses) + } + return Array.from(addresses) +} diff --git a/src/internal/routing-util.js b/src/internal/routing-util.js deleted file mode 100644 index 088676d55..000000000 --- a/src/internal/routing-util.js +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE } from '../error' -import Integer, { int } from '../integer' -import { ServerVersion, VERSION_4_0_0 } from './server-version' -import Bookmark from './bookmark' -import Session from '../session' -import TxConfig from './tx-config' -import ServerAddress from './server-address' - -const CONTEXT = 'context' -const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})` -const DATABASE = 'database' -const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})` -const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound' -const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound' - -export default class RoutingUtil { - constructor (routingContext, initialAddress) { - this._routingContext = routingContext - // The address that the driver is connecting to, used by routing as a fallback when routing - // and clustering isn't configured. - this._initialAddress = initialAddress - } - - /** - * Invoke routing procedure using the given session. - * @param {Session} session the session to use. - * @param {string} routerAddress the URL of the router. - * @return {Promise} promise resolved with records returned by the procedure call or null if - * connection error happened. - */ - async callRoutingProcedure (session, database, routerAddress) { - try { - const result = await this._callAvailableRoutingProcedure( - session, - database - ) - - await session.close() - - return result.records - } catch (error) { - if (error.code === DATABASE_NOT_FOUND_CODE) { - throw error - } else if (error.code === PROCEDURE_NOT_FOUND_CODE) { - // throw when getServers procedure not found because this is clearly a configuration issue - throw newError( - `Server at ${routerAddress.asHostPort()} can't perform routing. Make sure you are connecting to a causal cluster`, - SERVICE_UNAVAILABLE - ) - } else { - // return nothing when failed to connect because code higher in the callstack is still able to retry with a - // different session towards a different router - return null - } - } - } - - parseTtl (record, routerAddress) { - try { - const now = int(Date.now()) - const expires = int(record.get('ttl')) - .multiply(1000) - .add(now) - // if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed - if (expires.lessThan(now)) { - return Integer.MAX_VALUE - } - return expires - } catch (error) { - throw newError( - `Unable to parse TTL entry from router ${routerAddress} from record:\n${JSON.stringify( - record - )}\nError message: ${error.message}`, - PROTOCOL_ERROR - ) - } - } - - parseServers (record, routerAddress) { - try { - const servers = record.get('servers') - - let routers = [] - let readers = [] - let writers = [] - - servers.forEach(server => { - const role = server.role - const addresses = server.addresses - - if (role === 'ROUTE') { - routers = parseArray(addresses).map(address => - ServerAddress.fromUrl(address) - ) - } else if (role === 'WRITE') { - writers = parseArray(addresses).map(address => - ServerAddress.fromUrl(address) - ) - } else if (role === 'READ') { - readers = parseArray(addresses).map(address => - ServerAddress.fromUrl(address) - ) - } else { - throw newError('Unknown server role "' + role + '"', PROTOCOL_ERROR) - } - }) - - return { - routers: routers, - readers: readers, - writers: writers - } - } catch (error) { - throw newError( - `Unable to parse servers entry from router ${routerAddress} from record:\n${JSON.stringify( - record - )}\nError message: ${error.message}`, - PROTOCOL_ERROR - ) - } - } - - _callAvailableRoutingProcedure (session, database) { - return session._run(null, null, connection => { - let query - let params - - const protocolVersion = connection.protocol().version - if (protocolVersion >= 4.0) { - query = CALL_GET_ROUTING_TABLE_MULTI_DB - params = { - context: this._routingContext || {}, - database: database || null - } - params.context.address = this._initialAddress - } else { - query = CALL_GET_ROUTING_TABLE - params = { context: this._routingContext } - } - - return connection.protocol().run(query, params, { - bookmark: session._lastBookmark, - txConfig: TxConfig.empty(), - mode: session._mode, - database: session._database, - afterComplete: session._onComplete - }) - }) - } -} - -function parseArray (addresses) { - if (!Array.isArray(addresses)) { - throw new TypeError('Array expected but got: ' + addresses) - } - return Array.from(addresses) -} diff --git a/src/session.js b/src/session.js index 82051b0ae..42cccfa8c 100644 --- a/src/session.js +++ b/src/session.js @@ -146,6 +146,34 @@ class Session { return new Result(observerPromise, query, parameters, connectionHolder) } + async _acquireConnection (connectionConsumer) { + let promise + const connectionHolder = this._connectionHolderWithMode(this._mode) + if (!this._open) { + promise = Promise.reject( + newError('Cannot run query in a closed session.') + ) + } else if (!this._hasTx && connectionHolder.initializeConnection()) { + promise = connectionHolder + .getConnection() + .then(connection => connectionConsumer(connection)) + .then(async result => { + await connectionHolder.releaseConnection() + return result + }) + } else { + promise = Promise.reject( + newError( + 'Queries cannot be run directly on a ' + + 'session with an open transaction; either run from within the ' + + 'transaction or use a different session.' + ) + ) + } + + return promise + } + /** * Begin a new transaction in this session. A session can have at most one transaction running at a time, if you * want to run multiple concurrent transactions, you should use multiple concurrent sessions. diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 03348c385..198aa26fd 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -132,6 +132,11 @@ export default class FakeConnection extends Connection { return this } + withProtocolVersion (version) { + this.protocolVersion = version + return this + } + withCreationTimestamp (value) { this.creationTimestamp = value return this diff --git a/test/internal/fake-session.js b/test/internal/fake-session.js new file mode 100644 index 000000000..c247339e0 --- /dev/null +++ b/test/internal/fake-session.js @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export default class FakeSession { + constructor (runResponse, fakeConnection) { + this._runResponse = runResponse + this._fakeConnection = fakeConnection + this._closed = false + } + + static successful (result) { + return new FakeSession(Promise.resolve(result), null) + } + + static failed (error) { + return new FakeSession(Promise.reject(error), null) + } + + static withFakeConnection (connection) { + return new FakeSession(null, connection) + } + + _run (ignoreQuery, ignoreParameters, queryRunner) { + if (this._runResponse) { + return this._runResponse + } + queryRunner(this._fakeConnection) + return Promise.resolve() + } + + withBookmark (bookmark) { + this._lastBookmark = bookmark + return this + } + + withDatabase (database) { + this._database = database || '' + return this + } + + withMode (mode) { + this._mode = mode + return this + } + + withOnComplete (onComplete) { + this._onComplete = onComplete + return this + } + + close () { + this._closed = true + return Promise.resolve() + } + + isClosed () { + return this._closed + } +} diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index d9f5b1ee5..c3503a40a 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -18,247 +18,175 @@ */ import Rediscovery from '../../src/internal/rediscovery' -import RoutingUtil from '../../src/internal/routing-util' -import { newError, PROTOCOL_ERROR } from '../../src/error' -import Record from '../../src/record' -import { int } from '../../src/integer' +import RoutingTable from '../../src/internal/routing-table' import ServerAddress from '../../src/internal/server-address' -const ROUTER_ADDRESS = 'neo4j://test.router.com' - describe('#unit Rediscovery', () => { - it('should return null when connection error happens', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => null - }) - - lookupRoutingTableOnRouter(util).then(routingTable => { - expect(routingTable).toBeNull() - done() - }) - }) + it('should return the routing table when it available', async () => { + const expectedRoutingTable = new RoutingTable({ + database: 'db', + expirationTime: 113, + routers: [ServerAddress.fromUrl('bolt://localhost:7687')], + writers: [ServerAddress.fromUrl('bolt://localhost:7686')], + readers: [ServerAddress.fromUrl('bolt://localhost:7683')] + }) + const routingTableGetter = new FakeRoutingTableGetter( + Promise.resolve(expectedRoutingTable) + ) - it('should throw when no records are returned', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [] + const routingTable = await lookupRoutingTableOnRouter({ + routingTableGetter }) - lookupRoutingTableOnRouter(util).catch(error => { - expectProtocolError(error, 'Illegal response from router') - done() - }) + expect(routingTable).toBe(expectedRoutingTable) }) - it('should throw when multiple records are returned', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [ - new Record(['a'], ['aaa']), - new Record(['b'], ['bbb']) - ] - }) + it('should call getter once with correct arguments', async () => { + const expectedRoutingTable = new RoutingTable() + const connection = { connection: 'abc' } + const database = 'adb' + const session = new FakeSession(connection) + const routerAddress = ServerAddress.fromUrl('bolt://localhost:7682') + const routingTableGetter = new FakeRoutingTableGetter( + Promise.resolve(expectedRoutingTable) + ) - lookupRoutingTableOnRouter(util).catch(error => { - expectProtocolError(error, 'Illegal response from router') - done() - }) + await lookupRoutingTableOnRouter({ + routingTableGetter, + connection, + session, + database, + routerAddress + }) + + expect(routingTableGetter._called).toEqual(1) + expect(routingTableGetter._arguments).toEqual([ + connection, + database, + routerAddress, + session + ]) }) - it('should throw when ttl parsing throws', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [new Record(['a'], ['aaa'])], - parseTtl: () => { - throw newError('Unable to parse TTL', PROTOCOL_ERROR) - } - }) - - lookupRoutingTableOnRouter(util).catch(error => { - expectProtocolError(error, 'Unable to parse TTL') - done() - }) - }) + it('should acquire connection once', async () => { + const expectedRoutingTable = new RoutingTable() + const connection = { connection: 'abc' } + const database = 'adb' + const session = new FakeSession(connection) + const routerAddress = ServerAddress.fromUrl('bolt://localhost:7682') + const routingTableGetter = new FakeRoutingTableGetter( + Promise.resolve(expectedRoutingTable) + ) - it('should throw when servers parsing throws', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [new Record(['a'], ['aaa'])], - parseTtl: () => int(42), - parseServers: () => { - throw newError('Unable to parse servers', PROTOCOL_ERROR) - } + await lookupRoutingTableOnRouter({ + routingTableGetter, + connection, + session, + database, + routerAddress }) - lookupRoutingTableOnRouter(util).catch(error => { - expectProtocolError(error, 'Unable to parse servers') - done() - }) + expect(session._called).toEqual(1) }) - it('should throw when no routers', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [new Record(['a'], ['aaa'])], - parseTtl: () => int(42), - parseServers: () => { - return { - routers: [], - readers: ['reader1'], - writers: ['writer1'] - } - } - }) - - lookupRoutingTableOnRouter(util).catch(error => { - expectProtocolError(error, 'Received no routers') - done() - }) - }) + it('should create the routingTableGetter with the correct arguments', async () => { + const routingTable = new RoutingTable() + const connection = { connection: 'abc' } + const routingTableGetter = new FakeRoutingTableGetter( + Promise.resolve(routingTable) + ) + const factory = new FakeRoutingTableGetterFactory(routingTableGetter) - it('should throw when no readers', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [new Record(['a'], ['aaa'])], - parseTtl: () => int(42), - parseServers: () => { - return { - routers: ['router1'], - readers: [], - writers: ['writer1'] - } - } + await lookupRoutingTableOnRouter({ + routingTableGetter, + factory, + connection }) - lookupRoutingTableOnRouter(util).catch(error => { - expectProtocolError(error, 'Received no readers') - done() - }) + expect(factory._called).toEqual(1) + expect(factory._arguments).toEqual([connection]) }) - it('should return routing table when no writers', done => { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [new Record(['a'], ['aaa'])], - parseTtl: () => int(42), - parseServers: () => { - return { - routers: ['router1'], - readers: ['reader1'], - writers: [] - } - } - }) + it('should return null when the getter resolves the table as null', async () => { + const routingTableGetter = new FakeRoutingTableGetter(Promise.resolve(null)) - lookupRoutingTableOnRouter(util).then(routingTable => { - expect(routingTable).toBeDefined() - expect(routingTable).not.toBeNull() - done() + const routingTable = await lookupRoutingTableOnRouter({ + routingTableGetter }) - }) - - it('should return valid routing table with 1 router, 1 reader and 1 writer', done => { - testValidRoutingTable( - ['router1:7687'], - ['reader1:7687'], - ['writer1:7687'], - int(42), - done - ) - }) - it('should return valid routing table with 2 routers, 2 readers and 2 writers', done => { - testValidRoutingTable( - ['router1:7687', 'router2:7687'], - ['reader1:7687', 'reader2:7687'], - ['writer1:7687', 'writer2:7687'], - int(Date.now()), - done - ) + expect(routingTable).toBeNull() }) - it('should return valid routing table with 1 router, 3 readers and 1 writer', done => { - testValidRoutingTable( - ['router1:7687'], - ['reader1:7687', 'reader2:7687', 'reader3:7687'], - ['writer1:7687'], - int(12345), - done - ) + it('should fail when the getter fails', async () => { + const expectedError = 'error' + try { + const routingTableGetter = new FakeRoutingTableGetter( + Promise.reject(expectedError) + ) + await lookupRoutingTableOnRouter({ routingTableGetter }) + fail('should not complete with success') + } catch (error) { + expect(error).toBe(expectedError) + } }) +}) - function testValidRoutingTable ( - routerAddresses, - readerAddresses, - writerAddresses, - expires, - done - ) { - const util = new FakeRoutingUtil({ - callRoutingProcedure: () => [new Record(['a'], ['aaa'])], - parseTtl: () => expires, - parseServers: () => { - return { - routers: routerAddresses.map(a => ServerAddress.fromUrl(a)), - readers: readerAddresses.map(a => ServerAddress.fromUrl(a)), - writers: writerAddresses.map(a => ServerAddress.fromUrl(a)) - } - } - }) - - lookupRoutingTableOnRouter(util).then(routingTable => { - expect(routingTable).toBeDefined() - expect(routingTable).not.toBeNull() - - expect(routingTable.expirationTime).toEqual(expires) - - const allServers = routingTable.allServers().sort() - const allExpectedServers = [ - ...routerAddresses, - ...readerAddresses, - ...writerAddresses - ].sort() - expect(allServers.map(s => s.asHostPort())).toEqual(allExpectedServers) - - done() - }) +function lookupRoutingTableOnRouter ({ + database = 'db', + routerAddress = ServerAddress.fromUrl('bolt://localhost:7687'), + routingTableGetter = new FakeRoutingTableGetter( + Promise.resolve(new RoutingTable()) + ), + session, + factory, + connection = {} +} = {}) { + const _factory = + factory || new FakeRoutingTableGetterFactory(routingTableGetter) + const _session = session || new FakeSession(connection) + const rediscovery = new Rediscovery(_factory) + + return rediscovery.lookupRoutingTableOnRouter( + _session, + database, + routerAddress + ) +} + +class FakeRoutingTableGetter { + constructor (result) { + this._result = result + this._called = 0 } - function lookupRoutingTableOnRouter (routingUtil) { - const rediscovery = new Rediscovery(routingUtil) - return rediscovery.lookupRoutingTableOnRouter(null, null, ROUTER_ADDRESS) + get () { + this._called++ + this._arguments = [...arguments] + return this._result } +} - function expectProtocolError (error, messagePrefix) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message.indexOf(messagePrefix)).toEqual(0) +class FakeRoutingTableGetterFactory { + constructor (routingTableGetter) { + this._routingTableGetter = routingTableGetter + this._called = 0 } - function shouldNotBeCalled () { - throw new Error('Should not be called') + create () { + this._called++ + this._arguments = [...arguments] + return this._routingTableGetter } +} - class FakeRoutingUtil extends RoutingUtil { - constructor ({ - callRoutingProcedure = shouldNotBeCalled, - parseTtl = shouldNotBeCalled, - parseServers = shouldNotBeCalled - }) { - super() - this._callAvailableRoutingProcedure = callRoutingProcedure - this._parseTtl = parseTtl - this._parseServers = parseServers - } - - callRoutingProcedure (session, routerAddress) { - return new Promise((resolve, reject) => { - try { - resolve(this._callAvailableRoutingProcedure()) - } catch (error) { - reject(error) - } - }) - } - - parseTtl (record, routerAddress) { - return this._parseTtl() - } +class FakeSession { + constructor (connection) { + this._connection = connection + this._called = 0 + } - parseServers (record, routerAddress) { - return this._parseServers() - } + _acquireConnection (callback) { + this._called++ + return callback(this._connection) } -}) +} diff --git a/test/internal/routing-table-getter/routing-procdeure-runner-multi-database.test.js b/test/internal/routing-table-getter/routing-procdeure-runner-multi-database.test.js new file mode 100644 index 000000000..bbba3f58e --- /dev/null +++ b/test/internal/routing-table-getter/routing-procdeure-runner-multi-database.test.js @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import MultiDatabaseRountingProcuderRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-multi-database' +import ServerAddress from '../../../src/internal/server-address' +import TxConfig from '../../../src/internal/tx-config' +import Result from '../../../src/result' +import FakeConnection from '../fake-connection' +import FakeSession from '../fake-session' + +describe('#unit MultiDatabaseRountingProcuderRunner', () => { + it('should run query over protocol with the correct params', () => { + const initialAddress = ServerAddress.fromUrl('bolt://127.0.0.1') + const bookmark = 'bookmark' + const mode = 'READ' + const database = 'adb' + const sessionDatabase = 'session.database' + const onComplete = () => 'nothing' + const connection = new FakeConnection().withProtocolVersion(3) + const context = { someContext: '1234' } + const session = new FakeSession(null, connection) + .withBookmark(bookmark) + .withMode(mode) + .withDatabase(sessionDatabase) + .withOnComplete(onComplete) + + run({ connection, context, session, database, initialAddress }) + + expect(connection.seenQueries).toEqual([ + 'CALL dbms.routing.getRoutingTable($context, $database)' + ]) + expect(connection.seenParameters).toEqual([ + { context: { ...context, address: initialAddress }, database } + ]) + expect(connection.seenProtocolOptions).toEqual([ + { + bookmark, + txConfig: TxConfig.empty(), + mode, + database: sessionDatabase, + afterComplete: onComplete + } + ]) + }) + + it('should return a result', () => { + const connection = new FakeConnection().withProtocolVersion(3) + const context = { someContext: '1234' } + const session = new FakeSession(null, connection) + + const result = run({ connection, context, session }) + + expect(result).toEqual(jasmine.any(Result)) + expect(result._streamObserverPromise).toEqual(jasmine.any(Promise)) + }) +}) + +function run ({ + connection, + database = 'adb', + context, + session, + initialAddress +}) { + const runner = new MultiDatabaseRountingProcuderRunner(initialAddress) + return runner.run(connection, database, context, session) +} diff --git a/test/internal/routing-table-getter/routing-procedure-runner-single-database.test.js b/test/internal/routing-table-getter/routing-procedure-runner-single-database.test.js new file mode 100644 index 000000000..461c573f0 --- /dev/null +++ b/test/internal/routing-table-getter/routing-procedure-runner-single-database.test.js @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import SingleDatabaseRountingProcuderRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-single-database' +import TxConfig from '../../../src/internal/tx-config' +import Result from '../../../src/result' +import FakeConnection from '../fake-connection' +import FakeSession from '../fake-session' + +describe('#unit SingleDatabaseRountingProcuderRunner', () => { + it('should run query over protocol with the correct params', () => { + const bookmark = 'bookmark' + const mode = 'READ' + const sessionDatabase = 'session.database' + const onComplete = () => 'nothing' + const connection = new FakeConnection().withProtocolVersion(3) + const context = { someContext: '1234' } + const session = new FakeSession(null, connection) + .withBookmark(bookmark) + .withMode(mode) + .withDatabase(sessionDatabase) + .withOnComplete(onComplete) + + run({ connection, context, session }) + + expect(connection.seenQueries).toEqual([ + 'CALL dbms.cluster.routing.getRoutingTable($context)' + ]) + expect(connection.seenParameters).toEqual([{ context }]) + expect(connection.seenProtocolOptions).toEqual([ + { + bookmark, + txConfig: TxConfig.empty(), + mode, + database: sessionDatabase, + afterComplete: onComplete + } + ]) + }) + + it('should return a result', () => { + const connection = new FakeConnection().withProtocolVersion(3) + const context = { someContext: '1234' } + const session = new FakeSession(null, connection) + + const result = run({ connection, context, session }) + + expect(result).toEqual(jasmine.any(Result)) + expect(result._streamObserverPromise).toEqual(jasmine.any(Promise)) + }) +}) + +function run ({ connection, database = 'adb', context, session }) { + const runner = new SingleDatabaseRountingProcuderRunner() + return runner.run(connection, database, context, session) +} diff --git a/test/internal/routing-table-getter/routing-table-getter-factory.test.js b/test/internal/routing-table-getter/routing-table-getter-factory.test.js new file mode 100644 index 000000000..2f5895b53 --- /dev/null +++ b/test/internal/routing-table-getter/routing-table-getter-factory.test.js @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RoutingTableGetterFactory from '../../../src/internal/routing-table-getter/routing-table-getter-factory' +import ProcedureRoutingTableGetter from '../../../src/internal/routing-table-getter/routing-table-getter-procedure' +import MultiDatabaseRoutingProcedureRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-multi-database' +import SingleDatabaseRoutingProcedureRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-single-database' +import FakeConnection from '../fake-connection' + +const routingTableProcedureVersions = [3, 4, 4.1, 4.2] +const singleDatabaseProcedureVersion = routingTableProcedureVersions.filter( + version => version < 4 +) +const multiDatabaseProcedureVersion = routingTableProcedureVersions.filter( + version => version >= 4 +) + +describe('#unit RoutingTableGetterFactory', () => { + routingTableProcedureVersions.forEach(version => { + it(`should create ProcedureRoutingTableGetter when the protocol version is ${version}`, () => { + const connection = new FakeConnection().withProtocolVersion(version) + const routingContext = { region: 'china' } + const getter = createRoutingTableGetter({ connection, routingContext }) + + expect(getter).toEqual(jasmine.any(ProcedureRoutingTableGetter)) + expect(getter._routingContext).toEqual(routingContext) + expect(getter._) + }) + }) + + singleDatabaseProcedureVersion.forEach(version => { + it(`should configure SingleDatabaseRoutingProcedureRunner as the runner in the getter when the protocol version is ${version}`, () => { + const connection = new FakeConnection().withProtocolVersion(3) + const getter = createRoutingTableGetter({ connection }) + + expect(getter._runner).toEqual( + jasmine.any(SingleDatabaseRoutingProcedureRunner) + ) + }) + }) + + multiDatabaseProcedureVersion.forEach(version => { + it(`should configure MultiDatabaseRoutingProcedureRunner as the runner in the getter when the protocol version is ${version}`, () => { + const connection = new FakeConnection().withProtocolVersion(version) + const initialAddress = 'localhost' + const getter = createRoutingTableGetter({ connection, initialAddress }) + + expect(getter._runner).toEqual( + jasmine.any(MultiDatabaseRoutingProcedureRunner) + ) + expect(getter._runner._initialAddress).toEqual(initialAddress) + }) + }) + + function createRoutingTableGetter ({ + connection, + routingContext = {}, + initialAddress = '127.0.0.1' + }) { + const factory = new RoutingTableGetterFactory( + routingContext, + initialAddress + ) + return factory.create(connection) + } +}) diff --git a/test/internal/routing-table-getter/routing-table-getter-procedure.test.js b/test/internal/routing-table-getter/routing-table-getter-procedure.test.js new file mode 100644 index 000000000..e98e70312 --- /dev/null +++ b/test/internal/routing-table-getter/routing-table-getter-procedure.test.js @@ -0,0 +1,481 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import ProcedureRoutingTableGetter from '../../../src/internal/routing-table-getter/routing-table-getter-procedure' +import RoutingProcedurRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner' +import FakeConnection from '../fake-connection' +import { + newError, + SERVICE_UNAVAILABLE, + PROTOCOL_ERROR +} from '../../../lib/error' +import ServerAddress from '../../../src/internal/server-address' +import Record from '../../../src/record' +import Integer, { int } from '../../../src/integer' +import RoutingTable from '../../../src/internal/routing-table' +import lolex from 'lolex' + +const invalidRecords = [ + [], + [newRecord(), newRecord()], + [newRecord(), newRecord(), newRecord()] +] + +const invalidAddressesFieldValues = [ + 'localhost:1234', + [{ address: 'localhost:1244' }], + null, + 23 +] + +const invalidTtlValues = [null, undefined] + +describe('#unit ProcedureRoutingTableGetter', () => { + it('should call the runner with the correct params', async () => { + const routerContext = { region: 'china' } + const connection = new FakeConnection() + const database = 'fake-db' + const session = { session: 'sec' } + + const runnerCalledWith = { + capturer (connection, database, routerContext, session) { + this.connection = connection + this.database = database + this.routerContext = routerContext + this.session = session + return fakeResolvedResult() + } + } + + try { + await callProcedureRoutingTableGetter({ + routerContext, + connection, + database, + session, + runner: new FakeRoutingProcedureRunner({ + run: runnerCalledWith.capturer.bind(runnerCalledWith) + }) + }) + } finally { + expect(runnerCalledWith.connection).toEqual(connection) + expect(runnerCalledWith.database).toEqual(database) + expect(runnerCalledWith.routerContext).toEqual(routerContext) + expect(runnerCalledWith.session).toEqual(session) + } + }) + + it('should return the routing table', () => + runWithClockAt(Date.now(), async currentTime => { + const ttl = int(42) + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + const result = await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ ttl, routers, readers, writers })]) + }) + }) + + expect(result).toEqual( + new RoutingTable({ + database, + readers: readers.map(r => ServerAddress.fromUrl(r)), + routers: routers.map(r => ServerAddress.fromUrl(r)), + writers: writers.map(w => ServerAddress.fromUrl(w)), + expirationTime: calculateExpirationTime(currentTime, ttl) + }) + ) + })) + + it('should return Integer.MAX_VALUE as expirationTime when ttl overflowed', async () => { + const ttl = int(Integer.MAX_VALUE - 2) + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + const result = await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ ttl, routers, readers, writers })]) + }) + }) + expect(result.expirationTime).toEqual(Integer.MAX_VALUE) + }) + + it('should return Integer.MAX_VALUE as expirationTime when ttl is negative', async () => { + const ttl = int(-2) + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + const result = await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ ttl, routers, readers, writers })]) + }) + }) + expect(result.expirationTime).toEqual(Integer.MAX_VALUE) + }) + + invalidTtlValues.forEach(invalidTtlValue => { + it(`should throw PROTOCOL_ERROR when ttl is not valid [${invalidTtlValue}]`, async () => { + try { + const ttl = invalidTtlValue + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([ + newRecord({ ttl, routers, readers, writers }) + ]) + }) + }) + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Unable to parse TTL entry from router') + } + }) + }) + + it('should throw PROTOCOL_ERROR when ttl is not in the record', async () => { + try { + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => fakeResolvedResult([new Record(['noTtl'], ['1234'])]) + }) + }) + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Unable to parse TTL entry from router') + } + }) + + invalidAddressesFieldValues.forEach(invalidAddressFieldValue => { + it(`should throw PROTOCOL_ERROR when routers is not valid [${invalidAddressFieldValue}]`, async () => { + try { + const routers = invalidAddressFieldValue + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ routers, readers, writers })]) + }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + + it(`should throw PROTOCOL_ERROR when readers is not valid [${invalidAddressFieldValue}]`, async () => { + try { + const routers = ['router:7699'] + const readers = invalidAddressFieldValue + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ routers, readers, writers })]) + }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + + it(`should throw PROTOCOL_ERROR when writers is not valid [${invalidAddressFieldValue}]`, async () => { + try { + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = invalidAddressFieldValue + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ routers, readers, writers })]) + }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + }) + + it('should throw PROTOCOL_ERROR when it has an alien role', async () => { + try { + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const alienRole = { + role: 'ALIEN_ROLE', + addresses: ['alien:7699'] + } + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([ + newRecord({ routers, readers, writers, extra: [alienRole] }) + ]) + }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + + it('should throw PROTOCOL_ERROR when there is no routers', async () => { + try { + const routers = [] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ routers, readers, writers })]) + }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Received no') + } + }) + + it('should throw PROTOCOL_ERROR when there is no readers', async () => { + try { + const routers = ['router:7699'] + const readers = [] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ routers, readers, writers })]) + }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Received no') + } + }) + + it('should return the routing when there is no writers', async () => { + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = [] + const database = 'db' + const routingTable = await callProcedureRoutingTableGetter({ + database, + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeResolvedResult([newRecord({ routers, readers, writers })]) + }) + }) + expect(routingTable.readers).toEqual( + readers.map(r => ServerAddress.fromUrl(r)) + ) + expect(routingTable.routers).toEqual( + routers.map(r => ServerAddress.fromUrl(r)) + ) + expect(routingTable.writers).toEqual( + writers.map(r => ServerAddress.fromUrl(r)) + ) + }) + + invalidRecords.forEach(records => { + it(`should throws PROTOCOL_ERROR when records lenght is ${records.length}`, async () => { + try { + await callProcedureRoutingTableGetter({ + runner: new FakeRoutingProcedureRunner({ + run: () => fakeResolvedResult(records) + }) + }) + fail('should throws an exception') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Illegal response from router') + } + }) + }) + + it('should throws Neo.ClientError.Database.DatabaseNotFound when this error occurs while run the query', async () => { + const expectedError = newError( + 'Some messsage', + 'Neo.ClientError.Database.DatabaseNotFound' + ) + try { + await callProcedureRoutingTableGetter({ + runner: new FakeRoutingProcedureRunner({ + run: () => fakeRejectedResult(expectedError) + }) + }) + fail('Expect to throws exception') + } catch (error) { + expect(error).toEqual(expectedError) + } + }) + + it('should throws SERVICE_UNAVAILABLE when Neo.ClientError.Procedure.ProcedureNotFound occurs', async () => { + try { + await callProcedureRoutingTableGetter({ + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeRejectedResult( + newError( + 'Some messsage', + 'Neo.ClientError.Procedure.ProcedureNotFound' + ) + ) + }) + }) + fail('Expect to throws exception') + } catch (error) { + expect(error.code).toEqual(SERVICE_UNAVAILABLE) + } + }) + + it('should return null when another error ocurrs', async () => { + const result = await callProcedureRoutingTableGetter({ + runner: new FakeRoutingProcedureRunner({ + run: () => + fakeRejectedResult(newError('Some messsage', 'another error')) + }) + }) + expect(result).toBeNull() + }) + + function callProcedureRoutingTableGetter ({ + routerContext = {}, + runner = new FakeRoutingProcedureRunner(), + connection = new FakeConnection(), + database = 'adb', + routerAddress = 'neo4j://127.0.0.1:7687', + session = {} + }) { + const getter = new ProcedureRoutingTableGetter(routerContext, runner) + return getter.get( + connection, + database, + ServerAddress.fromUrl(routerAddress), + session + ) + } +}) + +class FakeRoutingProcedureRunner extends RoutingProcedurRunner { + constructor ({ run = shouldNotBeCalled }) { + super() + this._run = run + } + + run (connection, database, routerContext, session) { + return this._run(connection, database, routerContext, session) + } +} + +function newRecord ({ + ttl = int(42), + routers = [], + readers = [], + writers = [], + extra = [] +} = {}) { + const routersField = { + role: 'ROUTE', + addresses: routers + } + const readersField = { + role: 'READ', + addresses: readers + } + const writersField = { + role: 'WRITE', + addresses: writers + } + return new Record( + ['ttl', 'servers'], + [ttl, [...extra, routersField, readersField, writersField]] + ) +} + +async function runWithClockAt (currentTime, callback) { + const clock = lolex.install() + try { + clock.setSystemTime(currentTime) + return await callback(currentTime) + } finally { + clock.uninstall() + } +} + +function calculateExpirationTime (currentTime, ttl) { + return int(currentTime + ttl.toNumber() * 1000) +} + +function fakeResolvedResult (records = null) { + return Promise.resolve({ + records + }) +} + +function fakeRejectedResult (error) { + return Promise.reject(error) +} + +function shouldNotBeCalled () { + throw new Error('Should not be called') +} diff --git a/test/internal/routing-util.test.js b/test/internal/routing-util.test.js deleted file mode 100644 index 33a5cc3b9..000000000 --- a/test/internal/routing-util.test.js +++ /dev/null @@ -1,570 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import RoutingUtil from '../../src/internal/routing-util' -import Record from '../../src/record' -import Integer, { int } from '../../src/integer' -import { - newError, - PROTOCOL_ERROR, - SERVICE_UNAVAILABLE, - SESSION_EXPIRED -} from '../../src/error' -import lolex from 'lolex' -import FakeConnection from './fake-connection' -import ServerAddress from '../../src/internal/server-address' -import Bookmark from '../../src/internal/bookmark' -import { - ACCESS_MODE_READ, - ACCESS_MODE_WRITE -} from '../../src/internal/constants' - -const ROUTER_ADDRESS = ServerAddress.fromUrl('test.router.com:4242') - -class FakeSession { - constructor (runResponse, fakeConnection) { - this._runResponse = runResponse - this._fakeConnection = fakeConnection - this._closed = false - } - - static successful (result) { - return new FakeSession(Promise.resolve(result), null) - } - - static failed (error) { - return new FakeSession(Promise.reject(error), null) - } - - static withFakeConnection (connection) { - return new FakeSession(null, connection) - } - - _run (ignoreQuery, ignoreParameters, queryRunner) { - if (this._runResponse) { - return this._runResponse - } - queryRunner(this._fakeConnection) - return Promise.resolve() - } - - withBookmark (bookmark) { - this._lastBookmark = bookmark - return this - } - - withDatabase (database) { - this._database = database || '' - return this - } - - withMode (mode) { - this._mode = mode - return this - } - - close () { - this._closed = true - return Promise.resolve() - } - - isClosed () { - return this._closed - } -} - -describe('#unit RoutingUtil', () => { - it('should return retrieved records when query succeeds', done => { - const session = FakeSession.successful({ records: ['foo', 'bar', 'baz'] }) - - callRoutingProcedure(session, '') - .then(records => { - expect(records).toEqual(['foo', 'bar', 'baz']) - done() - }) - .catch(console.log) - }) - - it('should close session when query succeeds', done => { - const session = FakeSession.successful({ records: ['foo', 'bar', 'baz'] }) - - callRoutingProcedure(session, '') - .then(() => { - expect(session.isClosed()).toBeTruthy() - done() - }) - .catch(console.log) - }) - - it('should not close session when query fails', done => { - const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)) - - callRoutingProcedure(session, '') - .then(() => { - expect(session.isClosed()).toBeFalsy() - done() - }) - .catch(console.log) - }) - - it('should return null on connection error', done => { - const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)) - - callRoutingProcedure(session, '') - .then(records => { - expect(records).toBeNull() - done() - }) - .catch(console.log) - }) - - it('should fail when procedure not found', done => { - const session = FakeSession.failed( - newError('Oh no!', 'Neo.ClientError.Procedure.ProcedureNotFound') - ) - - callRoutingProcedure(session, '').catch(error => { - expect(error.code).toBe(SERVICE_UNAVAILABLE) - expect(error.message).toBe( - `Server at ${ROUTER_ADDRESS} can't perform routing. Make sure you are connecting to a causal cluster` - ) - done() - }) - }) - - it('should use getRoutingTable procedure with empty routing context when server version is 3.2.0', done => { - const connection = new FakeConnection().withServerVersion('Neo4j/3.2.0') - const session = FakeSession.withFakeConnection(connection) - - callRoutingProcedure(session, '', {}).then(() => { - expect(connection.seenQueries).toEqual([ - 'CALL dbms.cluster.routing.getRoutingTable($context)' - ]) - expect(connection.seenParameters).toEqual([{ context: {} }]) - done() - }) - }) - - it('should use getRoutingTable procedure with routing context when server version is 3.2.0', done => { - const connection = new FakeConnection().withServerVersion('Neo4j/3.2.0') - const session = FakeSession.withFakeConnection(connection) - - callRoutingProcedure(session, '', { key1: 'value1', key2: 'value2' }).then( - () => { - expect(connection.seenQueries).toEqual([ - 'CALL dbms.cluster.routing.getRoutingTable($context)' - ]) - expect(connection.seenParameters).toEqual([ - { context: { key1: 'value1', key2: 'value2' } } - ]) - done() - } - ) - }) - - it('should use getRoutingTable procedure with empty routing context when server version is newer than 3.2.0', done => { - const connection = new FakeConnection().withServerVersion('Neo4j/3.3.5') - const session = FakeSession.withFakeConnection(connection) - - callRoutingProcedure(session, '', {}).then(() => { - expect(connection.seenQueries).toEqual([ - 'CALL dbms.cluster.routing.getRoutingTable($context)' - ]) - expect(connection.seenParameters).toEqual([{ context: {} }]) - done() - }) - }) - - it('should use getRoutingTable procedure with routing context when server version is newer than 3.2.0', done => { - const connection = new FakeConnection().withServerVersion('Neo4j/3.2.8') - const session = FakeSession.withFakeConnection(connection) - - callRoutingProcedure(session, '', { key1: 'foo', key2: 'bar' }).then(() => { - expect(connection.seenQueries).toEqual([ - 'CALL dbms.cluster.routing.getRoutingTable($context)' - ]) - expect(connection.seenParameters).toEqual([ - { context: { key1: 'foo', key2: 'bar' } } - ]) - done() - }) - }) - - it('should use getRoutingTable procedure without database and routing context when server version is newer than 4.0.0', done => { - testMultiDbRoutingProcedure({}, '', done) - }) - - it('should use getRoutingTable procedure without database but routing context when server version is newer than 4.0.0', done => { - testMultiDbRoutingProcedure({ key1: 'foo', key2: 'bar' }, '', done) - }) - - it('should use getRoutingTable procedure without routing context but database when server version is newer than 4.0.0', done => { - testMultiDbRoutingProcedure({}, 'myDatabase', done) - }) - - it('should use getRoutingTable procedure with database and routing context when server version is newer than 4.0.0', done => { - testMultiDbRoutingProcedure( - { key1: 'foo', key2: 'bar' }, - 'myDatabase', - done - ) - }) - - describe('should pass session access mode while invoking routing procedure', () => { - async function verifyAccessMode (mode) { - const connection = new FakeConnection().withServerVersion('Neo4j/4.0.0') - const session = FakeSession.withFakeConnection(connection).withMode(mode) - - await callRoutingProcedure(session, '', {}) - - expect(connection.seenQueries).toEqual([ - 'CALL dbms.routing.getRoutingTable($context, $database)' - ]) - expect(connection.seenParameters).toEqual([ - { context: { address: '127.0.0.1' }, database: null } - ]) - expect(connection.seenProtocolOptions).toEqual([ - jasmine.objectContaining({ - mode - }) - ]) - } - - it('READ', () => verifyAccessMode(ACCESS_MODE_READ)) - - it('WRITE', () => verifyAccessMode(ACCESS_MODE_WRITE)) - }) - - describe('should pass session database while invoking routing procedure', () => { - async function verifyDatabase (database) { - const connection = new FakeConnection().withServerVersion('Neo4j/4.0.0') - const session = FakeSession.withFakeConnection(connection).withDatabase( - database - ) - - await callRoutingProcedure(session, '', {}) - - expect(connection.seenQueries).toEqual([ - 'CALL dbms.routing.getRoutingTable($context, $database)' - ]) - expect(connection.seenParameters).toEqual([ - { context: { address: '127.0.0.1' }, database: null } - ]) - expect(connection.seenProtocolOptions).toEqual([ - jasmine.objectContaining({ - database - }) - ]) - } - it('systemdb', () => verifyDatabase('systemdb')) - - it('someOtherDb', () => verifyDatabase('someOtherDb')) - }) - - describe('should pass session bookmark while invoking routing procedure', () => { - async function verifyBookmark (bookmark) { - const connection = new FakeConnection().withServerVersion('Neo4j/4.0.0') - const session = FakeSession.withFakeConnection(connection).withBookmark( - bookmark - ) - - await callRoutingProcedure(session, '', {}) - - expect(connection.seenQueries).toEqual([ - 'CALL dbms.routing.getRoutingTable($context, $database)' - ]) - expect(connection.seenParameters).toEqual([ - { context: { address: '127.0.0.1' }, database: null } - ]) - expect(connection.seenProtocolOptions).toEqual([ - jasmine.objectContaining({ - bookmark - }) - ]) - } - it('empty', () => verifyBookmark(Bookmark.empty())) - - it('single', () => verifyBookmark(new Bookmark('bookmark1'))) - - it('single item', () => verifyBookmark(new Bookmark(['bookmark1']))) - - it('multiple items', () => - verifyBookmark(new Bookmark(['bookmark1', 'bookmark2', 'bookmark3']))) - - it('nested items', () => - verifyBookmark( - new Bookmark(['bookmark1', ['bookmark2'], ['bookmark3', 'bookmark4']]) - )) - }) - - it('should pass initial address while invoking routing procedure', async () => { - const connection = new FakeConnection().withServerVersion('Neo4j/4.1.0') - const session = FakeSession.withFakeConnection(connection) - const util = new RoutingUtil({}, 'initialAddr') - - await util.callRoutingProcedure(session, '', ROUTER_ADDRESS) - - expect(connection.seenParameters).toEqual([ - { context: { address: 'initialAddr' }, database: null } - ]) - }) - - it('should parse valid ttl', () => { - const clock = lolex.install() - try { - testValidTtlParsing(clock, 100, 5) - testValidTtlParsing(clock, Date.now(), 3600) // 1 hour - testValidTtlParsing(clock, Date.now(), 86400) // 24 hours - testValidTtlParsing(clock, Date.now(), 3628800) // 42 days - testValidTtlParsing(clock, 0, 1) - testValidTtlParsing(clock, 50, 0) - testValidTtlParsing(clock, Date.now(), 0) - } finally { - clock.uninstall() - } - }) - - it('should not overflow parsing huge ttl', () => { - const record = newRecord({ ttl: Integer.MAX_VALUE }) - const clock = lolex.install() - try { - clock.setSystemTime(42) - - const expirationTime = parseTtl(record) - - expect(expirationTime).toBe(Integer.MAX_VALUE) - } finally { - clock.uninstall() - } - }) - - it('should return valid value parsing negative ttl', () => { - const record = newRecord({ ttl: int(-42) }) - const clock = lolex.install() - try { - clock.setSystemTime(42) - - const expirationTime = parseTtl(record) - - expect(expirationTime).toBe(Integer.MAX_VALUE) - } finally { - clock.uninstall() - } - }) - - it('should throw when record does not have a ttl entry', done => { - const record = new Record(['notTtl', 'servers'], []) - expectProtocolError(() => parseTtl(record), done) - }) - - it('should parse servers', () => { - testValidServersParsing([], [], []) - - testValidServersParsing(['router1'], [], []) - testValidServersParsing([], ['reader1'], []) - testValidServersParsing([], [], ['writer1']) - - testValidServersParsing(['router1'], ['reader1'], []) - testValidServersParsing(['router1'], ['reader1'], ['writer1']) - testValidServersParsing([], ['reader1'], ['writer1']) - - testValidServersParsing(['router1'], ['reader1'], ['writer1']) - testValidServersParsing( - ['router1', 'router2'], - ['reader1', 'reader2'], - ['writer1'] - ) - testValidServersParsing( - ['router1', 'router2'], - ['reader1', 'reader2'], - ['writer1', 'writer2'] - ) - }) - - it('should fail to parse servers entry when record does not have servers', done => { - const record = new Record( - ['ttl', 'notServers'], - [int(42), [{ role: 'READ', addresses: ['1', '2', '3'] }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry without role', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ notRole: 'READ', addresses: ['1', '2', '3'] }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry with illegal role', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ role: 'ARBITER', addresses: ['1', '2', '3'] }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry with just ttl', done => { - const record = new Record(['ttl', 'servers'], [int(42), [{ role: 'READ' }]]) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry without addresses', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ role: 'WRITE', notAddresses: ['1', '2', '3'] }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry with string addresses', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ role: 'WRITE', addresses: '' }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry with null addresses', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ role: 'WRITE', addresses: null }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry with integer addresses', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ role: 'WRITE', addresses: 12345 }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - it('should fail to parse servers entry with object addresses', done => { - const record = new Record( - ['ttl', 'servers'], - [int(42), [{ role: 'WRITE', addresses: { key: ['localhost'] } }]] - ) - expectProtocolError(() => parseServers(record), done) - }) - - function testMultiDbRoutingProcedure (context, database, done) { - const connection = new FakeConnection().withServerVersion('Neo4j/4.0.0') - const session = FakeSession.withFakeConnection(connection) - - callRoutingProcedure(session, database, context).then(() => { - expect(connection.seenQueries).toEqual([ - 'CALL dbms.routing.getRoutingTable($context, $database)' - ]) - expect(connection.seenParameters).toEqual([ - { context, database: database || null } - ]) - done() - }) - } - - function testValidTtlParsing (clock, currentTime, ttlSeconds) { - clock.setSystemTime(currentTime) - const expectedExpirationTime = currentTime + ttlSeconds * 1000 - - // verify parsing when TTL is an Integer - const record1 = newRecord({ ttl: int(ttlSeconds) }) - const expirationTime1 = parseTtl(record1).toNumber() - expect(expirationTime1).toEqual(expectedExpirationTime) - - // verify parsing when TTL is a JavaScript Number, this can happen when driver is configured with {disableLosslessIntegers: true} - const record2 = newRecord({ ttl: ttlSeconds }) - const expirationTime2 = parseTtl(record2).toNumber() - expect(expirationTime2).toEqual(expectedExpirationTime) - } - - function testValidServersParsing ( - routerAddresses, - readerAddresses, - writerAddresses - ) { - const record = newRecord({ - routers: routerAddresses, - readers: readerAddresses, - writers: writerAddresses - }) - - const { routers, readers, writers } = parseServers(record) - - expect(routers).toEqual(routerAddresses.map(r => ServerAddress.fromUrl(r))) - expect(readers).toEqual(readerAddresses.map(r => ServerAddress.fromUrl(r))) - expect(writers).toEqual(writerAddresses.map(w => ServerAddress.fromUrl(w))) - } - - function callRoutingProcedure (session, database, routingContext) { - const util = new RoutingUtil(routingContext || {}, '127.0.0.1') - return util.callRoutingProcedure(session, database, ROUTER_ADDRESS) - } - - function parseTtl (record) { - const util = new RoutingUtil() - return util.parseTtl(record, ROUTER_ADDRESS) - } - - function parseServers (record) { - const util = new RoutingUtil() - return util.parseServers(record, ROUTER_ADDRESS) - } - - function newRecord ({ - ttl = int(42), - routers = [], - readers = [], - writers = [] - }) { - const routersField = { - role: 'ROUTE', - addresses: routers - } - const readersField = { - role: 'READ', - addresses: readers - } - const writersField = { - role: 'WRITE', - addresses: writers - } - return new Record( - ['ttl', 'servers'], - [ttl, [routersField, readersField, writersField]] - ) - } - - function expectProtocolError (action, done) { - const promise = new Promise((resolve, reject) => { - try { - resolve(action()) - } catch (e) { - reject(e) - } - }) - - promise.catch(error => { - expect(error.code).toBe(PROTOCOL_ERROR) - done() - }) - } -})