Skip to content

[BLOCKED] Activity reset #1730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: activity_pause_unpause
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/client/src/async-completion-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ export class ActivityCancelledError extends Error {}
@SymbolBasedInstanceOfError('ActivityPausedError')
export class ActivityPausedError extends Error {}

/**
* Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity
* has been reset.
*/
@SymbolBasedInstanceOfError('ActivityResetError')
export class ActivityResetError extends Error {}

/**
* Options used to configure {@link AsyncCompletionClient}
*/
Expand Down Expand Up @@ -219,6 +226,7 @@ export class AsyncCompletionClient extends BaseClient {
const payloads = await encodeToPayloads(this.dataConverter, details);
let cancelRequested = false;
let paused = false;
let reset = false;
try {
if (taskTokenOrFullActivityId instanceof Uint8Array) {
const response = await this.workflowService.recordActivityTaskHeartbeat({
Expand All @@ -229,6 +237,7 @@ export class AsyncCompletionClient extends BaseClient {
});
cancelRequested = !!response.cancelRequested;
paused = !!response.activityPaused;
reset = !!response.activityReset;
} else {
const response = await this.workflowService.recordActivityTaskHeartbeatById({
identity: this.options.identity,
Expand All @@ -238,13 +247,17 @@ export class AsyncCompletionClient extends BaseClient {
});
cancelRequested = !!response.cancelRequested;
paused = !!response.activityPaused;
reset = !!response.activityReset;
}
} catch (err) {
this.handleError(err);
}
if (cancelRequested) {
throw new ActivityCancelledError('cancelled');
}
if (reset) {
throw new ActivityResetError('reset');
}
if (paused) {
throw new ActivityPausedError('paused');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export async function heartbeatCancellationDetailsActivity(
// eslint-disable-next-line no-constant-condition
while (true) {
try {
activity.heartbeat();
activity.heartbeat('heartbeated');
await activity.sleep(300);
} catch (err) {
if (err instanceof activity.CancelledFailure && catchErr) {
Expand Down
23 changes: 17 additions & 6 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ export async function assertPendingActivityExistsEventually(
return activityInfo as temporal.api.workflow.v1.IPendingActivityInfo;
}

export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise<void> {
export async function setActivityState(
handle: WorkflowHandle,
activityId: string,
state: 'pause' | 'unpause' | 'reset'
): Promise<void> {
const desc = await handle.describe();
const req = {
namespace: handle.client.options.namespace,
Expand All @@ -313,17 +317,24 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId:
},
id: activityId,
};
if (pause) {
if (state === 'pause') {
await handle.client.workflowService.pauseActivity(req);
} else {
} else if (state === 'unpause') {
await handle.client.workflowService.unpauseActivity(req);
} else {
const resetReq = { ...req, resetHeartbeat: true };
await handle.client.workflowService.resetActivity(resetReq);
}
await waitUntil(async () => {
const info = await assertPendingActivityExistsEventually(handle, activityId, 10000);
if (pause) {
return info.paused ?? false;
if (state === 'pause') {
return info.paused === true;
} else if (state === 'unpause') {
return info.paused === false;
} else {
// Heartbeat details reset
return info.heartbeatDetails === null;
}
return !info.paused;
}, 10000);
}

Expand Down
2 changes: 0 additions & 2 deletions packages/test/src/test-integration-split-three.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ test(
await worker.runUntil(handle.result());
let firstChild = true;
const history = await handle.fetchHistory();
console.log('events');
for (const event of history?.events ?? []) {
switch (event.eventType) {
case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
Expand Down Expand Up @@ -184,7 +183,6 @@ test('workflow start without priorities sees undefined for the key', configMacro
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
console.log('STARTING WORKFLOW');

const handle1 = await startWorkflow(workflows.priorityWorkflow, {
args: [true, undefined],
Expand Down
67 changes: 52 additions & 15 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
createLocalTestEnvironment,
helpers,
makeTestFunction,
setActivityPauseState,
setActivityState,
} from './helpers-integration';
import { overrideSdkInternalFlag } from './mock-internal-flags';
import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';
Expand Down Expand Up @@ -1424,7 +1424,7 @@ test('Workflow can return root workflow', async (t) => {
});
});

export async function heartbeatPauseWorkflow(
export async function heartbeatCancellationDetailsWorkflow(
activityId: string,
catchErr: boolean,
maximumAttempts: number
Expand Down Expand Up @@ -1464,14 +1464,14 @@ test('Activity pause returns expected cancellation details', async (t) => {

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
const handle = await startWorkflow(heartbeatCancellationDetailsWorkflow, { args: [testActivityId, true, 1] });

const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000);
t.true(activityInfo.paused === false);
await setActivityPauseState(handle, testActivityId, true);
await setActivityState(handle, testActivityId, 'pause');
const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000);
t.true(activityInfo2.paused === false);
await setActivityPauseState(handle, `${testActivityId}-2`, true);
await setActivityState(handle, `${testActivityId}-2`, 'pause');
const result = await handle.result();
t.deepEqual(result[0], {
cancelRequested: false,
Expand All @@ -1494,12 +1494,12 @@ test('Activity pause returns expected cancellation details', async (t) => {

test('Activity can pause and unpause', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string) {
async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string, expectedDetails: string) {
const activityInfo = await assertPendingActivityExistsEventually(handle, activityId, 5000);
if (activityInfo.heartbeatDetails?.payloads) {
for (const payload of activityInfo.heartbeatDetails?.payloads || []) {
if (payload.data && payload.data?.length > 0) {
return true;
if (payload.data != null) {
return workflow.defaultPayloadConverter.fromPayload(payload) === expectedDetails;
}
}
}
Expand All @@ -1515,24 +1515,61 @@ test('Activity can pause and unpause', async (t) => {

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] });
const handle = await startWorkflow(heartbeatCancellationDetailsWorkflow, { args: [testActivityId, false, 2] });
const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000);
t.true(activityInfo.paused === false);
await setActivityPauseState(handle, testActivityId, true);
await setActivityState(handle, testActivityId, 'pause');
await waitUntil(async () => {
return await checkHeartbeatDetailsExist(handle, testActivityId);
return await checkHeartbeatDetailsExist(handle, testActivityId, 'finally-complete');
}, 5000);
await setActivityPauseState(handle, testActivityId, false);
await setActivityState(handle, testActivityId, 'unpause');
const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000);
t.true(activityInfo2.paused === false);
await setActivityPauseState(handle, `${testActivityId}-2`, true);
await setActivityState(handle, `${testActivityId}-2`, 'pause');
await waitUntil(async () => {
return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`);
return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`, 'finally-complete');
}, 5000);
await setActivityPauseState(handle, `${testActivityId}-2`, false);
await setActivityState(handle, `${testActivityId}-2`, 'unpause');
const result = await handle.result();
// Undefined values are converted to null by data converter.
t.true(result[0] === null);
t.true(result[1] === null);
});
});

test('Activity reset returns expected cancellation details', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
heartbeatCancellationDetailsActivity,
heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity,
},
});

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatCancellationDetailsWorkflow, { args: [testActivityId, true, 2] });

await assertPendingActivityExistsEventually(handle, testActivityId, 5000);
await setActivityState(handle, testActivityId, 'reset');
await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 10000);
await setActivityState(handle, `${testActivityId}-2`, 'reset');
const result = await handle.result();
t.deepEqual(result[0], {
cancelRequested: false,
notFound: false,
paused: false,
timedOut: false,
workerShutdown: false,
reset: true,
});
t.deepEqual(result[1], {
cancelRequested: false,
notFound: false,
paused: false,
timedOut: false,
workerShutdown: false,
reset: true,
});
});
});
19 changes: 12 additions & 7 deletions packages/worker/src/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ export class Activity {
(error instanceof CancelledFailure || isAbortError(error)) &&
this.context.cancellationSignal.aborted
) {
if (this.context.cancellationDetails.details?.paused) {
if (this.context.cancellationDetails.details?.reset) {
this.workerLogger.debug('Activity reset', { durationMs });
} else if (this.context.cancellationDetails.details?.paused) {
this.workerLogger.debug('Activity paused', { durationMs });
} else {
this.workerLogger.debug('Activity completed as cancelled', { durationMs });
Expand Down Expand Up @@ -180,14 +182,17 @@ export class Activity {
} else if (this.cancelReason) {
// Either a CancelledFailure that we threw or AbortError from AbortController
if (err instanceof CancelledFailure) {
// If cancel due to activity pause, emit an application failure for the pause.
if (this.context.cancellationDetails.details?.paused) {
// If cancel due to activity pause or reset, emit an application failure.
if (this.context.cancellationDetails.details?.paused || this.context.cancellationDetails.details?.reset) {
let message = 'Activity reset';
let errType = 'ActivityReset';
if (!this.context.cancellationDetails.details?.reset) {
message = 'Activity paused';
errType = 'ActivityPause';
}
return {
failed: {
failure: await encodeErrorToFailure(
this.dataConverter,
new ApplicationFailure('Activity paused', 'ActivityPause')
),
failure: await encodeErrorToFailure(this.dataConverter, new ApplicationFailure(message, errType)),
},
};
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ export class Worker {
details,
onError() {
// activity must be defined
// empty cancellation details, not corresponding detail for heartbeat detail conversion failure
// empty cancellation details, no corresponding detail for heartbeat detail conversion failure
activity?.cancel(
'HEARTBEAT_DETAILS_CONVERSION_FAILED',
ActivityCancellationDetails.fromProto(undefined)
Expand Down
Loading