Skip to content

Commit a9a9339

Browse files
committed
add tests
1 parent 9f7f0da commit a9a9339

File tree

4 files changed

+193
-4
lines changed

4 files changed

+193
-4
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { ActivityCancellationDetails } from '@temporalio/common';
2+
import * as activity from '@temporalio/activity';
3+
4+
export async function heartbeatCancellationDetailsActivity(
5+
catchErr: boolean
6+
): Promise<ActivityCancellationDetails | undefined> {
7+
// Exit early if we've already run this activity.
8+
if (activity.activityInfo().heartbeatDetails === 'finally-complete') {
9+
return activity.cancellationDetails();
10+
}
11+
// eslint-disable-next-line no-constant-condition
12+
while (true) {
13+
try {
14+
activity.heartbeat();
15+
await activity.sleep(300);
16+
} catch (err) {
17+
if (err instanceof activity.CancelledFailure && catchErr) {
18+
return activity.cancellationDetails();
19+
}
20+
activity.heartbeat('finally-complete');
21+
throw err;
22+
}
23+
}
24+
}

packages/test/src/helpers-integration.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ import * as workflow from '@temporalio/workflow';
3030
import { temporal } from '@temporalio/proto';
3131
import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes';
3232
import { ConnectionInjectorInterceptor } from './activities/interceptors';
33-
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers';
33+
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers';
3434

3535
export interface Context {
3636
env: TestWorkflowEnvironment;
3737
workflowBundle: WorkflowBundle;
3838
}
3939

4040
const defaultDynamicConfigOptions = [
41+
'frontend.activityAPIsEnabled=true',
4142
'frontend.enableExecuteMultiOperation=true',
4243
'frontend.workerVersioningDataAPIs=true',
4344
'frontend.workerVersioningWorkflowAPIs=true',
@@ -284,6 +285,48 @@ export function configurableHelpers<T>(
284285
};
285286
}
286287

288+
export async function assertPendingActivityExistsEventually(
289+
handle: WorkflowHandle<workflow.Workflow>,
290+
activityId: string,
291+
timeoutMs: number
292+
): Promise<temporal.api.workflow.v1.IPendingActivityInfo> {
293+
let activityInfo: temporal.api.workflow.v1.IPendingActivityInfo | undefined;
294+
try {
295+
await waitUntil(async () => {
296+
const desc = await handle.describe();
297+
activityInfo = desc.raw.pendingActivities?.find((pa) => pa.activityId === activityId);
298+
return activityInfo !== undefined;
299+
}, timeoutMs);
300+
} catch {
301+
throw new Error(`Unable to find pending activity for activity ${activityId}`);
302+
}
303+
return activityInfo as temporal.api.workflow.v1.IPendingActivityInfo;
304+
}
305+
306+
export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise<void> {
307+
const desc = await handle.describe();
308+
const req = {
309+
namespace: handle.client.options.namespace,
310+
execution: {
311+
workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId,
312+
runId: desc.raw.workflowExecutionInfo?.execution?.runId,
313+
},
314+
id: activityId,
315+
};
316+
if (pause) {
317+
await handle.client.workflowService.pauseActivity(req);
318+
} else {
319+
await handle.client.workflowService.unpauseActivity(req);
320+
}
321+
await waitUntil(async () => {
322+
const info = await assertPendingActivityExistsEventually(handle, activityId, 10000);
323+
if (pause) {
324+
return info.paused ?? false;
325+
}
326+
return !info.paused;
327+
}, 10000);
328+
}
329+
287330
export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
288331
return configurableHelpers(t, t.context.workflowBundle, testEnv);
289332
}

packages/test/src/test-integration-workflows.ts

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { setTimeout as setTimeoutPromise } from 'timers/promises';
22
import { randomUUID } from 'crypto';
33
import { ExecutionContext } from 'ava';
44
import { firstValueFrom, Subject } from 'rxjs';
5-
import { WorkflowFailedError } from '@temporalio/client';
5+
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
66
import * as activity from '@temporalio/activity';
77
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
88
import { TestWorkflowEnvironment } from '@temporalio/testing';
@@ -11,6 +11,7 @@ import * as workflow from '@temporalio/workflow';
1111
import { defineQuery, defineSignal } from '@temporalio/workflow';
1212
import { SdkFlags } from '@temporalio/workflow/lib/flags';
1313
import {
14+
ActivityCancellationDetails,
1415
ActivityCancellationType,
1516
ApplicationFailure,
1617
defineSearchAttributeKey,
@@ -22,9 +23,17 @@ import {
2223
import { signalSchedulingWorkflow } from './activities/helpers';
2324
import { activityStartedSignal } from './workflows/definitions';
2425
import * as workflows from './workflows';
25-
import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration';
26+
import {
27+
assertPendingActivityExistsEventually,
28+
Context,
29+
createLocalTestEnvironment,
30+
helpers,
31+
makeTestFunction,
32+
setActivityPauseState,
33+
} from './helpers-integration';
2634
import { overrideSdkInternalFlag } from './mock-internal-flags';
2735
import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';
36+
import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
2837

2938
const test = makeTestFunction({
3039
workflowsPath: __filename,
@@ -1414,3 +1423,116 @@ test('Workflow can return root workflow', async (t) => {
14141423
t.deepEqual(result, 'empty test-root-workflow-length');
14151424
});
14161425
});
1426+
1427+
export async function heartbeatPauseWorkflow(
1428+
activityId: string,
1429+
catchErr: boolean,
1430+
maximumAttempts: number
1431+
): Promise<Array<ActivityCancellationDetails | undefined>> {
1432+
const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({
1433+
startToCloseTimeout: '5s',
1434+
activityId,
1435+
retry: {
1436+
maximumAttempts,
1437+
},
1438+
heartbeatTimeout: '1s',
1439+
});
1440+
const { heartbeatCancellationDetailsActivity2 } = workflow.proxyActivities({
1441+
startToCloseTimeout: '5s',
1442+
activityId: `${activityId}-2`,
1443+
retry: {
1444+
maximumAttempts,
1445+
},
1446+
heartbeatTimeout: '1s',
1447+
});
1448+
const results = [];
1449+
results.push(
1450+
await heartbeatCancellationDetailsActivity(catchErr),
1451+
await heartbeatCancellationDetailsActivity2(catchErr)
1452+
);
1453+
return results;
1454+
}
1455+
1456+
test('Activity pause returns expected cancellation details', async (t) => {
1457+
const { createWorker, startWorkflow } = helpers(t);
1458+
const worker = await createWorker({
1459+
activities: {
1460+
heartbeatCancellationDetailsActivity,
1461+
heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity,
1462+
},
1463+
});
1464+
1465+
await worker.runUntil(async () => {
1466+
const testActivityId = randomUUID();
1467+
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1468+
1469+
const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000);
1470+
t.true(activityInfo.paused === false);
1471+
await setActivityPauseState(handle, testActivityId, true);
1472+
const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000);
1473+
t.true(activityInfo2.paused === false);
1474+
await setActivityPauseState(handle, `${testActivityId}-2`, true);
1475+
const result = await handle.result();
1476+
t.deepEqual(result[0], {
1477+
cancelRequested: false,
1478+
notFound: false,
1479+
paused: true,
1480+
timedOut: false,
1481+
workerShutdown: false,
1482+
reset: false,
1483+
});
1484+
t.deepEqual(result[1], {
1485+
cancelRequested: false,
1486+
notFound: false,
1487+
paused: true,
1488+
timedOut: false,
1489+
workerShutdown: false,
1490+
reset: false,
1491+
});
1492+
});
1493+
});
1494+
1495+
test('Activity can pause and unpause', async (t) => {
1496+
const { createWorker, startWorkflow } = helpers(t);
1497+
async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string) {
1498+
const activityInfo = await assertPendingActivityExistsEventually(handle, activityId, 5000);
1499+
if (activityInfo.heartbeatDetails?.payloads) {
1500+
for (const payload of activityInfo.heartbeatDetails?.payloads || []) {
1501+
if (payload.data && payload.data?.length > 0) {
1502+
return true;
1503+
}
1504+
}
1505+
}
1506+
return false;
1507+
}
1508+
1509+
const worker = await createWorker({
1510+
activities: {
1511+
heartbeatCancellationDetailsActivity,
1512+
heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity,
1513+
},
1514+
});
1515+
1516+
await worker.runUntil(async () => {
1517+
const testActivityId = randomUUID();
1518+
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] });
1519+
const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000);
1520+
t.true(activityInfo.paused === false);
1521+
await setActivityPauseState(handle, testActivityId, true);
1522+
await waitUntil(async () => {
1523+
return await checkHeartbeatDetailsExist(handle, testActivityId);
1524+
}, 5000);
1525+
await setActivityPauseState(handle, testActivityId, false);
1526+
const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000);
1527+
t.true(activityInfo2.paused === false);
1528+
await setActivityPauseState(handle, `${testActivityId}-2`, true);
1529+
await waitUntil(async () => {
1530+
return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`);
1531+
}, 5000);
1532+
await setActivityPauseState(handle, `${testActivityId}-2`, false);
1533+
const result = await handle.result();
1534+
// Undefined values are converted to null by data converter.
1535+
t.true(result[0] === null);
1536+
t.true(result[1] === null);
1537+
});
1538+
});

packages/testing/src/mocking-activity-environment.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export interface MockActivityEnvironmentOptions {
3030
* will immediately be in a cancelled state.
3131
*/
3232
export class MockActivityEnvironment extends events.EventEmitter {
33-
public cancel: (reason?: any) => void = () => undefined;
33+
public cancel: (reason?: any, details?: any) => void = () => undefined;
3434
public readonly context: activity.Context;
3535
private readonly activity: Activity;
3636

0 commit comments

Comments
 (0)