Skip to content

Useful event ordering tests from discarded changes to event loop #729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions tests/worker/test_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,27 @@
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Dict
from typing import Any, Dict, Optional, Type

import pytest

from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError, WorkflowHistory
from temporalio.exceptions import ApplicationError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Replayer, Worker
from temporalio.worker import (
ExecuteWorkflowInput,
Interceptor,
Replayer,
Worker,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)
from tests.helpers import assert_eq_eventually
from tests.worker.test_workflow import (
ActivityAndSignalsWhileWorkflowDown,
SignalsActivitiesTimersUpdatesTracingWorkflow,
)


@activity.defn
Expand Down Expand Up @@ -385,3 +396,97 @@ async def test_replayer_command_reordering_backward_compatibility() -> None:
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
WorkflowHistory.from_json("fake", history)
)


test_replayer_workflow_res = None


class WorkerWorkflowResultInterceptor(Interceptor):
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return WorkflowResultInterceptor


class WorkflowResultInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
global test_replayer_workflow_res
res = await super().execute_workflow(input)
test_replayer_workflow_res = res
return res


async def test_replayer_async_ordering() -> None:
"""
This test verifies that the order that asyncio tasks/coroutines are woken up matches the
order they were before changes to apply all jobs and then run the event loop, where previously
the event loop was ran after each "batch" of jobs.
"""
histories_and_expecteds = [
(
"test_replayer_event_tracing.json",
[
"sig-before-sync",
"sig-before-1",
"sig-before-2",
"timer-sync",
"act-sync",
"act-1",
"act-2",
"sig-1-sync",
"sig-1-1",
"sig-1-2",
"update-1-sync",
"update-1-1",
"update-1-2",
"timer-1",
"timer-2",
],
),
(
"test_replayer_event_tracing_double_sig_at_start.json",
[
"sig-before-sync",
"sig-before-1",
"sig-1-sync",
"sig-1-1",
"sig-before-2",
"sig-1-2",
"timer-sync",
"act-sync",
"update-1-sync",
"update-1-1",
"update-1-2",
"act-1",
"act-2",
"timer-1",
"timer-2",
],
),
]
for history, expected in histories_and_expecteds:
with Path(__file__).with_name(history).open() as f:
history = f.read()
await Replayer(
workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))
assert test_replayer_workflow_res == expected


async def test_replayer_alternate_async_ordering() -> None:
with Path(__file__).with_name(
"test_replayer_event_tracing_alternate.json"
).open() as f:
history = f.read()
await Replayer(
workflows=[ActivityAndSignalsWhileWorkflowDown],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))
assert test_replayer_workflow_res == [
"act-start",
"sig-1",
"sig-2",
"counter-2",
"act-done",
]
Loading
Loading