Skip to content

Commit a9e4534

Browse files
committed
Wait for activity completions on worker shutdown
Fixes #368
1 parent 31358d1 commit a9e4534

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

temporalio/worker/_activity.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ async def drain_poll_queue(self) -> None:
195195
except temporalio.bridge.worker.PollShutdownError:
196196
return
197197

198+
# Only call this after run()/drain_poll_queue() have returned. This will not
199+
# raise an exception.
200+
async def wait_all_completed(self) -> None:
201+
running_tasks = [v.task for v in self._running_activities.values() if v.task]
202+
await asyncio.gather(*running_tasks, return_exceptions=False)
203+
198204
def _cancel(
199205
self, task_token: bytes, cancel: temporalio.bridge.proto.activity_task.Cancel
200206
) -> None:

temporalio/worker/_worker.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,13 @@ async def raise_on_shutdown():
467467
for task in tasks:
468468
task.cancel()
469469

470+
# If there's an activity worker, we have to let all activity completions
471+
# finish. We cannot guarantee that because poll shutdown completed
472+
# (which means activities completed) that they got flushed to the
473+
# server.
474+
if self._activity_worker:
475+
await self._activity_worker.wait_all_completed()
476+
470477
# Do final shutdown
471478
try:
472479
await self._bridge_worker.finalize_shutdown()

0 commit comments

Comments
 (0)