Skip to content

Commit 35b476d

Browse files
drewhoskins-temporaldrewhoskinsdandavison
authoredJul 24, 2024
Workflow Update and Signal handlers concurrency sample (#123)
* Atomic message handlers sample * Remove resize jobs to reduce code size * Misc polish * Add test * Format code * Continue as new * Formatting * Feedback, readme, restructure files and directories * Format * More feedback. Add test-continue-as-new flag. * Feedback; throw ApplicationFailures from update handlers * Formatting * __init__.py * Fix lint issues * Dan Feedback * More typehints * s/atomic/safe/ * Fix and demo idempotency * Compatibility with 3.8 * More feedback * Re-add tests * Fix flaky test * Improve update and tests * list -> set to fix a test * Return a struct rather than a raw value from the list for better hygiene * Remove test dependency on race conditions between health check and adding nodes. * Ruff linting * Use consistent verbs, improve health check * poe format * Minor sample improvements * Skip update tests under Java test server --------- Co-authored-by: Drew Hoskins <[email protected]> Co-authored-by: Dan Davison <[email protected]>
1 parent 5ae603e commit 35b476d

File tree

12 files changed

+620
-12
lines changed

12 files changed

+620
-12
lines changed
 

‎README.md‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ Some examples require extra dependencies. See each sample's directory for specif
6767
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
6868
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
6969
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
70+
* [safe_message_handlers](updates_and_signals/safe_message_handlers/) - Safely handling updates and signals.
7071
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
7172
* [sentry](sentry) - Report errors to Sentry.
7273
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.

‎polling/frequent/README.md‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ To run, first see [README.md](../../README.md) for prerequisites.
88

99
Then, run the following from this directory to run the sample:
1010

11-
```bash
12-
poetry run python run_worker.py
13-
poetry run python run_frequent.py
14-
```
11+
poetry run python run_worker.py
12+
13+
Then, in another terminal, run the following to execute the workflow:
14+
15+
poetry run python run_frequent.py
1516

1617
The Workflow will continue to poll the service and heartbeat on every iteration until it succeeds.
1718

‎polling/infrequent/README.md‎

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ To run, first see [README.md](../../README.md) for prerequisites.
1313

1414
Then, run the following from this directory to run the sample:
1515

16-
```bash
17-
poetry run python run_worker.py
18-
poetry run python run_infrequent.py
19-
```
16+
poetry run python run_worker.py
17+
18+
Then, in another terminal, run the following to execute the workflow:
19+
20+
poetry run python run_infrequent.py
21+
2022

2123
Since the test service simulates being _down_ for four polling attempts and then returns _OK_ on the fifth poll attempt, the Workflow will perform four Activity retries with a 60-second poll interval, and then return the service result on the successful fifth attempt.
2224

‎polling/periodic_sequence/README.md‎

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ To run, first see [README.md](../../README.md) for prerequisites.
88

99
Then, run the following from this directory to run the sample:
1010

11-
```bash
12-
poetry run python run_worker.py
13-
poetry run python run_periodic.py
14-
```
11+
poetry run python run_worker.py
12+
13+
Then, in another terminal, run the following to execute the workflow:
14+
15+
poetry run python run_periodic.py
16+
1517

1618
This will start a Workflow and Child Workflow to periodically poll an Activity.
1719
The Parent Workflow is not aware about the Child Workflow calling Continue-As-New, and it gets notified when it completes (or fails).
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import asyncio
2+
import uuid
3+
4+
import pytest
5+
from temporalio.client import Client, WorkflowUpdateFailedError
6+
from temporalio.exceptions import ApplicationError
7+
from temporalio.testing import WorkflowEnvironment
8+
from temporalio.worker import Worker
9+
10+
from updates_and_signals.safe_message_handlers.activities import (
11+
assign_nodes_to_job,
12+
find_bad_nodes,
13+
unassign_nodes_for_job,
14+
)
15+
from updates_and_signals.safe_message_handlers.workflow import (
16+
ClusterManagerAssignNodesToJobInput,
17+
ClusterManagerDeleteJobInput,
18+
ClusterManagerInput,
19+
ClusterManagerWorkflow,
20+
)
21+
22+
23+
async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment):
24+
if env.supports_time_skipping:
25+
pytest.skip(
26+
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
27+
)
28+
task_queue = f"tq-{uuid.uuid4()}"
29+
async with Worker(
30+
client,
31+
task_queue=task_queue,
32+
workflows=[ClusterManagerWorkflow],
33+
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
34+
):
35+
cluster_manager_handle = await client.start_workflow(
36+
ClusterManagerWorkflow.run,
37+
ClusterManagerInput(),
38+
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
39+
task_queue=task_queue,
40+
)
41+
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
42+
43+
allocation_updates = []
44+
for i in range(6):
45+
allocation_updates.append(
46+
cluster_manager_handle.execute_update(
47+
ClusterManagerWorkflow.assign_nodes_to_job,
48+
ClusterManagerAssignNodesToJobInput(
49+
total_num_nodes=2, job_name=f"task-{i}"
50+
),
51+
)
52+
)
53+
results = await asyncio.gather(*allocation_updates)
54+
for result in results:
55+
assert len(result.nodes_assigned) == 2
56+
57+
await asyncio.sleep(1)
58+
59+
deletion_updates = []
60+
for i in range(6):
61+
deletion_updates.append(
62+
cluster_manager_handle.execute_update(
63+
ClusterManagerWorkflow.delete_job,
64+
ClusterManagerDeleteJobInput(job_name=f"task-{i}"),
65+
)
66+
)
67+
await asyncio.gather(*deletion_updates)
68+
69+
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
70+
71+
result = await cluster_manager_handle.result()
72+
assert result.num_currently_assigned_nodes == 0
73+
74+
75+
async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
76+
if env.supports_time_skipping:
77+
pytest.skip(
78+
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
79+
)
80+
task_queue = f"tq-{uuid.uuid4()}"
81+
async with Worker(
82+
client,
83+
task_queue=task_queue,
84+
workflows=[ClusterManagerWorkflow],
85+
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
86+
):
87+
cluster_manager_handle = await client.start_workflow(
88+
ClusterManagerWorkflow.run,
89+
ClusterManagerInput(),
90+
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
91+
task_queue=task_queue,
92+
)
93+
94+
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
95+
96+
result_1 = await cluster_manager_handle.execute_update(
97+
ClusterManagerWorkflow.assign_nodes_to_job,
98+
ClusterManagerAssignNodesToJobInput(
99+
total_num_nodes=5, job_name="jobby-job"
100+
),
101+
)
102+
# simulate that in calling it twice, the operation is idempotent
103+
result_2 = await cluster_manager_handle.execute_update(
104+
ClusterManagerWorkflow.assign_nodes_to_job,
105+
ClusterManagerAssignNodesToJobInput(
106+
total_num_nodes=5, job_name="jobby-job"
107+
),
108+
)
109+
# the second call should not assign more nodes (it may return fewer if the health check finds bad nodes
110+
# in between the two signals.)
111+
assert result_1.nodes_assigned >= result_2.nodes_assigned
112+
113+
114+
async def test_update_failure(client: Client, env: WorkflowEnvironment):
115+
if env.supports_time_skipping:
116+
pytest.skip(
117+
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
118+
)
119+
task_queue = f"tq-{uuid.uuid4()}"
120+
async with Worker(
121+
client,
122+
task_queue=task_queue,
123+
workflows=[ClusterManagerWorkflow],
124+
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
125+
):
126+
cluster_manager_handle = await client.start_workflow(
127+
ClusterManagerWorkflow.run,
128+
ClusterManagerInput(),
129+
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
130+
task_queue=task_queue,
131+
)
132+
133+
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
134+
135+
await cluster_manager_handle.execute_update(
136+
ClusterManagerWorkflow.assign_nodes_to_job,
137+
ClusterManagerAssignNodesToJobInput(
138+
total_num_nodes=24, job_name="big-task"
139+
),
140+
)
141+
try:
142+
# Try to assign too many nodes
143+
await cluster_manager_handle.execute_update(
144+
ClusterManagerWorkflow.assign_nodes_to_job,
145+
ClusterManagerAssignNodesToJobInput(
146+
total_num_nodes=3, job_name="little-task"
147+
),
148+
)
149+
except WorkflowUpdateFailedError as e:
150+
assert isinstance(e.cause, ApplicationError)
151+
assert e.cause.message == "Cannot assign 3 nodes; have only 1 available"
152+
finally:
153+
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
154+
result = await cluster_manager_handle.result()
155+
assert result.num_currently_assigned_nodes + result.num_bad_nodes == 24

‎updates_and_signals/__init__.py‎

Whitespace-only changes.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Atomic message handlers
2+
3+
This sample shows off important techniques for handling signals and updates, aka messages. In particular, it illustrates how message handlers can interleave or not be completed before the workflow completes, and how you can manage that.
4+
5+
* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown.
6+
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start."
7+
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access.
8+
* Message handlers should also finish before the workflow run completes. One option is to use a lock.
9+
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so.
10+
* Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`.
11+
12+
To run, first see [README.md](../../README.md) for prerequisites.
13+
14+
Then, run the following from this directory to run the worker:
15+
\
16+
poetry run python worker.py
17+
18+
Then, in another terminal, run the following to execute the workflow:
19+
20+
poetry run python starter.py
21+
22+
This will start a worker to run your workflow and activities, then start a ClusterManagerWorkflow and put it through its paces.

‎updates_and_signals/safe_message_handlers/__init__.py‎

Whitespace-only changes.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import List, Set
4+
5+
from temporalio import activity
6+
7+
8+
@dataclass
9+
class AssignNodesToJobInput:
10+
nodes: List[str]
11+
job_name: str
12+
13+
14+
@activity.defn
15+
async def assign_nodes_to_job(input: AssignNodesToJobInput) -> None:
16+
print(f"Assigning nodes {input.nodes} to job {input.job_name}")
17+
await asyncio.sleep(0.1)
18+
19+
20+
@dataclass
21+
class UnassignNodesForJobInput:
22+
nodes: List[str]
23+
job_name: str
24+
25+
26+
@activity.defn
27+
async def unassign_nodes_for_job(input: UnassignNodesForJobInput) -> None:
28+
print(f"Deallocating nodes {input.nodes} from job {input.job_name}")
29+
await asyncio.sleep(0.1)
30+
31+
32+
@dataclass
33+
class FindBadNodesInput:
34+
nodes_to_check: Set[str]
35+
36+
37+
@activity.defn
38+
async def find_bad_nodes(input: FindBadNodesInput) -> Set[str]:
39+
await asyncio.sleep(0.1)
40+
bad_nodes = set([n for n in input.nodes_to_check if int(n) % 5 == 0])
41+
if bad_nodes:
42+
print(f"Found bad nodes: {bad_nodes}")
43+
else:
44+
print("No new bad nodes found.")
45+
return bad_nodes
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import argparse
2+
import asyncio
3+
import logging
4+
import uuid
5+
from typing import Optional
6+
7+
from temporalio import common
8+
from temporalio.client import Client, WorkflowHandle
9+
10+
from updates_and_signals.safe_message_handlers.workflow import (
11+
ClusterManagerAssignNodesToJobInput,
12+
ClusterManagerDeleteJobInput,
13+
ClusterManagerInput,
14+
ClusterManagerWorkflow,
15+
)
16+
17+
18+
async def do_cluster_lifecycle(wf: WorkflowHandle, delay_seconds: Optional[int] = None):
19+
20+
await wf.signal(ClusterManagerWorkflow.start_cluster)
21+
22+
print("Assigning jobs to nodes...")
23+
allocation_updates = []
24+
for i in range(6):
25+
allocation_updates.append(
26+
wf.execute_update(
27+
ClusterManagerWorkflow.assign_nodes_to_job,
28+
ClusterManagerAssignNodesToJobInput(
29+
total_num_nodes=2, job_name=f"task-{i}"
30+
),
31+
)
32+
)
33+
await asyncio.gather(*allocation_updates)
34+
35+
print(f"Sleeping for {delay_seconds} second(s)")
36+
if delay_seconds:
37+
await asyncio.sleep(delay_seconds)
38+
39+
print("Deleting jobs...")
40+
deletion_updates = []
41+
for i in range(6):
42+
deletion_updates.append(
43+
wf.execute_update(
44+
ClusterManagerWorkflow.delete_job,
45+
ClusterManagerDeleteJobInput(job_name=f"task-{i}"),
46+
)
47+
)
48+
await asyncio.gather(*deletion_updates)
49+
50+
await wf.signal(ClusterManagerWorkflow.shutdown_cluster)
51+
52+
53+
async def main(should_test_continue_as_new: bool):
54+
# Connect to Temporal
55+
client = await Client.connect("localhost:7233")
56+
57+
print("Starting cluster")
58+
cluster_manager_handle = await client.start_workflow(
59+
ClusterManagerWorkflow.run,
60+
ClusterManagerInput(test_continue_as_new=should_test_continue_as_new),
61+
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
62+
task_queue="safe-message-handlers-task-queue",
63+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
64+
)
65+
delay_seconds = 10 if should_test_continue_as_new else 1
66+
await do_cluster_lifecycle(cluster_manager_handle, delay_seconds=delay_seconds)
67+
result = await cluster_manager_handle.result()
68+
print(
69+
f"Cluster shut down successfully."
70+
f" It had {result.num_currently_assigned_nodes} nodes assigned at the end."
71+
)
72+
73+
74+
if __name__ == "__main__":
75+
logging.basicConfig(level=logging.INFO)
76+
parser = argparse.ArgumentParser(description="Atomic message handlers")
77+
parser.add_argument(
78+
"--test-continue-as-new",
79+
help="Make the ClusterManagerWorkflow continue as new before shutting down",
80+
action="store_true",
81+
default=False,
82+
)
83+
args = parser.parse_args()
84+
asyncio.run(main(args.test_continue_as_new))
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from updates_and_signals.safe_message_handlers.workflow import (
8+
ClusterManagerWorkflow,
9+
assign_nodes_to_job,
10+
find_bad_nodes,
11+
unassign_nodes_for_job,
12+
)
13+
14+
interrupt_event = asyncio.Event()
15+
16+
17+
async def main():
18+
# Connect client
19+
client = await Client.connect("localhost:7233")
20+
21+
async with Worker(
22+
client,
23+
task_queue="safe-message-handlers-task-queue",
24+
workflows=[ClusterManagerWorkflow],
25+
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
26+
):
27+
# Wait until interrupted
28+
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
29+
await interrupt_event.wait()
30+
logging.info("Shutting down")
31+
32+
33+
if __name__ == "__main__":
34+
logging.basicConfig(level=logging.INFO)
35+
loop = asyncio.new_event_loop()
36+
try:
37+
loop.run_until_complete(main())
38+
except KeyboardInterrupt:
39+
interrupt_event.set()
40+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import asyncio
2+
import dataclasses
3+
from dataclasses import dataclass
4+
from datetime import timedelta
5+
from typing import Dict, List, Optional, Set
6+
7+
from temporalio import workflow
8+
from temporalio.common import RetryPolicy
9+
from temporalio.exceptions import ApplicationError
10+
11+
from updates_and_signals.safe_message_handlers.activities import (
12+
AssignNodesToJobInput,
13+
FindBadNodesInput,
14+
UnassignNodesForJobInput,
15+
assign_nodes_to_job,
16+
find_bad_nodes,
17+
unassign_nodes_for_job,
18+
)
19+
20+
21+
# In workflows that continue-as-new, it's convenient to store all your state in one serializable structure
22+
# to make it easier to pass between runs
23+
@dataclass
24+
class ClusterManagerState:
25+
cluster_started: bool = False
26+
cluster_shutdown: bool = False
27+
nodes: Dict[str, Optional[str]] = dataclasses.field(default_factory=dict)
28+
jobs_assigned: Set[str] = dataclasses.field(default_factory=set)
29+
30+
31+
@dataclass
32+
class ClusterManagerInput:
33+
state: Optional[ClusterManagerState] = None
34+
test_continue_as_new: bool = False
35+
36+
37+
@dataclass
38+
class ClusterManagerResult:
39+
num_currently_assigned_nodes: int
40+
num_bad_nodes: int
41+
42+
43+
# Be in the habit of storing message inputs and outputs in serializable structures.
44+
# This makes it easier to add more over time in a backward-compatible way.
45+
@dataclass
46+
class ClusterManagerAssignNodesToJobInput:
47+
# If larger or smaller than previous amounts, will resize the job.
48+
total_num_nodes: int
49+
job_name: str
50+
51+
52+
@dataclass
53+
class ClusterManagerDeleteJobInput:
54+
job_name: str
55+
56+
57+
@dataclass
58+
class ClusterManagerAssignNodesToJobResult:
59+
nodes_assigned: Set[str]
60+
61+
62+
# ClusterManagerWorkflow keeps track of the assignments of a cluster of nodes.
63+
# Via signals, the cluster can be started and shutdown.
64+
# Via updates, clients can also assign jobs to nodes and delete jobs.
65+
# These updates must run atomically.
66+
@workflow.defn
67+
class ClusterManagerWorkflow:
68+
def __init__(self) -> None:
69+
self.state = ClusterManagerState()
70+
# Protects workflow state from interleaved access
71+
self.nodes_lock = asyncio.Lock()
72+
self.max_history_length: Optional[int] = None
73+
self.sleep_interval_seconds: int = 600
74+
75+
@workflow.signal
76+
async def start_cluster(self) -> None:
77+
self.state.cluster_started = True
78+
self.state.nodes = {str(k): None for k in range(25)}
79+
workflow.logger.info("Cluster started")
80+
81+
@workflow.signal
82+
async def shutdown_cluster(self) -> None:
83+
await workflow.wait_condition(lambda: self.state.cluster_started)
84+
self.state.cluster_shutdown = True
85+
workflow.logger.info("Cluster shut down")
86+
87+
# This is an update as opposed to a signal because the client may want to wait for nodes to be allocated
88+
# before sending work to those nodes.
89+
# Returns the list of node names that were allocated to the job.
90+
@workflow.update
91+
async def assign_nodes_to_job(
92+
self, input: ClusterManagerAssignNodesToJobInput
93+
) -> ClusterManagerAssignNodesToJobResult:
94+
await workflow.wait_condition(lambda: self.state.cluster_started)
95+
if self.state.cluster_shutdown:
96+
# If you want the client to receive a failure, either add an update validator and throw the
97+
# exception from there, or raise an ApplicationError. Other exceptions in the main handler
98+
# will cause the workflow to keep retrying and get it stuck.
99+
raise ApplicationError(
100+
"Cannot assign nodes to a job: Cluster is already shut down"
101+
)
102+
103+
async with self.nodes_lock:
104+
# Idempotency guard.
105+
if input.job_name in self.state.jobs_assigned:
106+
return ClusterManagerAssignNodesToJobResult(
107+
self.get_assigned_nodes(job_name=input.job_name)
108+
)
109+
unassigned_nodes = self.get_unassigned_nodes()
110+
if len(unassigned_nodes) < input.total_num_nodes:
111+
# If you want the client to receive a failure, either add an update validator and throw the
112+
# exception from there, or raise an ApplicationError. Other exceptions in the main handler
113+
# will cause the workflow to keep retrying and get it stuck.
114+
raise ApplicationError(
115+
f"Cannot assign {input.total_num_nodes} nodes; have only {len(unassigned_nodes)} available"
116+
)
117+
nodes_to_assign = unassigned_nodes[: input.total_num_nodes]
118+
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
119+
# with delete_job and perform_health_checks, which both touch self.state.nodes.
120+
await self._assign_nodes_to_job(nodes_to_assign, input.job_name)
121+
return ClusterManagerAssignNodesToJobResult(
122+
nodes_assigned=self.get_assigned_nodes(job_name=input.job_name)
123+
)
124+
125+
async def _assign_nodes_to_job(
126+
self, assigned_nodes: List[str], job_name: str
127+
) -> None:
128+
await workflow.execute_activity(
129+
assign_nodes_to_job,
130+
AssignNodesToJobInput(nodes=assigned_nodes, job_name=job_name),
131+
start_to_close_timeout=timedelta(seconds=10),
132+
)
133+
for node in assigned_nodes:
134+
self.state.nodes[node] = job_name
135+
self.state.jobs_assigned.add(job_name)
136+
137+
# Even though it returns nothing, this is an update because the client may want to track it, for example
138+
# to wait for nodes to be unassignd before reassigning them.
139+
@workflow.update
140+
async def delete_job(self, input: ClusterManagerDeleteJobInput) -> None:
141+
await workflow.wait_condition(lambda: self.state.cluster_started)
142+
if self.state.cluster_shutdown:
143+
# If you want the client to receive a failure, either add an update validator and throw the
144+
# exception from there, or raise an ApplicationError. Other exceptions in the main handler
145+
# will cause the workflow to keep retrying and get it stuck.
146+
raise ApplicationError("Cannot delete a job: Cluster is already shut down")
147+
148+
async with self.nodes_lock:
149+
nodes_to_unassign = [
150+
k for k, v in self.state.nodes.items() if v == input.job_name
151+
]
152+
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
153+
# with assign_nodes_to_job and perform_health_checks, which all touch self.state.nodes.
154+
await self._unassign_nodes_for_job(nodes_to_unassign, input.job_name)
155+
156+
async def _unassign_nodes_for_job(
157+
self, nodes_to_unassign: List[str], job_name: str
158+
):
159+
await workflow.execute_activity(
160+
unassign_nodes_for_job,
161+
UnassignNodesForJobInput(nodes=nodes_to_unassign, job_name=job_name),
162+
start_to_close_timeout=timedelta(seconds=10),
163+
)
164+
for node in nodes_to_unassign:
165+
self.state.nodes[node] = None
166+
167+
def get_unassigned_nodes(self) -> List[str]:
168+
return [k for k, v in self.state.nodes.items() if v is None]
169+
170+
def get_bad_nodes(self) -> Set[str]:
171+
return set([k for k, v in self.state.nodes.items() if v == "BAD!"])
172+
173+
def get_assigned_nodes(self, *, job_name: Optional[str] = None) -> Set[str]:
174+
if job_name:
175+
return set([k for k, v in self.state.nodes.items() if v == job_name])
176+
else:
177+
return set(
178+
[
179+
k
180+
for k, v in self.state.nodes.items()
181+
if v is not None and v != "BAD!"
182+
]
183+
)
184+
185+
async def perform_health_checks(self) -> None:
186+
async with self.nodes_lock:
187+
assigned_nodes = self.get_assigned_nodes()
188+
try:
189+
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
190+
# with assign_nodes_to_job and delete_job, which both touch self.state.nodes.
191+
bad_nodes = await workflow.execute_activity(
192+
find_bad_nodes,
193+
FindBadNodesInput(nodes_to_check=assigned_nodes),
194+
start_to_close_timeout=timedelta(seconds=10),
195+
# This health check is optional, and our lock would block the whole workflow if we let it retry forever.
196+
retry_policy=RetryPolicy(maximum_attempts=1),
197+
)
198+
for node in bad_nodes:
199+
self.state.nodes[node] = "BAD!"
200+
except Exception as e:
201+
workflow.logger.warn(
202+
f"Health check failed with error {type(e).__name__}:{e}"
203+
)
204+
205+
# The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
206+
# continue-as-new.
207+
def init(self, input: ClusterManagerInput) -> None:
208+
if input.state:
209+
self.state = input.state
210+
if input.test_continue_as_new:
211+
self.max_history_length = 120
212+
self.sleep_interval_seconds = 1
213+
214+
def should_continue_as_new(self) -> bool:
215+
# We don't want to continue-as-new if we're in the middle of an update
216+
if self.nodes_lock.locked():
217+
return False
218+
if workflow.info().is_continue_as_new_suggested():
219+
return True
220+
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
221+
if (
222+
self.max_history_length
223+
and workflow.info().get_current_history_length() > self.max_history_length
224+
):
225+
return True
226+
return False
227+
228+
@workflow.run
229+
async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
230+
self.init(input)
231+
await workflow.wait_condition(lambda: self.state.cluster_started)
232+
# Perform health checks at intervals.
233+
while True:
234+
await self.perform_health_checks()
235+
try:
236+
await workflow.wait_condition(
237+
lambda: self.state.cluster_shutdown
238+
or self.should_continue_as_new(),
239+
timeout=timedelta(seconds=self.sleep_interval_seconds),
240+
)
241+
except asyncio.TimeoutError:
242+
pass
243+
if self.state.cluster_shutdown:
244+
break
245+
if self.should_continue_as_new():
246+
workflow.logger.info("Continuing as new")
247+
workflow.continue_as_new(
248+
ClusterManagerInput(
249+
state=self.state,
250+
test_continue_as_new=input.test_continue_as_new,
251+
)
252+
)
253+
return ClusterManagerResult(
254+
len(self.get_assigned_nodes()),
255+
len(self.get_bad_nodes()),
256+
)

0 commit comments

Comments
 (0)
Please sign in to comment.