-
Notifications
You must be signed in to change notification settings - Fork 59
Description
I discovered a rare issue that might exist in Python versions < 3.12. Very rarely, a test would hang for me:
tests/test_worker.py::TestWorkerPostgresQueue::test_startup_shutdown_hooks_many
After adding debugging information and studying the code, I started to think that the hanging occurs at the moment when Worker.process
calls queue.dequeue
, which in turn calls self._dequeue
. If at that moment, perhaps during the exit from context managers, likely at conn.transaction()
, worker.stop
is called and task.cancel()
is executed for all the worker's active tasks, then I believe one of the internal wait_for
calls in psycopg
just suppressed this CancelledError
. And if we do not have a dequeue.timeout
, we would hang indefinitely on the next async call in queue.dequeue()
.
I immediately googled my thoughts about wait_for and found the following bug:
https://bugs.python.org/issue43389
and:
python/cpython#28149
They show exactly the same approach to cancellation that we use in Worker.stop
and cancel_tasks
:
tasks = [...]
for t in tasks:
t.cancel()
results = await asyncio.gather(*tasks, return_exceptions=True)
Please double-check my assumptions, and if I'm right, maybe it makes sense to add a shutdown timeout in Worker.stop
to avoid potential hangs in Python versions < 3.12?
Another simpler fix for this problem is to set a dequeue timeout and not rely on its absence. This way, we might not cancel the task, but we will wait for it.
Also, I don't know if this issue might appear elsewhere in the saq library, so I would appreciate it if you could at least be aware of it.