Skip to content

Add shared error logic for all event handlers #574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/eventHandlers/EventHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { WorkerChannel } from '../WorkerChannel';

export type SupportedRequestName =
| 'functionEnvironmentReloadRequest'
| 'functionLoadRequest'
| 'invocationRequest'
| 'workerInitRequest';
export type SupportedRequest = rpc.StreamingMessage[SupportedRequestName];

export type SupportedResponseName =
| 'functionEnvironmentReloadResponse'
| 'functionLoadResponse'
| 'invocationResponse'
| 'workerInitResponse';
export type SupportedResponse = rpc.StreamingMessage[SupportedResponseName];

export abstract class EventHandler<
TRequestName extends SupportedRequestName = SupportedRequestName,
TResponseName extends SupportedResponseName = SupportedResponseName,
TRequest = NonNullable<rpc.StreamingMessage[TRequestName]>,
TResponse = NonNullable<rpc.StreamingMessage[TResponseName]>
> {
abstract readonly responseName: TResponseName;

/**
* The default response with any properties unique to this request that should be set for both success & failure scenarios
*/
abstract getDefaultResponse(request: TRequest): TResponse;

/**
* Handles the event and returns the response
* NOTE: This method does not need to set the result/status. That will be handled in code common to all event handlers
*/
abstract handleEvent(channel: WorkerChannel, request: TRequest): Promise<TResponse>;
}
51 changes: 51 additions & 0 deletions src/eventHandlers/FunctionEnvironmentReloadHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { WorkerChannel } from '../WorkerChannel';
import { EventHandler } from './EventHandler';
import LogCategory = rpc.RpcLog.RpcLogCategory;
import LogLevel = rpc.RpcLog.Level;

/**
* Environment variables from the current process
*/
export class FunctionEnvironmentReloadHandler extends EventHandler<
'functionEnvironmentReloadRequest',
'functionEnvironmentReloadResponse'
> {
readonly responseName = 'functionEnvironmentReloadResponse';

getDefaultResponse(_msg: rpc.IFunctionEnvironmentReloadRequest): rpc.IFunctionEnvironmentReloadResponse {
return {};
}

async handleEvent(
channel: WorkerChannel,
msg: rpc.IFunctionEnvironmentReloadRequest
): Promise<rpc.IFunctionEnvironmentReloadResponse> {
const response = this.getDefaultResponse(msg);

// Add environment variables from incoming
const numVariables = (msg.environmentVariables && Object.keys(msg.environmentVariables).length) || 0;
channel.log({
message: `Reloading environment variables. Found ${numVariables} variables to reload.`,
level: LogLevel.Information,
logCategory: LogCategory.System,
});

process.env = Object.assign({}, msg.environmentVariables);
// Change current working directory
if (msg.functionAppDirectory) {
channel.log({
message: `Changing current working directory to ${msg.functionAppDirectory}`,
level: LogLevel.Information,
logCategory: LogCategory.System,
});
process.chdir(msg.functionAppDirectory);
await channel.updatePackageJson(msg.functionAppDirectory);
}

return response;
}
}
36 changes: 36 additions & 0 deletions src/eventHandlers/FunctionLoadHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { ensureErrorType } from '../utils/ensureErrorType';
import { nonNullProp } from '../utils/nonNull';
import { WorkerChannel } from '../WorkerChannel';
import { EventHandler } from './EventHandler';

/**
* Worker responds after loading required metadata to load function with the load result
*/
export class FunctionLoadHandler extends EventHandler<'functionLoadRequest', 'functionLoadResponse'> {
readonly responseName = 'functionLoadResponse';

getDefaultResponse(msg: rpc.IFunctionLoadRequest): rpc.IFunctionLoadResponse {
return { functionId: msg.functionId };
}

async handleEvent(channel: WorkerChannel, msg: rpc.IFunctionLoadRequest): Promise<rpc.IFunctionLoadResponse> {
const response = this.getDefaultResponse(msg);

const functionId = nonNullProp(msg, 'functionId');
const metadata = nonNullProp(msg, 'metadata');
try {
await channel.functionLoader.load(functionId, metadata, channel.packageJson);
} catch (err) {
const error = ensureErrorType(err);
error.isAzureFunctionsInternalException = true;
error.message = `Worker was unable to load function ${metadata.name}: '${error.message}'`;
throw error;
}

return response;
}
}
208 changes: 208 additions & 0 deletions src/eventHandlers/InvocationHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AzureFunction } from '@azure/functions';
import { HookData, PostInvocationContext, PreInvocationContext } from '@azure/functions-core';
import { format } from 'util';
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { CreateContextAndInputs } from '../Context';
import { toTypedData } from '../converters/RpcConverters';
import { isError } from '../utils/ensureErrorType';
import { nonNullProp } from '../utils/nonNull';
import { WorkerChannel } from '../WorkerChannel';
import { EventHandler } from './EventHandler';
import LogCategory = rpc.RpcLog.RpcLogCategory;
import LogLevel = rpc.RpcLog.Level;

/**
* Host requests worker to invoke a Function
*/
export class InvocationHandler extends EventHandler<'invocationRequest', 'invocationResponse'> {
readonly responseName = 'invocationResponse';

getDefaultResponse(msg: rpc.IInvocationRequest): rpc.IInvocationResponse {
return { invocationId: msg.invocationId };
}

async handleEvent(channel: WorkerChannel, msg: rpc.IInvocationRequest): Promise<rpc.IInvocationResponse> {
const response = this.getDefaultResponse(msg);

const invocationId = nonNullProp(msg, 'invocationId');
const functionId = nonNullProp(msg, 'functionId');

// explicitly set outputData to empty array to concat later
response.outputData = [];

let isDone = false;
let isExecutingPostInvocationHooks = false;
let resultIsPromise = false;

const info = channel.functionLoader.getInfo(functionId);
const asyncDoneLearnMoreLink = 'https://go.microsoft.com/fwlink/?linkid=2097909';

const msgCategory = `${info.name}.Invocation`;
function log(level: LogLevel, logCategory: LogCategory, ...args: any[]) {
channel.log({
invocationId: invocationId,
category: msgCategory,
message: format.apply(null, <[any, any[]]>args),
level: level,
logCategory,
});
}
function systemLog(level: LogLevel, ...args: any[]) {
log(level, LogCategory.System, ...args);
}
function userLog(level: LogLevel, ...args: any[]) {
if (isDone && !isExecutingPostInvocationHooks) {
let badAsyncMsg =
"Warning: Unexpected call to 'log' on the context object after function execution has completed. Please check for asynchronous calls that are not awaited or calls to 'done' made before function execution completes. ";
badAsyncMsg += `Function name: ${info.name}. Invocation Id: ${invocationId}. `;
badAsyncMsg += `Learn more: ${asyncDoneLearnMoreLink}`;
systemLog(LogLevel.Warning, badAsyncMsg);
}
log(level, LogCategory.User, ...args);
}

// Log invocation details to ensure the invocation received by node worker
systemLog(LogLevel.Debug, 'Received FunctionInvocationRequest');

function onDone(): void {
if (isDone) {
const message = resultIsPromise
? `Error: Choose either to return a promise or call 'done'. Do not use both in your script. Learn more: ${asyncDoneLearnMoreLink}`
: "Error: 'done' has already been called. Please check your script for extraneous calls to 'done'.";
systemLog(LogLevel.Error, message);
}
isDone = true;
}

let { context, inputs, doneEmitter } = CreateContextAndInputs(info, msg, userLog);
try {
const legacyDoneTask = new Promise((resolve, reject) => {
doneEmitter.on('done', (err?: unknown, result?: any) => {
onDone();
if (isError(err)) {
reject(err);
} else {
resolve(result);
}
});
});

const hookData: HookData = {};
let userFunction = channel.functionLoader.getFunc(functionId);
const preInvocContext: PreInvocationContext = {
hookData,
invocationContext: context,
functionCallback: <AzureFunction>userFunction,
inputs,
};

await channel.executeHooks('preInvocation', preInvocContext, msg.invocationId, msgCategory);
inputs = preInvocContext.inputs;
userFunction = preInvocContext.functionCallback;

let rawResult = userFunction(context, ...inputs);
resultIsPromise = rawResult && typeof rawResult.then === 'function';
let resultTask: Promise<any>;
if (resultIsPromise) {
rawResult = Promise.resolve(rawResult).then((r) => {
onDone();
return r;
});
resultTask = Promise.race([rawResult, legacyDoneTask]);
} else {
resultTask = legacyDoneTask;
}

const postInvocContext: PostInvocationContext = {
hookData,
invocationContext: context,
inputs,
result: null,
error: null,
};
try {
postInvocContext.result = await resultTask;
} catch (err) {
postInvocContext.error = err;
}

try {
isExecutingPostInvocationHooks = true;
await channel.executeHooks('postInvocation', postInvocContext, msg.invocationId, msgCategory);
} finally {
isExecutingPostInvocationHooks = false;
}

if (isError(postInvocContext.error)) {
throw postInvocContext.error;
}
const result = postInvocContext.result;

// Allow HTTP response from context.res if HTTP response is not defined from the context.bindings object
if (info.httpOutputName && context.res && context.bindings[info.httpOutputName] === undefined) {
context.bindings[info.httpOutputName] = context.res;
}

// As legacy behavior, falsy values get serialized to `null` in AzFunctions.
// This breaks Durable Functions expectations, where customers expect any
// JSON-serializable values to be preserved by the framework,
// so we check if we're serializing for durable and, if so, ensure falsy
// values get serialized.
const isDurableBinding = info?.bindings?.name?.type == 'activityTrigger';

const returnBinding = info.getReturnBinding();
// Set results from return / context.done
if (result || (isDurableBinding && result != null)) {
// $return binding is found: return result data to $return binding
if (returnBinding) {
response.returnValue = returnBinding.converter(result);
// $return binding is not found: read result as object of outputs
} else {
response.outputData = Object.keys(info.outputBindings)
.filter((key) => result[key] !== undefined)
.map(
(key) =>
<rpc.IParameterBinding>{
name: key,
data: info.outputBindings[key].converter(result[key]),
}
);
}
// returned value does not match any output bindings (named or $return)
// if not http, pass along value
if (!response.returnValue && response.outputData.length == 0 && !info.hasHttpTrigger) {
response.returnValue = toTypedData(result);
}
}
// Set results from context.bindings
if (context.bindings) {
response.outputData = response.outputData.concat(
Object.keys(info.outputBindings)
// Data from return prioritized over data from context.bindings
.filter((key) => {
const definedInBindings: boolean = context.bindings[key] !== undefined;
const hasReturnValue = !!result;
const hasReturnBinding = !!returnBinding;
const definedInReturn: boolean =
hasReturnValue && !hasReturnBinding && result[key] !== undefined;
return definedInBindings && !definedInReturn;
})
.map(
(key) =>
<rpc.IParameterBinding>{
name: key,
data: info.outputBindings[key].converter(context.bindings[key]),
}
)
);
}
} finally {
isDone = true;
}

return response;
}
}
Loading