Skip to content

Commit 50768df

Browse files
authored
Apply eviction before completing activation (#466)
1 parent 5fe85be commit 50768df

File tree

1 file changed

+56
-47
lines changed

1 file changed

+56
-47
lines changed

temporalio/worker/_workflow.py

+56-47
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,25 @@ async def _handle_activation(
174174
) -> None:
175175
global LOG_PROTOS
176176

177+
# Extract a couple of jobs from the activation
178+
cache_remove_job = None
179+
start_job = None
180+
for job in act.jobs:
181+
if job.HasField("remove_from_cache"):
182+
cache_remove_job = job.remove_from_cache
183+
elif job.HasField("start_workflow"):
184+
start_job = job.start_workflow
185+
cache_remove_only_activation = len(act.jobs) == 1 and cache_remove_job
186+
177187
# Build default success completion (e.g. remove-job-only activations)
178188
completion = (
179189
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
180190
)
181191
completion.successful.SetInParent()
182-
remove_job = None
183192
try:
184-
# Decode the activation if there's a codec
185-
if self._data_converter.payload_codec:
193+
# Decode the activation if there's a codec and it's not a
194+
# cache-remove-only activation
195+
if self._data_converter.payload_codec and not cache_remove_only_activation:
186196
await temporalio.bridge.worker.decode_activation(
187197
act, self._data_converter.payload_codec
188198
)
@@ -191,15 +201,21 @@ async def _handle_activation(
191201
logger.debug("Received workflow activation:\n%s", act)
192202

193203
# We only have to run if there are any non-remove-from-cache jobs
194-
remove_job = next(
195-
(j for j in act.jobs if j.HasField("remove_from_cache")), None
196-
)
197-
if len(act.jobs) > 1 or not remove_job:
204+
if not cache_remove_only_activation:
198205
# If the workflow is not running yet, create it
199206
workflow = self._running_workflows.get(act.run_id)
200207
if not workflow:
201-
workflow = self._create_workflow_instance(act)
208+
# Must have a start job to create instance
209+
if not start_job:
210+
raise RuntimeError(
211+
"Missing start workflow, workflow could have unexpectedly been removed from cache"
212+
)
213+
workflow = self._create_workflow_instance(act, start_job)
202214
self._running_workflows[act.run_id] = workflow
215+
elif start_job:
216+
# This should never happen
217+
logger.warn("Cache already exists for activation with start job")
218+
203219
# Run activation in separate thread so we can check if it's
204220
# deadlocked
205221
activate_task = asyncio.get_running_loop().run_in_executor(
@@ -241,8 +257,25 @@ async def _handle_activation(
241257
# Always set the run ID on the completion
242258
completion.run_id = act.run_id
243259

244-
# Encode the completion if there's a codec
245-
if self._data_converter.payload_codec:
260+
# If there is a remove-from-cache job, do so
261+
if cache_remove_job:
262+
if act.run_id in self._running_workflows:
263+
logger.debug(
264+
"Evicting workflow with run ID %s, message: %s",
265+
act.run_id,
266+
cache_remove_job.message,
267+
)
268+
del self._running_workflows[act.run_id]
269+
else:
270+
logger.warn(
271+
"Eviction request on unknown workflow with run ID %s, message: %s",
272+
act.run_id,
273+
cache_remove_job.message,
274+
)
275+
276+
# Encode the completion if there's a codec and it's not a
277+
# cache-remove-only activation
278+
if self._data_converter.payload_codec and not cache_remove_only_activation:
246279
try:
247280
await temporalio.bridge.worker.encode_completion(
248281
completion, self._data_converter.payload_codec
@@ -265,54 +298,30 @@ async def _handle_activation(
265298
"Failed completing activation on workflow with run ID %s", act.run_id
266299
)
267300

268-
# If there is a remove-from-cache job, do so
269-
if remove_job:
270-
if act.run_id in self._running_workflows:
271-
logger.debug(
272-
"Evicting workflow with run ID %s, message: %s",
273-
act.run_id,
274-
remove_job.remove_from_cache.message,
275-
)
276-
del self._running_workflows[act.run_id]
277-
else:
278-
logger.debug(
279-
"Eviction request on unknown workflow with run ID %s, message: %s",
280-
act.run_id,
281-
remove_job.remove_from_cache.message,
282-
)
283-
# If we are failing on eviction, set the error and shutdown the
284-
# entire worker
285-
if self._on_eviction_hook is not None:
286-
try:
287-
self._on_eviction_hook(act.run_id, remove_job.remove_from_cache)
288-
except Exception as e:
289-
self._throw_after_activation = e
290-
logger.debug("Shutting down worker on eviction")
291-
self._bridge_worker().initiate_shutdown()
301+
# If there is a remove job and an eviction hook, run it
302+
if cache_remove_job and self._on_eviction_hook is not None:
303+
try:
304+
self._on_eviction_hook(act.run_id, cache_remove_job)
305+
except Exception as e:
306+
self._throw_after_activation = e
307+
logger.debug("Shutting down worker on eviction")
308+
self._bridge_worker().initiate_shutdown()
292309

293310
def _create_workflow_instance(
294-
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
311+
self,
312+
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
313+
start: temporalio.bridge.proto.workflow_activation.StartWorkflow,
295314
) -> WorkflowInstance:
296-
# First find the start workflow job
297-
start_job = next((j for j in act.jobs if j.HasField("start_workflow")), None)
298-
if not start_job:
299-
raise RuntimeError(
300-
"Missing start workflow, workflow could have unexpectedly been removed from cache"
301-
)
302-
303315
# Get the definition
304-
defn = self._workflows.get(
305-
start_job.start_workflow.workflow_type, self._dynamic_workflow
306-
)
316+
defn = self._workflows.get(start.workflow_type, self._dynamic_workflow)
307317
if not defn:
308318
workflow_names = ", ".join(sorted(self._workflows.keys()))
309319
raise temporalio.exceptions.ApplicationError(
310-
f"Workflow class {start_job.start_workflow.workflow_type} is not registered on this worker, available workflows: {workflow_names}",
320+
f"Workflow class {start.workflow_type} is not registered on this worker, available workflows: {workflow_names}",
311321
type="NotFoundError",
312322
)
313323

314324
# Build info
315-
start = start_job.start_workflow
316325
parent: Optional[temporalio.workflow.ParentInfo] = None
317326
if start.HasField("parent_workflow_info"):
318327
parent = temporalio.workflow.ParentInfo(

0 commit comments

Comments
 (0)