Skip to content

Commit f368693

Browse files
committed
More fixes and boltkit tests for routing
1 parent c2cfedf commit f368693

19 files changed

+520
-62
lines changed

package.json

+2
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@
5252
"lolex": "^1.5.2",
5353
"merge-stream": "^1.0.0",
5454
"minimist": "^1.2.0",
55+
"mustache": "^2.3.0",
5556
"phantomjs-prebuilt": "^2.1.7 ",
5657
"run-sequence": "^1.1.4",
5758
"semver": "^5.3.0",
5859
"through2": "~2.0.0",
60+
"tmp": "0.0.31",
5961
"vinyl-buffer": "^1.0.0",
6062
"vinyl-source-stream": "^1.1.0"
6163
},

src/v1/internal/get-servers-util.js

+13-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export default class GetServersUtil {
3131
session.close();
3232
return result.records;
3333
}).catch(error => {
34-
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
34+
if (this._isProcedureNotFoundError(error)) {
3535
// throw when getServers procedure not found because this is clearly a configuration issue
3636
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
3737
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
@@ -92,4 +92,16 @@ export default class GetServersUtil {
9292
PROTOCOL_ERROR);
9393
}
9494
}
95+
96+
_isProcedureNotFoundError(error) {
97+
let errorCode = error.code;
98+
if (!errorCode) {
99+
try {
100+
errorCode = error.fields[0].code;
101+
} catch (e) {
102+
errorCode = 'UNKNOWN';
103+
}
104+
}
105+
return errorCode === PROCEDURE_NOT_FOUND_CODE;
106+
}
95107
}

src/v1/internal/rediscovery.js

+2-5
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,8 @@ export default class Rediscovery {
4747

4848
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
4949
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);
50-
51-
if (writers.isEmpty()) {
52-
// retrieved routing table has no writers, next router should be queried
53-
return null;
54-
}
50+
// case with no writers is processed higher in the promise chain because only RoutingDriver knows
51+
// how to deal with such table and how to treat router that returned such table
5552

5653
return new RoutingTable(routers, readers, writers, expirationTime);
5754
});

src/v1/internal/routing-table.js

+8
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ export default class RoutingTable {
3838
this.writers.remove(address);
3939
}
4040

41+
forgetRouter(address) {
42+
this.routers.remove(address);
43+
}
44+
45+
forgetWriter(address) {
46+
this.writers.remove(address);
47+
}
48+
4149
serversDiff(otherRoutingTable) {
4250
const oldServers = new Set(this._allServers());
4351
const newServers = otherRoutingTable._allServers();

src/v1/routing-driver.js

+24-16
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ class RoutingDriver extends Driver {
7070
let url = 'UNKNOWN';
7171
if (conn) {
7272
url = conn.url;
73-
this._routingTable.writers.remove(conn.url);
73+
this._routingTable.forgetWriter(conn.url);
7474
} else {
7575
connectionPromise.then((conn) => {
76-
this._routingTable.writers.remove(conn.url);
76+
this._routingTable.forgetWriter(conn.url);
7777
}).catch(() => {/*ignore*/});
7878
}
7979
return newError("No longer possible to write to server at " + url, SESSION_EXPIRED);
@@ -118,35 +118,43 @@ class RoutingDriver extends Driver {
118118
const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
119119
return refreshedTablePromise.then(newRoutingTable => {
120120
if (newRoutingTable) {
121-
// correct routing table was fetched, just return it
122-
return newRoutingTable
123-
}
124-
125-
// returned routing table was undefined, this means a connection error happened and we need to forget the
126-
// previous router and try the next one
127-
const previousRouter = knownRouters[currentIndex - 1];
128-
if (previousRouter) {
129-
this._forget(previousRouter);
121+
if (!newRoutingTable.writers.isEmpty()) {
122+
// valid routing table was fetched - just return it, try next router otherwise
123+
return newRoutingTable;
124+
}
125+
} else {
126+
// returned routing table was undefined, this means a connection error happened and we need to forget the
127+
// previous router and try the next one
128+
const previousRouter = knownRouters[currentIndex - 1];
129+
if (previousRouter) {
130+
currentRoutingTable.forgetRouter(previousRouter);
131+
}
130132
}
131133

132-
const connection = this._pool.acquire(currentRouter);
133-
const connectionPromise = Promise.resolve(connection);
134-
const session = this._createSession(connectionPromise, this._releaseConnection(connectionPromise));
135-
136134
// try next router
135+
const session = this._createSessionForRediscovery(currentRouter);
137136
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
138137
})
139138
}, Promise.resolve(null));
140139

141140
return refreshedTablePromise.then(newRoutingTable => {
142-
if (newRoutingTable) {
141+
if (newRoutingTable && !newRoutingTable.writers.isEmpty()) {
143142
this._updateRoutingTable(newRoutingTable);
144143
return newRoutingTable
145144
}
146145
throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE);
147146
});
148147
}
149148

149+
_createSessionForRediscovery(routerAddress) {
150+
const connection = this._pool.acquire(routerAddress);
151+
const connectionPromise = Promise.resolve(connection);
152+
// error transformer here is a no-op unlike the one in a regular session, this is so because errors are
153+
// handled in the rediscovery promise chain and we do not need to do this in the error transformer
154+
const errorTransformer = error => error;
155+
return new RoutingSession(connectionPromise, this._releaseConnection(connectionPromise), errorTransformer);
156+
}
157+
150158
_forget(url) {
151159
this._routingTable.forget(url);
152160
this._pool.purge(url);

test/internal/rediscovery.test.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ describe('rediscovery', () => {
129129
});
130130
});
131131

132-
it('should return null when no writers', done => {
132+
it('should return routing table when no writers', done => {
133133
const util = new FakeGetServersUtil({
134134
callGetServers: () => [new Record(['a'], ['aaa'])],
135135
parseTtl: () => int(42),
@@ -143,7 +143,8 @@ describe('rediscovery', () => {
143143
});
144144

145145
lookupRoutingTableOnRouter(util).then(routingTable => {
146-
expect(routingTable).toBeNull();
146+
expect(routingTable).toBeDefined();
147+
expect(routingTable).not.toBeNull();
147148
done();
148149
});
149150
});

test/internal/routing-table.test.js

+20
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,26 @@ describe('routing-table', () => {
8787
expect(table.writers.toArray()).toEqual([]);
8888
});
8989

90+
it('should forget router', () => {
91+
const table = createTable([1, 2], [1, 3], [4, 1], notExpired());
92+
93+
table.forgetRouter(1);
94+
95+
expect(table.routers.toArray()).toEqual([2]);
96+
expect(table.readers.toArray()).toEqual([1, 3]);
97+
expect(table.writers.toArray()).toEqual([4, 1]);
98+
});
99+
100+
it('should forget writer', () => {
101+
const table = createTable([1, 2, 3], [2, 1, 5], [5, 1], notExpired());
102+
103+
table.forgetWriter(1);
104+
105+
expect(table.routers.toArray()).toEqual([1, 2, 3]);
106+
expect(table.readers.toArray()).toEqual([2, 1, 5]);
107+
expect(table.writers.toArray()).toEqual([5]);
108+
});
109+
90110
it('should return all servers in diff when other table is empty', () => {
91111
const oldTable = createTable([1, 2], [3, 4], [5, 6], notExpired());
92112
const newTable = createTable([], [], [], notExpired());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]]
9+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9005"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
10+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
9+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"},{"addresses": ["127.0.0.1:9001"], "role": "READ"}]]
9+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "notServers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
9+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["notTtl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
9+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": {{{writers}}},"role": "WRITE"}, {"addresses": {{{readers}}}, "role": "READ"},{"addresses": {{{routers}}}, "role": "ROUTE"}]]
9+
SUCCESS {}
10+
C: RUN "MATCH (n) RETURN n.name" {}
11+
PULL_ALL
12+
S: SUCCESS {"fields": ["n.name"]}
13+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [0, [{"addresses": ["127.0.0.1:9090","127.0.0.1:9091","127.0.0.1:9092","127.0.0.1:9000"],"role": "ROUTE"}, {"addresses": ["127.0.0.1:9000"], "role": "READ"},{"addresses": ["127.0.0.1:9000"], "role": "WRITE"}]]
9+
SUCCESS {}
10+
C: RUN "MATCH (n) RETURN n" {}
11+
PULL_ALL
12+
S: SUCCESS {"fields": ["n"]}
13+
SUCCESS {}
14+
C: RUN "CALL dbms.cluster.routing.getServers" {}
15+
PULL_ALL
16+
S: SUCCESS {"fields": ["ttl", "servers"]}
17+
RECORD [0, [{"addresses": ["127.0.0.1:9000"],"role": "ROUTE"}, {"addresses": ["127.0.0.1:9000"], "role": "READ"},{"addresses": ["127.0.0.1:9000"], "role": "WRITE"}]]
18+
SUCCESS {}
19+
C: RUN "MATCH (n) RETURN n" {}
20+
PULL_ALL
21+
S: SUCCESS {"fields": ["n"]}
22+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"notAddresses": ["127.0.0.1:9001"],"memberRole": "WRITER"}, {"notAddresses": ["127.0.0.1:9002","127.0.0.1:9003"], "memberRole": "READER"},{"notAddresses": ["127.0.0.1:9001","127.0.0.1:9002"], "memberRole": "ROUTER"}]]
9+
SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [{ttl: 9223372036854775807}, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]]
9+
SUCCESS {}

test/v1/boltkit.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,23 @@
1717
* limitations under the License.
1818
*/
1919

20-
var childProcess = require("child_process");
20+
var childProcess = require('child_process');
21+
var Mustache = require('Mustache');
22+
var fs = require('fs');
23+
var tmp = require('tmp');
2124

2225
var BoltKit = function (verbose) {
2326
this.verbose = verbose || false;
2427
};
2528

29+
BoltKit.prototype.startWithTemplate = function (scriptTemplate, parameters, port) {
30+
var template = fs.readFileSync(scriptTemplate, 'utf-8');
31+
var scriptContents = Mustache.render(template, parameters);
32+
var script = tmp.fileSync().name;
33+
fs.writeFileSync(script, scriptContents, 'utf-8');
34+
return this.start(script, port);
35+
};
36+
2637
BoltKit.prototype.start = function(script, port) {
2738
var spawn = childProcess.spawn, server, code = -1;
2839

0 commit comments

Comments
 (0)