From c0e27a6a203f1d4b4d7c7ba346a2864505e673f7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 09:55:15 -0400 Subject: [PATCH 1/5] Remove health check and "bad" node concept A concern was raised that our samples should not be demonstrating any form of polling from within an entity workflow loop, even if the poll frequency is low. Instead, we should point to long-running activity patterns. --- .../safe_message_handlers/workflow_test.py | 12 ++--- .../safe_message_handlers/activities.py | 16 ------- .../safe_message_handlers/worker.py | 3 +- .../safe_message_handlers/workflow.py | 46 ++----------------- 4 files changed, 10 insertions(+), 67 deletions(-) diff --git a/tests/updates_and_signals/safe_message_handlers/workflow_test.py b/tests/updates_and_signals/safe_message_handlers/workflow_test.py index 852419ef..3aed8dd2 100644 --- a/tests/updates_and_signals/safe_message_handlers/workflow_test.py +++ b/tests/updates_and_signals/safe_message_handlers/workflow_test.py @@ -9,7 +9,6 @@ from updates_and_signals.safe_message_handlers.activities import ( assign_nodes_to_job, - find_bad_nodes, unassign_nodes_for_job, ) from updates_and_signals.safe_message_handlers.workflow import ( @@ -30,7 +29,7 @@ async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment): client, task_queue=task_queue, workflows=[ClusterManagerWorkflow], - activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], + activities=[assign_nodes_to_job, unassign_nodes_for_job], ): cluster_manager_handle = await client.start_workflow( ClusterManagerWorkflow.run, @@ -82,7 +81,7 @@ async def test_update_idempotency(client: Client, env: WorkflowEnvironment): client, task_queue=task_queue, workflows=[ClusterManagerWorkflow], - activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], + activities=[assign_nodes_to_job, unassign_nodes_for_job], ): cluster_manager_handle = await client.start_workflow( ClusterManagerWorkflow.run, @@ -106,8 +105,7 @@ async def test_update_idempotency(client: Client, env: WorkflowEnvironment): total_num_nodes=5, job_name="jobby-job" ), ) - # the second call should not assign more nodes (it may return fewer if the health check finds bad nodes - # in between the two signals.) + # the second call should not assign more nodes assert result_1.nodes_assigned >= result_2.nodes_assigned @@ -121,7 +119,7 @@ async def test_update_failure(client: Client, env: WorkflowEnvironment): client, task_queue=task_queue, workflows=[ClusterManagerWorkflow], - activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], + activities=[assign_nodes_to_job, unassign_nodes_for_job], ): cluster_manager_handle = await client.start_workflow( ClusterManagerWorkflow.run, @@ -152,4 +150,4 @@ async def test_update_failure(client: Client, env: WorkflowEnvironment): finally: await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster) result = await cluster_manager_handle.result() - assert result.num_currently_assigned_nodes + result.num_bad_nodes == 24 + assert result.num_currently_assigned_nodes == 24 diff --git a/updates_and_signals/safe_message_handlers/activities.py b/updates_and_signals/safe_message_handlers/activities.py index 3a1c9cd2..48a619fe 100644 --- a/updates_and_signals/safe_message_handlers/activities.py +++ b/updates_and_signals/safe_message_handlers/activities.py @@ -27,19 +27,3 @@ class UnassignNodesForJobInput: async def unassign_nodes_for_job(input: UnassignNodesForJobInput) -> None: print(f"Deallocating nodes {input.nodes} from job {input.job_name}") await asyncio.sleep(0.1) - - -@dataclass -class FindBadNodesInput: - nodes_to_check: Set[str] - - -@activity.defn -async def find_bad_nodes(input: FindBadNodesInput) -> Set[str]: - await asyncio.sleep(0.1) - bad_nodes = set([n for n in input.nodes_to_check if int(n) % 5 == 0]) - if bad_nodes: - print(f"Found bad nodes: {bad_nodes}") - else: - print("No new bad nodes found.") - return bad_nodes diff --git a/updates_and_signals/safe_message_handlers/worker.py b/updates_and_signals/safe_message_handlers/worker.py index 5e28eca3..098e892a 100644 --- a/updates_and_signals/safe_message_handlers/worker.py +++ b/updates_and_signals/safe_message_handlers/worker.py @@ -7,7 +7,6 @@ from updates_and_signals.safe_message_handlers.workflow import ( ClusterManagerWorkflow, assign_nodes_to_job, - find_bad_nodes, unassign_nodes_for_job, ) @@ -22,7 +21,7 @@ async def main(): client, task_queue="safe-message-handlers-task-queue", workflows=[ClusterManagerWorkflow], - activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes], + activities=[assign_nodes_to_job, unassign_nodes_for_job], ): # Wait until interrupted logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit") diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index b8230578..8f04173d 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -5,15 +5,12 @@ from typing import Dict, List, Optional, Set from temporalio import workflow -from temporalio.common import RetryPolicy from temporalio.exceptions import ApplicationError from updates_and_signals.safe_message_handlers.activities import ( AssignNodesToJobInput, - FindBadNodesInput, UnassignNodesForJobInput, assign_nodes_to_job, - find_bad_nodes, unassign_nodes_for_job, ) @@ -37,7 +34,6 @@ class ClusterManagerInput: @dataclass class ClusterManagerResult: num_currently_assigned_nodes: int - num_bad_nodes: int # Be in the habit of storing message inputs and outputs in serializable structures. @@ -116,7 +112,7 @@ async def assign_nodes_to_job( ) nodes_to_assign = unassigned_nodes[: input.total_num_nodes] # This await would be dangerous without nodes_lock because it yields control and allows interleaving - # with delete_job and perform_health_checks, which both touch self.state.nodes. + # with delete_job, which touches self.state.nodes. await self._assign_nodes_to_job(nodes_to_assign, input.job_name) return ClusterManagerAssignNodesToJobResult( nodes_assigned=self.get_assigned_nodes(job_name=input.job_name) @@ -150,7 +146,7 @@ async def delete_job(self, input: ClusterManagerDeleteJobInput) -> None: k for k, v in self.state.nodes.items() if v == input.job_name ] # This await would be dangerous without nodes_lock because it yields control and allows interleaving - # with assign_nodes_to_job and perform_health_checks, which all touch self.state.nodes. + # with assign_nodes_to_job, which touches self.state.nodes. await self._unassign_nodes_for_job(nodes_to_unassign, input.job_name) async def _unassign_nodes_for_job( @@ -167,40 +163,11 @@ async def _unassign_nodes_for_job( def get_unassigned_nodes(self) -> List[str]: return [k for k, v in self.state.nodes.items() if v is None] - def get_bad_nodes(self) -> Set[str]: - return set([k for k, v in self.state.nodes.items() if v == "BAD!"]) - def get_assigned_nodes(self, *, job_name: Optional[str] = None) -> Set[str]: if job_name: return set([k for k, v in self.state.nodes.items() if v == job_name]) else: - return set( - [ - k - for k, v in self.state.nodes.items() - if v is not None and v != "BAD!" - ] - ) - - async def perform_health_checks(self) -> None: - async with self.nodes_lock: - assigned_nodes = self.get_assigned_nodes() - try: - # This await would be dangerous without nodes_lock because it yields control and allows interleaving - # with assign_nodes_to_job and delete_job, which both touch self.state.nodes. - bad_nodes = await workflow.execute_activity( - find_bad_nodes, - FindBadNodesInput(nodes_to_check=assigned_nodes), - start_to_close_timeout=timedelta(seconds=10), - # This health check is optional, and our lock would block the whole workflow if we let it retry forever. - retry_policy=RetryPolicy(maximum_attempts=1), - ) - for node in bad_nodes: - self.state.nodes[node] = "BAD!" - except Exception as e: - workflow.logger.warn( - f"Health check failed with error {type(e).__name__}:{e}" - ) + return set([k for k, v in self.state.nodes.items() if v is not None]) # The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and # continue-as-new. @@ -229,9 +196,7 @@ def should_continue_as_new(self) -> bool: async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: self.init(input) await workflow.wait_condition(lambda: self.state.cluster_started) - # Perform health checks at intervals. while True: - await self.perform_health_checks() try: await workflow.wait_condition( lambda: self.state.cluster_shutdown @@ -250,7 +215,4 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: test_continue_as_new=input.test_continue_as_new, ) ) - return ClusterManagerResult( - len(self.get_assigned_nodes()), - len(self.get_bad_nodes()), - ) + return ClusterManagerResult(len(self.get_assigned_nodes())) From 7ec0e726c433c9d5778ac6e774606f882e5c8337 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 10:25:05 -0400 Subject: [PATCH 2/5] Remove unnecessary control logic and otherwise simplify main loop --- .../safe_message_handlers/workflow.py | 34 +++++++------------ 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index 8f04173d..7bfe993b 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -66,7 +66,6 @@ def __init__(self) -> None: # Protects workflow state from interleaved access self.nodes_lock = asyncio.Lock() self.max_history_length: Optional[int] = None - self.sleep_interval_seconds: int = 600 @workflow.signal async def start_cluster(self) -> None: @@ -176,7 +175,6 @@ def init(self, input: ClusterManagerInput) -> None: self.state = input.state if input.test_continue_as_new: self.max_history_length = 120 - self.sleep_interval_seconds = 1 def should_continue_as_new(self) -> bool: # We don't want to continue-as-new if we're in the middle of an update @@ -195,24 +193,16 @@ def should_continue_as_new(self) -> bool: @workflow.run async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: self.init(input) - await workflow.wait_condition(lambda: self.state.cluster_started) - while True: - try: - await workflow.wait_condition( - lambda: self.state.cluster_shutdown - or self.should_continue_as_new(), - timeout=timedelta(seconds=self.sleep_interval_seconds), - ) - except asyncio.TimeoutError: - pass - if self.state.cluster_shutdown: - break - if self.should_continue_as_new(): - workflow.logger.info("Continuing as new") - workflow.continue_as_new( - ClusterManagerInput( - state=self.state, - test_continue_as_new=input.test_continue_as_new, - ) + await workflow.wait_condition( + lambda: self.state.cluster_shutdown or self.should_continue_as_new() + ) + if self.should_continue_as_new(): + workflow.logger.info("Continuing as new") + workflow.continue_as_new( + ClusterManagerInput( + state=self.state, + test_continue_as_new=input.test_continue_as_new, ) - return ClusterManagerResult(len(self.get_assigned_nodes())) + ) + else: + return ClusterManagerResult(len(self.get_assigned_nodes())) From e3eba6e238b341d55fbd3c17268d79172bb6caa7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 10:56:30 -0400 Subject: [PATCH 3/5] Reinstate loop --- .../safe_message_handlers/workflow.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index 7bfe993b..7b677311 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -193,16 +193,18 @@ def should_continue_as_new(self) -> bool: @workflow.run async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: self.init(input) - await workflow.wait_condition( - lambda: self.state.cluster_shutdown or self.should_continue_as_new() - ) - if self.should_continue_as_new(): - workflow.logger.info("Continuing as new") - workflow.continue_as_new( - ClusterManagerInput( - state=self.state, - test_continue_as_new=input.test_continue_as_new, - ) + while True: + await workflow.wait_condition( + lambda: self.state.cluster_shutdown or self.should_continue_as_new() ) - else: - return ClusterManagerResult(len(self.get_assigned_nodes())) + if self.state.cluster_shutdown: + break + if self.should_continue_as_new(): + workflow.logger.info("Continuing as new") + workflow.continue_as_new( + ClusterManagerInput( + state=self.state, + test_continue_as_new=input.test_continue_as_new, + ) + ) + return ClusterManagerResult(len(self.get_assigned_nodes())) From d934ee11fabde0b951d6b6d7eca8998c83022853 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 11:01:47 -0400 Subject: [PATCH 4/5] Wait all handlers finished before CAN --- updates_and_signals/safe_message_handlers/workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index 7b677311..942bb5c8 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -200,6 +200,7 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: if self.state.cluster_shutdown: break if self.should_continue_as_new(): + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) workflow.logger.info("Continuing as new") workflow.continue_as_new( ClusterManagerInput( From ab9b2f93c8d0188e9c65fa2a8cc5e6331a224ca1 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 24 Jul 2024 11:06:48 -0400 Subject: [PATCH 5/5] Update SDK version --- poetry.lock | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index 5fa72054..72b30261 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3041,4 +3041,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "0497040b902f57971353bd95171ce201a3254a92a1994a5717ab3a6e3f7dc5c4" +content-hash = "c6979928bb141377bd1aa63e9b991abf52c9b3e9f81707158a0d7f31fae2884d" diff --git a/pyproject.toml b/pyproject.toml index 1bdb7250..38e6fb85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ packages = [ [tool.poetry.dependencies] python = "^3.8" -temporalio = "^1.5.0" +temporalio = "^1.6.0" [tool.poetry.dev-dependencies] black = "^22.3.0"