Skip to content

Commit 503cae7

Browse files
Verify history is replayed up to StartedEventId (#1916)
Verify history is replayed up to StartedEventId
1 parent 8b3be3b commit 503cae7

File tree

3 files changed

+84
-6
lines changed

3 files changed

+84
-6
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,20 +228,22 @@ private void handleWorkflowTaskImpl(
228228
workflowStateMachines.setWorklfowStartedEventId(workflowTask.getStartedEventId());
229229
workflowStateMachines.setReplaying(workflowTask.getPreviousStartedEventId() > 0);
230230
workflowStateMachines.setMessages(workflowTask.getMessagesList());
231-
applyServerHistory(historyIterator);
231+
applyServerHistory(workflowTask.getStartedEventId(), historyIterator);
232232
}
233233

234-
private void applyServerHistory(WorkflowHistoryIterator historyIterator) {
234+
private void applyServerHistory(long lastEventId, WorkflowHistoryIterator historyIterator) {
235235
Duration expiration = toJavaDuration(startedEvent.getWorkflowTaskTimeout());
236236
historyIterator.initDeadline(Deadline.after(expiration.toMillis(), TimeUnit.MILLISECONDS));
237237

238238
boolean timerStopped = false;
239239
Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_TASK_REPLAY_LATENCY).start();
240+
long currentEventId = 0;
240241
try {
241242
while (historyIterator.hasNext()) {
242243
// iteration itself is intentionally left outside the try-catch below,
243244
// as gRPC exception happened during history iteration should never ever fail the workflow
244245
HistoryEvent event = historyIterator.next();
246+
currentEventId = event.getEventId();
245247
boolean hasNext = historyIterator.hasNext();
246248
try {
247249
workflowStateMachines.handleEvent(event, hasNext);
@@ -264,13 +266,27 @@ private void applyServerHistory(WorkflowHistoryIterator historyIterator) {
264266
timerStopped = true;
265267
}
266268
}
269+
verifyAllEventsProcessed(lastEventId, currentEventId);
267270
} finally {
268271
if (!timerStopped) {
269272
sw.stop();
270273
}
271274
}
272275
}
273276

277+
// Verify the received and processed all events up to the last one we knew about from the polled
278+
// task.
279+
// It is possible for the server to send fewer events than required if we are reading history from
280+
// a stale node.
281+
private void verifyAllEventsProcessed(long lastEventId, long processedEventId) {
282+
if (lastEventId != Long.MAX_VALUE && lastEventId > 0 && processedEventId < lastEventId) {
283+
throw new IllegalStateException(
284+
String.format(
285+
"Premature end of stream, expectedLastEventID=%d but no more events after eventID=%d",
286+
lastEventId, processedEventId));
287+
}
288+
}
289+
274290
private Map<String, WorkflowQueryResult> executeQueries(Map<String, WorkflowQuery> queries) {
275291
Map<String, WorkflowQueryResult> queryResults = new HashMap<>();
276292
for (Map.Entry<String, WorkflowQuery> entry : queries.entrySet()) {

temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,27 @@
2929
import static org.mockito.Mockito.mock;
3030
import static org.mockito.Mockito.when;
3131

32+
import com.google.protobuf.ByteString;
3233
import com.google.protobuf.util.Durations;
3334
import com.uber.m3.tally.NoopScope;
35+
import io.temporal.api.enums.v1.EventType;
36+
import io.temporal.api.history.v1.History;
37+
import io.temporal.api.history.v1.HistoryEvent;
3438
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
39+
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
40+
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
3541
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
42+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
3643
import io.temporal.internal.common.InternalUtils;
3744
import io.temporal.internal.worker.SingleWorkerOptions;
3845
import io.temporal.internal.worker.WorkflowExecutorCache;
3946
import io.temporal.internal.worker.WorkflowRunLockManager;
4047
import io.temporal.internal.worker.WorkflowTaskHandler;
48+
import io.temporal.serviceclient.WorkflowServiceStubs;
4149
import io.temporal.testUtils.HistoryUtils;
4250
import io.temporal.testing.internal.SDKTestWorkflowRule;
4351
import java.time.Duration;
52+
import java.util.List;
4453
import java.util.Optional;
4554
import org.junit.Rule;
4655
import org.junit.Test;
@@ -77,6 +86,62 @@ public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() thro
7786
assertFalse(result.getTaskCompleted().hasStickyAttributes());
7887
}
7988

89+
@Test
90+
public void workflowTaskFailOnIncompleteHistory() throws Throwable {
91+
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);
92+
93+
WorkflowExecutorCache cache =
94+
new WorkflowExecutorCache(10, new WorkflowRunLockManager(), new NoopScope());
95+
WorkflowServiceStubs client = mock(WorkflowServiceStubs.class);
96+
when(client.getServerCapabilities())
97+
.thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build());
98+
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
99+
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
100+
when(client.blockingStub()).thenReturn(blockingStub);
101+
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
102+
103+
// Simulate a stale history node sending a workflow task with an incomplete history
104+
List<HistoryEvent> history =
105+
HistoryUtils.generateWorkflowTaskWithInitialHistory().getHistory().getEventsList();
106+
assertEquals(3, history.size());
107+
assertEquals(
108+
EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED, history.get(history.size() - 1).getEventType());
109+
history = history.subList(0, history.size() - 1);
110+
when(blockingStub.getWorkflowExecutionHistory(any()))
111+
.thenReturn(
112+
GetWorkflowExecutionHistoryResponse.newBuilder()
113+
.setHistory(History.newBuilder().addAllEvents(history).build())
114+
.build());
115+
116+
WorkflowTaskHandler taskHandler =
117+
new ReplayWorkflowTaskHandler(
118+
"namespace",
119+
setUpMockWorkflowFactory(),
120+
cache,
121+
SingleWorkerOptions.newBuilder().build(),
122+
null,
123+
Duration.ofSeconds(5),
124+
client,
125+
null);
126+
127+
// Send a poll with a partial history and no cached execution so the SDK will request a full
128+
// history
129+
WorkflowTaskHandler.Result result =
130+
taskHandler.handleWorkflowTask(
131+
HistoryUtils.generateWorkflowTaskWithInitialHistory().toBuilder()
132+
.setHistory(History.newBuilder().build())
133+
.setNextPageToken(ByteString.EMPTY)
134+
.build());
135+
136+
// Assert
137+
assertEquals(0, cache.size());
138+
assertNotNull(result.getTaskFailed());
139+
assertTrue(result.getTaskFailed().hasFailure());
140+
assertEquals(
141+
"Premature end of stream, expectedLastEventID=3 but no more events after eventID=2",
142+
result.getTaskFailed().getFailure().getMessage());
143+
}
144+
80145
@Test
81146
public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Throwable {
82147
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,18 +329,15 @@ public void sendQueryTask(
329329
List<HistoryEvent> events = new ArrayList<>(historyStore.getEventsLocked());
330330
History.Builder history = History.newBuilder();
331331
PeekingIterator<HistoryEvent> iterator = Iterators.peekingIterator(events.iterator());
332-
long startedEventId = 0;
333332
long previousStaredEventId = 0;
334333
while (iterator.hasNext()) {
335334
HistoryEvent event = iterator.next();
336335
if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED) {
337336
if (!iterator.hasNext()
338337
|| iterator.peek().getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
339338
previousStaredEventId = event.getEventId();
340-
startedEventId = previousStaredEventId;
341339
}
342340
} else if (WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(event)) {
343-
startedEventId = 0;
344341
if (iterator.hasNext()) {
345342
throw Status.INTERNAL
346343
.withDescription("Unexpected event after the completion event: " + iterator.peek())
@@ -351,7 +348,7 @@ public void sendQueryTask(
351348
task.setPreviousStartedEventId(previousStaredEventId);
352349
// it's not a real workflow task and the server sends 0 for startedEventId for such a workflow
353350
// task
354-
task.setStartedEventId(startedEventId);
351+
task.setStartedEventId(0);
355352
if (taskQueue.getTaskQueueName().equals(task.getWorkflowExecutionTaskQueue().getName())) {
356353
history.addAllEvents(events);
357354
} else {

0 commit comments

Comments
 (0)