Skip to content

Commit 79d7fd4

Browse files
committed
Refactoring routing table rediscovery routine
The goal of this change is prepare the terrain for loading the routing table using the routing message. Previously, the `Rediscovery` class was responsible for the parsing the result for the procedure applying the business rules and the `RoutingUtil` was responsible for calling the correct procedure depending on the protocol version and provide methods to parse the result. In the new design, * `Rediscovery` is responsible for orchestrating the `Session`, `RoutingTableGetterFactory` and `RoutingTableGetter` to get the `Table`. * `RoutingTableGetterFactory` is responsible for creating the correct `RoutingTableGetter` configuration due the protocol version. * `ProcedureRoutingTableGetter` is responsible for getting a valid `RoutingTable` running the configured procedure * `SingleDatabaseProcedureRunner` is responsible for running the procedure used for single database protocol and return the result * `MultiDatabaseProcedureRunner` is responsible for running the procedure used for multi database protocol and return the result To extend this model with a new RoutingTable source, you need to create a new getter which will have the same blackbox behaviour of `ProcedureRoutingTableGetter` and instantiate it properly on the factory.
1 parent c6513c4 commit 79d7fd4

18 files changed

+1455
-1012
lines changed

src/internal/connection-provider-routing.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { READ, WRITE } from '../driver'
2222
import Session from '../session'
2323
import RoutingTable from './routing-table'
2424
import Rediscovery from './rediscovery'
25-
import RoutingUtil from './routing-util'
25+
import { RoutingTableGetterFactory } from './routing-table-getter'
2626
import { HostNameResolver } from './node'
2727
import SingleConnectionProvider from './connection-provider-single'
2828
import PooledConnectionProvider from './connection-provider-pooled'
@@ -66,7 +66,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
6666
this._seedRouter = address
6767
this._routingTables = {}
6868
this._rediscovery = new Rediscovery(
69-
new RoutingUtil(routingContext, address.toString())
69+
new RoutingTableGetterFactory(routingContext, address.toString())
7070
)
7171
this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(
7272
this._connectionPool
@@ -417,6 +417,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
417417
`unable to fetch routing table because of an error ${error}`
418418
)
419419
return null
420+
} finally {
421+
session.close()
420422
}
421423
} else {
422424
// unable to acquire connection and create session towards the current router

src/internal/rediscovery.js

+17-58
Original file line numberDiff line numberDiff line change
@@ -16,79 +16,38 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
2019
import RoutingTable from './routing-table'
21-
import RoutingUtil from './routing-util'
22-
import { newError, PROTOCOL_ERROR } from '../error'
20+
import Session from '../session'
21+
import { RoutingTableGetterFactory } from './routing-table-getter'
22+
import ServerAddress from './server-address'
2323

2424
export default class Rediscovery {
2525
/**
2626
* @constructor
27-
* @param {RoutingUtil} routingUtil the util to use.
27+
* @param {RoutingTableGetterFactory} routingTableGetterFactory the util to use.
2828
*/
29-
constructor (routingUtil) {
30-
this._routingUtil = routingUtil
29+
constructor (routingTableGetterFactory) {
30+
this._routingTableGetterFactory = routingTableGetterFactory
3131
}
3232

3333
/**
3434
* Try to fetch new routing table from the given router.
3535
* @param {Session} session the session to use.
3636
* @param {string} database the database for which to lookup routing table.
37-
* @param {string} routerAddress the URL of the router.
37+
* @param {ServerAddress} routerAddress the URL of the router.
3838
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
3939
*/
40-
async lookupRoutingTableOnRouter (session, database, routerAddress) {
41-
const records = await this._routingUtil.callRoutingProcedure(
42-
session,
43-
database,
44-
routerAddress
45-
)
46-
if (records === null) {
47-
// connection error happened, unable to retrieve routing table from this router, next one should be queried
48-
return null
49-
}
50-
51-
if (records.length !== 1) {
52-
throw newError(
53-
'Illegal response from router "' +
54-
routerAddress +
55-
'". ' +
56-
'Received ' +
57-
records.length +
58-
' records but expected only one.\n' +
59-
JSON.stringify(records),
60-
PROTOCOL_ERROR
40+
lookupRoutingTableOnRouter (session, database, routerAddress) {
41+
return session._acquireConnection(connection => {
42+
const routingTableGetter = this._routingTableGetterFactory.create(
43+
connection
6144
)
62-
}
63-
64-
const record = records[0]
65-
66-
const expirationTime = this._routingUtil.parseTtl(record, routerAddress)
67-
const { routers, readers, writers } = this._routingUtil.parseServers(
68-
record,
69-
routerAddress
70-
)
71-
72-
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress)
73-
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress)
74-
// case with no writers is processed higher in the promise chain because only RoutingDriver knows
75-
// how to deal with such table and how to treat router that returned such table
76-
77-
return new RoutingTable({
78-
database,
79-
routers,
80-
readers,
81-
writers,
82-
expirationTime
83-
})
84-
}
85-
86-
static _assertNonEmpty (serverAddressesArray, serversName, routerAddress) {
87-
if (serverAddressesArray.length === 0) {
88-
throw newError(
89-
'Received no ' + serversName + ' from router ' + routerAddress,
90-
PROTOCOL_ERROR
45+
return routingTableGetter.get(
46+
connection,
47+
database,
48+
routerAddress,
49+
session
9150
)
92-
}
51+
})
9352
}
9453
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import RoutingTableGetterFactory from './routing-table-getter-factory'
20+
21+
export { RoutingTableGetterFactory }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import RoutingProcedureRunner from './routing-procedure-runner'
20+
21+
const CONTEXT = 'context'
22+
const DATABASE = 'database'
23+
const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})`
24+
25+
/**
26+
* Runs the Multi-Database procedure to get the Routing Table
27+
*/
28+
export default class MultiDatabaseRoutingProcedureRunner extends RoutingProcedureRunner {
29+
constructor (initialAddress) {
30+
super()
31+
this._initialAddress = initialAddress
32+
}
33+
34+
/**
35+
* Run the procedure
36+
*
37+
* @param {Connection} connection The connection use
38+
* @param {string} database the database
39+
* @param {string} routerAddress the router address
40+
* @param {Session} session the session which was used to get the connection,
41+
* it will be used to get lastBookmark and other properties
42+
*
43+
* @returns {Result} the result of the query
44+
*/
45+
run (connection, database, context, session) {
46+
return this._runQuery(
47+
connection,
48+
CALL_GET_ROUTING_TABLE_MULTI_DB,
49+
{
50+
context: {
51+
...context,
52+
address: this._initialAddress
53+
},
54+
database: database || null
55+
},
56+
session
57+
)
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import RoutingProcedureRunner from './routing-procedure-runner'
20+
21+
const CONTEXT = 'context'
22+
const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})`
23+
24+
/**
25+
* Runs the Single-Database procedure to get the Routing Table
26+
*/
27+
export default class SingleDatabaseRoutingProcedureRunner extends RoutingProcedureRunner {
28+
/**
29+
* Run the procedure
30+
*
31+
* @param {Connection} connection The connection use
32+
* @param {string} database the database
33+
* @param {string} routerAddress the router address
34+
* @param {Session} session the session which was used to get the connection,
35+
* it will be used to get lastBookmark and other properties
36+
*
37+
* @returns {Result} the result of the query
38+
*/
39+
run (connection, database, context, session) {
40+
return this._runQuery(
41+
connection,
42+
CALL_GET_ROUTING_TABLE,
43+
{ context },
44+
session
45+
)
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import Result from '../../result'
20+
import TxConfig from '../tx-config'
21+
22+
export default class RoutingProcedureRunner {
23+
/**
24+
* @param {Connection} connection the connection
25+
* @param {string} database the database
26+
* @param {object} routerContext the router context
27+
* @param {Session} session the session which was used to get the connection,
28+
* it will be used to get lastBookmark and other properties
29+
*
30+
* @returns {Result} the result of the query
31+
*/
32+
run (connection, database, routerContext, session) {
33+
throw new Error('not implemented')
34+
}
35+
36+
/**
37+
* Run query using the connection
38+
* @param {Connection} connection the connectiom
39+
* @param {string} query the query
40+
* @param {object} params the query params
41+
* @param {Session} session the session which was used to get the connection,
42+
* it will be used to get lastBookmark and other properties
43+
*
44+
* @returns {Result} the result of the query
45+
*/
46+
_runQuery (connection, query, params, session) {
47+
const resultOberserver = connection.protocol().run(query, params, {
48+
bookmark: session._lastBookmark,
49+
txConfig: TxConfig.empty(),
50+
mode: session._mode,
51+
database: session._database,
52+
afterComplete: session._onComplete
53+
})
54+
return new Result(Promise.resolve(resultOberserver))
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import { BOLT_PROTOCOL_V4_0 } from '../constants'
20+
import Connection from '../connection'
21+
import ProcedureRoutingTableGetter from './routing-table-getter-procedure'
22+
import SingleDatabaseRoutingProcedureRunner from './routing-procedure-runner-single-database'
23+
import MultiDatabaseRoutingProcedureRunner from './routing-procedure-runner-multi-database'
24+
25+
/**
26+
* Constructs the RoutingTableGetter according to the correct protocol version.
27+
*/
28+
export default class RoutingTableGetterFactory {
29+
/**
30+
* Constructor
31+
* @param {Object} routingContext Context which the be used to define the routing table
32+
* @param {string} initialAddress The address that the driver is connecting to,
33+
* used by routing as a fallback when routing and clustering isn't configured.
34+
*/
35+
constructor (routingContext, initialAddress) {
36+
this._routingContext = routingContext
37+
this._initialAddress = initialAddress
38+
}
39+
40+
/**
41+
* Creates the RoutingTableGetter using the given session and database
42+
*
43+
* @param {Connection} connection the connection to use
44+
* @param {string} database the database name
45+
* @param {string} routerAddress the URL of the router.
46+
* @returns {ProcedureRoutingTableGetter} The routing table getter
47+
*/
48+
create (connection) {
49+
const runner =
50+
connection.protocol().version < BOLT_PROTOCOL_V4_0
51+
? new SingleDatabaseRoutingProcedureRunner()
52+
: new MultiDatabaseRoutingProcedureRunner(this._initialAddress)
53+
54+
return new ProcedureRoutingTableGetter(this._routingContext, runner)
55+
}
56+
}

0 commit comments

Comments
 (0)