Skip to content

[Bug] Activity completions may be dropped on worker shutdown #368

Closed
@cretz

Description

@cretz

Describe the bug

The activity completion, often a cancel failure as result of worker shutdown, may not get recorded before run/shutdown returns. This becomes evident if you sys.exit() immediately after shutdown returns.

Minimal Reproduction

import asyncio
import logging
import signal
import sys
import os
from datetime import timedelta
from uuid import uuid4

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.common import RetryPolicy
from temporalio.worker import Worker


async def handler_stop_signals():
    shutdowns = []
    shutdowns.append(worker.shutdown())
    await asyncio.gather(*shutdowns) 
    sys.exit()

@activity.defn
async def long_activity() -> None:
    activity.logger.info(
        f"Activity waiting for cancel, attempt: {activity.info().attempt}"
    )
    await asyncio.sleep(60)


@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self):
        await workflow.execute_activity(
            long_activity,
            start_to_close_timeout=timedelta(hours=2),
            retry_policy=RetryPolicy(maximum_attempts=1),
        )


async def main():
    logging.basicConfig(level=logging.INFO)

    logging.info("Starting worker")
    client = await Client.connect("localhost:7233")
    task_queue = f"tq-{uuid4()}"
    global worker
    worker = Worker(
        client,
        task_queue=task_queue,
        workflows=[MyWorkflow],
        activities=[long_activity],
        graceful_shutdown_timeout=timedelta(minutes=24),
    )
    loop = asyncio.get_event_loop()
    run_task = asyncio.create_task(worker.run())

    logging.info("Starting workflow")
    handle = await client.start_workflow(
        MyWorkflow.run,
        id=f"wf-{uuid4()}",
        task_queue=task_queue,
    )
    logging.info("Started workflow with ID %s" % handle.id)
    logging.info(os.getpid())

    logging.info("Waiting for SIGTERM")
    loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.ensure_future(handler_stop_signals()))
    await run_task

    logging.info(
        "Worker shutdown, activity will fail, but workflow will remain until a worker is available to process failure"
    )


if __name__ == "__main__":
    asyncio.run(main())

Sometimes this doesn't record the activity failure sometimes it does. We need to properly finish running activities before closing activity worker (we thought we were but looking at code, running activity completion may not get recorded).

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions