Skip to content

Commit 4597b29

Browse files
committed
Don't break Inbox when hydration or unpacking fails
1 parent f935fd0 commit 4597b29

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

neo4j/_async/io/_common.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import asyncio
2020
import logging
2121
import socket
22-
from contextlib import contextmanager
2322
from struct import pack as struct_pack
2423

2524
from ..._async_compat.util import AsyncUtil
@@ -45,7 +44,7 @@ def __init__(self, sock, on_error, unpacker_cls):
4544
self._unpacker = unpacker_cls(self._buffer)
4645
self._broken = False
4746

48-
async def pop(self, hydration_hooks):
47+
async def _buffer_one_chunk(self):
4948
assert not self._broken
5049
try:
5150
chunk_size = 0
@@ -64,20 +63,23 @@ async def pop(self, hydration_hooks):
6463

6564
if chunk_size == 0:
6665
# chunk_size was the end marker for the message
67-
size, tag = self._unpacker.unpack_structure_header()
68-
fields = [self._unpacker.unpack(hydration_hooks)
69-
for _ in range(size)]
70-
# Reset for new message
71-
self._unpacker.reset()
72-
return tag, fields
66+
return
7367

7468
except (OSError, socket.timeout, SocketDeadlineExceeded) as error:
7569
self._broken = True
7670
await AsyncUtil.callback(self.on_error, error)
7771
raise
78-
except Exception:
79-
self._broken = True
80-
raise
72+
73+
async def pop(self, hydration_hooks):
74+
await self._buffer_one_chunk()
75+
try:
76+
size, tag = self._unpacker.unpack_structure_header()
77+
fields = [self._unpacker.unpack(hydration_hooks)
78+
for _ in range(size)]
79+
return tag, fields
80+
finally:
81+
# Reset for new message
82+
self._unpacker.reset()
8183

8284

8385
class AsyncOutbox:

neo4j/_sync/io/_common.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import asyncio
2020
import logging
2121
import socket
22-
from contextlib import contextmanager
2322
from struct import pack as struct_pack
2423

2524
from ..._async_compat.util import Util
@@ -45,7 +44,7 @@ def __init__(self, sock, on_error, unpacker_cls):
4544
self._unpacker = unpacker_cls(self._buffer)
4645
self._broken = False
4746

48-
def pop(self, hydration_hooks):
47+
def _buffer_one_chunk(self):
4948
assert not self._broken
5049
try:
5150
chunk_size = 0
@@ -64,20 +63,23 @@ def pop(self, hydration_hooks):
6463

6564
if chunk_size == 0:
6665
# chunk_size was the end marker for the message
67-
size, tag = self._unpacker.unpack_structure_header()
68-
fields = [self._unpacker.unpack(hydration_hooks)
69-
for _ in range(size)]
70-
# Reset for new message
71-
self._unpacker.reset()
72-
return tag, fields
66+
return
7367

7468
except (OSError, socket.timeout, SocketDeadlineExceeded) as error:
7569
self._broken = True
7670
Util.callback(self.on_error, error)
7771
raise
78-
except Exception:
79-
self._broken = True
80-
raise
72+
73+
def pop(self, hydration_hooks):
74+
self._buffer_one_chunk()
75+
try:
76+
size, tag = self._unpacker.unpack_structure_header()
77+
fields = [self._unpacker.unpack(hydration_hooks)
78+
for _ in range(size)]
79+
return tag, fields
80+
finally:
81+
# Reset for new message
82+
self._unpacker.reset()
8183

8284

8385
class Outbox:

0 commit comments

Comments
 (0)