Skip to content

Add support for update admitted event #2041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

package io.temporal.internal.statemachines;

import com.google.common.base.Preconditions;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.internal.common.WorkflowExecutionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* This class buffers events between WorkflowTaskStarted events and return them in one chunk so any
Expand All @@ -42,9 +42,29 @@ private enum WFTState {
Closed,
}

public static class EventBatch {
private final List<HistoryEvent> events;
private final Optional<HistoryEvent> workflowTaskCompletedEvent;

public EventBatch(
Optional<HistoryEvent> workflowTaskCompletedEvent, List<HistoryEvent> events) {
this.workflowTaskCompletedEvent = workflowTaskCompletedEvent;
this.events = events;
}

public List<HistoryEvent> getEvents() {
return events;
}

public Optional<HistoryEvent> getWorkflowTaskCompletedEvent() {
return workflowTaskCompletedEvent;
}
}

private WFTState wftSequenceState = WFTState.None;

private final List<HistoryEvent> wftBuffer = new ArrayList<>();
private Optional<HistoryEvent> workflowTaskCompletedEvent = Optional.empty();
private final List<HistoryEvent> readyToFetch = new ArrayList<>();

/**
Expand Down Expand Up @@ -73,17 +93,20 @@ private void handleEvent(HistoryEvent event, boolean hasNextEvent) {
// This is the only way to enter into the WFT sequence -
// any WorkflowTaskStarted event that it not the last in the history
if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(event.getEventType())) {
// if there is something in wftBuffer, let's flush it
flushBuffer();
// and init a new sequence
// Init a new sequence
wftSequenceState = WFTState.Started;
addToBuffer(event);
return;
}

if (WFTState.Started.equals(wftSequenceState)
&& WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event)) {
wftSequenceState = WFTState.Closed;
if (event.getEventType().equals(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)) {
workflowTaskCompletedEvent = Optional.of(event);
wftSequenceState = WFTState.Closed;
} else {
wftSequenceState = WFTState.None;
}
addToBuffer(event);
return;
}
Expand All @@ -94,14 +117,19 @@ private void handleEvent(HistoryEvent event, boolean hasNextEvent) {
// If event is WFT_STARTED or any of the Closing events, it's handled by if statements
// earlier, so it's safe to switch to None here, we are not inside WFT sequence
wftSequenceState = WFTState.None;
// no open WFT sequence, can't add to buffer, it's ok to add directly to readyToFetch, this
// event can't be EVENT_TYPE_WORKFLOW_TASK_STARTED because we checked it above.
readyToFetch.add(event);

addToBuffer(event);
return;
}
if (WFTState.Closed.equals(wftSequenceState) && WorkflowExecutionUtils.isCommandEvent(event)) {
// we are inside a closed WFT sequence, we can add to buffer
addToBuffer(event);
return;
}

if (WFTState.None.equals(wftSequenceState)) {
// we should be returning the events one by one, we are not inside a WFT sequence
if (WorkflowExecutionUtils.isCommandEvent(event)
|| WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event)) {
flushBuffer();
readyToFetch.add(event);
} else {
addToBuffer(event);
Expand All @@ -114,21 +142,22 @@ private void flushBuffer() {
}

private void addToBuffer(HistoryEvent event) {
Preconditions.checkState(
!WFTState.None.equals(wftSequenceState),
"We should be inside an open WFT sequence to add to the buffer");
wftBuffer.add(event);
}

public List<HistoryEvent> fetch() {
public EventBatch fetch() {
if (readyToFetch.size() == 1) {
HistoryEvent event = readyToFetch.get(0);
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
workflowTaskCompletedEvent = Optional.empty();
readyToFetch.clear();
return Collections.singletonList(event);
return new EventBatch(wftStarted, Collections.singletonList(event));
} else {
List<HistoryEvent> result = new ArrayList<>(readyToFetch);
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
workflowTaskCompletedEvent = Optional.empty();
readyToFetch.clear();
return result;
return new EventBatch(wftStarted, result);
Comment on lines 149 to +160
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GH won't let me suggest here because it's a whiny baby about deleted lines, but looks like this could be:

    List<HistoryEvent> res;
    if (readyToFetch.size() == 1) {
      HistoryEvent event = readyToFetch.get(0);
      res = Collections.singletonList(event));
    } else {
      res = new ArrayList<>(readyToFetch);
    }
    Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
    workflowTaskCompletedEvent = Optional.empty();
    readyToFetch.clear();
    return new EventBatch(wftStarted, res);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ enum HandleEventStatus {

private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();

/** Key is the protocol instance id */
private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();

private final Queue<Message> messageOutbox = new ArrayDeque<>();
Expand All @@ -126,9 +127,19 @@ enum HandleEventStatus {
private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();

/**
* Is workflow executing new code or replaying from the history. Note that this flag ALWAYS flips
* to true for the time when we apply events from the server even if the commands were created by
* an actual execution with replaying=false.
* Is workflow executing new code or replaying from the history. The definition of replaying here
* is that we are no longer replaying as soon as we see new events that have never been seen or
* produced by the SDK.
*
* <p>Specifically, replay ends once we have seen any non-command event (IE: events that aren't a
* result of something we produced in the SDK) on a WFT which has the final event in history
* (meaning we are processing the most recent WFT and there are no more subsequent WFTs). WFT
* Completed in this case does not count as a non-command event, because that will typically show
* up as the first event in an incremental history, and we want to ignore it and its associated
* commands since we "produced" them.
*
* <p>Note: that this flag ALWAYS flips to true for the time when we apply events from the server
* even if the commands were created by an actual execution with replaying=false.
*/
private boolean replaying;

Expand Down Expand Up @@ -160,6 +171,12 @@ enum HandleEventStatus {

private List<Message> messages = new ArrayList<>();

/**
* Set of accepted durably admitted updates by update id a "durably admitted" update is one with
* an UPDATE_ADMITTED event.
*/
private final Set<String> acceptedUpdates = new HashSet<>();

private final SdkFlags flags;

public WorkflowStateMachines(
Expand Down Expand Up @@ -277,18 +294,19 @@ public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
* Handle an events batch for one workflow task. Events that are related to one workflow task
* during replay should be prefetched and supplied in one batch.
*
* @param events events belong to one workflow task
* @param hasNextEvent true if there are more events in the history follow this batch, false if
* @param eventBatch events belong to one workflow task
* @param hasNextBatch true if there are more events in the history follow this batch, false if
* this batch contains the last events of the history
*/
private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent) {
private void handleEventsBatch(WFTBuffer.EventBatch eventBatch, boolean hasNextBatch) {
List<HistoryEvent> events = eventBatch.getEvents();
if (EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.equals(events.get(0).getEventType())) {
for (SdkFlag flag : initialFlags) {
flags.tryUseSdkFlag(flag);
}
}

if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
if (eventBatch.getWorkflowTaskCompletedEvent().isPresent()) {
for (HistoryEvent event : events) {
handleSingleEventLookahead(event);
}
Expand All @@ -304,7 +322,10 @@ private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent)
}

try {
handleSingleEvent(event, iterator.hasNext() || hasNextEvent);
boolean isLastTask =
!hasNextBatch && !eventBatch.getWorkflowTaskCompletedEvent().isPresent();
boolean hasNextEvent = iterator.hasNext() || hasNextBatch;
handleSingleEvent(event, isLastTask, hasNextEvent);
} catch (RuntimeException e) {
throw createEventProcessingException(e, event);
}
Expand All @@ -330,13 +351,20 @@ private void handleSingleEventLookahead(HistoryEvent event) {
// Look ahead to infer protocol messages
WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
event.getWorkflowExecutionUpdateAcceptedEventAttributes();
this.messages.add(
Message.newBuilder()
.setId(updateEvent.getAcceptedRequestMessageId())
.setProtocolInstanceId(updateEvent.getProtocolInstanceId())
.setEventId(updateEvent.getAcceptedRequestSequencingEventId())
.setBody(Any.pack(updateEvent.getAcceptedRequest()))
.build());
// If an EXECUTION_UPDATE_ACCEPTED event does not have an accepted request, then it
// must be from an admitted update. This is the only way to infer an admitted update was
// accepted.
if (!updateEvent.hasAcceptedRequest()) {
acceptedUpdates.add(updateEvent.getProtocolInstanceId());
} else {
messages.add(
Message.newBuilder()
.setId(updateEvent.getAcceptedRequestMessageId())
.setProtocolInstanceId(updateEvent.getProtocolInstanceId())
.setEventId(updateEvent.getAcceptedRequestSequencingEventId())
.setBody(Any.pack(updateEvent.getAcceptedRequest()))
.build());
}
break;
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
WorkflowTaskCompletedEventAttributes completedEvent =
Expand All @@ -352,6 +380,9 @@ private void handleSingleEventLookahead(HistoryEvent event) {
}
flags.setSdkFlag(sdkFlag);
}
// Remove any finished update protocol state machines. We can't remove them on an event like
// other state machines because a rejected update produces no event in history.
protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
break;
}
}
Expand Down Expand Up @@ -416,16 +447,17 @@ private void handleSingleMessage(Message message) {
stateMachine.handleMessage(message);
}

private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
private void handleSingleEvent(HistoryEvent event, boolean lastTask, boolean hasNextEvent) {
if (isCommandEvent(event)) {
handleCommandEvent(event);
return;
}

// We don't explicitly check if the event is a command event here because it's already handled
// above.
if (replaying
&& !hasNextEvent
&& (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED
|| WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event))) {
&& lastTask
&& event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
replaying = false;
}

Expand Down Expand Up @@ -705,6 +737,20 @@ private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
callbacks.cancel(event);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
WorkflowExecutionUpdateAdmittedEventAttributes admittedEvent =
event.getWorkflowExecutionUpdateAdmittedEventAttributes();
Message msg =
Message.newBuilder()
.setId(admittedEvent.getRequest().getMeta().getUpdateId() + "/request")
.setProtocolInstanceId(admittedEvent.getRequest().getMeta().getUpdateId())
.setEventId(event.getEventId())
.setBody(Any.pack(admittedEvent.getRequest()))
.build();
if (replaying && acceptedUpdates.remove(msg.getProtocolInstanceId()) || !replaying) {
messages.add(msg);
}
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case UNRECOGNIZED:
break;
Expand Down Expand Up @@ -1028,9 +1074,6 @@ public Functions.Proc scheduleLocalActivityTask(

/** Validates that command matches the event during replay. */
private void validateCommand(Command command, HistoryEvent event) {
// TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
// matching.

// ProtocolMessageCommand is different from other commands because it can be associated with
// multiple types of events
// TODO(#1781) Validate protocol message is expected type.
Expand Down Expand Up @@ -1291,6 +1334,7 @@ private OptionalLong getInitialCommandEventId(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
return OptionalLong.of(event.getEventId());

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ private HistoryEvent newAttributes(EventType type, Object attributes) {
result.setChildWorkflowExecutionTerminatedEventAttributes(
(ChildWorkflowExecutionTerminatedEventAttributes) attributes);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
result.setWorkflowExecutionUpdateAdmittedEventAttributes(
(WorkflowExecutionUpdateAdmittedEventAttributes) attributes);
break;
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
result.setWorkflowExecutionUpdateAcceptedEventAttributes(
(WorkflowExecutionUpdateAcceptedEventAttributes) attributes);
Expand Down
Loading