diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 1c25f61bc..cc8ddab9a 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -365,6 +365,40 @@ function makeSetPatchMarker(myPatchId: string, deprecated: boolean): coresdk.wor }; } +function makeUpdateActivationJob( + id: string, + protocolInstanceId: string, + name: string, + input: unknown[] +): coresdk.workflow_activation.IWorkflowActivationJob { + return { + doUpdate: { + id, + protocolInstanceId, + name, + input: toPayloads(defaultPayloadConverter, ...input), + }, + }; +} + +function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand { + return { + updateResponse: { + protocolInstanceId: id, + accepted: {}, + }, + }; +} + +function makeUpdateCompleteResponse(id: string, result: unknown): coresdk.workflow_commands.IWorkflowCommand { + return { + updateResponse: { + protocolInstanceId: id, + completed: defaultPayloadConverter.toPayload(result), + }, + }; +} + test('random', async (t) => { const { logs, workflowType } = t.context; { @@ -2404,23 +2438,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - pre- ...makeActivation( undefined, makeSignalWorkflowJob('aaSignal', ['signal1']), - { - doUpdate: { - id: 'first', - name: 'aaUpdate', - protocolInstanceId: '1', - input: toPayloads(defaultPayloadConverter, ['update1']), - }, - }, + makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), makeSignalWorkflowJob('aaSignal', ['signal2']), - { - doUpdate: { - id: 'second', - name: 'aaUpdate', - protocolInstanceId: '2', - input: toPayloads(defaultPayloadConverter, ['update2']), - }, - }, + makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), makeFireTimerJob(1), makeResolveActivityJob(1, { completed: {} }) ), @@ -2483,23 +2503,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11 ...makeActivation( undefined, makeSignalWorkflowJob('aaSignal', ['signal1']), - { - doUpdate: { - id: 'first', - name: 'aaUpdate', - protocolInstanceId: '1', - input: toPayloads(defaultPayloadConverter, ['update1']), - }, - }, + makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), makeSignalWorkflowJob('aaSignal', ['signal2']), - { - doUpdate: { - id: 'second', - name: 'aaUpdate', - protocolInstanceId: '2', - input: toPayloads(defaultPayloadConverter, ['update2']), - }, - }, + makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), makeFireTimerJob(1), makeResolveActivityJob(1, { completed: {} }) ), @@ -2530,3 +2536,138 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11 ); } }); + +test('Buffered updates are dispatched in the correct order - updatesOrdering', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate( + t, + makeActivation( + undefined, + makeInitializeWorkflowJob(workflowType), + makeUpdateActivationJob('1', '1', 'non-existant', [1]), + makeUpdateActivationJob('2', '2', 'updateA', [2]), + makeUpdateActivationJob('3', '3', 'updateA', [3]), + makeUpdateActivationJob('4', '4', 'updateC', [4]), + makeUpdateActivationJob('5', '5', 'updateB', [5]), + makeUpdateActivationJob('6', '6', 'non-existant', [6]), + makeUpdateActivationJob('7', '7', 'updateB', [7]) + ) + ); + + // The activation above: + // - initializes the workflow + // - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered) + // - enters the workflow code + // - workflow code sets handler for updateA + // - handler is registered for updateA + // - we attempt to dispatch buffered updates + // - buffered updates for handler A are *accepted* but not executed + // (executing an update is a promise/async, so it instead goes on the node event queue) + // - we continue/re-enter the workflow code + // - ...and do the same pattern for updateB, the default update handler, the updateC + // - once updates have been accepted, node processes the waiting events in its queue (the waiting updates) + // - these are processesed in FIFO order, so we get execution for updateA, then updateB, the default handler, then updateC + + // As such, the expected order of these updates is the order that the handlers were registered. + // Note that because the default handler was registered *before* updateC, all remaining buffered updates were dispatched + // to it, including the update for updateC. + + compareCompletion( + t, + completion, + makeSuccess( + [ + // FIFO accepted order + makeUpdateAcceptedResponse('2'), + makeUpdateAcceptedResponse('3'), + makeUpdateAcceptedResponse('5'), + makeUpdateAcceptedResponse('7'), + makeUpdateAcceptedResponse('1'), + makeUpdateAcceptedResponse('4'), + makeUpdateAcceptedResponse('6'), + // FIFO executed order + makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }), + makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }), + makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }), + makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }), + makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }), + // updateC handled by default handler. + makeUpdateCompleteResponse('4', { handler: 'default', updateName: 'updateC', args: [4] }), + makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }), + // No expected update response from updateC handler + makeCompleteWorkflowExecution(), + ] + // [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); + +test('Buffered updates are reentrant - updatesAreReentrant', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate( + t, + makeActivation( + undefined, + makeInitializeWorkflowJob(workflowType), + makeUpdateActivationJob('1', '1', 'non-existant', [1]), + makeUpdateActivationJob('2', '2', 'updateA', [2]), + makeUpdateActivationJob('3', '3', 'updateA', [3]), + makeUpdateActivationJob('4', '4', 'updateC', [4]), + makeUpdateActivationJob('5', '5', 'updateB', [5]), + makeUpdateActivationJob('6', '6', 'non-existant', [6]), + makeUpdateActivationJob('7', '7', 'updateB', [7]), + makeUpdateActivationJob('8', '8', 'updateC', [8]) + ) + ); + + // The activation above: + // - initializes the workflow + // - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered) + // - enters the workflow code + // - workflow code sets handler for updateA + // - handler is registered for updateA + // - we attempt to dispatch buffered updates + // - buffered updates for handler A are *accepted* but not executed + // (executing an update is a promise/async, so it instead goes on the node event queue) + // - however, there is no more workflow code, node dequues event queue and we immediately run the update handler + // (we begin executing the update which...) + // - deletes the current handler and registers the next one (updateB) + // - this pattern repeats (updateA -> updateB -> updateC -> default) until there are no more updates to handle + // - at this point, all updates have been accepted and are executing + // - due to the call order in the workflow, the completion order of the updates follows the call stack, LIFO + + // This workflow is interesting in that updates are accepted FIFO, but executed LIFO + + compareCompletion( + t, + completion, + makeSuccess( + [ + // FIFO accepted order + makeUpdateAcceptedResponse('2'), + makeUpdateAcceptedResponse('5'), + makeUpdateAcceptedResponse('4'), + makeUpdateAcceptedResponse('1'), + makeUpdateAcceptedResponse('3'), + makeUpdateAcceptedResponse('7'), + makeUpdateAcceptedResponse('8'), + makeUpdateAcceptedResponse('6'), + // LIFO executed order + makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }), + makeUpdateCompleteResponse('8', { handler: 'updateC', args: [8] }), + makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }), + makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }), + makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }), + makeUpdateCompleteResponse('4', { handler: 'updateC', args: [4] }), + makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }), + makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }), + makeCompleteWorkflowExecution(), + ] + // [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index 6df78ec71..a77256420 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -90,3 +90,4 @@ export * from './upsert-and-read-search-attributes'; export * from './wait-on-user'; export * from './workflow-cancellation-scenarios'; export * from './upsert-and-read-memo'; +export * from './updates-ordering'; diff --git a/packages/test/src/workflows/updates-ordering.ts b/packages/test/src/workflows/updates-ordering.ts new file mode 100644 index 000000000..d1ef53ee6 --- /dev/null +++ b/packages/test/src/workflows/updates-ordering.ts @@ -0,0 +1,77 @@ +import { defineUpdate, setDefaultUpdateHandler, setHandler } from '@temporalio/workflow'; + +const updateA = defineUpdate('updateA'); +const updateB = defineUpdate('updateB'); +const updateC = defineUpdate('updateC'); + +interface ProcessedUpdate { + handler: string; + updateName?: string; + args: unknown[]; +} + +/* + There's a surprising amount going on with the workflow below. Let's simplify it to just updateA and updateB + (no updateC or the default) and walk through it. + + 1. setHandler for updateA + - this is all synchronous code until we yield (.then), when we run execute(input) within doUpdateImpl + 2. queue execute for A on node event queue: [executeA] + 3. continue running the workflow code, which leads us to.. + 4. setHandler for updateB + - same deal as A + 5. queue execute for B on node event queue: [executeA, executeB] + 6. finished workflow code, go through the event queue + 7. execute update A, node event queue [executeB], command ordering [acceptA, acceptB, executeA] + 8. execute update B, node event queue [] (empty), command ordering [acceptA, acceptB, executeA, executeB] + + The only additional complexity with the workflow below is that once the default handler is registered, buffered updates for C will be + dispatched to the default handler. So in this scenario: + -> update queue = [updateC1, updateC2] + -> default handler registered + -> C handler registered + both C1 and C2 will be dispatched to the default handler, as it was registered prior to the C handler, and it is capable of handling + any update type (like a catch-all). + + It's worth noting that for this workflow specifically, none of the handlers are asynchronous, so they will execute synchronously. But + The description above serves generally for asynchronous updates, which are commonplace. +*/ +export async function updatesOrdering(): Promise { + setHandler(updateA, (...args: any[]) => { + return { handler: 'updateA', args }; + }); + setHandler(updateB, (...args: any[]) => { + return { handler: 'updateB', args }; + }); + setDefaultUpdateHandler((updateName, ...args: any[]) => { + return { handler: 'default', updateName, args }; + }); + setHandler(updateC, (...args: any[]) => { + return { handler: 'updateC', args }; + }); +} + +export async function updatesAreReentrant(): Promise { + function handlerA(...args: any[]) { + setHandler(updateA, undefined); + setHandler(updateB, handlerB); + return { handler: 'updateA', args }; + } + function handlerB(...args: any[]) { + setHandler(updateB, undefined); + setHandler(updateC, handlerC); + return { handler: 'updateB', args }; + } + function handlerC(...args: any[]) { + setHandler(updateC, undefined); + setDefaultUpdateHandler(defaultHandler); + return { handler: 'updateC', args }; + } + function defaultHandler(updateName: string, ...args: any[]) { + setDefaultUpdateHandler(undefined); + setHandler(updateA, handlerA); + return { handler: 'default', updateName, args }; + } + + setHandler(updateA, handlerA); +} diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 87500de01..4847a2c37 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -586,6 +586,11 @@ export type Handler< */ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise; +/** + * A handler function accepting update calls for non-registered update names. + */ +export type DefaultUpdateHandler = (updateName: string, ...args: unknown[]) => Promise | unknown; + /** * A handler function accepting query calls for non-registered query names. */ diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index acc49985b..fcc503075 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -43,6 +43,7 @@ import { WorkflowInfo, WorkflowCreateOptionsInternal, ActivationCompletion, + DefaultUpdateHandler, DefaultQueryHandler, } from './interfaces'; import { type SinkCall } from './sinks'; @@ -192,6 +193,11 @@ export class Activator implements ActivationHandler { */ defaultSignalHandler?: DefaultSignalHandler; + /** + * A update handler that catches calls for non-registered update names. + */ + defaultUpdateHandler?: DefaultUpdateHandler; + /** * A query handler that catches calls for non-registered query names. */ @@ -631,7 +637,7 @@ export class Activator implements ActivationHandler { protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise { let fn = this.queryHandlers.get(queryName)?.handler; if (fn === undefined && this.defaultQueryHandler !== undefined) { - fn = this.defaultQueryHandler.bind(this, queryName); + fn = this.defaultQueryHandler.bind(undefined, queryName); } // No handler or default registered, fail. if (fn === undefined) { @@ -688,8 +694,20 @@ export class Activator implements ActivationHandler { if (!protocolInstanceId) { throw new TypeError('Missing activation update protocolInstanceId'); } - const entry = this.updateHandlers.get(name); - if (!entry) { + + const entry = + this.updateHandlers.get(name) ?? + (this.defaultUpdateHandler + ? { + handler: this.defaultUpdateHandler.bind(undefined, name), + validator: undefined, + // Default to a warning policy. + unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON, + } + : null); + + // If we don't have an entry from either source, buffer and return + if (entry === null) { this.bufferedUpdates.push(activation); return; } @@ -781,13 +799,21 @@ export class Activator implements ActivationHandler { public dispatchBufferedUpdates(): void { const bufferedUpdates = this.bufferedUpdates; while (bufferedUpdates.length) { - const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string)); - if (foundIndex === -1) { - // No buffered Updates have a handler yet. - break; + // We have a default update handler, so all updates are dispatchable. + if (this.defaultUpdateHandler) { + const update = bufferedUpdates.shift(); + // Logically, this must be defined as we're in the loop. + // But Typescript doesn't know that so we use a non-null assertion (!). + this.doUpdate(update!); + } else { + const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string)); + if (foundIndex === -1) { + // No buffered Updates have a handler yet. + break; + } + const [update] = bufferedUpdates.splice(foundIndex, 1); + this.doUpdate(update); } - const [update] = bufferedUpdates.splice(foundIndex, 1); - this.doUpdate(update); } } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index b5e6500d2..54b3069a0 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -57,6 +57,7 @@ import { UpdateInfo, encodeChildWorkflowCancellationType, encodeParentClosePolicy, + DefaultUpdateHandler, DefaultQueryHandler, } from './interfaces'; import { LocalActivityDoBackoff } from './errors'; @@ -1331,6 +1332,29 @@ export function setDefaultSignalHandler(handler: DefaultSignalHandler | undefine } } +/** + * Set a update handler function that will handle updates calls for non-registered update names. + * + * Updates are dispatched to the default update handler in the order that they were accepted by the server. + * + * If this function is called multiple times for a given update name the last handler will overwrite any previous calls. + * + * @param handler a function that will handle updates for non-registered update names, or `undefined` to unset the handler. + */ +export function setDefaultUpdateHandler(handler: DefaultUpdateHandler | undefined): void { + const activator = assertInWorkflowContext( + 'Workflow.setDefaultUpdateHandler(...) may only be used from a Workflow Execution.' + ); + if (typeof handler === 'function') { + activator.defaultUpdateHandler = handler; + activator.dispatchBufferedUpdates(); + } else if (handler == null) { + activator.defaultUpdateHandler = undefined; + } else { + throw new TypeError(`Expected handler to be either a function or 'undefined'. Got: '${typeof handler}'`); + } +} + /** * Set a query handler function that will handle query calls for non-registered query names. *