Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 99 additions & 12 deletions core/src/worker/workflow/machines/nexus_operation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::worker::workflow::{
};
use itertools::Itertools;
use rustfsm::{MachineError, StateMachine, TransitionResult, fsm};
use temporal_sdk_core_protos::coresdk::nexus::NexusOperationCancellationType;
use temporal_sdk_core_protos::temporal::api::command::v1::command;
use temporal_sdk_core_protos::{
coresdk::{
nexus::{NexusOperationResult, nexus_operation_result},
Expand All @@ -16,7 +18,7 @@ use temporal_sdk_core_protos::{
workflow_commands::ScheduleNexusOperation,
},
temporal::api::{
command::v1::{RequestCancelNexusOperationCommandAttributes, command},
command::v1::RequestCancelNexusOperationCommandAttributes,
common::v1::Payload,
enums::v1::{CommandType, EventType},
failure::v1::{self as failure, Failure, failure::FailureInfo},
Expand Down Expand Up @@ -54,6 +56,7 @@ fsm! {
--(NexusOperationStarted(NexusOperationStartedEventAttributes), on_started)--> Started;

Started --(Cancel, shared on_issue_cancel)--> Started;
Started --(Cancel, shared on_issue_cancel)--> Cancelled;
Started --(CommandRequestCancelNexusOperation)--> Started;
Started --(NexusOperationCancelRequested)--> Started;
Started
Expand All @@ -65,10 +68,17 @@ fsm! {
Started
--(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut;

Cancelled --(Cancel)--> Cancelled;
Cancelled --(CommandRequestCancelNexusOperation)--> Cancelled;
Cancelled --(NexusOperationCancelRequested)--> Cancelled;
Cancelled --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), shared on_completed)--> Cancelled;
Cancelled --(NexusOperationFailed(NexusOperationFailedEventAttributes), shared on_failed)--> Cancelled;
Cancelled --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), shared on_timed_out)--> Cancelled;
Cancelled --(NexusOperationCanceled(NexusOperationCanceledEventAttributes))--> Cancelled;

// Ignore cancels in all terminal states
Completed --(Cancel)--> Completed;
Failed --(Cancel)--> Failed;
Cancelled --(Cancel)--> Cancelled;
TimedOut --(Cancel)--> TimedOut;
}

Expand Down Expand Up @@ -102,6 +112,7 @@ pub(super) struct SharedState {

cancelled_before_sent: bool,
cancel_sent: bool,
cancel_type: NexusOperationCancellationType,
}

impl NexusOperationMachine {
Expand All @@ -116,6 +127,7 @@ impl NexusOperationMachine {
operation: attribs.operation.clone(),
cancelled_before_sent: false,
cancel_sent: false,
cancel_type: attribs.cancellation_type(),
},
);
NewMachineWithCommand {
Expand Down Expand Up @@ -249,12 +261,20 @@ impl Started {
pub(crate) fn on_issue_cancel(
&self,
ss: &mut SharedState,
) -> NexusOperationMachineTransition<Started> {
) -> NexusOperationMachineTransition<StartedOrCancelled> {
if !ss.cancel_sent {
ss.cancel_sent = true;
NexusOperationMachineTransition::commands([NexusOperationCommand::IssueCancel])
let dest = if matches!(
ss.cancel_type,
NexusOperationCancellationType::Abandon | NexusOperationCancellationType::TryCancel
) {
StartedOrCancelled::Cancelled(Default::default())
} else {
StartedOrCancelled::Started(Default::default())
};
TransitionResult::ok([NexusOperationCommand::IssueCancel], dest)
} else {
NexusOperationMachineTransition::default()
TransitionResult::ok([], StartedOrCancelled::Started(Default::default()))
}
}

Expand Down Expand Up @@ -315,6 +335,49 @@ pub(super) struct TimedOut;
#[derive(Default, Clone)]
pub(super) struct Cancelled;

fn completion_of_not_abandoned_err() -> WFMachinesError {
WFMachinesError::Nondeterminism(
"Nexus operation which don't have the ABANDON cancellation type cannot complete after \
being cancelled."
.to_string(),
)
}

impl Cancelled {
pub(super) fn on_completed(
self,
ss: &mut SharedState,
_: NexusOperationCompletedEventAttributes,
) -> NexusOperationMachineTransition<Cancelled> {
if ss.cancel_type == NexusOperationCancellationType::Abandon {
return NexusOperationMachineTransition::Err(completion_of_not_abandoned_err());
}
NexusOperationMachineTransition::ok([], self)
}

pub(super) fn on_failed(
self,
ss: &mut SharedState,
_: NexusOperationFailedEventAttributes,
) -> NexusOperationMachineTransition<Cancelled> {
if ss.cancel_type == NexusOperationCancellationType::Abandon {
return NexusOperationMachineTransition::Err(completion_of_not_abandoned_err());
}
NexusOperationMachineTransition::ok([], self)
}

pub(super) fn on_timed_out(
self,
ss: &mut SharedState,
_: NexusOperationTimedOutEventAttributes,
) -> NexusOperationMachineTransition<Cancelled> {
if ss.cancel_type == NexusOperationCancellationType::Abandon {
return NexusOperationMachineTransition::Err(completion_of_not_abandoned_err());
}
NexusOperationMachineTransition::ok([], self)
}
}

impl TryFrom<HistEventData> for NexusOperationMachineEvents {
type Error = WFMachinesError;

Expand Down Expand Up @@ -497,14 +560,38 @@ impl WFMachinesAdapter for NexusOperationMachine {
]
}
NexusOperationCommand::IssueCancel => {
vec![MachineResponse::IssueNewCommand(
command::Attributes::RequestCancelNexusOperationCommandAttributes(
RequestCancelNexusOperationCommandAttributes {
scheduled_event_id: self.shared_state.scheduled_event_id,
},
let mut resps = vec![];
if self.shared_state.cancel_type != NexusOperationCancellationType::Abandon {
resps.push(MachineResponse::IssueNewCommand(
command::Attributes::RequestCancelNexusOperationCommandAttributes(
RequestCancelNexusOperationCommandAttributes {
scheduled_event_id: self.shared_state.scheduled_event_id,
},
)
.into(),
))
}
// Immediately resolve abandon/trycancel modes
if matches!(
self.shared_state.cancel_type,
NexusOperationCancellationType::Abandon
| NexusOperationCancellationType::TryCancel
) {
resps.push(
ResolveNexusOperation {
seq: self.shared_state.lang_seq_num,
result: Some(NexusOperationResult {
status: Some(nexus_operation_result::Status::Cancelled(
self.cancelled_failure(
"Nexus operation cancelled after starting".to_owned(),
),
)),
}),
}
.into(),
)
.into(),
)]
}
resps
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,19 @@ enum NexusTaskCancelReason {
TIMED_OUT = 0;
// The worker is shutting down
WORKER_SHUTDOWN = 1;
}
}

// Controls at which point to report back to lang when a nexus operation is cancelled
enum NexusOperationCancellationType {
// Do not request cancellation of the nexus operation if already scheduled
ABANDON = 0;

// Initiate a cancellation request for the Nexus operation and immediately report cancellation
// to the caller. Note that it doesn't guarantee that cancellation is delivered to the operation if calling workflow exits before the delivery is done.
// If you want to ensure that cancellation is delivered to the operation, use WAIT_CANCELLATION_REQUESTED.
TRY_CANCEL = 1;
// Request cancellation of the operation and wait for confirmation that the request was received.
WAIT_CANCELLATION_REQUESTED = 2;
// Wait for operation cancellation completion. Default.
WAIT_CANCELLATION_COMPLETED = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "temporal/api/enums/v1/workflow.proto";
import "temporal/api/failure/v1/message.proto";
import "temporal/api/sdk/v1/user_metadata.proto";
import "temporal/sdk/core/child_workflow/child_workflow.proto";
import "temporal/sdk/core/nexus/nexus.proto";
import "temporal/sdk/core/common/common.proto";

message WorkflowCommand {
Expand Down Expand Up @@ -375,6 +376,8 @@ message ScheduleNexusOperation {
// activities and child workflows, these are transmitted to Nexus operations that may be
// external and are not traditional payloads.
map<string, string> nexus_header = 7;
// Defines behaviour of the underlying nexus operation when operation cancellation has been requested.
nexus.NexusOperationCancellationType cancellation_type = 8;
}

// Request cancellation of a nexus operation started via `ScheduleNexusOperation`
Expand Down
7 changes: 7 additions & 0 deletions sdk/src/workflow_context/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, time::Duration};

use temporal_client::{Priority, WorkflowOptions};
use temporal_sdk_core_protos::coresdk::nexus::NexusOperationCancellationType;
use temporal_sdk_core_protos::{
coresdk::{
child_workflow::ChildWorkflowCancellationType,
Expand Down Expand Up @@ -397,6 +398,8 @@ pub struct NexusOperationOptions {
/// activities and child workflows, these are transmitted to Nexus operations that may be
/// external and are not traditional payloads.
pub nexus_header: HashMap<String, String>,
/// Cancellation type for the operation
pub cancellation_type: Option<NexusOperationCancellationType>,
}

impl IntoWorkflowCommand for NexusOperationOptions {
Expand All @@ -414,6 +417,10 @@ impl IntoWorkflowCommand for NexusOperationOptions {
.schedule_to_close_timeout
.and_then(|t| t.try_into().ok()),
nexus_header: self.nexus_header,
cancellation_type: self
.cancellation_type
.unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
.into(),
}
.into(),
),
Expand Down
Loading
Loading