Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 7fe6868

Browse files
committedMay 8, 2025·
Nexus
1 parent 64584f4 commit 7fe6868

19 files changed

+3956
-563
lines changed
 

‎README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ informal introduction to the features and their implementation.
9696
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
9797
- [Worker Shutdown](#worker-shutdown)
9898
- [Testing](#testing-1)
99+
- [Nexus](#nexus)
100+
- [hello](#hello)
99101
- [Workflow Replay](#workflow-replay)
100102
- [Observability](#observability)
101103
- [Metrics](#metrics)
@@ -1314,6 +1316,71 @@ affect calls activity code might make to functions on the `temporalio.activity`
13141316
* `cancel()` can be invoked to simulate a cancellation of the activity
13151317
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
13161318

1319+
1320+
### Nexus
1321+
1322+
See [docs.temporal.io/nexus](https://docs.temporal.io/nexus).
1323+
1324+
#### Service Interface Definition
1325+
1326+
A Nexus Service interface definition is a set of named operations, where each operation is an
1327+
`(input_type, output_type)` pair:
1328+
1329+
```python
1330+
@nexusrpc.service
1331+
class MyNexusService:
1332+
my_operation: nexusrpc.Operation[MyOpInput, MyOpOutput]
1333+
```
1334+
1335+
### Operation implementation
1336+
1337+
```python
1338+
@nexusrpc.service(interface.MyNexusService)
1339+
class MyNexusService:
1340+
1341+
@nexusrpc.sync_operation
1342+
def echo(self, input: EchoInput) -> EchoOutput:
1343+
return EchoOutput(message=input.message)
1344+
```
1345+
1346+
```python
1347+
@nexusrpc.service(interface.MyNexusService)
1348+
class MyNexusService:
1349+
1350+
@temporalio.nexus.workflow_operation
1351+
async def hello(
1352+
self, input: HelloInput
1353+
) -> AsyncWorkflowOperationResult[HelloOutput]:
1354+
return await temporalio.nexus.handler.start_workflow(HelloWorkflow.run, input)
1355+
```
1356+
1357+
1358+
### Request options
1359+
1360+
```python
1361+
@dataclass
1362+
class OperationOptions:
1363+
"""Options passed by the Nexus caller when starting an operation."""
1364+
1365+
# A callback URL is required to deliver the completion of an async operation. This URL should be
1366+
# called by a handler upon completion if the started operation is async.
1367+
callback_url: Optional[str] = None
1368+
1369+
# Optional header fields set by the caller to be attached to the callback request when an
1370+
# asynchronous operation completes.
1371+
callback_header: dict[str, str] = field(default_factory=dict)
1372+
1373+
# Request ID that may be used by the server handler to dedupe a start request.
1374+
# By default a v4 UUID will be generated by the client.
1375+
request_id: Optional[str] = None
1376+
1377+
# Links contain arbitrary caller information. Handlers may use these links as
1378+
# metadata on resources associated with an operation.
1379+
links: list[Link] = field(default_factory=list)
1380+
```
1381+
1382+
1383+
13171384
### Workflow Replay
13181385

13191386
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

‎pyproject.toml

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ keywords = [
1111
"workflow",
1212
]
1313
dependencies = [
14+
"hyperlinked",
15+
"nexus-rpc",
16+
"pdbpp>=0.11.6",
1417
"protobuf>=3.20",
1518
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
19+
"temporalio-xray",
1620
"types-protobuf>=3.20",
1721
"typing-extensions>=4.2.0,<5",
1822
]
@@ -40,7 +44,7 @@ dev = [
4044
"psutil>=5.9.3,<6",
4145
"pydocstyle>=6.3.0,<7",
4246
"pydoctor>=24.11.1,<25",
43-
"pyright==1.1.377",
47+
"pyright==1.1.400",
4448
"pytest~=7.4",
4549
"pytest-asyncio>=0.21,<0.22",
4650
"pytest-timeout~=2.2",
@@ -49,6 +53,7 @@ dev = [
4953
"twine>=4.0.1,<5",
5054
"ruff>=0.5.0,<0.6",
5155
"maturin>=1.8.2",
56+
"pytest-cov>=6.1.1",
5257
]
5358

5459
[tool.poe.tasks]
@@ -60,8 +65,8 @@ gen-protos = "uv run python scripts/gen_protos.py"
6065
lint = [
6166
{cmd = "uv run ruff check --select I"},
6267
{cmd = "uv run ruff format --check"},
63-
{ref = "lint-types"},
6468
{cmd = "uv run pyright"},
69+
{ref = "lint-types"},
6570
{ref = "lint-docs"},
6671
]
6772
bridge-lint = { cmd = "cargo clippy -- -D warnings", cwd = "temporalio/bridge" }
@@ -70,7 +75,7 @@ bridge-lint = { cmd = "cargo clippy -- -D warnings", cwd = "temporalio/bridge" }
7075
lint-docs = "uv run pydocstyle --ignore-decorators=overload"
7176
lint-types = "uv run mypy --namespace-packages --check-untyped-defs ."
7277
run-bench = "uv run python scripts/run_bench.py"
73-
test = "uv run pytest"
78+
test = "uv run pytest --cov temporalio --cov-report xml"
7479

7580

7681
[tool.pytest.ini_options]
@@ -83,8 +88,6 @@ testpaths = ["tests"]
8388
timeout = 600
8489
timeout_func_only = true
8590
filterwarnings = [
86-
"error::temporalio.workflow.UnfinishedUpdateHandlersWarning",
87-
"error::temporalio.workflow.UnfinishedSignalHandlersWarning",
8891
"ignore::pytest.PytestDeprecationWarning",
8992
"ignore::DeprecationWarning",
9093
]
@@ -157,6 +160,7 @@ exclude = [
157160
"tests/worker/workflow_sandbox/testmodules/proto",
158161
"temporalio/bridge/worker.py",
159162
"temporalio/contrib/opentelemetry.py",
163+
"temporalio/contrib/pydantic.py",
160164
"temporalio/converter.py",
161165
"temporalio/testing/_workflow.py",
162166
"temporalio/worker/_activity.py",
@@ -168,6 +172,10 @@ exclude = [
168172
"tests/api/test_grpc_stub.py",
169173
"tests/conftest.py",
170174
"tests/contrib/test_opentelemetry.py",
175+
"tests/contrib/pydantic/models.py",
176+
"tests/contrib/pydantic/models_2.py",
177+
"tests/contrib/pydantic/test_pydantic.py",
178+
"tests/contrib/pydantic/workflows.py",
171179
"tests/test_converter.py",
172180
"tests/test_service.py",
173181
"tests/test_workflow.py",
@@ -186,6 +194,9 @@ exclude = [
186194
[tool.ruff]
187195
target-version = "py39"
188196

197+
[tool.ruff.lint]
198+
extend-ignore = ["E741"] # Allow single-letter variable names like I, O
199+
189200
[build-system]
190201
requires = ["maturin>=1.0,<2.0"]
191202
build-backend = "maturin"
@@ -202,3 +213,8 @@ exclude = [
202213
[tool.uv]
203214
# Prevent uv commands from building the package by default
204215
package = false
216+
217+
[tool.uv.sources]
218+
nexus-rpc = { path = "../nexus-sdk-python", editable = true }
219+
temporalio-xray = { path = "../xray/sdks/python", editable = true }
220+
hyperlinked = { path = "../../hyperlinked/python", editable = true }

‎temporalio/bridge/src/worker.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use temporal_sdk_core_api::worker::{
2020
};
2121
use temporal_sdk_core_api::Worker;
2222
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
23-
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
23+
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion};
2424
use temporal_sdk_core_protos::temporal::api::history::v1::History;
2525
use tokio::sync::mpsc::{channel, Sender};
2626
use tokio_stream::wrappers::ReceiverStream;
@@ -570,6 +570,19 @@ impl WorkerRef {
570570
})
571571
}
572572

573+
fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
574+
let worker = self.worker.as_ref().unwrap().clone();
575+
self.runtime.future_into_py(py, async move {
576+
let bytes = match worker.poll_nexus_task().await {
577+
Ok(task) => task.encode_to_vec(),
578+
Err(PollError::ShutDown) => return Err(PollShutdownError::new_err(())),
579+
Err(err) => return Err(PyRuntimeError::new_err(format!("Poll failure: {}", err))),
580+
};
581+
let bytes: &[u8] = &bytes;
582+
Ok(Python::with_gil(|py| bytes.into_py(py)))
583+
})
584+
}
585+
573586
fn complete_workflow_activation<'p>(
574587
&self,
575588
py: Python<'p>,
@@ -600,6 +613,19 @@ impl WorkerRef {
600613
})
601614
}
602615

616+
fn complete_nexus_task<'p>(&self, py: Python<'p>, proto: &PyBytes) -> PyResult<&'p PyAny> {
617+
let worker = self.worker.as_ref().unwrap().clone();
618+
let completion = NexusTaskCompletion::decode(proto.as_bytes())
619+
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {}", err)))?;
620+
self.runtime.future_into_py(py, async move {
621+
worker
622+
.complete_nexus_task(completion)
623+
.await
624+
.context("Completion failure")
625+
.map_err(Into::into)
626+
})
627+
}
628+
603629
fn record_activity_heartbeat(&self, proto: &PyBytes) -> PyResult<()> {
604630
enter_sync!(self.runtime);
605631
let heartbeat = ActivityHeartbeat::decode(proto.as_bytes())

‎temporalio/bridge/worker.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import temporalio.bridge.client
2727
import temporalio.bridge.proto
2828
import temporalio.bridge.proto.activity_task
29+
import temporalio.bridge.proto.nexus
2930
import temporalio.bridge.proto.workflow_activation
3031
import temporalio.bridge.proto.workflow_completion
3132
import temporalio.bridge.runtime
@@ -35,7 +36,7 @@
3536
from temporalio.bridge.temporal_sdk_bridge import (
3637
CustomSlotSupplier as BridgeCustomSlotSupplier,
3738
)
38-
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError
39+
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore
3940

4041

4142
@dataclass
@@ -216,6 +217,14 @@ async def poll_activity_task(
216217
await self._ref.poll_activity_task()
217218
)
218219

220+
async def poll_nexus_task(
221+
self,
222+
) -> temporalio.bridge.proto.nexus.NexusTask:
223+
"""Poll for a nexus task."""
224+
return temporalio.bridge.proto.nexus.NexusTask.FromString(
225+
await self._ref.poll_nexus_task()
226+
)
227+
219228
async def complete_workflow_activation(
220229
self,
221230
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
@@ -229,6 +238,12 @@ async def complete_activity_task(
229238
"""Complete an activity task."""
230239
await self._ref.complete_activity_task(comp.SerializeToString())
231240

241+
async def complete_nexus_task(
242+
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
243+
) -> None:
244+
"""Complete a nexus task."""
245+
await self._ref.complete_nexus_task(comp.SerializeToString())
246+
232247
def record_activity_heartbeat(
233248
self, comp: temporalio.bridge.proto.ActivityHeartbeat
234249
) -> None:

‎temporalio/client.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,15 @@ async def start_workflow(
459459
rpc_metadata: Mapping[str, str] = {},
460460
rpc_timeout: Optional[timedelta] = None,
461461
request_eager_start: bool = False,
462-
stack_level: int = 2,
463462
priority: temporalio.common.Priority = temporalio.common.Priority.default,
463+
# The following options are deliberately not exposed in overloads
464+
stack_level: int = 2,
465+
nexus_completion_callbacks: Sequence[
466+
temporalio.common.NexusCompletionCallback
467+
] = [],
468+
workflow_event_links: Sequence[
469+
temporalio.api.common.v1.Link.WorkflowEvent
470+
] = [],
464471
) -> WorkflowHandle[Any, Any]:
465472
"""Start a workflow and return its handle.
466473
@@ -523,6 +530,11 @@ async def start_workflow(
523530
temporalio.workflow._Definition.get_name_and_result_type(workflow)
524531
)
525532

533+
for l in workflow_event_links:
534+
print(
535+
f"🌈@@ worker starting workflow with link: {google.protobuf.json_format.MessageToJson(l)}"
536+
)
537+
526538
return await self._impl.start_workflow(
527539
StartWorkflowInput(
528540
workflow=name,
@@ -549,6 +561,8 @@ async def start_workflow(
549561
rpc_timeout=rpc_timeout,
550562
request_eager_start=request_eager_start,
551563
priority=priority,
564+
nexus_completion_callbacks=nexus_completion_callbacks,
565+
workflow_event_links=workflow_event_links,
552566
)
553567
)
554568

@@ -5156,6 +5170,8 @@ class StartWorkflowInput:
51565170
rpc_timeout: Optional[timedelta]
51575171
request_eager_start: bool
51585172
priority: temporalio.common.Priority
5173+
nexus_completion_callbacks: Sequence[temporalio.common.NexusCompletionCallback]
5174+
workflow_event_links: Sequence[temporalio.api.common.v1.Link.WorkflowEvent]
51595175

51605176

51615177
@dataclass
@@ -5770,6 +5786,16 @@ async def _build_start_workflow_execution_request(
57705786
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
57715787
req.request_eager_execution = input.request_eager_start
57725788
await self._populate_start_workflow_execution_request(req, input)
5789+
for callback in input.nexus_completion_callbacks:
5790+
c = temporalio.api.common.v1.Callback()
5791+
c.nexus.url = callback.url
5792+
c.nexus.header.update(callback.header)
5793+
req.completion_callbacks.append(c)
5794+
5795+
req.links.extend(
5796+
temporalio.api.common.v1.Link(workflow_event=link)
5797+
for link in input.workflow_event_links
5798+
)
57735799
return req
57745800

57755801
async def _build_signal_with_start_workflow_execution_request(

‎temporalio/common.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from abc import ABC, abstractmethod
99
from dataclasses import dataclass
1010
from datetime import datetime, timedelta
11-
from enum import Enum, IntEnum
11+
from enum import IntEnum
1212
from typing import (
1313
Any,
1414
Callable,
@@ -195,6 +195,37 @@ def __setstate__(self, state: object) -> None:
195195
)
196196

197197

198+
@dataclass(frozen=True)
199+
class NexusCompletionCallback:
200+
"""Nexus callback to attach to events such as workflow completion."""
201+
202+
url: str
203+
"""Callback URL."""
204+
205+
header: Mapping[str, str]
206+
"""Header to attach to callback request."""
207+
208+
209+
@dataclass(frozen=True)
210+
class WorkflowEventLink:
211+
"""A link to a history event that can be attached to a different history event."""
212+
213+
namespace: str
214+
"""Namespace of the workflow to link to."""
215+
216+
workflow_id: str
217+
"""ID of the workflow to link to."""
218+
219+
run_id: str
220+
"""Run ID of the workflow to link to."""
221+
222+
event_type: temporalio.api.enums.v1.EventType
223+
"""Type of the event to link to."""
224+
225+
event_id: int
226+
"""ID of the event to link to."""
227+
228+
198229
# We choose to make this a list instead of an sequence so we can catch if people
199230
# are not sending lists each time but maybe accidentally sending a string (which
200231
# is a sequence)

‎temporalio/converter.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,13 +901,57 @@ def _error_to_failure(
901901
failure.child_workflow_execution_failure_info.retry_state = (
902902
temporalio.api.enums.v1.RetryState.ValueType(error.retry_state or 0)
903903
)
904+
# TODO(dan): test coverage for this
905+
elif isinstance(error, temporalio.exceptions.NexusOperationError):
906+
failure.nexus_operation_execution_failure_info.SetInParent()
907+
failure.nexus_operation_execution_failure_info.operation_token = (
908+
error.operation_token
909+
)
904910

905911
def from_failure(
906912
self,
907913
failure: temporalio.api.failure.v1.Failure,
908914
payload_converter: PayloadConverter,
909915
) -> BaseException:
910916
"""See base class."""
917+
918+
# message Failure {
919+
# string message = 1;
920+
# // The source this Failure originated in, e.g. TypeScriptSDK / JavaSDK
921+
# // In some SDKs this is used to rehydrate the stack trace into an exception object.
922+
# string source = 2;
923+
# string stack_trace = 3;
924+
# // Alternative way to supply `message` and `stack_trace` and possibly other attributes, used for encryption of
925+
# // errors originating in user code which might contain sensitive information.
926+
# // The `encoded_attributes` Payload could represent any serializable object, e.g. JSON object or a `Failure` proto
927+
# // message.
928+
# //
929+
# // SDK authors:
930+
# // - The SDK should provide a default `encodeFailureAttributes` and `decodeFailureAttributes` implementation that:
931+
# // - Uses a JSON object to represent `{ message, stack_trace }`.
932+
# // - Overwrites the original message with "Encoded failure" to indicate that more information could be extracted.
933+
# // - Overwrites the original stack_trace with an empty string.
934+
# // - The resulting JSON object is converted to Payload using the default PayloadConverter and should be processed
935+
# // by the user-provided PayloadCodec
936+
# //
937+
# // - If there's demand, we could allow overriding the default SDK implementation to encode other opaque Failure attributes.
938+
# // (-- api-linter: core::0203::optional=disabled --)
939+
# temporal.api.common.v1.Payload encoded_attributes = 20;
940+
# Failure cause = 4;
941+
# oneof failure_info {
942+
# ApplicationFailureInfo application_failure_info = 5;
943+
# TimeoutFailureInfo timeout_failure_info = 6;
944+
# CanceledFailureInfo canceled_failure_info = 7;
945+
# TerminatedFailureInfo terminated_failure_info = 8;
946+
# ServerFailureInfo server_failure_info = 9;
947+
# ResetWorkflowFailureInfo reset_workflow_failure_info = 10;
948+
# ActivityFailureInfo activity_failure_info = 11;
949+
# ChildWorkflowExecutionFailureInfo child_workflow_execution_failure_info = 12;
950+
# NexusOperationFailureInfo nexus_operation_execution_failure_info = 13;
951+
# NexusHandlerFailureInfo nexus_handler_failure_info = 14;
952+
# }
953+
# }
954+
911955
# If encoded attributes are present and have the fields we expect,
912956
# extract them
913957
if failure.HasField("encoded_attributes"):
@@ -978,6 +1022,15 @@ def from_failure(
9781022
else None,
9791023
)
9801024
elif failure.HasField("child_workflow_execution_failure_info"):
1025+
# message ChildWorkflowExecutionFailureInfo {
1026+
# string namespace = 1;
1027+
# temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
1028+
# temporal.api.common.v1.WorkflowType workflow_type = 3;
1029+
# int64 initiated_event_id = 4;
1030+
# int64 started_event_id = 5;
1031+
# temporal.api.enums.v1.RetryState retry_state = 6;
1032+
# }
1033+
9811034
child_info = failure.child_workflow_execution_failure_info
9821035
err = temporalio.exceptions.ChildWorkflowError(
9831036
failure.message or "Child workflow error",
@@ -993,6 +1046,53 @@ def from_failure(
9931046
if child_info.retry_state
9941047
else None,
9951048
)
1049+
elif failure.HasField("nexus_handler_failure_info"):
1050+
# message NexusHandlerFailureInfo {
1051+
# // The Nexus error type as defined in the spec:
1052+
# // https://github.com/nexus-rpc/api/blob/main/SPEC.md#predefined-handler-errors.
1053+
# string type = 1;
1054+
# // Retry behavior, defaults to the retry behavior of the error type as defined in the spec.
1055+
# temporal.api.enums.v1.NexusHandlerErrorRetryBehavior retry_behavior = 2;
1056+
# }
1057+
# TODO(dan): check that handler-side phenomena are correctly turning into this vs nexus_operation_execution_failure_info
1058+
# TODO(dan): What exception should be raised for this vs nexus_operation_execution_failure_info?
1059+
nexus_handler_failure_info = failure.nexus_handler_failure_info
1060+
err = temporalio.exceptions.NexusHandlerError(
1061+
failure.message or "Nexus handler error",
1062+
type=nexus_handler_failure_info.type,
1063+
retryable={
1064+
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: True,
1065+
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: False,
1066+
}.get(nexus_handler_failure_info.retry_behavior),
1067+
)
1068+
elif failure.HasField("nexus_operation_execution_failure_info"):
1069+
# message NexusOperationFailureInfo {
1070+
# // The NexusOperationScheduled event ID.
1071+
# int64 scheduled_event_id = 1;
1072+
# // Endpoint name.
1073+
# string endpoint = 2;
1074+
# // Service name.
1075+
# string service = 3;
1076+
# // Operation name.
1077+
# string operation = 4;
1078+
# // Operation ID - may be empty if the operation completed synchronously.
1079+
# //
1080+
# // Deprecated: Renamed to operation_token.
1081+
# string operation_id = 5;
1082+
# // Operation token - may be empty if the operation completed synchronously.
1083+
# string operation_token = 6;
1084+
# }
1085+
# TODO(dan)
1086+
# This is covered by cancellation tests
1087+
nexus_op_failure_info = failure.nexus_operation_execution_failure_info
1088+
err = temporalio.exceptions.NexusOperationError(
1089+
failure.message or "Nexus operation error",
1090+
scheduled_event_id=nexus_op_failure_info.scheduled_event_id,
1091+
endpoint=nexus_op_failure_info.endpoint,
1092+
service=nexus_op_failure_info.service,
1093+
operation=nexus_op_failure_info.operation,
1094+
operation_token=nexus_op_failure_info.operation_token,
1095+
)
9961096
else:
9971097
err = temporalio.exceptions.FailureError(failure.message or "Failure error")
9981098
err._failure = failure

‎temporalio/exceptions.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,15 @@ def retry_state(self) -> Optional[RetryState]:
284284
class ChildWorkflowError(FailureError):
285285
"""Error raised on child workflow failure."""
286286

287+
# message ChildWorkflowExecutionFailureInfo {
288+
# string namespace = 1;
289+
# temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
290+
# temporal.api.common.v1.WorkflowType workflow_type = 3;
291+
# int64 initiated_event_id = 4;
292+
# int64 started_event_id = 5;
293+
# temporal.api.enums.v1.RetryState retry_state = 6;
294+
# }
295+
287296
def __init__(
288297
self,
289298
message: str,
@@ -342,6 +351,94 @@ def retry_state(self) -> Optional[RetryState]:
342351
return self._retry_state
343352

344353

354+
class NexusHandlerError(FailureError):
355+
"""Error raised on Nexus handler failure."""
356+
357+
# message NexusHandlerFailureInfo {
358+
# // The Nexus error type as defined in the spec:
359+
# // https://github.com/nexus-rpc/api/blob/main/SPEC.md#predefined-handler-errors.
360+
# string type = 1;
361+
# // Retry behavior, defaults to the retry behavior of the error type as defined in the spec.
362+
# temporal.api.enums.v1.NexusHandlerErrorRetryBehavior retry_behavior = 2;
363+
# }
364+
365+
def __init__(
366+
self,
367+
message: str,
368+
*,
369+
type: str,
370+
retryable: Optional[bool] = None,
371+
):
372+
"""Initialize a Nexus handler error."""
373+
super().__init__(message)
374+
self._type = type
375+
self._retryable = retryable
376+
377+
378+
class NexusOperationError(FailureError):
379+
"""Error raised on Nexus operation failure."""
380+
381+
# message NexusOperationFailureInfo {
382+
# // The NexusOperationScheduled event ID.
383+
# int64 scheduled_event_id = 1;
384+
# // Endpoint name.
385+
# string endpoint = 2;
386+
# // Service name.
387+
# string service = 3;
388+
# // Operation name.
389+
# string operation = 4;
390+
# // Operation ID - may be empty if the operation completed synchronously.
391+
# //
392+
# // Deprecated: Renamed to operation_token.
393+
# string operation_id = 5;
394+
# // Operation token - may be empty if the operation completed synchronously.
395+
# string operation_token = 6;
396+
# }
397+
398+
def __init__(
399+
self,
400+
message: str,
401+
*,
402+
scheduled_event_id: int,
403+
endpoint: str,
404+
service: str,
405+
operation: str,
406+
operation_token: str,
407+
):
408+
"""Initialize a Nexus operation error."""
409+
super().__init__(message)
410+
self._scheduled_event_id = scheduled_event_id
411+
self._endpoint = endpoint
412+
self._service = service
413+
self._operation = operation
414+
self._operation_token = operation_token
415+
416+
@property
417+
def scheduled_event_id(self) -> int:
418+
"""The NexusOperationScheduled event ID for the failed operation."""
419+
return self._scheduled_event_id
420+
421+
@property
422+
def endpoint(self) -> str:
423+
"""The endpoint name for the failed operation."""
424+
return self._endpoint
425+
426+
@property
427+
def service(self) -> str:
428+
"""The service name for the failed operation."""
429+
return self._service
430+
431+
@property
432+
def operation(self) -> str:
433+
"""The name of the failed operation."""
434+
return self._operation
435+
436+
@property
437+
def operation_token(self) -> str:
438+
"""The operation token returned by the failed operation."""
439+
return self._operation_token
440+
441+
345442
def is_cancelled_exception(exception: BaseException) -> bool:
346443
"""Check whether the given exception is considered a cancellation exception
347444
according to Temporal.

‎temporalio/nexus/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import logging
2+
from collections.abc import Mapping
3+
from typing import Any, Optional
4+
5+
6+
class LoggerAdapter(logging.LoggerAdapter):
7+
def __init__(self, logger: logging.Logger, extra: Optional[Mapping[str, Any]]):
8+
super().__init__(logger, extra or {})
9+
10+
11+
logger = LoggerAdapter(logging.getLogger(__name__), None)
12+
"""Logger that has additional details regarding the current Nexus operation."""

‎temporalio/nexus/handler.py

Lines changed: 451 additions & 0 deletions
Large diffs are not rendered by default.

‎temporalio/worker/_activity.py

Lines changed: 234 additions & 178 deletions
Large diffs are not rendered by default.

‎temporalio/worker/_interceptor.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,26 @@
33
from __future__ import annotations
44

55
import concurrent.futures
6-
from dataclasses import dataclass
6+
from collections.abc import Callable, Mapping, MutableMapping
7+
from dataclasses import dataclass, field
78
from datetime import timedelta
89
from typing import (
910
Any,
1011
Awaitable,
11-
Callable,
12+
Generic,
1213
List,
13-
Mapping,
14-
MutableMapping,
1514
NoReturn,
1615
Optional,
1716
Sequence,
1817
Type,
18+
TypeVar,
1919
Union,
2020
)
2121

22+
import nexusrpc
23+
import nexusrpc.handler
24+
import nexusrpc.interface
25+
2226
import temporalio.activity
2327
import temporalio.api.common.v1
2428
import temporalio.common
@@ -285,6 +289,60 @@ class StartChildWorkflowInput:
285289
ret_type: Optional[Type]
286290

287291

292+
# TODO(dan): Put these in a better location. Type variance?
293+
I = TypeVar("I")
294+
O = TypeVar("O")
295+
296+
297+
@dataclass
298+
class StartNexusOperationInput(Generic[I, O]):
299+
"""Input for :py:meth:`WorkflowOutboundInterceptor.start_nexus_operation`."""
300+
301+
endpoint: str
302+
service: str
303+
operation: Union[
304+
nexusrpc.interface.Operation[I, O],
305+
Callable[[Any], nexusrpc.handler.Operation[I, O]],
306+
str,
307+
]
308+
input: I
309+
schedule_to_close_timeout: Optional[timedelta]
310+
headers: Optional[Mapping[str, str]]
311+
output_type: Optional[Type[O]] = None
312+
313+
_operation_name: str = field(init=False, repr=False)
314+
_input_type: Optional[Type[I]] = field(init=False, repr=False)
315+
316+
def __post_init__(self) -> None:
317+
if isinstance(self.operation, str):
318+
self._operation_name = self.operation
319+
self._input_type = None
320+
elif isinstance(self.operation, nexusrpc.interface.Operation):
321+
self._operation_name = self.operation.name
322+
self._input_type = self.operation.input_type
323+
self.output_type = self.operation.output_type
324+
elif isinstance(self.operation, Callable):
325+
defn = getattr(self.operation, "__nexus_operation__", None)
326+
if isinstance(defn, nexusrpc.handler.NexusOperationDefinition):
327+
self._operation_name = defn.name
328+
self._input_type = defn.input_type
329+
self.output_type = defn.output_type
330+
else:
331+
raise ValueError(
332+
f"Operation callable is not a Nexus operation: {self.operation}"
333+
)
334+
else:
335+
raise ValueError(f"Operation is not a Nexus operation: {self.operation}")
336+
337+
@property
338+
def operation_name(self) -> str:
339+
return self._operation_name
340+
341+
@property
342+
def input_type(self) -> Optional[Type[I]]:
343+
return self._input_type
344+
345+
288346
@dataclass
289347
class StartLocalActivityInput:
290348
"""Input for :py:meth:`WorkflowOutboundInterceptor.start_local_activity`."""
@@ -409,3 +467,9 @@ def start_local_activity(
409467
and :py:func:`temporalio.workflow.execute_local_activity` call.
410468
"""
411469
return self.next.start_local_activity(input)
470+
471+
async def start_nexus_operation(
472+
self, input: StartNexusOperationInput
473+
) -> temporalio.workflow.NexusOperationHandle[Any]:
474+
"""Called for every :py:func:`temporalio.workflow.start_nexus_operation` call."""
475+
return await self.next.start_nexus_operation(input)

‎temporalio/worker/_nexus.py

Lines changed: 478 additions & 0 deletions
Large diffs are not rendered by default.

‎temporalio/worker/_worker.py

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
from ._activity import SharedStateManager, _ActivityWorker
4343
from ._interceptor import Interceptor
44+
from ._nexus import _NexusWorker
4445
from ._tuning import WorkerTuner
4546
from ._workflow import _WorkflowWorker
4647
from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner
@@ -106,6 +107,7 @@ def __init__(
106107
*,
107108
task_queue: str,
108109
activities: Sequence[Callable] = [],
110+
nexus_services: Sequence[Any] = [],
109111
workflows: Sequence[Type] = [],
110112
activity_executor: Optional[concurrent.futures.Executor] = None,
111113
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
@@ -144,6 +146,8 @@ def __init__(
144146
maximum=5
145147
),
146148
) -> None:
149+
# TODO(dan): consider not allowing max_workers < max_concurrent_nexus_operations?
150+
# TODO(dan): Nexus tuner support?
147151
"""Create a worker to process workflows and/or activities.
148152
149153
Args:
@@ -153,10 +157,12 @@ def __init__(
153157
client's underlying service client. This client cannot be
154158
"lazy".
155159
task_queue: Required task queue for this worker.
156-
activities: Set of activity callables decorated with
160+
activities: Activity callables decorated with
157161
:py:func:`@activity.defn<temporalio.activity.defn>`. Activities
158162
may be async functions or non-async functions.
159-
workflows: Set of workflow classes decorated with
163+
nexus_services: Nexus service instances decorated with
164+
:py:func:`@nexusrpc.handler.service<nexusrpc.handler.service>`.
165+
workflows: Workflow classes decorated with
160166
:py:func:`@workflow.defn<temporalio.workflow.defn>`.
161167
activity_executor: Concurrent executor to use for non-async
162168
activities. This is required if any activities are non-async.
@@ -167,6 +173,14 @@ def __init__(
167173
executor should at least be ``max_concurrent_activities`` or a
168174
warning is issued. Note, a broken-executor failure from this
169175
executor will cause the worker to fail and shutdown.
176+
nexus_operation_executor: Concurrent executor to use for non-async
177+
Nexus operations. This is required if any operation start methods
178+
are non-async. :py:class:`concurrent.futures.ThreadPoolExecutor`
179+
is recommended. If this is a
180+
:py:class:`concurrent.futures.ProcessPoolExecutor`, all non-async
181+
start methods must be picklable. ``max_workers`` on the executor
182+
should at least be ``max_concurrent_nexus_operations`` or a warning
183+
is issued.
170184
workflow_task_executor: Thread pool executor for workflow tasks. If
171185
this is not present, a new
172186
:py:class:`concurrent.futures.ThreadPoolExecutor` will be
@@ -191,12 +205,14 @@ def __init__(
191205
identity is used.
192206
max_cached_workflows: If nonzero, workflows will be cached and
193207
sticky task queues will be used.
194-
max_concurrent_workflow_tasks: Maximum allowed number of workflow
195-
tasks that will ever be given to this worker at one time. Mutually exclusive with ``tuner``.
196208
max_concurrent_activities: Maximum number of activity tasks that
197-
will ever be given to this worker concurrently. Mutually exclusive with ``tuner``.
209+
will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``.
198210
max_concurrent_local_activities: Maximum number of local activity
199-
tasks that will ever be given to this worker concurrently. Mutually exclusive with ``tuner``.
211+
tasks that will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``.
212+
max_concurrent_nexus_operations: Maximum number of Nexus operations that
213+
will ever be given to the Nexus worker concurrently. Mutually exclusive with ``tuner``.
214+
max_concurrent_workflow_tasks: Maximum allowed number of
215+
tasks that will ever be given to the workflow worker at one time. Mutually exclusive with ``tuner``.
200216
tuner: Provide a custom :py:class:`WorkerTuner`. Mutually exclusive with the
201217
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
202218
``max_concurrent_local_activities`` arguments.
@@ -392,6 +408,17 @@ def __init__(
392408
interceptors=interceptors,
393409
metric_meter=self._runtime.metric_meter,
394410
)
411+
self._nexus_worker: Optional[_NexusWorker] = None
412+
if nexus_services:
413+
self._nexus_worker = _NexusWorker(
414+
bridge_worker=lambda: self._bridge_worker,
415+
client=client,
416+
task_queue=task_queue,
417+
nexus_services=nexus_services,
418+
data_converter=client_config["data_converter"],
419+
interceptors=interceptors,
420+
metric_meter=self._runtime.metric_meter,
421+
)
395422
self._workflow_worker: Optional[_WorkflowWorker] = None
396423
if workflows:
397424
should_enforce_versioning_behavior = (
@@ -606,21 +633,30 @@ async def raise_on_shutdown():
606633
except asyncio.CancelledError:
607634
pass
608635

609-
tasks: List[asyncio.Task] = [asyncio.create_task(raise_on_shutdown())]
636+
tasks: dict[
637+
Union[None, _ActivityWorker, _WorkflowWorker, _NexusWorker], asyncio.Task
638+
] = {None: asyncio.create_task(raise_on_shutdown())}
610639
# Create tasks for workers
611640
if self._activity_worker:
612-
tasks.append(asyncio.create_task(self._activity_worker.run()))
641+
tasks[self._activity_worker] = asyncio.create_task(
642+
self._activity_worker.run()
643+
)
613644
if self._workflow_worker:
614-
tasks.append(asyncio.create_task(self._workflow_worker.run()))
645+
tasks[self._workflow_worker] = asyncio.create_task(
646+
self._workflow_worker.run()
647+
)
648+
if self._nexus_worker:
649+
tasks[self._nexus_worker] = asyncio.create_task(self._nexus_worker.run())
615650

616651
# Wait for either worker or shutdown requested
617-
wait_task = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
652+
wait_task = asyncio.wait(tasks.values(), return_when=asyncio.FIRST_EXCEPTION)
618653
try:
619654
await asyncio.shield(wait_task)
620655

621-
# If any of the last two tasks failed, we want to re-raise that as
622-
# the exception
623-
exception = next((t.exception() for t in tasks[1:] if t.done()), None)
656+
# If any of the worker tasks failed, re-raise that as the exception
657+
exception = next(
658+
(t.exception() for w, t in tasks.items() if w and t.done()), None
659+
)
624660
if exception:
625661
logger.error("Worker failed, shutting down", exc_info=exception)
626662
if self._config["on_fatal_error"]:
@@ -635,7 +671,7 @@ async def raise_on_shutdown():
635671
exception = user_cancel_err
636672

637673
# Cancel the shutdown task (safe if already done)
638-
tasks[0].cancel()
674+
tasks[None].cancel()
639675
graceful_timeout = self._config["graceful_shutdown_timeout"]
640676
logger.info(
641677
f"Beginning worker shutdown, will wait {graceful_timeout} before cancelling activities"
@@ -644,18 +680,10 @@ async def raise_on_shutdown():
644680
# Initiate core worker shutdown
645681
self._bridge_worker.initiate_shutdown()
646682

647-
# If any worker task had an exception, replace that task with a queue
648-
# drain (task at index 1 can be activity or workflow worker, task at
649-
# index 2 must be workflow worker if present)
650-
if tasks[1].done() and tasks[1].exception():
651-
if self._activity_worker:
652-
tasks[1] = asyncio.create_task(self._activity_worker.drain_poll_queue())
653-
else:
654-
assert self._workflow_worker
655-
tasks[1] = asyncio.create_task(self._workflow_worker.drain_poll_queue())
656-
if len(tasks) > 2 and tasks[2].done() and tasks[2].exception():
657-
assert self._workflow_worker
658-
tasks[2] = asyncio.create_task(self._workflow_worker.drain_poll_queue())
683+
# If any worker task had an exception, replace that task with a queue drain
684+
for worker, task in tasks.items():
685+
if worker and task.done() and task.exception():
686+
tasks[worker] = asyncio.create_task(worker.drain_poll_queue())
659687

660688
# Notify shutdown occurring
661689
if self._activity_worker:
@@ -664,20 +692,23 @@ async def raise_on_shutdown():
664692
self._workflow_worker.notify_shutdown()
665693

666694
# Wait for all tasks to complete (i.e. for poller loops to stop)
667-
await asyncio.wait(tasks)
695+
await asyncio.wait(tasks.values())
668696
# Sometimes both workers throw an exception and since we only take the
669697
# first, Python may complain with "Task exception was never retrieved"
670698
# if we don't get the others. Therefore we call cancel on each task
671699
# which suppresses this.
672-
for task in tasks:
700+
for task in tasks.values():
673701
task.cancel()
674702

675-
# If there's an activity worker, we have to let all activity completions
676-
# finish. We cannot guarantee that because poll shutdown completed
677-
# (which means activities completed) that they got flushed to the
678-
# server.
703+
# Let all activity / nexus operations completions finish. We cannot guarantee that
704+
# because poll shutdown completed (which means activities/operations completed)
705+
# that they got flushed to the server.
679706
if self._activity_worker:
680707
await self._activity_worker.wait_all_completed()
708+
if self._nexus_worker:
709+
await self._nexus_worker.wait_all_completed()
710+
711+
# TODO(dan): check that we do all appropriate things for nexus worker that we do for activity worker
681712

682713
# Do final shutdown
683714
try:

‎temporalio/worker/_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(
104104
if interceptor_class:
105105
self._interceptor_classes.append(interceptor_class)
106106
self._extern_functions.update(
107-
**_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter)
107+
**_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter) # pyright: ignore
108108
)
109109
self._workflow_failure_exception_types = workflow_failure_exception_types
110110
self._running_workflows: Dict[str, _RunningWorkflow] = {}

‎temporalio/worker/_workflow_instance.py

Lines changed: 382 additions & 6 deletions
Large diffs are not rendered by default.

‎temporalio/workflow.py

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import asyncio
66
import contextvars
7-
import dataclasses
87
import inspect
98
import logging
109
import threading
@@ -23,6 +22,7 @@
2322
Awaitable,
2423
Callable,
2524
Dict,
25+
Generator,
2626
Generic,
2727
Iterable,
2828
Iterator,
@@ -40,6 +40,8 @@
4040
overload,
4141
)
4242

43+
import nexusrpc
44+
import nexusrpc.handler
4345
from typing_extensions import (
4446
Concatenate,
4547
Literal,
@@ -832,6 +834,22 @@ def workflow_start_local_activity(
832834
activity_id: Optional[str],
833835
) -> ActivityHandle[Any]: ...
834836

837+
@abstractmethod
838+
async def workflow_start_nexus_operation(
839+
self,
840+
endpoint: str,
841+
service: str,
842+
operation: Union[
843+
nexusrpc.interface.Operation[I, O],
844+
Callable[[Any], nexusrpc.handler.Operation[I, O]],
845+
str,
846+
],
847+
input: Any,
848+
output_type: Optional[Type[O]] = None,
849+
schedule_to_close_timeout: Optional[timedelta] = None,
850+
headers: Optional[Mapping[str, str]] = None,
851+
) -> NexusOperationHandle[Any]: ...
852+
835853
@abstractmethod
836854
def workflow_time_ns(self) -> int: ...
837855

@@ -4340,6 +4358,77 @@ async def execute_child_workflow(
43404358
return await handle
43414359

43424360

4361+
I = TypeVar("I")
4362+
O = TypeVar("O")
4363+
S = TypeVar("S")
4364+
4365+
4366+
# TODO(dan): ABC?
4367+
class NexusOperationHandle(Generic[O]):
4368+
def cancel(self) -> bool:
4369+
# TODO(dan): docstring
4370+
"""
4371+
Call task.cancel() on the asyncio task that is backing this handle.
4372+
4373+
From asyncio docs:
4374+
4375+
Cancel the future and schedule callbacks.
4376+
4377+
If the future is already done or cancelled, return False. Otherwise, change the future's state to cancelled, schedule the callbacks and return True.
4378+
"""
4379+
raise NotImplementedError
4380+
4381+
def __await__(self) -> Generator[Any, Any, O]:
4382+
raise NotImplementedError
4383+
4384+
# TODO(dan): check SDK-wide philosophy on @property vs nullary accessor methods.
4385+
@property
4386+
def operation_token(self) -> Optional[str]:
4387+
raise NotImplementedError
4388+
4389+
4390+
async def start_nexus_operation(
4391+
endpoint: str,
4392+
service: str,
4393+
operation: Union[
4394+
nexusrpc.interface.Operation[I, O],
4395+
Callable[[Any], nexusrpc.handler.Operation[I, O]],
4396+
str,
4397+
],
4398+
input: Any,
4399+
*,
4400+
output_type: Optional[Type[O]] = None,
4401+
schedule_to_close_timeout: Optional[timedelta] = None,
4402+
headers: Optional[Mapping[str, str]] = None,
4403+
) -> NexusOperationHandle[Any]:
4404+
"""Start a Nexus operation and return its handle.
4405+
4406+
Args:
4407+
endpoint: The Nexus endpoint.
4408+
service: The Nexus service.
4409+
operation: The Nexus operation.
4410+
input: The Nexus operation input.
4411+
output_type: The Nexus operation output type.
4412+
schedule_to_close_timeout: Timeout for the entire operation attempt.
4413+
headers: Headers to send with the Nexus HTTP request.
4414+
4415+
Returns:
4416+
A handle to the Nexus operation. The result can be obtained as
4417+
```python
4418+
await handle.result()
4419+
```
4420+
"""
4421+
return await _Runtime.current().workflow_start_nexus_operation(
4422+
endpoint=endpoint,
4423+
service=service,
4424+
operation=operation,
4425+
input=input,
4426+
output_type=output_type,
4427+
schedule_to_close_timeout=schedule_to_close_timeout,
4428+
headers=headers,
4429+
)
4430+
4431+
43434432
class ExternalWorkflowHandle(Generic[SelfType]):
43444433
"""Handle for interacting with an external workflow.
43454434
@@ -5042,3 +5131,88 @@ def _to_proto(self) -> temporalio.bridge.proto.common.VersioningIntent.ValueType
50425131
elif self == VersioningIntent.DEFAULT:
50435132
return temporalio.bridge.proto.common.VersioningIntent.DEFAULT
50445133
return temporalio.bridge.proto.common.VersioningIntent.UNSPECIFIED
5134+
5135+
5136+
# Nexus
5137+
5138+
5139+
class NexusClient(Generic[S]):
5140+
def __init__(
5141+
self,
5142+
service: Union[
5143+
# TODO(dan): Type[S] is modeling the interface case as well the impl case, but
5144+
# the typevar S is used below only in the impl case. I think this is OK, but
5145+
# think about it again before deleting this TODO.
5146+
Type[S],
5147+
str,
5148+
],
5149+
endpoint: str,
5150+
schedule_to_close_timeout: Optional[timedelta] = None,
5151+
) -> None:
5152+
# If service is not a str, then it must be a service interface or implementation
5153+
# class.
5154+
if isinstance(service, str):
5155+
self._service_name = service
5156+
elif service_defn := getattr(service, "__nexus_service__", None):
5157+
self._service_name = service_defn.name
5158+
elif service_metadata := getattr(service, "__nexus_service_metadata__", None):
5159+
self._service_name = service_metadata.name
5160+
else:
5161+
raise ValueError(
5162+
f"`service` may be a name (str), or a class decorated with either "
5163+
f"@nexusrpc.handler.service or @nexusrpc.interface.service. "
5164+
f"Invalid service type: {type(service)}"
5165+
)
5166+
print(f"🌈 NexusClient using service name: {self._service_name}")
5167+
self._endpoint = endpoint
5168+
self._schedule_to_close_timeout = schedule_to_close_timeout
5169+
5170+
# TODO(dan): overloads: no-input, operation name, ret type
5171+
# TODO(dan): should it be an error to use a reference to a method on a class other than that supplied?
5172+
async def start_operation(
5173+
self,
5174+
operation: Union[
5175+
nexusrpc.interface.Operation[I, O],
5176+
Callable[[S], nexusrpc.handler.Operation[I, O]],
5177+
str,
5178+
],
5179+
input: I,
5180+
*,
5181+
output_type: Optional[Type[O]] = None,
5182+
schedule_to_close_timeout: Optional[timedelta] = None,
5183+
headers: Optional[Mapping[str, str]] = None,
5184+
) -> NexusOperationHandle[O]:
5185+
return await temporalio.workflow.start_nexus_operation(
5186+
endpoint=self._endpoint,
5187+
service=self._service_name,
5188+
operation=operation,
5189+
input=input,
5190+
output_type=output_type,
5191+
schedule_to_close_timeout=(
5192+
schedule_to_close_timeout or self._schedule_to_close_timeout
5193+
),
5194+
headers=headers,
5195+
)
5196+
5197+
# TODO(dan): overloads: no-input, operation name, ret type
5198+
async def execute_operation(
5199+
self,
5200+
operation: Union[
5201+
nexusrpc.interface.Operation[I, O],
5202+
Callable[[S], nexusrpc.handler.Operation[I, O]],
5203+
str,
5204+
],
5205+
input: I,
5206+
*,
5207+
output_type: Optional[Type[O]] = None,
5208+
schedule_to_close_timeout: Optional[timedelta] = None,
5209+
headers: Optional[Mapping[str, str]] = None,
5210+
) -> O:
5211+
handle: NexusOperationHandle[O] = await self.start_operation(
5212+
operation,
5213+
input,
5214+
output_type=output_type,
5215+
schedule_to_close_timeout=schedule_to_close_timeout,
5216+
headers=headers,
5217+
)
5218+
return await handle

‎tests/worker/test_nexus.py

Lines changed: 1090 additions & 0 deletions
Large diffs are not rendered by default.

‎uv.lock

Lines changed: 615 additions & 332 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)
Please sign in to comment.