Skip to content

Add build id to workflow info #1964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ boolean getVersion(
/**
* @return eventId of the last / currently active workflow task of this workflow
*/
long getCurrentWorkflowTaskStartedEventId();
long getLastWorkflowTaskStartedEventId();

/**
* @return size of Workflow history in bytes up until the current moment of execution. This value
Expand Down Expand Up @@ -405,4 +405,13 @@ boolean getVersion(
* @return true if this flag may currently be used.
*/
boolean tryUseSdkFlag(SdkFlag flag);

/**
* @return The Build ID of the worker which executed the current Workflow Task. May be empty the
* task was completed by a worker without a Build ID. If this worker is the one executing this
* task for the first time and has a Build ID set, then its ID will be used. This value may
* change over the lifetime of the workflow run, but is deterministic and safe to use for
* branching.
*/
Optional<String> getCurrentBuildId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,19 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
return workflowStateMachines.tryUseSdkFlag(flag);
}

@Override
public Optional<String> getCurrentBuildId() {
String curTaskBID = workflowStateMachines.getCurrentTaskBuildId();
// The current task started id == 0 check is to avoid setting the build id to this worker's ID
// in the event we're
// servicing a query, in which case we do want to use the ID from history.
if (!workflowStateMachines.isReplaying()
&& workflowStateMachines.getCurrentWFTStartedEventId() != 0) {
curTaskBID = workerOptions.getBuildId();
}
return Optional.ofNullable(curTaskBID);
}

@Override
public Functions.Proc1<RuntimeException> newTimer(
Duration delay, Functions.Proc1<RuntimeException> callback) {
Expand Down Expand Up @@ -356,8 +369,8 @@ public Map<String, Payload> getHeader() {
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
return workflowStateMachines.getCurrentStartedEventId();
public long getLastWorkflowTaskStartedEventId() {
return workflowStateMachines.getLastWFTStartedEventId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public WorkflowTaskResult handleWorkflowTask(
TimeUnit.NANOSECONDS);

if (workflowTask.getPreviousStartedEventId()
< workflowStateMachines.getCurrentStartedEventId()) {
< workflowStateMachines.getLastWFTStartedEventId()) {
// if previousStartedEventId < currentStartedEventId - the last workflow task handled by
// these state machines is ahead of the last handled workflow task known by the server.
// Something is off, the server lost progress.
Expand Down Expand Up @@ -219,7 +219,7 @@ public QueryResult handleDirectQueryWorkflowTask(

@Override
public void setCurrentStartedEvenId(Long eventId) {
workflowStateMachines.setCurrentStartedEventId(eventId);
workflowStateMachines.setLastWFTStartedEventId(eventId);
}

private void handleWorkflowTaskImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.temporal.workflow.Functions;
import java.nio.charset.StandardCharsets;
import java.util.*;
import javax.annotation.Nullable;

public final class WorkflowStateMachines {

Expand Down Expand Up @@ -79,7 +80,10 @@ enum HandleEventStatus {
private long workflowTaskStartedEventId;

/** EventId of the last WorkflowTaskStarted event handled by these state machines. */
private long currentStartedEventId;
private long lastWFTStartedEventId;

/** The Build ID used in the current WFT if already completed and set (may be null) */
private String currentTaskBuildId;

private long historySize;

Expand Down Expand Up @@ -201,27 +205,36 @@ public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
this.workflowTaskStartedEventId = workflowTaskStartedEventId;
}

public void setCurrentStartedEventId(long eventId) {
public void setLastWFTStartedEventId(long eventId) {
// We have to drop any state machines (which should only be one workflow task machine)
// created when handling the speculative workflow task
for (long i = this.lastHandledEventId; i > eventId; i--) {
stateMachines.remove(i);
}
this.currentStartedEventId = eventId;
this.lastWFTStartedEventId = eventId;
// When we reset the event ID on a speculative WFT we need to move this counter back
// to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
// always follows the WFT started.
this.lastHandledEventId = eventId + 1;
}

public long getCurrentStartedEventId() {
return currentStartedEventId;
public long getLastWFTStartedEventId() {
return lastWFTStartedEventId;
}

public long getCurrentWFTStartedEventId() {
return workflowTaskStartedEventId;
}

public long getHistorySize() {
return historySize;
}

@Nullable
public String getCurrentTaskBuildId() {
return currentTaskBuildId;
}

public boolean isContinueAsNewSuggested() {
return isContinueAsNewSuggested;
}
Expand Down Expand Up @@ -329,6 +342,10 @@ private void handleSingleEventLookahead(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
WorkflowTaskCompletedEventAttributes completedEvent =
event.getWorkflowTaskCompletedEventAttributes();
String maybeBuildId = completedEvent.getWorkerVersion().getBuildId();
if (!maybeBuildId.isEmpty()) {
currentTaskBuildId = maybeBuildId;
}
for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
SdkFlag sdkFlag = SdkFlag.getValue(flag);
if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
Expand Down Expand Up @@ -703,7 +720,7 @@ private long setCurrentTimeMillis(long currentTimeMillis) {
}

public long getLastStartedEventId() {
return currentStartedEventId;
return lastWFTStartedEventId;
}

/**
Expand Down Expand Up @@ -1164,7 +1181,7 @@ public void workflowTaskStarted(
value.nonReplayWorkflowTaskStarted();
}
}
WorkflowStateMachines.this.currentStartedEventId = startedEventId;
WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
WorkflowStateMachines.this.historySize = historySize;
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;

Expand Down Expand Up @@ -1282,6 +1299,6 @@ private String createEventHandlingMessage(HistoryEvent event) {
private String createShortCurrentStateMessagePostfix() {
return String.format(
"{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
this.workflowTaskStartedEventId, this.currentStartedEventId);
this.workflowTaskStartedEventId, this.lastWFTStartedEventId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public String getCronSchedule() {

@Override
public long getHistoryLength() {
return context.getCurrentWorkflowTaskStartedEventId();
return context.getLastWorkflowTaskStartedEventId();
}

@Override
Expand All @@ -150,6 +150,11 @@ public boolean isContinueAsNewSuggested() {
return context.isContinueAsNewSuggested();
}

@Override
public Optional<String> getCurrentBuildId() {
return context.getCurrentBuildId();
}

@Override
public String toString() {
return "WorkflowInfo{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,13 @@ public interface WorkflowInfo {
* value changes during the lifetime of a Workflow Execution.
*/
boolean isContinueAsNewSuggested();

/**
* @return The Build ID of the worker which executed the current Workflow Task. May be empty the
* task was completed by a worker without a Build ID. If this worker is the one executing this
* task for the first time and has a Build ID set, then its ID will be used. This value may
* change over the lifetime of the workflow run, but is deterministic and safe to use for
* branching.
*/
Optional<String> getCurrentBuildId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.BuildIdOperation;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.internal.Signal;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.WorkflowQueue;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -45,7 +46,6 @@ public class BuildIdVersioningTest {
SDKTestWorkflowRule.newBuilder()
.setWorkerOptions(
WorkerOptions.newBuilder().setBuildId("1.0").setUseBuildIdForVersioning(true).build())
.setWorkflowTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class)
.setActivityImplementations(new BuildIdVersioningTest.ActivityImpl())
.setDoNotStart(true)
.build();
Expand All @@ -57,6 +57,10 @@ public void testBuildIdVersioningDataSetProperly() {

String taskQueue = testWorkflowRule.getTaskQueue();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
testWorkflowRule
.getWorker()
.registerWorkflowImplementationTypes(
BuildIdVersioningTest.TestVersioningWorkflowImpl.class);

// Add 1.0 to the queue
workflowClient.updateWorkerBuildIdCompatability(
Expand Down Expand Up @@ -120,6 +124,75 @@ public void testBuildIdVersioningDataSetProperly() {
w2F.shutdown();
}

private static final Signal ACTIVITY_RAN = new Signal();

@Test
public void testCurrentBuildIDSetProperly() throws InterruptedException {
assumeTrue(
"Test Server doesn't support versioning yet", SDKTestWorkflowRule.useExternalService);

String taskQueue = testWorkflowRule.getTaskQueue();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
testWorkflowRule
.getWorker()
.registerWorkflowImplementationTypes(
BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class);

// Add 1.0 to the queue
workflowClient.updateWorkerBuildIdCompatability(
taskQueue, BuildIdOperation.newIdInNewDefaultSet("1.0"));

// Now start the worker (to avoid poll timeout while queue is unversioned)
testWorkflowRule.getTestEnvironment().start();

// Start a workflow
String workflowId = "build-id-versioning-1.0-" + UUID.randomUUID();
WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(taskQueue).toBuilder()
.setWorkflowId(workflowId)
.build();
TestWorkflows.QueryableWorkflow wf1 =
workflowClient.newWorkflowStub(TestWorkflows.QueryableWorkflow.class, options);
WorkflowClient.start(wf1::execute);

Assert.assertEquals("1.0", wf1.getState());

// Wait for activity to run
ACTIVITY_RAN.waitForSignal();
Assert.assertEquals("1.0", wf1.getState());

testWorkflowRule.getTestEnvironment().shutdown();
workflowClient
.getWorkflowServiceStubs()
.blockingStub()
.resetStickyTaskQueue(
io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest.newBuilder()
.setNamespace(testWorkflowRule.getTestEnvironment().getNamespace())
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId).build())
.build());

// Add 1.1 to the queue
workflowClient.updateWorkerBuildIdCompatability(
taskQueue, BuildIdOperation.newCompatibleVersion("1.1", "1.0"));

WorkerFactory w11F =
WorkerFactory.newInstance(workflowClient, testWorkflowRule.getWorkerFactoryOptions());
Worker w11 =
w11F.newWorker(
taskQueue,
WorkerOptions.newBuilder().setBuildId("1.1").setUseBuildIdForVersioning(true).build());
w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class);
w11.registerActivitiesImplementations(new BuildIdVersioningTest.ActivityImpl());
w11F.start();

Assert.assertEquals("1.0", wf1.getState());
wf1.mySignal("finish");

Assert.assertEquals("1.1", wf1.getState());

w11F.shutdown();
}

public static class TestVersioningWorkflowImpl implements TestWorkflows.QueryableWorkflow {
WorkflowQueue<String> sigQueue = Workflow.newWorkflowQueue(1);
private final TestActivities.TestActivity1 activity =
Expand Down Expand Up @@ -156,4 +229,43 @@ public String execute(String input) {
return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input;
}
}

public static class TestCurrentBuildIdWorkflow implements TestWorkflows.QueryableWorkflow {
private final TestActivities.TestActivity1 activity =
Workflow.newActivityStub(
TestActivities.TestActivity1.class,
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build());
private boolean doFinish = false;
private String lastBuildId;

@WorkflowMethod
public String execute() {
updateBuildId();
Workflow.sleep(1);
updateBuildId();
if (Workflow.getInfo().getCurrentBuildId().orElse("").equals("1.0")) {
activity.execute("foo");
updateBuildId();
ACTIVITY_RAN.signal();
}
Workflow.await(() -> doFinish);
updateBuildId();
return "Yay done";
}

private void updateBuildId() {
lastBuildId = Workflow.getInfo().getCurrentBuildId().orElse("");
}

@Override
public void mySignal(String arg) {
doFinish = true;
}

@Override
public String getState() {
// Workflow.getInfo isn't accessible in queries, so we do this
return lastBuildId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
return false;
}

@Override
public Optional<String> getCurrentBuildId() {
return Optional.empty();
}

@Override
public int getAttempt() {
return 1;
Expand Down Expand Up @@ -353,7 +358,7 @@ public Map<String, Payload> getHeader() {
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
public long getLastWorkflowTaskStartedEventId() {
return 0;
}

Expand Down