Skip to content

Commit 28ae84b

Browse files
committed
Respond to upstream core proto / api change: fail on sync start
1 parent bb44a7d commit 28ae84b

File tree

2 files changed

+22
-22
lines changed

2 files changed

+22
-22
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -857,23 +857,21 @@ def _apply_resolve_nexus_operation_start(
857857
f"Failed to find nexus operation handle for job sequence number {job.seq}"
858858
)
859859
if job.HasField("operation_token"):
860-
# The Nexus operation started asynchronously. A `ResolveNexusOperation` job
860+
# The nexus operation started asynchronously. A `ResolveNexusOperation` job
861861
# will follow in a future activation.
862862
handle._resolve_start_success(job.operation_token)
863863
elif job.HasField("started_sync"):
864-
# The Nexus operation 'started' in the sense that it's already resolved. A
864+
# The nexus operation 'started' in the sense that it's already resolved. A
865865
# `ResolveNexusOperation` job will be in the same activation.
866866
handle._resolve_start_success(None)
867-
elif job.HasField("cancelled_before_start"):
868-
# From proto docs: the operation was cancelled before it was ever
869-
# sent to server (same WFT). Note that core will still send a
870-
# `ResolveNexusOperation` job in the same activation, so there does
871-
# not need to be an exceptional case for this in lang.
872-
#
873-
# We do not resolve start here because it will be resolved in the by
874-
# handle._resolve_failure when handling the follow-up
875-
# ResolveNexusOperation from core.
876-
pass
867+
elif job.HasField("failed"):
868+
# The nexus operation start failed; no ResolveNexusOperation will follow.
869+
self._pending_nexus_operations.pop(job.seq, None)
870+
handle._resolve_failure(
871+
self._failure_converter.from_failure(
872+
job.failed, self._payload_converter
873+
)
874+
)
877875
else:
878876
raise ValueError(f"Unknown Nexus operation start status: {job}")
879877

tests/nexus/test_workflow_caller.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ def __init__(
260260
}[input.op_input.caller_reference],
261261
endpoint=make_nexus_endpoint_name(task_queue),
262262
)
263-
self._nexus_operation_started = False
263+
self._nexus_operation_start_resolved = False
264264
self._proceed = False
265265

266266
@workflow.run
@@ -271,12 +271,14 @@ async def run(
271271
task_queue: str,
272272
) -> CallerWfOutput:
273273
op_input = input.op_input
274-
op_handle = await self.nexus_client.start_operation(
275-
self._get_operation(op_input), # type: ignore[arg-type] # test uses non-public operation types
276-
op_input,
277-
headers=op_input.headers,
278-
)
279-
self._nexus_operation_started = True
274+
try:
275+
op_handle = await self.nexus_client.start_operation(
276+
self._get_operation(op_input), # type: ignore[arg-type] # test uses non-public operation types
277+
op_input,
278+
headers=op_input.headers,
279+
)
280+
finally:
281+
self._nexus_operation_start_resolved = True
280282
if not input.op_input.response_type.exception_in_operation_start:
281283
if isinstance(input.op_input.response_type, SyncResponse):
282284
assert (
@@ -295,8 +297,8 @@ async def run(
295297
return CallerWfOutput(op_output=OpOutput(value=op_output.value))
296298

297299
@workflow.update
298-
async def wait_nexus_operation_started(self) -> None:
299-
await workflow.wait_condition(lambda: self._nexus_operation_started)
300+
async def wait_nexus_operation_start_resolved(self) -> None:
301+
await workflow.wait_condition(lambda: self._nexus_operation_start_resolved)
300302

301303
@staticmethod
302304
def _get_operation(
@@ -720,7 +722,7 @@ async def _start_wf_and_nexus_op(
720722
)
721723

722724
await client.execute_update_with_start_workflow(
723-
CallerWorkflow.wait_nexus_operation_started,
725+
CallerWorkflow.wait_nexus_operation_start_resolved,
724726
start_workflow_operation=start_op,
725727
)
726728
caller_wf_handle = await start_op.workflow_handle()

0 commit comments

Comments
 (0)