Skip to content

Commit c637770

Browse files
authored
Merge pull request #335 from lutovich/1.6-points
Bolt V2 with point support
2 parents 2723fe5 + 1a1b8af commit c637770

16 files changed

+897
-267
lines changed

src/v1/index.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import VERSION from '../version';
2929
import {assertString, isEmptyObjectOrNull} from './internal/util';
3030
import urlUtil from './internal/url-util';
3131
import HttpDriver from './internal/http/http-driver';
32+
import {Point} from './spatial-types';
3233

3334
/**
3435
* @property {function(username: string, password: string, realm: ?string)} basic the function to create a
@@ -205,7 +206,8 @@ const types = {
205206
Path,
206207
Result,
207208
ResultSummary,
208-
Record
209+
Record,
210+
Point
209211
};
210212

211213
/**

src/v1/internal/ch-dummy.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,15 @@ class DummyChannel {
5858
}
5959

6060
close(cb) {
61-
this.written = [];
61+
this.clear();
6262
if (cb) {
6363
return cb();
6464
}
6565
}
66+
67+
clear() {
68+
this.written = [];
69+
}
6670
}
6771

6872
const channel = DummyChannel;

src/v1/internal/connector.js

+34-94
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import WebSocketChannel from './ch-websocket';
2121
import NodeChannel from './ch-node';
2222
import {Chunker, Dechunker} from './chunking';
23-
import {Packer, Unpacker} from './packstream';
23+
import packStreamUtil from './packstream-util';
2424
import {alloc} from './buf';
25-
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
2625
import {newError} from './../error';
2726
import ChannelConfig from './ch-config';
2827
import urlUtil from './url-util';
@@ -54,11 +53,6 @@ RECORD = 0x71, // 0111 0001 // RECORD <value>
5453
IGNORED = 0x7E, // 0111 1110 // IGNORED <metadata>
5554
FAILURE = 0x7F, // 0111 1111 // FAILURE <metadata>
5655

57-
// Signature bytes for higher-level graph objects
58-
NODE = 0x4E,
59-
RELATIONSHIP = 0x52,
60-
UNBOUND_RELATIONSHIP = 0x72,
61-
PATH = 0x50,
6256
//sent before version negotiation
6357
MAGIC_PREAMBLE = 0x6060B017,
6458
DEBUG = false;
@@ -85,66 +79,6 @@ let NO_OP_OBSERVER = {
8579
onError : NO_OP
8680
};
8781

88-
/** Maps from packstream structures to Neo4j domain objects */
89-
let _mappers = {
90-
node : ( unpacker, buf ) => {
91-
return new Node(
92-
unpacker.unpack(buf), // Identity
93-
unpacker.unpack(buf), // Labels
94-
unpacker.unpack(buf) // Properties
95-
);
96-
},
97-
rel : ( unpacker, buf ) => {
98-
return new Relationship(
99-
unpacker.unpack(buf), // Identity
100-
unpacker.unpack(buf), // Start Node Identity
101-
unpacker.unpack(buf), // End Node Identity
102-
unpacker.unpack(buf), // Type
103-
unpacker.unpack(buf) // Properties
104-
);
105-
},
106-
unboundRel : ( unpacker, buf ) => {
107-
return new UnboundRelationship(
108-
unpacker.unpack(buf), // Identity
109-
unpacker.unpack(buf), // Type
110-
unpacker.unpack(buf) // Properties
111-
);
112-
},
113-
path : ( unpacker, buf ) => {
114-
let nodes = unpacker.unpack(buf),
115-
rels = unpacker.unpack(buf),
116-
sequence = unpacker.unpack(buf);
117-
let prevNode = nodes[0],
118-
segments = [];
119-
120-
for (let i = 0; i < sequence.length; i += 2) {
121-
let relIndex = sequence[i],
122-
nextNode = nodes[sequence[i + 1]],
123-
rel;
124-
if (relIndex > 0) {
125-
rel = rels[relIndex - 1];
126-
if( rel instanceof UnboundRelationship ) {
127-
// To avoid duplication, relationships in a path do not contain
128-
// information about their start and end nodes, that's instead
129-
// inferred from the path sequence. This is us inferring (and,
130-
// for performance reasons remembering) the start/end of a rel.
131-
rels[relIndex - 1] = rel = rel.bind(prevNode.identity, nextNode.identity);
132-
}
133-
} else {
134-
rel = rels[-relIndex - 1];
135-
if( rel instanceof UnboundRelationship ) {
136-
// See above
137-
rels[-relIndex - 1] = rel = rel.bind(nextNode.identity, prevNode.identity);
138-
}
139-
}
140-
// Done hydrating one path segment.
141-
segments.push( new PathSegment( prevNode, rel, nextNode ) );
142-
prevNode = nextNode;
143-
}
144-
return new Path(nodes[0], nodes[nodes.length - 1], segments );
145-
}
146-
};
147-
14882
/**
14983
* A connection manages sending and recieving messages over a channel. A
15084
* connector is very closely tied to the Bolt protocol, it implements the
@@ -175,13 +109,16 @@ class Connection {
175109
this.url = url;
176110
this.server = {address: url};
177111
this.creationTimestamp = Date.now();
112+
this._disableLosslessIntegers = disableLosslessIntegers;
178113
this._pendingObservers = [];
179114
this._currentObserver = undefined;
180115
this._ch = channel;
181116
this._dechunker = new Dechunker();
182117
this._chunker = new Chunker( channel );
183-
this._packer = new Packer(this._chunker);
184-
this._unpacker = new Unpacker(disableLosslessIntegers);
118+
119+
// initially assume that database supports latest Bolt version, create latest packer and unpacker
120+
this._packer = packStreamUtil.createLatestPacker(this._chunker);
121+
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
185122

186123
this._isHandlingFailure = false;
187124
this._currentFailure = null;
@@ -191,34 +128,18 @@ class Connection {
191128
// Set to true on fatal errors, to get this out of session pool.
192129
this._isBroken = false;
193130

194-
// For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
195-
this._unpacker.structMappers[NODE] = _mappers.node;
196-
this._unpacker.structMappers[RELATIONSHIP] = _mappers.rel;
197-
this._unpacker.structMappers[UNBOUND_RELATIONSHIP] = _mappers.unboundRel;
198-
this._unpacker.structMappers[PATH] = _mappers.path;
199-
200-
let self = this;
201131
// TODO: Using `onmessage` and `onerror` came from the WebSocket API,
202132
// it reads poorly and has several annoying drawbacks. Swap to having
203133
// Channel extend EventEmitter instead, then we can use `on('data',..)`
204134
this._ch.onmessage = (buf) => {
205-
let proposed = buf.readInt32();
206-
if( proposed == 1 ) {
207-
// Ok, protocol running. Simply forward all messages past
208-
// this to the dechunker
209-
self._ch.onmessage = (buf) => {
210-
self._dechunker.write(buf);
211-
};
212-
213-
if( buf.hasRemaining() ) {
214-
self._dechunker.write(buf.readSlice( buf.remaining() ));
215-
}
135+
const proposed = buf.readInt32();
136+
if (proposed == 1 || proposed == 2) {
137+
this._initializeProtocol(proposed, buf);
216138
} else if (proposed == 1213486160) {//server responded 1213486160 == 0x48545450 == "HTTP"
217-
self._handleFatalError(newError("Server responded HTTP. Make sure you are not trying to connect to the http endpoint " +
218-
"(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)"));
219-
}
220-
else {
221-
self._handleFatalError(newError("Unknown Bolt protocol version: " + proposed));
139+
this._handleFatalError(newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
140+
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'));
141+
} else {
142+
this._handleFatalError(newError('Unknown Bolt protocol version: ' + proposed));
222143
}
223144
};
224145

@@ -235,21 +156,40 @@ class Connection {
235156
}
236157

237158
this._dechunker.onmessage = (buf) => {
238-
self._handleMessage( self._unpacker.unpack( buf ) );
159+
this._handleMessage(this._unpacker.unpack(buf));
239160
};
240161

241162
let handshake = alloc( 5 * 4 );
242163
//magic preamble
243164
handshake.writeInt32( MAGIC_PREAMBLE );
244165
//proposed versions
166+
handshake.writeInt32( 2 );
245167
handshake.writeInt32( 1 );
246168
handshake.writeInt32( 0 );
247169
handshake.writeInt32( 0 );
248-
handshake.writeInt32( 0 );
249170
handshake.reset();
250171
this._ch.write( handshake );
251172
}
252173

174+
/**
175+
* Complete protocol initialization.
176+
* @param {number} version the selected protocol version.
177+
* @param {BaseBuffer} buffer the handshake response buffer.
178+
* @private
179+
*/
180+
_initializeProtocol(version, buffer) {
181+
// re-create packer and unpacker because version might be lower than we initially assumed
182+
this._packer = packStreamUtil.createPackerForProtocolVersion(version, this._chunker);
183+
this._unpacker = packStreamUtil.createUnpackerForProtocolVersion(version, this._disableLosslessIntegers);
184+
185+
// Ok, protocol running. Simply forward all messages to the dechunker
186+
this._ch.onmessage = buf => this._dechunker.write(buf);
187+
188+
if (buffer.hasRemaining()) {
189+
this._dechunker.write(buffer.readSlice(buffer.remaining()));
190+
}
191+
}
192+
253193
/**
254194
* "Fatal" means the connection is dead. Only call this if something
255195
* happens that cannot be recovered from. This will lead to all subscribers

src/v1/internal/packstream-util.js

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import * as v1 from './packstream-v1';
21+
import * as v2 from './packstream-v2';
22+
23+
const PACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Packer, v2.Packer];
24+
const UNPACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Unpacker, v2.Unpacker];
25+
26+
function createLatestPacker(chunker) {
27+
return createPackerForProtocolVersion(PACKER_CONSTRUCTORS_BY_VERSION.length - 1, chunker);
28+
}
29+
30+
function createLatestUnpacker(disableLosslessIntegers) {
31+
return createUnpackerForProtocolVersion(UNPACKER_CONSTRUCTORS_BY_VERSION.length - 1, disableLosslessIntegers);
32+
}
33+
34+
function createPackerForProtocolVersion(version, chunker) {
35+
const packerConstructor = PACKER_CONSTRUCTORS_BY_VERSION[version];
36+
if (!packerConstructor) {
37+
throw new Error(`Packer can't be created for protocol version ${version}`);
38+
}
39+
return new packerConstructor(chunker);
40+
}
41+
42+
function createUnpackerForProtocolVersion(version, disableLosslessIntegers) {
43+
const unpackerConstructor = UNPACKER_CONSTRUCTORS_BY_VERSION[version];
44+
if (!unpackerConstructor) {
45+
throw new Error(`Unpacker can't be created for protocol version ${version}`);
46+
}
47+
return new unpackerConstructor(disableLosslessIntegers);
48+
}
49+
50+
export default {
51+
createLatestPacker: createLatestPacker,
52+
createLatestUnpacker: createLatestUnpacker,
53+
createPackerForProtocolVersion: createPackerForProtocolVersion,
54+
createUnpackerForProtocolVersion: createUnpackerForProtocolVersion
55+
};

0 commit comments

Comments
 (0)