Skip to content

(feat): add support for default query handler #1639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion packages/test/src/test-integration-split-two.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow';

import { condition, defineQuery, setHandler, sleep } from '@temporalio/workflow';
import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow';
import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration';
import * as activities from './activities';
import * as workflows from './workflows';
Expand Down Expand Up @@ -697,3 +697,57 @@ test('Query does not cause condition to be triggered', configMacro, async (t, co
// Worker did not crash
t.pass();
});

const completeSignal = defineSignal('complete');
const definedQuery = defineQuery<QueryNameAndArgs>('query-handler-type');

interface QueryNameAndArgs {
name: string;
queryName?: string;
args: any[];
}

export async function workflowWithMaybeDefinedQuery(useDefinedQuery: boolean): Promise<void> {
let complete = false;
setHandler(completeSignal, () => {
complete = true;
});
setDefaultQueryHandler((queryName: string, ...args: any[]) => {
return { name: 'default', queryName, args };
});
if (useDefinedQuery) {
setHandler(definedQuery, (...args: any[]) => {
return { name: definedQuery.name, args };
});
}

await condition(() => complete);
}

test('default query handler is used if requested query does not exist', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflowWithMaybeDefinedQuery, {
args: [false],
});
await worker.runUntil(async () => {
const args = ['test', 'args'];
const result = await handle.query(definedQuery, ...args);
t.deepEqual(result, { name: 'default', queryName: definedQuery.name, args });
});
});

test('default query handler is not used if requested query exists', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflowWithMaybeDefinedQuery, {
args: [true],
});
await worker.runUntil(async () => {
const args = ['test', 'args'];
const result = await handle.query('query-handler-type', ...args);
t.deepEqual(result, { name: definedQuery.name, args });
});
});
5 changes: 5 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ export type Handler<
*/
export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise<void>;

/**
* A handler function accepting query calls for non-registered query names.
*/
export type DefaultQueryHandler = (queryName: string, ...args: unknown[]) => unknown;

/**
* A validation function capable of accepting the arguments for a given UpdateDefinition.
*/
Expand Down
13 changes: 12 additions & 1 deletion packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
WorkflowInfo,
WorkflowCreateOptionsInternal,
ActivationCompletion,
DefaultQueryHandler,
} from './interfaces';
import { type SinkCall } from './sinks';
import { untrackPromise } from './stack-helpers';
Expand Down Expand Up @@ -189,6 +190,11 @@ export class Activator implements ActivationHandler {
*/
defaultSignalHandler?: DefaultSignalHandler;

/**
* A query handler that catches calls for non-registered query names.
*/
defaultQueryHandler?: DefaultQueryHandler;

/**
* Source map file for looking up the source files in response to __enhanced_stack_trace
*/
Expand Down Expand Up @@ -611,7 +617,11 @@ export class Activator implements ActivationHandler {

// Intentionally non-async function so this handler doesn't show up in the stack trace
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
const fn = this.queryHandlers.get(queryName)?.handler;
let fn = this.queryHandlers.get(queryName)?.handler;
if (fn === undefined && this.defaultQueryHandler !== undefined) {
fn = this.defaultQueryHandler.bind(this, queryName);
}
// No handler or default registered, fail.
if (fn === undefined) {
const knownQueryTypes = [...this.queryHandlers.keys()].join(' ');
// Fail the query
Expand All @@ -621,6 +631,7 @@ export class Activator implements ActivationHandler {
)
);
}
// Execute handler.
try {
const ret = fn(...args);
if (ret instanceof Promise) {
Expand Down
23 changes: 22 additions & 1 deletion packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
UpdateInfo,
encodeChildWorkflowCancellationType,
encodeParentClosePolicy,
DefaultQueryHandler,
} from './interfaces';
import { LocalActivityDoBackoff } from './errors';
import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes';
Expand Down Expand Up @@ -1300,7 +1301,7 @@ export function setHandler<
*
* Signals are dispatched to the default signal handler in the order that they were accepted by the server.
*
* If this function is called multiple times for a given signal or query name the last handler will overwrite any previous calls.
* If this function is called multiple times for a given signal name the last handler will overwrite any previous calls.
*
* @param handler a function that will handle signals for non-registered signal names, or `undefined` to unset the handler.
*/
Expand All @@ -1318,6 +1319,26 @@ export function setDefaultSignalHandler(handler: DefaultSignalHandler | undefine
}
}

/**
* Set a query handler function that will handle query calls for non-registered query names.
*
* Queries are dispatched to the default query handler in the order that they were accepted by the server.
*
* If this function is called multiple times for a given query name the last handler will overwrite any previous calls.
*
* @param handler a function that will handle queries for non-registered query names, or `undefined` to unset the handler.
*/
export function setDefaultQueryHandler(handler: DefaultQueryHandler | undefined): void {
const activator = assertInWorkflowContext(
'Workflow.setDefaultQueryHandler(...) may only be used from a Workflow Execution.'
);
if (typeof handler === 'function' || handler === undefined) {
activator.defaultQueryHandler = handler;
} else {
throw new TypeError(`Expected handler to be either a function or 'undefined'. Got: '${typeof handler}'`);
}
}

/**
* Updates this Workflow's Search Attributes by merging the provided `searchAttributes` with the existing Search
* Attributes, `workflowInfo().searchAttributes`.
Expand Down
Loading