From b9a6e6714f6026e9c7e8a6145ae504eec74017a4 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Tue, 29 Nov 2016 12:40:37 +0000 Subject: [PATCH 1/4] Removed hex2 --- neo4j/v1/bolt.py | 9 ++++----- neo4j/v1/compat.py | 13 ------------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/neo4j/v1/bolt.py b/neo4j/v1/bolt.py index e49495fc9..1147b4ba8 100644 --- a/neo4j/v1/bolt.py +++ b/neo4j/v1/bolt.py @@ -39,7 +39,6 @@ from struct import pack as struct_pack, unpack as struct_unpack, unpack_from as struct_unpack_from from .constants import DEFAULT_USER_AGENT, KNOWN_HOSTS, MAGIC_PREAMBLE, TRUST_DEFAULT, TRUST_ON_FIRST_USE -from .compat import hex2 from .exceptions import ProtocolError, Unauthorized from .packstream import Packer, Unpacker from .ssl_compat import SSL_AVAILABLE, HAS_SNI, SSLError @@ -137,7 +136,7 @@ def send(self): """ data = self.raw.getvalue() if __debug__: - log_debug("C: %s", ":".join(map(hex2, data))) + log_debug("C: b%r", data) self.socket.sendall(data) self.raw.seek(self.raw.truncate(0)) @@ -150,7 +149,7 @@ def _recv(self, size): # Read up to the required amount remaining b = self.socket.recv(8192) if b: - if __debug__: log_debug("S: %s", ":".join(map(hex2, b))) + if __debug__: log_debug("S: b%r", b) else: if ready_to_read is not None: raise ProtocolError("Server closed connection") @@ -454,7 +453,7 @@ def connect(host_port, ssl_context=None, **config): handshake = [MAGIC_PREAMBLE] + supported_versions if __debug__: log_info("C: [HANDSHAKE] 0x%X %r", MAGIC_PREAMBLE, supported_versions) data = b"".join(struct_pack(">I", num) for num in handshake) - if __debug__: log_debug("C: %s", ":".join(map(hex2, data))) + if __debug__: log_debug("C: b%r", data) s.sendall(data) # Handle the handshake response @@ -469,7 +468,7 @@ def connect(host_port, ssl_context=None, **config): log_error("S: [CLOSE]") raise ProtocolError("Server closed connection without responding to handshake") if data_size == 4: - if __debug__: log_debug("S: %s", ":".join(map(hex2, data))) + if __debug__: log_debug("S: b%r", data) else: # Some other garbled data has been received log_error("S: @*#!") diff --git a/neo4j/v1/compat.py b/neo4j/v1/compat.py index 7bae32b6d..b9f24df70 100644 --- a/neo4j/v1/compat.py +++ b/neo4j/v1/compat.py @@ -45,12 +45,6 @@ def ustr(x): else: return str(x) - def hex2(x): - if x < 0x10: - return "0" + hex(x)[2:].upper() - else: - return hex(x)[2:].upper() - else: # Python 2 @@ -65,13 +59,6 @@ def ustr(x): else: return unicode(x) - def hex2(x): - x = ord(x) - if x < 0x10: - return "0" + hex(x)[2:].upper() - else: - return hex(x)[2:].upper() - try: from multiprocessing import Array, Process From 9fdaaacb0fc836139eaa388833645075f42e2375 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 30 Nov 2016 14:14:52 +0000 Subject: [PATCH 2/4] Slightly faster unpacking --- neo4j/v1/bolt.py | 120 +++++++------ neo4j/v1/packstream.py | 365 ++++++++++++++++++++-------------------- test/test_packstream.py | 18 +- test/test_session.py | 4 +- 4 files changed, 262 insertions(+), 245 deletions(-) diff --git a/neo4j/v1/bolt.py b/neo4j/v1/bolt.py index 1147b4ba8..c6f0e4cb8 100644 --- a/neo4j/v1/bolt.py +++ b/neo4j/v1/bolt.py @@ -80,6 +80,42 @@ log_error = log.error +class BufferingSocket(object): + + def __init__(self, socket): + self.socket = socket + self.buffer = bytearray() + + def fill(self): + ready_to_read, _, _ = select((self.socket,), (), (), 0) + received = self.socket.recv(65539) + if received: + if __debug__: + log_debug("S: b%r", received) + self.buffer[len(self.buffer):] = received + else: + if ready_to_read is not None: + raise ProtocolError("Server closed connection") + + def read_message(self): + message_data = bytearray() + p = 0 + size = -1 + while size != 0: + while len(self.buffer) - p < 2: + self.fill() + size = 0x100 * self.buffer[p] + self.buffer[p + 1] + p += 2 + if size > 0: + while len(self.buffer) - p < size: + self.fill() + end = p + size + message_data[len(message_data):] = self.buffer[p:end] + p = end + self.buffer = self.buffer[p:] + return message_data + + class ChunkChannel(object): """ Reader/writer for chunked data. @@ -141,40 +177,6 @@ def send(self): self.raw.seek(self.raw.truncate(0)) - def _recv(self, size): - # If data is needed, keep reading until all bytes have been received - remaining = size - len(self._recv_buffer) - ready_to_read = None - while remaining > 0: - # Read up to the required amount remaining - b = self.socket.recv(8192) - if b: - if __debug__: log_debug("S: b%r", b) - else: - if ready_to_read is not None: - raise ProtocolError("Server closed connection") - remaining -= len(b) - self._recv_buffer += b - - # If more is required, wait for available network data - if remaining > 0: - ready_to_read, _, _ = select((self.socket,), (), (), 0) - while not ready_to_read: - ready_to_read, _, _ = select((self.socket,), (), (), 0) - - # Split off the amount of data required and keep the rest in the buffer - data, self._recv_buffer = self._recv_buffer[:size], self._recv_buffer[size:] - return data - - def chunk_reader(self): - chunk_size = -1 - while chunk_size != 0: - chunk_header = self._recv(2) - chunk_size, = struct_unpack_from(">H", chunk_header) - if chunk_size > 0: - data = self._recv(chunk_size) - yield data - class Response(object): """ Subscriber object for a full response (zero or @@ -207,9 +209,12 @@ class Connection(object): """ def __init__(self, sock, **config): + self.socket = sock + self.buffering_socket = BufferingSocket(sock) self.defunct = False self.channel = ChunkChannel(sock) self.packer = Packer(self.channel) + self.unpacker = Unpacker() self.responses = deque() self.closed = False @@ -317,33 +322,38 @@ def fetch(self): raise ProtocolError("Cannot read from a closed connection") if self.defunct: raise ProtocolError("Cannot read from a defunct connection") - raw = BytesIO() - unpack = Unpacker(raw).unpack try: - raw.writelines(self.channel.chunk_reader()) + message_data = self.buffering_socket.read_message() except ProtocolError: self.defunct = True self.close() raise # Unpack from the raw byte stream and call the relevant message handler(s) - raw.seek(0) - response = self.responses[0] - for signature, fields in unpack(): - if __debug__: - log_info("S: %s %s", message_names[signature], " ".join(map(repr, fields))) - if signature in SUMMARY: - response.complete = True - self.responses.popleft() - if signature == FAILURE: - self.acknowledge_failure() - handler_name = "on_%s" % message_names[signature].lower() - try: - handler = getattr(response, handler_name) - except AttributeError: - pass - else: - handler(*fields) - raw.close() + self.unpacker.load(message_data) + size, signature = self.unpacker.unpack_structure_header() + fields = [self.unpacker.unpack() for _ in range(size)] + + if __debug__: + log_info("S: %s %r", message_names[signature], fields) + + if signature == SUCCESS: + response = self.responses.popleft() + response.complete = True + response.on_success(*fields) + elif signature == RECORD: + response = self.responses[0] + response.on_record(*fields) + elif signature == IGNORED: + response = self.responses.popleft() + response.complete = True + response.on_ignored(*fields) + elif signature == FAILURE: + response = self.responses.popleft() + response.complete = True + self.acknowledge_failure() + response.on_failure(*fields) + else: + raise ProtocolError("Unexpected response message with signature %02X" % signature) def fetch_all(self): while self.responses: diff --git a/neo4j/v1/packstream.py b/neo4j/v1/packstream.py index 76259a840..82fa6d285 100644 --- a/neo4j/v1/packstream.py +++ b/neo4j/v1/packstream.py @@ -320,7 +320,7 @@ INTEGER_TYPE = (int, long) STRING_TYPE = (str, unicode) -__all__ = ["Packer", "pack", "packb", "Unpacker", "unpack", "unpackb"] +__all__ = ["Packer", "pack", "packb", "Unpacker"] INFINITY = 1e309 @@ -397,37 +397,6 @@ UNPACKED_MARKERS.update({bytes(bytearray([z + 256])): z for z in range(MINUS_2_TO_THE_4, 0)}) -class List(list): - - def __init__(self, capacity): - self.capacity = capacity - - def append(self, item): - if item is END_OF_STREAM: - self.capacity = len(self) - else: - list.append(self, item) - - -class Map(dict): - - def __init__(self, capacity): - self.capacity = capacity - self.__key = NotImplemented - - def append(self, item): - key = self.__key - if key is NotImplemented: - if item is END_OF_STREAM: - self.capacity = len(self) - else: - self.__key = item - else: - self[key] = item - self.__key = NotImplemented - return key - - class Structure(list): def __init__(self, capacity, signature): @@ -640,161 +609,199 @@ def packb(*values): class Unpacker(object): - def __init__(self, stream): - self.stream = stream + def __init__(self): + self.buffer = None + self.pos = 0 + + def load(self, buffer): + self.buffer = buffer + self.pos = 0 + + def read(self, n=1): + available = len(self.buffer) - self.pos + start = self.pos + if n <= available: + self.pos += n + else: + self.pos = len(self.buffer) + return bytes(self.buffer[start:self.pos]) + + def read_marker(self): + if self.pos < len(self.buffer): + pos = self.pos + self.pos += 1 + return self.buffer[pos] + else: + return -1 def unpack(self): - current_collection = List(INFINITY) - current_capacity = current_collection.capacity - current_size = len(current_collection) - push_item = current_collection.append + stream_read = self.read + marker = self.read_marker() - collection_stack = [] - push_collection = collection_stack.append - pop_collection = collection_stack.pop + if marker == -1: + raise RuntimeError("Nothing to unpack") - stream_read = self.stream.read - while True: - marker_byte = stream_read(1) + # Tiny Integer + if 0x00 <= marker <= 0x7F: + return marker + elif 0xF0 <= marker <= 0xFF: + return marker - 0x100 - if not marker_byte: - break + # Null + elif marker == 0xC0: + return None - is_collection = False + # Float + elif marker == 0xC1: + return struct_unpack(DOUBLE_STRUCT, stream_read(8))[0] - try: - value = UNPACKED_MARKERS[marker_byte] # NULL, TRUE, FALSE and TINY_INT - - except KeyError: - marker = UNPACKED_UINT_8[marker_byte] - marker_high = marker & 0xF0 - - # Float - if marker_byte == FLOAT_64: - value = struct_unpack(DOUBLE_STRUCT, stream_read(8))[0] - - # Integer - elif marker_byte == INT_8: - value = UNPACKED_INT_8[stream_read(1)] - elif marker_byte == INT_16: - value = UNPACKED_INT_16[stream_read(2)] - elif marker_byte == INT_32: - value = struct_unpack(INT_32_STRUCT, stream_read(4))[0] - elif marker_byte == INT_64: - value = struct_unpack(INT_64_STRUCT, stream_read(8))[0] - - # Bytes - elif marker_byte == BYTES_8: - byte_size = UNPACKED_UINT_8[stream_read(1)] - value = stream_read(byte_size) - elif marker_byte == BYTES_16: - byte_size = UNPACKED_UINT_16[stream_read(2)] - value = stream_read(byte_size) - elif marker_byte == BYTES_32: - byte_size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] - value = stream_read(byte_size) - - # String - elif marker_high == 0x80: - value = stream_read(marker & 0x0F).decode(ENCODING) - elif marker_byte == STRING_8: - byte_size = UNPACKED_UINT_8[stream_read(1)] - value = stream_read(byte_size).decode(ENCODING) - elif marker_byte == STRING_16: - byte_size = UNPACKED_UINT_16[stream_read(2)] - value = stream_read(byte_size).decode(ENCODING) - elif marker_byte == STRING_32: - byte_size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] - value = stream_read(byte_size).decode(ENCODING) - - # List - elif marker_high == 0x90: - value = List(marker & 0x0F) - is_collection = True - elif marker_byte == LIST_8: - size = UNPACKED_UINT_8[stream_read(1)] - value = List(size) - is_collection = True - elif marker_byte == LIST_16: - size = UNPACKED_UINT_16[stream_read(2)] - value = List(size) - is_collection = True - elif marker_byte == LIST_32: - size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] - value = List(size) - is_collection = True - elif marker_byte == LIST_STREAM: - size = INFINITY - value = List(size) - is_collection = True - - # Map - elif marker_high == 0xA0: - value = Map(marker & 0x0F) - is_collection = True - elif marker_byte == MAP_8: - size = UNPACKED_UINT_8[stream_read(1)] - value = Map(size) - is_collection = True - elif marker_byte == MAP_16: - size = UNPACKED_UINT_16[stream_read(2)] - value = Map(size) - is_collection = True - elif marker_byte == MAP_32: - size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] - value = Map(size) - is_collection = True - elif marker_byte == MAP_STREAM: - size = INFINITY - value = Map(size) - is_collection = True - - # Structure - elif marker_high == 0xB0: - signature = stream_read(1) - value = Structure(marker & 0x0F, signature) - is_collection = True - elif marker_byte == STRUCT_8: - size, signature = stream_read(2) - value = Structure(UNPACKED_UINT_8[size], signature) - is_collection = True - elif marker_byte == STRUCT_16: - data = stream_read(3) - value = Structure(UNPACKED_UINT_16[data[0:2]], data[2]) - is_collection = True - - elif marker_byte == END_OF_STREAM: - value = END_OF_STREAM - - appended = False - while not appended: - if current_size >= current_capacity: - current_collection = pop_collection() - current_capacity = current_collection.capacity - current_size = len(current_collection) - push_item = current_collection.append - else: - if push_item(value) is not NotImplemented: - current_size += 1 - if is_collection: - push_collection(current_collection) - current_collection = value - current_capacity = current_collection.capacity - current_size = len(current_collection) - push_item = current_collection.append - appended = True - - if collection_stack: - return iter(collection_stack[0]) - else: - return iter(current_collection) + # Boolean + elif marker == 0xC2: + return False + elif marker == 0xC3: + return True + # Integer + elif marker == 0xC8: + return UNPACKED_INT_8[stream_read(1)] + elif marker == 0xC9: + return UNPACKED_INT_16[stream_read(2)] + elif marker == 0xCA: + return struct_unpack(INT_32_STRUCT, stream_read(4))[0] + elif marker == 0xCB: + return struct_unpack(INT_64_STRUCT, stream_read(8))[0] + + # Bytes + elif marker == 0xCC: + byte_size = UNPACKED_UINT_8[stream_read(1)] + return stream_read(byte_size) + elif marker == 0xCD: + byte_size = UNPACKED_UINT_16[stream_read(2)] + return stream_read(byte_size) + elif marker == 0xCE: + byte_size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] + return stream_read(byte_size) -def unpack(stream): - unpacker = Unpacker(stream) - for value in unpacker.unpack(): - yield value + else: + marker_high = marker & 0xF0 + unpack1 = self.unpack + + # String + if marker_high == 0x80: # TINY_STRING + return stream_read(marker & 0x0F).decode(ENCODING) + elif marker == 0xD0: # STRING_8: + byte_size = UNPACKED_UINT_8[stream_read(1)] + return stream_read(byte_size).decode(ENCODING) + elif marker == 0xD1: # STRING_16: + byte_size = UNPACKED_UINT_16[stream_read(2)] + return stream_read(byte_size).decode(ENCODING) + elif marker == 0xD2: # STRING_32: + byte_size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] + return stream_read(byte_size).decode(ENCODING) + + # List + elif marker_high == 0x90: + size = marker & 0x0F + return [unpack1() for _ in range(size)] + elif marker == 0xD4: # LIST_8: + size = UNPACKED_UINT_8[stream_read(1)] + return [unpack1() for _ in range(size)] + elif marker == 0xD5: # LIST_16: + size = UNPACKED_UINT_16[stream_read(2)] + return [unpack1() for _ in range(size)] + elif marker == 0xD6: # LIST_32: + size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] + return [unpack1() for _ in range(size)] + elif marker == 0xD7: # LIST_STREAM: + value = [] + item = None + while item != END_OF_STREAM: + item = unpack1() + if item != END_OF_STREAM: + value.append(item) + return value + + # Map + elif marker_high == 0xA0: + size = marker & 0x0F + value = {} + for _ in range(size): + key = unpack1() + value[key] = unpack1() + return value + elif marker == 0xD8: # MAP_8: + size = UNPACKED_UINT_8[stream_read(1)] + value = {} + for _ in range(size): + key = unpack1() + value[key] = unpack1() + return value + elif marker == 0xD9: # MAP_16: + size = UNPACKED_UINT_16[stream_read(2)] + value = {} + for _ in range(size): + key = unpack1() + value[key] = unpack1() + return value + elif marker == 0xDA: # MAP_32: + size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] + value = {} + for _ in range(size): + key = unpack1() + value[key] = unpack1() + return value + elif marker == 0xDB: # MAP_STREAM: + value = {} + key = None + while key != END_OF_STREAM: + key = unpack1() + if key != END_OF_STREAM: + value[key] = unpack1() + return value + + # Structure + elif marker_high == 0xB0: + signature = stream_read(1) + value = Structure(marker & 0x0F, signature) + for _ in range(value.capacity): + value.append(unpack1()) + return value + elif marker == 0xDC: #STRUCT_8: + size, signature = stream_read(2) + value = Structure(UNPACKED_UINT_8[size], signature) + for _ in range(value.capacity): + value.append(unpack1()) + return value + elif marker == 0xDD: #STRUCT_16: + data = stream_read(3) + value = Structure(UNPACKED_UINT_16[data[0:2]], data[2]) + for _ in range(value.capacity): + value.append(unpack1()) + return value + + elif marker == 0xDF: #END_OF_STREAM: + return END_OF_STREAM + else: + raise RuntimeError("Unknown PackStream marker %02X" % marker) -def unpackb(b): - return unpack(BytesIO(b)) + def unpack_structure_header(self): + marker = self.read_marker() + if marker == -1: + return None + else: + return self._unpack_structure_header(marker) + + def _unpack_structure_header(self, marker): + marker_high = marker & 0xF0 + if marker_high == 0xB0: # TINY_STRUCT + signature = self.read(1) + return marker & 0x0F, signature + elif marker == 0xDC: # STRUCT_8: + size, signature = self.read(2) + return UNPACKED_UINT_8[size], signature + elif marker == 0xDD: # STRUCT_16: + data = self.read(3) + return UNPACKED_UINT_16[data[0:2]], data[2] + else: + raise RuntimeError("Expected structure, found marker %02X" % marker) diff --git a/test/test_packstream.py b/test/test_packstream.py index 8c092c177..12b1e299e 100644 --- a/test/test_packstream.py +++ b/test/test_packstream.py @@ -37,9 +37,9 @@ def assert_packable(value, packed_value): except AssertionError: raise AssertionError("Packed value %r is %r instead of expected %r" % (value, packed, packed_value)) - stream_in = BytesIO(packed) - unpacker = Unpacker(stream_in) - unpacked = next(unpacker.unpack()) + unpacker = Unpacker() + unpacker.load(packed) + unpacked = unpacker.unpack() try: assert unpacked == value except AssertionError: @@ -200,9 +200,9 @@ def test_list_stream(self): except AssertionError: raise AssertionError("Packed value is %r instead of expected %r" % (packed, packed_value)) - stream_in = BytesIO(packed) - unpacker = Unpacker(stream_in) - unpacked = next(unpacker.unpack()) + unpacker = Unpacker() + unpacker.load(packed) + unpacked = unpacker.unpack() try: assert unpacked == unpacked_value except AssertionError: @@ -248,9 +248,9 @@ def test_map_stream(self): except AssertionError: raise AssertionError("Packed value is %r instead of expected %r" % (packed, packed_value)) - stream_in = BytesIO(packed) - unpacker = Unpacker(stream_in) - unpacked = next(unpacker.unpack()) + unpacker = Unpacker() + unpacker.load(packed) + unpacked = unpacker.unpack() try: assert unpacked == unpacked_value except AssertionError: diff --git a/test/test_session.py b/test/test_session.py index 9695f3d5a..09e785b46 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -345,10 +345,10 @@ def test_automatic_reset_after_failure(self): assert False, "A Cypher error should have occurred" def test_defunct(self): - from neo4j.v1.bolt import ChunkChannel, ProtocolError + from neo4j.v1.bolt import BufferingSocket, ProtocolError with GraphDatabase.driver("bolt://localhost", auth=auth_token).session() as session: assert not session.connection.defunct - with patch.object(ChunkChannel, "chunk_reader", side_effect=ProtocolError()): + with patch.object(BufferingSocket, "fill", side_effect=ProtocolError()): with self.assertRaises(ProtocolError): session.run("RETURN 1").consume() assert session.connection.defunct From bfbada35c9b13faeebbb2c6c811d91a238d13cfb Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 30 Nov 2016 14:18:51 +0000 Subject: [PATCH 3/4] Moved ack_failure call --- neo4j/v1/bolt.py | 1 - neo4j/v1/session.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/neo4j/v1/bolt.py b/neo4j/v1/bolt.py index c6f0e4cb8..d55b8a938 100644 --- a/neo4j/v1/bolt.py +++ b/neo4j/v1/bolt.py @@ -350,7 +350,6 @@ def fetch(self): elif signature == FAILURE: response = self.responses.popleft() response.complete = True - self.acknowledge_failure() response.on_failure(*fields) else: raise ProtocolError("Unexpected response message with signature %02X" % signature) diff --git a/neo4j/v1/session.py b/neo4j/v1/session.py index 8f8891160..24db614ef 100644 --- a/neo4j/v1/session.py +++ b/neo4j/v1/session.py @@ -230,6 +230,7 @@ def on_footer(metadata): def on_failure(metadata): # Called on execution failure. + self.connection.acknowledge_failure() self._consumed = True raise CypherError(metadata) From c9b9ca5d8cf0a4d580567869b260d245f43ee1be Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 30 Nov 2016 14:39:06 +0000 Subject: [PATCH 4/4] unpack1->unpack --- neo4j/v1/packstream.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/neo4j/v1/packstream.py b/neo4j/v1/packstream.py index 82fa6d285..1c79a96f5 100644 --- a/neo4j/v1/packstream.py +++ b/neo4j/v1/packstream.py @@ -684,7 +684,7 @@ def unpack(self): else: marker_high = marker & 0xF0 - unpack1 = self.unpack + unpack = self.unpack # String if marker_high == 0x80: # TINY_STRING @@ -702,21 +702,21 @@ def unpack(self): # List elif marker_high == 0x90: size = marker & 0x0F - return [unpack1() for _ in range(size)] + return [unpack() for _ in range(size)] elif marker == 0xD4: # LIST_8: size = UNPACKED_UINT_8[stream_read(1)] - return [unpack1() for _ in range(size)] + return [unpack() for _ in range(size)] elif marker == 0xD5: # LIST_16: size = UNPACKED_UINT_16[stream_read(2)] - return [unpack1() for _ in range(size)] + return [unpack() for _ in range(size)] elif marker == 0xD6: # LIST_32: size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] - return [unpack1() for _ in range(size)] + return [unpack() for _ in range(size)] elif marker == 0xD7: # LIST_STREAM: value = [] item = None while item != END_OF_STREAM: - item = unpack1() + item = unpack() if item != END_OF_STREAM: value.append(item) return value @@ -726,37 +726,37 @@ def unpack(self): size = marker & 0x0F value = {} for _ in range(size): - key = unpack1() - value[key] = unpack1() + key = unpack() + value[key] = unpack() return value elif marker == 0xD8: # MAP_8: size = UNPACKED_UINT_8[stream_read(1)] value = {} for _ in range(size): - key = unpack1() - value[key] = unpack1() + key = unpack() + value[key] = unpack() return value elif marker == 0xD9: # MAP_16: size = UNPACKED_UINT_16[stream_read(2)] value = {} for _ in range(size): - key = unpack1() - value[key] = unpack1() + key = unpack() + value[key] = unpack() return value elif marker == 0xDA: # MAP_32: size = struct_unpack(UINT_32_STRUCT, stream_read(4))[0] value = {} for _ in range(size): - key = unpack1() - value[key] = unpack1() + key = unpack() + value[key] = unpack() return value elif marker == 0xDB: # MAP_STREAM: value = {} key = None while key != END_OF_STREAM: - key = unpack1() + key = unpack() if key != END_OF_STREAM: - value[key] = unpack1() + value[key] = unpack() return value # Structure @@ -764,19 +764,19 @@ def unpack(self): signature = stream_read(1) value = Structure(marker & 0x0F, signature) for _ in range(value.capacity): - value.append(unpack1()) + value.append(unpack()) return value elif marker == 0xDC: #STRUCT_8: size, signature = stream_read(2) value = Structure(UNPACKED_UINT_8[size], signature) for _ in range(value.capacity): - value.append(unpack1()) + value.append(unpack()) return value elif marker == 0xDD: #STRUCT_16: data = stream_read(3) value = Structure(UNPACKED_UINT_16[data[0:2]], data[2]) for _ in range(value.capacity): - value.append(unpack1()) + value.append(unpack()) return value elif marker == 0xDF: #END_OF_STREAM: