Skip to content

Commit 0953a73

Browse files
authored
Merge pull request #101 from neo4j/1.1-profiling
Optimisations from profiling
2 parents f7c2f77 + c9b9ca5 commit 0953a73

File tree

6 files changed

+265
-262
lines changed

6 files changed

+265
-262
lines changed

neo4j/v1/bolt.py

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
from struct import pack as struct_pack, unpack as struct_unpack, unpack_from as struct_unpack_from
4040

4141
from .constants import DEFAULT_USER_AGENT, KNOWN_HOSTS, MAGIC_PREAMBLE, TRUST_DEFAULT, TRUST_ON_FIRST_USE
42-
from .compat import hex2
4342
from .exceptions import ProtocolError, Unauthorized
4443
from .packstream import Packer, Unpacker
4544
from .ssl_compat import SSL_AVAILABLE, HAS_SNI, SSLError
@@ -81,6 +80,42 @@
8180
log_error = log.error
8281

8382

83+
class BufferingSocket(object):
84+
85+
def __init__(self, socket):
86+
self.socket = socket
87+
self.buffer = bytearray()
88+
89+
def fill(self):
90+
ready_to_read, _, _ = select((self.socket,), (), (), 0)
91+
received = self.socket.recv(65539)
92+
if received:
93+
if __debug__:
94+
log_debug("S: b%r", received)
95+
self.buffer[len(self.buffer):] = received
96+
else:
97+
if ready_to_read is not None:
98+
raise ProtocolError("Server closed connection")
99+
100+
def read_message(self):
101+
message_data = bytearray()
102+
p = 0
103+
size = -1
104+
while size != 0:
105+
while len(self.buffer) - p < 2:
106+
self.fill()
107+
size = 0x100 * self.buffer[p] + self.buffer[p + 1]
108+
p += 2
109+
if size > 0:
110+
while len(self.buffer) - p < size:
111+
self.fill()
112+
end = p + size
113+
message_data[len(message_data):] = self.buffer[p:end]
114+
p = end
115+
self.buffer = self.buffer[p:]
116+
return message_data
117+
118+
84119
class ChunkChannel(object):
85120
""" Reader/writer for chunked data.
86121
@@ -137,45 +172,11 @@ def send(self):
137172
"""
138173
data = self.raw.getvalue()
139174
if __debug__:
140-
log_debug("C: %s", ":".join(map(hex2, data)))
175+
log_debug("C: b%r", data)
141176
self.socket.sendall(data)
142177

143178
self.raw.seek(self.raw.truncate(0))
144179

145-
def _recv(self, size):
146-
# If data is needed, keep reading until all bytes have been received
147-
remaining = size - len(self._recv_buffer)
148-
ready_to_read = None
149-
while remaining > 0:
150-
# Read up to the required amount remaining
151-
b = self.socket.recv(8192)
152-
if b:
153-
if __debug__: log_debug("S: %s", ":".join(map(hex2, b)))
154-
else:
155-
if ready_to_read is not None:
156-
raise ProtocolError("Server closed connection")
157-
remaining -= len(b)
158-
self._recv_buffer += b
159-
160-
# If more is required, wait for available network data
161-
if remaining > 0:
162-
ready_to_read, _, _ = select((self.socket,), (), (), 0)
163-
while not ready_to_read:
164-
ready_to_read, _, _ = select((self.socket,), (), (), 0)
165-
166-
# Split off the amount of data required and keep the rest in the buffer
167-
data, self._recv_buffer = self._recv_buffer[:size], self._recv_buffer[size:]
168-
return data
169-
170-
def chunk_reader(self):
171-
chunk_size = -1
172-
while chunk_size != 0:
173-
chunk_header = self._recv(2)
174-
chunk_size, = struct_unpack_from(">H", chunk_header)
175-
if chunk_size > 0:
176-
data = self._recv(chunk_size)
177-
yield data
178-
179180

180181
class Response(object):
181182
""" Subscriber object for a full response (zero or
@@ -208,9 +209,12 @@ class Connection(object):
208209
"""
209210

210211
def __init__(self, sock, **config):
212+
self.socket = sock
213+
self.buffering_socket = BufferingSocket(sock)
211214
self.defunct = False
212215
self.channel = ChunkChannel(sock)
213216
self.packer = Packer(self.channel)
217+
self.unpacker = Unpacker()
214218
self.responses = deque()
215219
self.closed = False
216220

@@ -318,33 +322,37 @@ def fetch(self):
318322
raise ProtocolError("Cannot read from a closed connection")
319323
if self.defunct:
320324
raise ProtocolError("Cannot read from a defunct connection")
321-
raw = BytesIO()
322-
unpack = Unpacker(raw).unpack
323325
try:
324-
raw.writelines(self.channel.chunk_reader())
326+
message_data = self.buffering_socket.read_message()
325327
except ProtocolError:
326328
self.defunct = True
327329
self.close()
328330
raise
329331
# Unpack from the raw byte stream and call the relevant message handler(s)
330-
raw.seek(0)
331-
response = self.responses[0]
332-
for signature, fields in unpack():
333-
if __debug__:
334-
log_info("S: %s %s", message_names[signature], " ".join(map(repr, fields)))
335-
if signature in SUMMARY:
336-
response.complete = True
337-
self.responses.popleft()
338-
if signature == FAILURE:
339-
self.acknowledge_failure()
340-
handler_name = "on_%s" % message_names[signature].lower()
341-
try:
342-
handler = getattr(response, handler_name)
343-
except AttributeError:
344-
pass
345-
else:
346-
handler(*fields)
347-
raw.close()
332+
self.unpacker.load(message_data)
333+
size, signature = self.unpacker.unpack_structure_header()
334+
fields = [self.unpacker.unpack() for _ in range(size)]
335+
336+
if __debug__:
337+
log_info("S: %s %r", message_names[signature], fields)
338+
339+
if signature == SUCCESS:
340+
response = self.responses.popleft()
341+
response.complete = True
342+
response.on_success(*fields)
343+
elif signature == RECORD:
344+
response = self.responses[0]
345+
response.on_record(*fields)
346+
elif signature == IGNORED:
347+
response = self.responses.popleft()
348+
response.complete = True
349+
response.on_ignored(*fields)
350+
elif signature == FAILURE:
351+
response = self.responses.popleft()
352+
response.complete = True
353+
response.on_failure(*fields)
354+
else:
355+
raise ProtocolError("Unexpected response message with signature %02X" % signature)
348356

349357
def fetch_all(self):
350358
while self.responses:
@@ -454,7 +462,7 @@ def connect(host_port, ssl_context=None, **config):
454462
handshake = [MAGIC_PREAMBLE] + supported_versions
455463
if __debug__: log_info("C: [HANDSHAKE] 0x%X %r", MAGIC_PREAMBLE, supported_versions)
456464
data = b"".join(struct_pack(">I", num) for num in handshake)
457-
if __debug__: log_debug("C: %s", ":".join(map(hex2, data)))
465+
if __debug__: log_debug("C: b%r", data)
458466
s.sendall(data)
459467

460468
# Handle the handshake response
@@ -469,7 +477,7 @@ def connect(host_port, ssl_context=None, **config):
469477
log_error("S: [CLOSE]")
470478
raise ProtocolError("Server closed connection without responding to handshake")
471479
if data_size == 4:
472-
if __debug__: log_debug("S: %s", ":".join(map(hex2, data)))
480+
if __debug__: log_debug("S: b%r", data)
473481
else:
474482
# Some other garbled data has been received
475483
log_error("S: @*#!")

neo4j/v1/compat.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@ def ustr(x):
4545
else:
4646
return str(x)
4747

48-
def hex2(x):
49-
if x < 0x10:
50-
return "0" + hex(x)[2:].upper()
51-
else:
52-
return hex(x)[2:].upper()
53-
5448
else:
5549
# Python 2
5650

@@ -65,13 +59,6 @@ def ustr(x):
6559
else:
6660
return unicode(x)
6761

68-
def hex2(x):
69-
x = ord(x)
70-
if x < 0x10:
71-
return "0" + hex(x)[2:].upper()
72-
else:
73-
return hex(x)[2:].upper()
74-
7562

7663
try:
7764
from multiprocessing import Array, Process

0 commit comments

Comments
 (0)