-
Notifications
You must be signed in to change notification settings - Fork 132
Standalone activity prototype #1138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a1d3efe
to
23df3f8
Compare
temporalio/client.py
Outdated
self._id_or_token = ActivityIDReference(activity_id=id, run_id=run_id) | ||
self.run_id = run_id | ||
|
||
# TODO: do we support something like `follow_runs: bool`? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need there's no concept of an execution chain for activities.
temporalio/client.py
Outdated
handle = await self.start_activity(*args, **kwargs) | ||
return await handle.result() | ||
|
||
async def list_activities( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be interesting to see how we can model this to return both workflow and standalone "client" activities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the iterator yield Union[ActivityExecution, WorkflowActivityExecution]
. Those two dataclasses share a few fields.
|
||
# - TODO: Overloads for no-param, single-param, multi-param | ||
# - TODO: Support sync and async activity functions | ||
async def start_activity( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need execute_activity
but we can leave that for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually execute_activity
is already present below.
temporalio/client.py
Outdated
|
||
|
||
@dataclass(frozen=True) | ||
class AsyncActivityIDReference: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider deprecating and renaming to WorkflowActivityIDReference
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or merging the two reference types where workflow_id
becomes an optional field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've merged them into ActivityIDReference
and retained the AsyncActivityIDReference
name as an alias.
temporalio/client.py
Outdated
"""Handle representing an activity started by a workflow.""" | ||
|
||
def __init__( | ||
self, client: Client, id_or_token: Union[AsyncActivityIDReference, bytes] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should work with any activity IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. The PR:
- Makes
get_async_activity_handle
work for Standalone Activities and Workflow Activities. This is essentially a client appropriate for the "owner" of the activity, permitting manual completion/fail/cancellation/heartbeating - Introduces
Client.get_activity_handle
for SA only. This is a cliet appropriate for the caller of the activity: describe, poll, request cancellation, etc
temporalio/client.py
Outdated
self._id_or_token = id_or_token | ||
|
||
|
||
WorkflowActivityHandle = AsyncActivityHandle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also need pause
, reset
, etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still want AsyncActivityHandle
because you can obtain one with a token and can't do anything else with the token but issue completion requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added implementations of pause
/ unpause
/ reset
to the new ActivityHandle
for SAs.
The existing AsyncActivityHandle
is unperturbed by this PR: it just gains a constructor for SAs.
""" | ||
raise NotImplementedError | ||
|
||
# TODO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also TODO: all of the async completion methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean heartbeat
, complete
, fail
, and report_cancellation
, right? Those are all inherited from _BaseActivityHandle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm on the fence whether we want to expose these methods on the activity handle as opposed to having the async completion handle as a separate concept. The use cases are different for the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also changed direction here: they are now on AsyncActivityHandle
, for SA as they are for WA.
temporalio/client.py
Outdated
) | ||
|
||
|
||
# TODO: This name is suboptimal now. We could deprecate it and introduce WorkflowActivityHandle as a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's do that. Which means that you would have to accept a WorkflowActivityHandle
where you accept AsyncActivityHandle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed, and am retaining the old name as an alias to the same class object.
temporalio/client.py
Outdated
|
||
class AsyncActivityHandle: | ||
"""Handle representing an external activity for completion and heartbeat.""" | ||
class _BaseActivityHandle: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider just duplicating instead of inheriting but don't have a strong opinion.
e53a5c2
to
0a21bf7
Compare
07c6917
to
49ae6c3
Compare
49ae6c3
to
818c417
Compare
818c417
to
375a6e9
Compare
2ca269e
to
c681c13
Compare
c681c13
to
ace675c
Compare
ace675c
to
741603f
Compare
This reverts commit 7127c0fe5296d6df4c174f2792447c98934a95ce.
) | ||
|
||
@classmethod | ||
def get_name_and_result_type( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're extracting this common logic out of workflow_instance.py
, can we update workflow_instance.py
use this too? Also, then can we get rid of must_from_callable
and inline it into this method since it won't be called anywhere anymore?
# - TODO: Support sync and async activity functions | ||
async def start_activity( | ||
self, | ||
activity: Union[str, Callable[..., Awaitable[ReturnType]]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think when we get to the overloads, this final form may not make sense to use the generic
) | ||
|
||
|
||
class IdReusePolicy(IntEnum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think each top-level thing should have its own enumerate here. It doesn't make sense to have workflow ID reuse policy and not activity ID reuse policy. Same for ID conflict policy. It makes more sense from a user POV not to pretend like this is a common ID reuse policy when it is not (nor do we need it to be).
We should not eschew consistency just because we may have a NexusOperationIdReusePolicy
one day (and we'll be happy we kept them separate if they diverge). This is no different than cancellation type or any of these others.
retry_policy: Optional[temporalio.common.RetryPolicy] = None, | ||
search_attributes: Optional[ | ||
Union[ | ||
temporalio.common.SearchAttributes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we don't need to accept this deprecated form of search attributes for newer API, but it's not harmful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I meant to remove that. Removed.
|
||
def list_activities( | ||
self, | ||
query: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm whether we expect a query specifically saying activity kind is "standalone" at this time so that when we add non-standalone one day it doesn't surprise users?
TIMED_OUT = 6 # ACTIVITY_EXECUTION_STATUS_TIMED_OUT | ||
|
||
|
||
class PendingActivityState(IntEnum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this also may make sense in the client
module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should go ahead and put here the expected changes to activity runtime. Specifically I assume all workflow_
-prefixed fields of Info
will become optional. I would also recommend either a "kind" enumerate for activities, or add an is_standalone
akin to is_local
so users can know it's not the traditional activity.
namespace: str | ||
"""Namespace.""" | ||
|
||
workflow_id: Optional[str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What situations would workflow ID be optional here?
# TODO: This error class has required history event fields. I propose we retain it as | ||
# workflow-specific and introduce client.ActivityFailureError for an error in a standalone activity. | ||
# We could deprecate this name and introduce WorkflowActivityError as a preferred-going-forwards | ||
# alias. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with behavior, but disagree with the aliasing. This specifically maps to ActivityFailureInfo
in our API, I think we should keep that naming correlation (same for all failure errors). I expect similar for standalone Nexus operation failures (they don't use the named-in-proto failure messages, so they don't affect failure error things).
activity_id: Optional[str] | ||
"""Activity ID. Optional if this is an activity started from a workflow.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what situations is this optional? To confirm, is this not the activity ID for regular activities? We have to make sure that every value that is present on the deserialization side is present on the serialization side. So a workflow should always set this (right now it's defaulted in the constructor of the activity handle, but we can move it out if needed).
To test:
In
temporal
checkout the WIP server branch andThen in
sdk-python
, checkout this branch andSee