Skip to content

Commit cd1c6b1

Browse files
THardy98mjameswh
andauthored
Add default update handler (#1640)
Co-authored-by: James Watkins-Harvey <[email protected]>
1 parent 1eaf822 commit cd1c6b1

File tree

6 files changed

+315
-41
lines changed

6 files changed

+315
-41
lines changed

packages/test/src/test-workflows.ts

Lines changed: 173 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,40 @@ function makeSetPatchMarker(myPatchId: string, deprecated: boolean): coresdk.wor
365365
};
366366
}
367367

368+
function makeUpdateActivationJob(
369+
id: string,
370+
protocolInstanceId: string,
371+
name: string,
372+
input: unknown[]
373+
): coresdk.workflow_activation.IWorkflowActivationJob {
374+
return {
375+
doUpdate: {
376+
id,
377+
protocolInstanceId,
378+
name,
379+
input: toPayloads(defaultPayloadConverter, ...input),
380+
},
381+
};
382+
}
383+
384+
function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand {
385+
return {
386+
updateResponse: {
387+
protocolInstanceId: id,
388+
accepted: {},
389+
},
390+
};
391+
}
392+
393+
function makeUpdateCompleteResponse(id: string, result: unknown): coresdk.workflow_commands.IWorkflowCommand {
394+
return {
395+
updateResponse: {
396+
protocolInstanceId: id,
397+
completed: defaultPayloadConverter.toPayload(result),
398+
},
399+
};
400+
}
401+
368402
test('random', async (t) => {
369403
const { logs, workflowType } = t.context;
370404
{
@@ -2404,23 +2438,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - pre-
24042438
...makeActivation(
24052439
undefined,
24062440
makeSignalWorkflowJob('aaSignal', ['signal1']),
2407-
{
2408-
doUpdate: {
2409-
id: 'first',
2410-
name: 'aaUpdate',
2411-
protocolInstanceId: '1',
2412-
input: toPayloads(defaultPayloadConverter, ['update1']),
2413-
},
2414-
},
2441+
makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']),
24152442
makeSignalWorkflowJob('aaSignal', ['signal2']),
2416-
{
2417-
doUpdate: {
2418-
id: 'second',
2419-
name: 'aaUpdate',
2420-
protocolInstanceId: '2',
2421-
input: toPayloads(defaultPayloadConverter, ['update2']),
2422-
},
2423-
},
2443+
makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']),
24242444
makeFireTimerJob(1),
24252445
makeResolveActivityJob(1, { completed: {} })
24262446
),
@@ -2483,23 +2503,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11
24832503
...makeActivation(
24842504
undefined,
24852505
makeSignalWorkflowJob('aaSignal', ['signal1']),
2486-
{
2487-
doUpdate: {
2488-
id: 'first',
2489-
name: 'aaUpdate',
2490-
protocolInstanceId: '1',
2491-
input: toPayloads(defaultPayloadConverter, ['update1']),
2492-
},
2493-
},
2506+
makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']),
24942507
makeSignalWorkflowJob('aaSignal', ['signal2']),
2495-
{
2496-
doUpdate: {
2497-
id: 'second',
2498-
name: 'aaUpdate',
2499-
protocolInstanceId: '2',
2500-
input: toPayloads(defaultPayloadConverter, ['update2']),
2501-
},
2502-
},
2508+
makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']),
25032509
makeFireTimerJob(1),
25042510
makeResolveActivityJob(1, { completed: {} })
25052511
),
@@ -2530,3 +2536,138 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11
25302536
);
25312537
}
25322538
});
2539+
2540+
test('Buffered updates are dispatched in the correct order - updatesOrdering', async (t) => {
2541+
const { workflowType } = t.context;
2542+
{
2543+
const completion = await activate(
2544+
t,
2545+
makeActivation(
2546+
undefined,
2547+
makeInitializeWorkflowJob(workflowType),
2548+
makeUpdateActivationJob('1', '1', 'non-existant', [1]),
2549+
makeUpdateActivationJob('2', '2', 'updateA', [2]),
2550+
makeUpdateActivationJob('3', '3', 'updateA', [3]),
2551+
makeUpdateActivationJob('4', '4', 'updateC', [4]),
2552+
makeUpdateActivationJob('5', '5', 'updateB', [5]),
2553+
makeUpdateActivationJob('6', '6', 'non-existant', [6]),
2554+
makeUpdateActivationJob('7', '7', 'updateB', [7])
2555+
)
2556+
);
2557+
2558+
// The activation above:
2559+
// - initializes the workflow
2560+
// - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered)
2561+
// - enters the workflow code
2562+
// - workflow code sets handler for updateA
2563+
// - handler is registered for updateA
2564+
// - we attempt to dispatch buffered updates
2565+
// - buffered updates for handler A are *accepted* but not executed
2566+
// (executing an update is a promise/async, so it instead goes on the node event queue)
2567+
// - we continue/re-enter the workflow code
2568+
// - ...and do the same pattern for updateB, the default update handler, the updateC
2569+
// - once updates have been accepted, node processes the waiting events in its queue (the waiting updates)
2570+
// - these are processesed in FIFO order, so we get execution for updateA, then updateB, the default handler, then updateC
2571+
2572+
// As such, the expected order of these updates is the order that the handlers were registered.
2573+
// Note that because the default handler was registered *before* updateC, all remaining buffered updates were dispatched
2574+
// to it, including the update for updateC.
2575+
2576+
compareCompletion(
2577+
t,
2578+
completion,
2579+
makeSuccess(
2580+
[
2581+
// FIFO accepted order
2582+
makeUpdateAcceptedResponse('2'),
2583+
makeUpdateAcceptedResponse('3'),
2584+
makeUpdateAcceptedResponse('5'),
2585+
makeUpdateAcceptedResponse('7'),
2586+
makeUpdateAcceptedResponse('1'),
2587+
makeUpdateAcceptedResponse('4'),
2588+
makeUpdateAcceptedResponse('6'),
2589+
// FIFO executed order
2590+
makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }),
2591+
makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }),
2592+
makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }),
2593+
makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }),
2594+
makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }),
2595+
// updateC handled by default handler.
2596+
makeUpdateCompleteResponse('4', { handler: 'default', updateName: 'updateC', args: [4] }),
2597+
makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }),
2598+
// No expected update response from updateC handler
2599+
makeCompleteWorkflowExecution(),
2600+
]
2601+
// [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch]
2602+
)
2603+
);
2604+
}
2605+
});
2606+
2607+
test('Buffered updates are reentrant - updatesAreReentrant', async (t) => {
2608+
const { workflowType } = t.context;
2609+
{
2610+
const completion = await activate(
2611+
t,
2612+
makeActivation(
2613+
undefined,
2614+
makeInitializeWorkflowJob(workflowType),
2615+
makeUpdateActivationJob('1', '1', 'non-existant', [1]),
2616+
makeUpdateActivationJob('2', '2', 'updateA', [2]),
2617+
makeUpdateActivationJob('3', '3', 'updateA', [3]),
2618+
makeUpdateActivationJob('4', '4', 'updateC', [4]),
2619+
makeUpdateActivationJob('5', '5', 'updateB', [5]),
2620+
makeUpdateActivationJob('6', '6', 'non-existant', [6]),
2621+
makeUpdateActivationJob('7', '7', 'updateB', [7]),
2622+
makeUpdateActivationJob('8', '8', 'updateC', [8])
2623+
)
2624+
);
2625+
2626+
// The activation above:
2627+
// - initializes the workflow
2628+
// - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered)
2629+
// - enters the workflow code
2630+
// - workflow code sets handler for updateA
2631+
// - handler is registered for updateA
2632+
// - we attempt to dispatch buffered updates
2633+
// - buffered updates for handler A are *accepted* but not executed
2634+
// (executing an update is a promise/async, so it instead goes on the node event queue)
2635+
// - however, there is no more workflow code, node dequues event queue and we immediately run the update handler
2636+
// (we begin executing the update which...)
2637+
// - deletes the current handler and registers the next one (updateB)
2638+
// - this pattern repeats (updateA -> updateB -> updateC -> default) until there are no more updates to handle
2639+
// - at this point, all updates have been accepted and are executing
2640+
// - due to the call order in the workflow, the completion order of the updates follows the call stack, LIFO
2641+
2642+
// This workflow is interesting in that updates are accepted FIFO, but executed LIFO
2643+
2644+
compareCompletion(
2645+
t,
2646+
completion,
2647+
makeSuccess(
2648+
[
2649+
// FIFO accepted order
2650+
makeUpdateAcceptedResponse('2'),
2651+
makeUpdateAcceptedResponse('5'),
2652+
makeUpdateAcceptedResponse('4'),
2653+
makeUpdateAcceptedResponse('1'),
2654+
makeUpdateAcceptedResponse('3'),
2655+
makeUpdateAcceptedResponse('7'),
2656+
makeUpdateAcceptedResponse('8'),
2657+
makeUpdateAcceptedResponse('6'),
2658+
// LIFO executed order
2659+
makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }),
2660+
makeUpdateCompleteResponse('8', { handler: 'updateC', args: [8] }),
2661+
makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }),
2662+
makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }),
2663+
makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }),
2664+
makeUpdateCompleteResponse('4', { handler: 'updateC', args: [4] }),
2665+
makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }),
2666+
makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }),
2667+
makeCompleteWorkflowExecution(),
2668+
]
2669+
// [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch]
2670+
)
2671+
);
2672+
}
2673+
});

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,4 @@ export * from './upsert-and-read-search-attributes';
9090
export * from './wait-on-user';
9191
export * from './workflow-cancellation-scenarios';
9292
export * from './upsert-and-read-memo';
93+
export * from './updates-ordering';
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { defineUpdate, setDefaultUpdateHandler, setHandler } from '@temporalio/workflow';
2+
3+
const updateA = defineUpdate<ProcessedUpdate, [number]>('updateA');
4+
const updateB = defineUpdate<ProcessedUpdate, [number]>('updateB');
5+
const updateC = defineUpdate<ProcessedUpdate, [number]>('updateC');
6+
7+
interface ProcessedUpdate {
8+
handler: string;
9+
updateName?: string;
10+
args: unknown[];
11+
}
12+
13+
/*
14+
There's a surprising amount going on with the workflow below. Let's simplify it to just updateA and updateB
15+
(no updateC or the default) and walk through it.
16+
17+
1. setHandler for updateA
18+
- this is all synchronous code until we yield (.then), when we run execute(input) within doUpdateImpl
19+
2. queue execute for A on node event queue: [executeA]
20+
3. continue running the workflow code, which leads us to..
21+
4. setHandler for updateB
22+
- same deal as A
23+
5. queue execute for B on node event queue: [executeA, executeB]
24+
6. finished workflow code, go through the event queue
25+
7. execute update A, node event queue [executeB], command ordering [acceptA, acceptB, executeA]
26+
8. execute update B, node event queue [] (empty), command ordering [acceptA, acceptB, executeA, executeB]
27+
28+
The only additional complexity with the workflow below is that once the default handler is registered, buffered updates for C will be
29+
dispatched to the default handler. So in this scenario:
30+
-> update queue = [updateC1, updateC2]
31+
-> default handler registered
32+
-> C handler registered
33+
both C1 and C2 will be dispatched to the default handler, as it was registered prior to the C handler, and it is capable of handling
34+
any update type (like a catch-all).
35+
36+
It's worth noting that for this workflow specifically, none of the handlers are asynchronous, so they will execute synchronously. But
37+
The description above serves generally for asynchronous updates, which are commonplace.
38+
*/
39+
export async function updatesOrdering(): Promise<void> {
40+
setHandler(updateA, (...args: any[]) => {
41+
return { handler: 'updateA', args };
42+
});
43+
setHandler(updateB, (...args: any[]) => {
44+
return { handler: 'updateB', args };
45+
});
46+
setDefaultUpdateHandler((updateName, ...args: any[]) => {
47+
return { handler: 'default', updateName, args };
48+
});
49+
setHandler(updateC, (...args: any[]) => {
50+
return { handler: 'updateC', args };
51+
});
52+
}
53+
54+
export async function updatesAreReentrant(): Promise<void> {
55+
function handlerA(...args: any[]) {
56+
setHandler(updateA, undefined);
57+
setHandler(updateB, handlerB);
58+
return { handler: 'updateA', args };
59+
}
60+
function handlerB(...args: any[]) {
61+
setHandler(updateB, undefined);
62+
setHandler(updateC, handlerC);
63+
return { handler: 'updateB', args };
64+
}
65+
function handlerC(...args: any[]) {
66+
setHandler(updateC, undefined);
67+
setDefaultUpdateHandler(defaultHandler);
68+
return { handler: 'updateC', args };
69+
}
70+
function defaultHandler(updateName: string, ...args: any[]) {
71+
setDefaultUpdateHandler(undefined);
72+
setHandler(updateA, handlerA);
73+
return { handler: 'default', updateName, args };
74+
}
75+
76+
setHandler(updateA, handlerA);
77+
}

packages/workflow/src/interfaces.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,11 @@ export type Handler<
586586
*/
587587
export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise<void>;
588588

589+
/**
590+
* A handler function accepting update calls for non-registered update names.
591+
*/
592+
export type DefaultUpdateHandler = (updateName: string, ...args: unknown[]) => Promise<unknown> | unknown;
593+
589594
/**
590595
* A handler function accepting query calls for non-registered query names.
591596
*/

0 commit comments

Comments
 (0)