Skip to content

Refactoring routing table rediscovery routine #645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/internal/connection-provider-routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { READ, WRITE } from '../driver'
import Session from '../session'
import RoutingTable from './routing-table'
import Rediscovery from './rediscovery'
import RoutingUtil from './routing-util'
import { RoutingTableGetterFactory } from './routing-table-getter'
import { HostNameResolver } from './node'
import SingleConnectionProvider from './connection-provider-single'
import PooledConnectionProvider from './connection-provider-pooled'
Expand Down Expand Up @@ -66,7 +66,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._seedRouter = address
this._routingTables = {}
this._rediscovery = new Rediscovery(
new RoutingUtil(routingContext, address.toString())
new RoutingTableGetterFactory(routingContext, address.toString())
)
this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(
this._connectionPool
Expand Down Expand Up @@ -417,6 +417,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
`unable to fetch routing table because of an error ${error}`
)
return null
} finally {
session.close()
}
} else {
// unable to acquire connection and create session towards the current router
Expand Down
75 changes: 17 additions & 58 deletions src/internal/rediscovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,79 +16,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import RoutingTable from './routing-table'
import RoutingUtil from './routing-util'
import { newError, PROTOCOL_ERROR } from '../error'
import Session from '../session'
import { RoutingTableGetterFactory } from './routing-table-getter'
import ServerAddress from './server-address'

export default class Rediscovery {
/**
* @constructor
* @param {RoutingUtil} routingUtil the util to use.
* @param {RoutingTableGetterFactory} routingTableGetterFactory the util to use.
*/
constructor (routingUtil) {
this._routingUtil = routingUtil
constructor (routingTableGetterFactory) {
this._routingTableGetterFactory = routingTableGetterFactory
}

/**
* Try to fetch new routing table from the given router.
* @param {Session} session the session to use.
* @param {string} database the database for which to lookup routing table.
* @param {string} routerAddress the URL of the router.
* @param {ServerAddress} routerAddress the URL of the router.
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
*/
async lookupRoutingTableOnRouter (session, database, routerAddress) {
const records = await this._routingUtil.callRoutingProcedure(
session,
database,
routerAddress
)
if (records === null) {
// connection error happened, unable to retrieve routing table from this router, next one should be queried
return null
}

if (records.length !== 1) {
throw newError(
'Illegal response from router "' +
routerAddress +
'". ' +
'Received ' +
records.length +
' records but expected only one.\n' +
JSON.stringify(records),
PROTOCOL_ERROR
lookupRoutingTableOnRouter (session, database, routerAddress) {
return session._acquireConnection(connection => {
const routingTableGetter = this._routingTableGetterFactory.create(
connection
)
}

const record = records[0]

const expirationTime = this._routingUtil.parseTtl(record, routerAddress)
const { routers, readers, writers } = this._routingUtil.parseServers(
record,
routerAddress
)

Rediscovery._assertNonEmpty(routers, 'routers', routerAddress)
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress)
// case with no writers is processed higher in the promise chain because only RoutingDriver knows
// how to deal with such table and how to treat router that returned such table

return new RoutingTable({
database,
routers,
readers,
writers,
expirationTime
})
}

static _assertNonEmpty (serverAddressesArray, serversName, routerAddress) {
if (serverAddressesArray.length === 0) {
throw newError(
'Received no ' + serversName + ' from router ' + routerAddress,
PROTOCOL_ERROR
return routingTableGetter.get(
connection,
database,
routerAddress,
session
)
}
})
}
}
21 changes: 21 additions & 0 deletions src/internal/routing-table-getter/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import RoutingTableGetterFactory from './routing-table-getter-factory'

export { RoutingTableGetterFactory }
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import RoutingProcedureRunner from './routing-procedure-runner'

const CONTEXT = 'context'
const DATABASE = 'database'
const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})`

/**
* Runs the Multi-Database procedure to get the Routing Table
*/
export default class MultiDatabaseRoutingProcedureRunner extends RoutingProcedureRunner {
constructor (initialAddress) {
super()
this._initialAddress = initialAddress
}

/**
* Run the procedure
*
* @param {Connection} connection The connection use
* @param {string} database the database
* @param {string} routerAddress the router address
* @param {Session} session the session which was used to get the connection,
* it will be used to get lastBookmark and other properties
*
* @returns {Result} the result of the query
*/
run (connection, database, context, session) {
return this._runQuery(
connection,
CALL_GET_ROUTING_TABLE_MULTI_DB,
{
context: {
...context,
address: this._initialAddress
},
database: database || null
},
session
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import RoutingProcedureRunner from './routing-procedure-runner'

const CONTEXT = 'context'
const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})`

/**
* Runs the Single-Database procedure to get the Routing Table
*/
export default class SingleDatabaseRoutingProcedureRunner extends RoutingProcedureRunner {
/**
* Run the procedure
*
* @param {Connection} connection The connection use
* @param {string} database the database
* @param {string} routerAddress the router address
* @param {Session} session the session which was used to get the connection,
* it will be used to get lastBookmark and other properties
*
* @returns {Result} the result of the query
*/
run (connection, database, context, session) {
return this._runQuery(
connection,
CALL_GET_ROUTING_TABLE,
{ context },
session
)
}
}
56 changes: 56 additions & 0 deletions src/internal/routing-table-getter/routing-procedure-runner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Result from '../../result'
import TxConfig from '../tx-config'

export default class RoutingProcedureRunner {
/**
* @param {Connection} connection the connection
* @param {string} database the database
* @param {object} routerContext the router context
* @param {Session} session the session which was used to get the connection,
* it will be used to get lastBookmark and other properties
*
* @returns {Result} the result of the query
*/
run (connection, database, routerContext, session) {
throw new Error('not implemented')
}

/**
* Run query using the connection
* @param {Connection} connection the connectiom
* @param {string} query the query
* @param {object} params the query params
* @param {Session} session the session which was used to get the connection,
* it will be used to get lastBookmark and other properties
*
* @returns {Result} the result of the query
*/
_runQuery (connection, query, params, session) {
const resultOberserver = connection.protocol().run(query, params, {
bookmark: session._lastBookmark,
txConfig: TxConfig.empty(),
mode: session._mode,
database: session._database,
afterComplete: session._onComplete
})
return new Result(Promise.resolve(resultOberserver))
}
}
56 changes: 56 additions & 0 deletions src/internal/routing-table-getter/routing-table-getter-factory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { BOLT_PROTOCOL_V4_0 } from '../constants'
import Connection from '../connection'
import ProcedureRoutingTableGetter from './routing-table-getter-procedure'
import SingleDatabaseRoutingProcedureRunner from './routing-procedure-runner-single-database'
import MultiDatabaseRoutingProcedureRunner from './routing-procedure-runner-multi-database'

/**
* Constructs the RoutingTableGetter according to the correct protocol version.
*/
export default class RoutingTableGetterFactory {
/**
* Constructor
* @param {Object} routingContext Context which the be used to define the routing table
* @param {string} initialAddress The address that the driver is connecting to,
* used by routing as a fallback when routing and clustering isn't configured.
*/
constructor (routingContext, initialAddress) {
this._routingContext = routingContext
this._initialAddress = initialAddress
}

/**
* Creates the RoutingTableGetter using the given session and database
*
* @param {Connection} connection the connection to use
* @param {string} database the database name
* @param {string} routerAddress the URL of the router.
* @returns {ProcedureRoutingTableGetter} The routing table getter
*/
create (connection) {
const runner =
connection.protocol().version < BOLT_PROTOCOL_V4_0
? new SingleDatabaseRoutingProcedureRunner()
: new MultiDatabaseRoutingProcedureRunner(this._initialAddress)

return new ProcedureRoutingTableGetter(this._routingContext, runner)
}
}
Loading