diff --git a/neo4j/bolt/io.py b/neo4j/bolt/io.py index 296b736e7..6b32c3d4e 100644 --- a/neo4j/bolt/io.py +++ b/neo4j/bolt/io.py @@ -64,24 +64,31 @@ def read_int(self): def read(self, n): if n == 0 or self._current_pane == -1: return _empty_view - p, q = self._panes[self._current_pane] - size = q - p - remaining = size - self._current_offset - if n <= remaining: + value = bytearray(b"\x00" * n) + value_end = 0 + while n > 0 and self._current_pane >= 0: + p, q = self._panes[self._current_pane] + size = q - p + remaining = size - self._current_offset start = p + self._current_offset - end = start + n - if n < remaining: - self._current_offset += n + if n <= remaining: + end = start + n + if n < remaining: + self._current_offset += n + else: + self._next_pane() else: + end = q self._next_pane() - return memoryview(self._view[start:end]) - start = p + self._current_offset - end = q - value = bytearray(self._view[start:end]) - self._next_pane() - if len(value) < n and self._current_pane >= 0: - value.extend(self.read(n - (end - start))) - return value + part_size = end - start + next_value_end = value_end + part_size + value[value_end:next_value_end] = self._view[start:end] + n -= part_size + value_end = next_value_end + continue + if n > 0: + value = value[:value_end] + return memoryview(value) class ChunkedInputBuffer(object): @@ -191,7 +198,7 @@ def frame_message(self): p += 2 if chunk_size == 0: self._limit = p - self._frame = MessageFrame(memoryview(self._view[origin:self._limit]), panes) + self._frame = MessageFrame(memoryview(self._data[origin:self._limit]), panes) return True q = p + chunk_size panes.append((p - origin, q - origin)) @@ -224,23 +231,25 @@ def clear(self): self._data[0:2] = b"\x00\x00" def write(self, b): - data = self._data + new_data_start = 0 new_data_size = len(b) - chunk_size = self._end - self._start - max_chunk_size = self._max_chunk_size - chunk_remaining = max_chunk_size - chunk_size - if new_data_size > max_chunk_size: - self.write(b[:chunk_remaining]) - self.chunk() - self.write(b[chunk_remaining:]) - return - if new_data_size > chunk_remaining: - self.chunk() - new_end = self._end + new_data_size - new_chunk_size = new_end - self._start - data[self._end:new_end] = b - self._end = new_end - data[self._header:(self._header + 2)] = struct_pack(">H", new_chunk_size) + data_size = self._end - self._start + if data_size > new_data_size: + new_end = self._end + new_data_size + self._data[self._end:new_end] = bytearray(data_size) + while new_data_start < new_data_size: + chunk_occupied = self._end - self._start + chunk_remaining = self._max_chunk_size - chunk_occupied + if chunk_remaining == 0: + self.chunk() + chunk_remaining = self._max_chunk_size + chunk_write_size = min(chunk_remaining, new_data_size - new_data_start) + new_end = self._end + chunk_write_size + new_chunk_size = new_end - self._start + self._data[self._end:new_end] = b[new_data_start:(new_data_start + chunk_write_size)] + new_data_start += chunk_write_size + self._end = new_end + self._data[self._header:(self._header + 2)] = struct_pack(">H", new_chunk_size) def chunk(self): self._header = self._end diff --git a/test/performance/stress.py b/test/performance/stress.py index 70f09e990..c17fb1df2 100644 --- a/test/performance/stress.py +++ b/test/performance/stress.py @@ -23,6 +23,18 @@ def run(self): self.create_nodes() self.create_index() self.match_nodes() + self.read_large() + + def read_large(self): + for i in range(1, 7): + t0 = time() + with self.driver.session() as session: + try: + session.run("RETURN '{}'".format("x" * (i * 2 ** 20))).consume() + except CypherError: + pass + t1 = time() + stderr.write("Read %d MB in %fs\n" % (i, t1 - t0)) def drop_index(self): with self.driver.session() as session: