Skip to content

Commit 1d638de

Browse files
committed
fix(pubsub): read entire message before resetting
1 parent a084a4e commit 1d638de

File tree

3 files changed

+40
-3
lines changed

3 files changed

+40
-3
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
profile = "black"
77

88
[tool.pytest.ini_options]
9+
log_level = "INFO"
910
markers = [
1011
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
1112
"serial",

src/sqlitecloud/driver.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,10 @@ def _internal_setup_pubsub(
207207
def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:
208208
blen = 2048
209209
buffer: bytes = b""
210+
tread = 0
210211

211212
try:
212213
while True:
213-
tread = 0
214214

215215
try:
216216
if not connection.pubsub_socket:
@@ -240,7 +240,6 @@ def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:
240240

241241
nread = len(data)
242242
tread += nread
243-
blen -= nread
244243
buffer += data
245244

246245
sqlitecloud_number = self._internal_parse_number(buffer)
@@ -262,11 +261,16 @@ def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:
262261
connection.pubsub_callback(
263262
connection, SQLiteCloudResultSet(result), connection.pubsub_data
264263
)
264+
265+
# reset after having read the message
266+
tread = 0
267+
buffer: bytes = b""
265268
except Exception as e:
266269
logging.error(f"An error occurred while parsing data: {e}.")
267270

268271
finally:
269-
connection.pubsub_callback(connection, None, connection.pubsub_data)
272+
if connection and connection.pubsub_callback:
273+
connection.pubsub_callback(connection, None, connection.pubsub_data)
270274

271275
def upload_database(
272276
self,

src/tests/integration/test_pubsub.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,38 @@ def assert_callback(conn, result, data):
4747

4848
assert callback_called
4949

50+
def test_notify_multiple_messages(self, sqlitecloud_connection):
51+
connection, _ = sqlitecloud_connection
52+
53+
called_times = 3
54+
flag = threading.Event()
55+
56+
def assert_callback(conn, result, data):
57+
nonlocal called_times
58+
nonlocal flag
59+
60+
if isinstance(result, SQLiteCloudResultSet):
61+
assert data == ["somedataX"]
62+
called_times -= 1
63+
if called_times == 0:
64+
flag.set()
65+
66+
pubsub = SQLiteCloudPubSub()
67+
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL
68+
channel = "channel" + str(uuid.uuid4())
69+
70+
pubsub.create_channel(connection, channel)
71+
pubsub.listen(connection, subject_type, channel, assert_callback, ["somedataX"])
72+
73+
pubsub.notify_channel(connection, channel, "somedataX")
74+
pubsub.notify_channel(connection, channel, "somedataX")
75+
pubsub.notify_channel(connection, channel, "somedataX")
76+
77+
# wait for callback to be called
78+
flag.wait(30)
79+
80+
assert called_times == 0
81+
5082
def test_unlisten_channel(self, sqlitecloud_connection):
5183
connection, _ = sqlitecloud_connection
5284

0 commit comments

Comments
 (0)