diff --git a/src/eventHandlers/EventHandler.ts b/src/eventHandlers/EventHandler.ts new file mode 100644 index 00000000..d1ecab01 --- /dev/null +++ b/src/eventHandlers/EventHandler.ts @@ -0,0 +1,39 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; +import { WorkerChannel } from '../WorkerChannel'; + +export type SupportedRequestName = + | 'functionEnvironmentReloadRequest' + | 'functionLoadRequest' + | 'invocationRequest' + | 'workerInitRequest'; +export type SupportedRequest = rpc.StreamingMessage[SupportedRequestName]; + +export type SupportedResponseName = + | 'functionEnvironmentReloadResponse' + | 'functionLoadResponse' + | 'invocationResponse' + | 'workerInitResponse'; +export type SupportedResponse = rpc.StreamingMessage[SupportedResponseName]; + +export abstract class EventHandler< + TRequestName extends SupportedRequestName = SupportedRequestName, + TResponseName extends SupportedResponseName = SupportedResponseName, + TRequest = NonNullable, + TResponse = NonNullable +> { + abstract readonly responseName: TResponseName; + + /** + * The default response with any properties unique to this request that should be set for both success & failure scenarios + */ + abstract getDefaultResponse(request: TRequest): TResponse; + + /** + * Handles the event and returns the response + * NOTE: This method does not need to set the result/status. That will be handled in code common to all event handlers + */ + abstract handleEvent(channel: WorkerChannel, request: TRequest): Promise; +} diff --git a/src/eventHandlers/FunctionEnvironmentReloadHandler.ts b/src/eventHandlers/FunctionEnvironmentReloadHandler.ts new file mode 100644 index 00000000..ccf14b2a --- /dev/null +++ b/src/eventHandlers/FunctionEnvironmentReloadHandler.ts @@ -0,0 +1,51 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; +import { WorkerChannel } from '../WorkerChannel'; +import { EventHandler } from './EventHandler'; +import LogCategory = rpc.RpcLog.RpcLogCategory; +import LogLevel = rpc.RpcLog.Level; + +/** + * Environment variables from the current process + */ +export class FunctionEnvironmentReloadHandler extends EventHandler< + 'functionEnvironmentReloadRequest', + 'functionEnvironmentReloadResponse' +> { + readonly responseName = 'functionEnvironmentReloadResponse'; + + getDefaultResponse(_msg: rpc.IFunctionEnvironmentReloadRequest): rpc.IFunctionEnvironmentReloadResponse { + return {}; + } + + async handleEvent( + channel: WorkerChannel, + msg: rpc.IFunctionEnvironmentReloadRequest + ): Promise { + const response = this.getDefaultResponse(msg); + + // Add environment variables from incoming + const numVariables = (msg.environmentVariables && Object.keys(msg.environmentVariables).length) || 0; + channel.log({ + message: `Reloading environment variables. Found ${numVariables} variables to reload.`, + level: LogLevel.Information, + logCategory: LogCategory.System, + }); + + process.env = Object.assign({}, msg.environmentVariables); + // Change current working directory + if (msg.functionAppDirectory) { + channel.log({ + message: `Changing current working directory to ${msg.functionAppDirectory}`, + level: LogLevel.Information, + logCategory: LogCategory.System, + }); + process.chdir(msg.functionAppDirectory); + await channel.updatePackageJson(msg.functionAppDirectory); + } + + return response; + } +} diff --git a/src/eventHandlers/FunctionLoadHandler.ts b/src/eventHandlers/FunctionLoadHandler.ts new file mode 100644 index 00000000..1f66a4c9 --- /dev/null +++ b/src/eventHandlers/FunctionLoadHandler.ts @@ -0,0 +1,36 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; +import { ensureErrorType } from '../utils/ensureErrorType'; +import { nonNullProp } from '../utils/nonNull'; +import { WorkerChannel } from '../WorkerChannel'; +import { EventHandler } from './EventHandler'; + +/** + * Worker responds after loading required metadata to load function with the load result + */ +export class FunctionLoadHandler extends EventHandler<'functionLoadRequest', 'functionLoadResponse'> { + readonly responseName = 'functionLoadResponse'; + + getDefaultResponse(msg: rpc.IFunctionLoadRequest): rpc.IFunctionLoadResponse { + return { functionId: msg.functionId }; + } + + async handleEvent(channel: WorkerChannel, msg: rpc.IFunctionLoadRequest): Promise { + const response = this.getDefaultResponse(msg); + + const functionId = nonNullProp(msg, 'functionId'); + const metadata = nonNullProp(msg, 'metadata'); + try { + await channel.functionLoader.load(functionId, metadata, channel.packageJson); + } catch (err) { + const error = ensureErrorType(err); + error.isAzureFunctionsInternalException = true; + error.message = `Worker was unable to load function ${metadata.name}: '${error.message}'`; + throw error; + } + + return response; + } +} diff --git a/src/eventHandlers/InvocationHandler.ts b/src/eventHandlers/InvocationHandler.ts new file mode 100644 index 00000000..8049ea7d --- /dev/null +++ b/src/eventHandlers/InvocationHandler.ts @@ -0,0 +1,208 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { AzureFunction } from '@azure/functions'; +import { HookData, PostInvocationContext, PreInvocationContext } from '@azure/functions-core'; +import { format } from 'util'; +import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; +import { CreateContextAndInputs } from '../Context'; +import { toTypedData } from '../converters/RpcConverters'; +import { isError } from '../utils/ensureErrorType'; +import { nonNullProp } from '../utils/nonNull'; +import { WorkerChannel } from '../WorkerChannel'; +import { EventHandler } from './EventHandler'; +import LogCategory = rpc.RpcLog.RpcLogCategory; +import LogLevel = rpc.RpcLog.Level; + +/** + * Host requests worker to invoke a Function + */ +export class InvocationHandler extends EventHandler<'invocationRequest', 'invocationResponse'> { + readonly responseName = 'invocationResponse'; + + getDefaultResponse(msg: rpc.IInvocationRequest): rpc.IInvocationResponse { + return { invocationId: msg.invocationId }; + } + + async handleEvent(channel: WorkerChannel, msg: rpc.IInvocationRequest): Promise { + const response = this.getDefaultResponse(msg); + + const invocationId = nonNullProp(msg, 'invocationId'); + const functionId = nonNullProp(msg, 'functionId'); + + // explicitly set outputData to empty array to concat later + response.outputData = []; + + let isDone = false; + let isExecutingPostInvocationHooks = false; + let resultIsPromise = false; + + const info = channel.functionLoader.getInfo(functionId); + const asyncDoneLearnMoreLink = 'https://go.microsoft.com/fwlink/?linkid=2097909'; + + const msgCategory = `${info.name}.Invocation`; + function log(level: LogLevel, logCategory: LogCategory, ...args: any[]) { + channel.log({ + invocationId: invocationId, + category: msgCategory, + message: format.apply(null, <[any, any[]]>args), + level: level, + logCategory, + }); + } + function systemLog(level: LogLevel, ...args: any[]) { + log(level, LogCategory.System, ...args); + } + function userLog(level: LogLevel, ...args: any[]) { + if (isDone && !isExecutingPostInvocationHooks) { + let badAsyncMsg = + "Warning: Unexpected call to 'log' on the context object after function execution has completed. Please check for asynchronous calls that are not awaited or calls to 'done' made before function execution completes. "; + badAsyncMsg += `Function name: ${info.name}. Invocation Id: ${invocationId}. `; + badAsyncMsg += `Learn more: ${asyncDoneLearnMoreLink}`; + systemLog(LogLevel.Warning, badAsyncMsg); + } + log(level, LogCategory.User, ...args); + } + + // Log invocation details to ensure the invocation received by node worker + systemLog(LogLevel.Debug, 'Received FunctionInvocationRequest'); + + function onDone(): void { + if (isDone) { + const message = resultIsPromise + ? `Error: Choose either to return a promise or call 'done'. Do not use both in your script. Learn more: ${asyncDoneLearnMoreLink}` + : "Error: 'done' has already been called. Please check your script for extraneous calls to 'done'."; + systemLog(LogLevel.Error, message); + } + isDone = true; + } + + let { context, inputs, doneEmitter } = CreateContextAndInputs(info, msg, userLog); + try { + const legacyDoneTask = new Promise((resolve, reject) => { + doneEmitter.on('done', (err?: unknown, result?: any) => { + onDone(); + if (isError(err)) { + reject(err); + } else { + resolve(result); + } + }); + }); + + const hookData: HookData = {}; + let userFunction = channel.functionLoader.getFunc(functionId); + const preInvocContext: PreInvocationContext = { + hookData, + invocationContext: context, + functionCallback: userFunction, + inputs, + }; + + await channel.executeHooks('preInvocation', preInvocContext, msg.invocationId, msgCategory); + inputs = preInvocContext.inputs; + userFunction = preInvocContext.functionCallback; + + let rawResult = userFunction(context, ...inputs); + resultIsPromise = rawResult && typeof rawResult.then === 'function'; + let resultTask: Promise; + if (resultIsPromise) { + rawResult = Promise.resolve(rawResult).then((r) => { + onDone(); + return r; + }); + resultTask = Promise.race([rawResult, legacyDoneTask]); + } else { + resultTask = legacyDoneTask; + } + + const postInvocContext: PostInvocationContext = { + hookData, + invocationContext: context, + inputs, + result: null, + error: null, + }; + try { + postInvocContext.result = await resultTask; + } catch (err) { + postInvocContext.error = err; + } + + try { + isExecutingPostInvocationHooks = true; + await channel.executeHooks('postInvocation', postInvocContext, msg.invocationId, msgCategory); + } finally { + isExecutingPostInvocationHooks = false; + } + + if (isError(postInvocContext.error)) { + throw postInvocContext.error; + } + const result = postInvocContext.result; + + // Allow HTTP response from context.res if HTTP response is not defined from the context.bindings object + if (info.httpOutputName && context.res && context.bindings[info.httpOutputName] === undefined) { + context.bindings[info.httpOutputName] = context.res; + } + + // As legacy behavior, falsy values get serialized to `null` in AzFunctions. + // This breaks Durable Functions expectations, where customers expect any + // JSON-serializable values to be preserved by the framework, + // so we check if we're serializing for durable and, if so, ensure falsy + // values get serialized. + const isDurableBinding = info?.bindings?.name?.type == 'activityTrigger'; + + const returnBinding = info.getReturnBinding(); + // Set results from return / context.done + if (result || (isDurableBinding && result != null)) { + // $return binding is found: return result data to $return binding + if (returnBinding) { + response.returnValue = returnBinding.converter(result); + // $return binding is not found: read result as object of outputs + } else { + response.outputData = Object.keys(info.outputBindings) + .filter((key) => result[key] !== undefined) + .map( + (key) => + { + name: key, + data: info.outputBindings[key].converter(result[key]), + } + ); + } + // returned value does not match any output bindings (named or $return) + // if not http, pass along value + if (!response.returnValue && response.outputData.length == 0 && !info.hasHttpTrigger) { + response.returnValue = toTypedData(result); + } + } + // Set results from context.bindings + if (context.bindings) { + response.outputData = response.outputData.concat( + Object.keys(info.outputBindings) + // Data from return prioritized over data from context.bindings + .filter((key) => { + const definedInBindings: boolean = context.bindings[key] !== undefined; + const hasReturnValue = !!result; + const hasReturnBinding = !!returnBinding; + const definedInReturn: boolean = + hasReturnValue && !hasReturnBinding && result[key] !== undefined; + return definedInBindings && !definedInReturn; + }) + .map( + (key) => + { + name: key, + data: info.outputBindings[key].converter(context.bindings[key]), + } + ) + ); + } + } finally { + isDone = true; + } + + return response; + } +} diff --git a/src/eventHandlers/WorkerInitHandler.ts b/src/eventHandlers/WorkerInitHandler.ts new file mode 100644 index 00000000..2541763c --- /dev/null +++ b/src/eventHandlers/WorkerInitHandler.ts @@ -0,0 +1,99 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { access, constants } from 'fs'; +import * as path from 'path'; +import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; +import { isError } from '../utils/ensureErrorType'; +import { InternalException } from '../utils/InternalException'; +import { systemError } from '../utils/Logger'; +import { WorkerChannel } from '../WorkerChannel'; +import { EventHandler } from './EventHandler'; +import LogCategory = rpc.RpcLog.RpcLogCategory; +import LogLevel = rpc.RpcLog.Level; + +/** + * Host sends capabilities/init data to worker and requests the worker to initialize itself + */ +export class WorkerInitHandler extends EventHandler<'workerInitRequest', 'workerInitResponse'> { + readonly responseName = 'workerInitResponse'; + + getDefaultResponse(_msg: rpc.IWorkerInitRequest): rpc.IWorkerInitResponse { + return {}; + } + + async handleEvent(channel: WorkerChannel, msg: rpc.IWorkerInitRequest): Promise { + const response = this.getDefaultResponse(msg); + + // Validate version + const version = process.version; + if ( + (version.startsWith('v17.') || version.startsWith('v15.')) && + process.env.AZURE_FUNCTIONS_ENVIRONMENT == 'Development' + ) { + const msg = + 'Node.js version used (' + + version + + ') is not officially supported. You may use it during local development, but must use an officially supported version on Azure:' + + ' https://aka.ms/functions-node-versions'; + channel.log({ + message: msg, + level: LogLevel.Warning, + logCategory: LogCategory.System, + }); + } else if (!(version.startsWith('v14.') || version.startsWith('v16.'))) { + const errorMsg = + 'Incompatible Node.js version' + + ' (' + + version + + ').' + + ' The version of the Azure Functions runtime you are using (v4) supports Node.js v14.x or Node.js v16.x' + + ' Refer to our documentation to see the Node.js versions supported by each version of Azure Functions: https://aka.ms/functions-node-versions'; + systemError(errorMsg); + throw new InternalException(errorMsg); + } + + logColdStartWarning(channel); + if (msg.functionAppDirectory) { + await channel.updatePackageJson(msg.functionAppDirectory); + } + + response.capabilities = { + RpcHttpTriggerMetadataRemoved: 'true', + RpcHttpBodyOnly: 'true', + IgnoreEmptyValuedRpcHttpHeaders: 'true', + UseNullableValueDictionaryForHttp: 'true', + WorkerStatus: 'true', + TypedDataCollection: 'true', + }; + + return response; + } +} + +export function logColdStartWarning(channel: WorkerChannel, delayInMs?: number): void { + // On reading a js file with function code('require') NodeJs tries to find 'package.json' all the way up to the file system root. + // In Azure files it causes a delay during cold start as connection to Azure Files is an expensive operation. + if ( + process.env.WEBSITE_CONTENTAZUREFILECONNECTIONSTRING && + process.env.WEBSITE_CONTENTSHARE && + process.env.AzureWebJobsScriptRoot + ) { + // Add delay to avoid affecting coldstart + if (!delayInMs) { + delayInMs = 5000; + } + setTimeout(() => { + access(path.join(process.env.AzureWebJobsScriptRoot!, 'package.json'), constants.F_OK, (err) => { + if (isError(err)) { + channel.log({ + message: + 'package.json is not found at the root of the Function App in Azure Files - cold start for NodeJs can be affected.', + level: LogLevel.Debug, + logCategory: LogCategory.System, + }); + } + }); + }, delayInMs); + } +} diff --git a/src/eventHandlers/functionEnvironmentReloadRequest.ts b/src/eventHandlers/functionEnvironmentReloadRequest.ts deleted file mode 100644 index d7639fad..00000000 --- a/src/eventHandlers/functionEnvironmentReloadRequest.ts +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. - -import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { toRpcStatus } from '../utils/toRpcStatus'; -import { WorkerChannel } from '../WorkerChannel'; -import LogCategory = rpc.RpcLog.RpcLogCategory; -import LogLevel = rpc.RpcLog.Level; - -/** - * Environment variables from the current process - */ -export async function functionEnvironmentReloadRequest( - channel: WorkerChannel, - requestId: string, - msg: rpc.IFunctionEnvironmentReloadRequest -): Promise { - // Add environment variables from incoming - const numVariables = (msg.environmentVariables && Object.keys(msg.environmentVariables).length) || 0; - channel.log({ - message: `Reloading environment variables. Found ${numVariables} variables to reload.`, - level: LogLevel.Information, - logCategory: LogCategory.System, - }); - - let error: unknown; - try { - process.env = Object.assign({}, msg.environmentVariables); - // Change current working directory - if (msg.functionAppDirectory) { - channel.log({ - message: `Changing current working directory to ${msg.functionAppDirectory}`, - level: LogLevel.Information, - logCategory: LogCategory.System, - }); - process.chdir(msg.functionAppDirectory); - await channel.updatePackageJson(msg.functionAppDirectory); - } - } catch (err) { - error = err; - } - - const functionEnvironmentReloadResponse: rpc.IFunctionEnvironmentReloadResponse = { - result: toRpcStatus(error), - }; - - channel.eventStream.write({ - requestId: requestId, - functionEnvironmentReloadResponse, - }); -} diff --git a/src/eventHandlers/functionLoadRequest.ts b/src/eventHandlers/functionLoadRequest.ts deleted file mode 100644 index c4e1e93e..00000000 --- a/src/eventHandlers/functionLoadRequest.ts +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. - -import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { ensureErrorType } from '../utils/ensureErrorType'; -import { toRpcStatus } from '../utils/toRpcStatus'; -import { WorkerChannel } from '../WorkerChannel'; -import LogCategory = rpc.RpcLog.RpcLogCategory; -import LogLevel = rpc.RpcLog.Level; - -/** - * Worker responds after loading required metadata to load function with the load result - * @param requestId gRPC message request id - * @param msg gRPC message content - */ -export async function functionLoadRequest(channel: WorkerChannel, requestId: string, msg: rpc.IFunctionLoadRequest) { - if (msg.functionId && msg.metadata) { - let error: Error | null | undefined; - let errorMessage: string | undefined; - try { - await channel.functionLoader.load(msg.functionId, msg.metadata, channel.packageJson); - } catch (err) { - error = ensureErrorType(err); - errorMessage = `Worker was unable to load function ${msg.metadata.name}: '${error.message}'`; - channel.log({ - message: errorMessage, - level: LogLevel.Error, - logCategory: LogCategory.System, - }); - } - - channel.eventStream.write({ - requestId: requestId, - functionLoadResponse: { - functionId: msg.functionId, - result: toRpcStatus(error, errorMessage), - }, - }); - } -} diff --git a/src/eventHandlers/invocationRequest.ts b/src/eventHandlers/invocationRequest.ts deleted file mode 100644 index 018f29d2..00000000 --- a/src/eventHandlers/invocationRequest.ts +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. - -import { AzureFunction } from '@azure/functions'; -import { HookData, PostInvocationContext, PreInvocationContext } from '@azure/functions-core'; -import { format } from 'util'; -import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { CreateContextAndInputs } from '../Context'; -import { toTypedData } from '../converters/RpcConverters'; -import { isError } from '../utils/ensureErrorType'; -import { nonNullProp } from '../utils/nonNull'; -import { toRpcStatus } from '../utils/toRpcStatus'; -import { WorkerChannel } from '../WorkerChannel'; -import LogCategory = rpc.RpcLog.RpcLogCategory; -import LogLevel = rpc.RpcLog.Level; - -/** - * Host requests worker to invoke a Function - * @param requestId gRPC message request id - * @param msg gRPC message content - */ -export async function invocationRequest(channel: WorkerChannel, requestId: string, msg: rpc.IInvocationRequest) { - const response: rpc.IInvocationResponse = { - invocationId: msg.invocationId, - result: toRpcStatus(), - }; - // explicitly set outputData to empty array to concat later - response.outputData = []; - - let isDone = false; - let isExecutingPostInvocationHooks = false; - let resultIsPromise = false; - - const info = channel.functionLoader.getInfo(nonNullProp(msg, 'functionId')); - const asyncDoneLearnMoreLink = 'https://go.microsoft.com/fwlink/?linkid=2097909'; - - const msgCategory = `${info.name}.Invocation`; - function log(level: LogLevel, logCategory: LogCategory, ...args: any[]) { - channel.log({ - invocationId: msg.invocationId, - category: msgCategory, - message: format.apply(null, <[any, any[]]>args), - level: level, - logCategory, - }); - } - function systemLog(level: LogLevel, ...args: any[]) { - log(level, LogCategory.System, ...args); - } - function userLog(level: LogLevel, ...args: any[]) { - if (isDone && !isExecutingPostInvocationHooks) { - let badAsyncMsg = - "Warning: Unexpected call to 'log' on the context object after function execution has completed. Please check for asynchronous calls that are not awaited or calls to 'done' made before function execution completes. "; - badAsyncMsg += `Function name: ${info.name}. Invocation Id: ${msg.invocationId}. `; - badAsyncMsg += `Learn more: ${asyncDoneLearnMoreLink}`; - systemLog(LogLevel.Warning, badAsyncMsg); - } - log(level, LogCategory.User, ...args); - } - - // Log invocation details to ensure the invocation received by node worker - systemLog(LogLevel.Debug, 'Received FunctionInvocationRequest'); - - function onDone(): void { - if (isDone) { - const message = resultIsPromise - ? `Error: Choose either to return a promise or call 'done'. Do not use both in your script. Learn more: ${asyncDoneLearnMoreLink}` - : "Error: 'done' has already been called. Please check your script for extraneous calls to 'done'."; - systemLog(LogLevel.Error, message); - } - isDone = true; - } - - let { context, inputs, doneEmitter } = CreateContextAndInputs(info, msg, userLog); - try { - const legacyDoneTask = new Promise((resolve, reject) => { - doneEmitter.on('done', (err?: unknown, result?: any) => { - onDone(); - if (isError(err)) { - reject(err); - } else { - resolve(result); - } - }); - }); - - const hookData: HookData = {}; - let userFunction = channel.functionLoader.getFunc(nonNullProp(msg, 'functionId')); - const preInvocContext: PreInvocationContext = { - hookData, - invocationContext: context, - functionCallback: userFunction, - inputs, - }; - - await channel.executeHooks('preInvocation', preInvocContext, msg.invocationId, msgCategory); - inputs = preInvocContext.inputs; - userFunction = preInvocContext.functionCallback; - - let rawResult = userFunction(context, ...inputs); - resultIsPromise = rawResult && typeof rawResult.then === 'function'; - let resultTask: Promise; - if (resultIsPromise) { - rawResult = Promise.resolve(rawResult).then((r) => { - onDone(); - return r; - }); - resultTask = Promise.race([rawResult, legacyDoneTask]); - } else { - resultTask = legacyDoneTask; - } - - const postInvocContext: PostInvocationContext = { - hookData, - invocationContext: context, - inputs, - result: null, - error: null, - }; - try { - postInvocContext.result = await resultTask; - } catch (err) { - postInvocContext.error = err; - } - - try { - isExecutingPostInvocationHooks = true; - await channel.executeHooks('postInvocation', postInvocContext, msg.invocationId, msgCategory); - } finally { - isExecutingPostInvocationHooks = false; - } - - if (isError(postInvocContext.error)) { - throw postInvocContext.error; - } - const result = postInvocContext.result; - - // Allow HTTP response from context.res if HTTP response is not defined from the context.bindings object - if (info.httpOutputName && context.res && context.bindings[info.httpOutputName] === undefined) { - context.bindings[info.httpOutputName] = context.res; - } - - // As legacy behavior, falsy values get serialized to `null` in AzFunctions. - // This breaks Durable Functions expectations, where customers expect any - // JSON-serializable values to be preserved by the framework, - // so we check if we're serializing for durable and, if so, ensure falsy - // values get serialized. - const isDurableBinding = info?.bindings?.name?.type == 'activityTrigger'; - - const returnBinding = info.getReturnBinding(); - // Set results from return / context.done - if (result || (isDurableBinding && result != null)) { - // $return binding is found: return result data to $return binding - if (returnBinding) { - response.returnValue = returnBinding.converter(result); - // $return binding is not found: read result as object of outputs - } else { - response.outputData = Object.keys(info.outputBindings) - .filter((key) => result[key] !== undefined) - .map( - (key) => - { - name: key, - data: info.outputBindings[key].converter(result[key]), - } - ); - } - // returned value does not match any output bindings (named or $return) - // if not http, pass along value - if (!response.returnValue && response.outputData.length == 0 && !info.hasHttpTrigger) { - response.returnValue = toTypedData(result); - } - } - // Set results from context.bindings - if (context.bindings) { - response.outputData = response.outputData.concat( - Object.keys(info.outputBindings) - // Data from return prioritized over data from context.bindings - .filter((key) => { - const definedInBindings: boolean = context.bindings[key] !== undefined; - const hasReturnValue = !!result; - const hasReturnBinding = !!returnBinding; - const definedInReturn: boolean = - hasReturnValue && !hasReturnBinding && result[key] !== undefined; - return definedInBindings && !definedInReturn; - }) - .map( - (key) => - { - name: key, - data: info.outputBindings[key].converter(context.bindings[key]), - } - ) - ); - } - } catch (err) { - response.result = toRpcStatus(err); - isDone = true; - } - - channel.eventStream.write({ - requestId: requestId, - invocationResponse: response, - }); -} diff --git a/src/eventHandlers/workerInitRequest.ts b/src/eventHandlers/workerInitRequest.ts deleted file mode 100644 index 19f66a18..00000000 --- a/src/eventHandlers/workerInitRequest.ts +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. - -import { access, constants } from 'fs'; -import * as path from 'path'; -import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { isError } from '../utils/ensureErrorType'; -import { InternalException } from '../utils/InternalException'; -import { systemError } from '../utils/Logger'; -import { toRpcStatus } from '../utils/toRpcStatus'; -import { WorkerChannel } from '../WorkerChannel'; -import LogCategory = rpc.RpcLog.RpcLogCategory; -import LogLevel = rpc.RpcLog.Level; - -/** - * Host sends capabilities/init data to worker and requests the worker to initialize itself - * @param requestId gRPC message request id - * @param msg gRPC message content - */ -export async function workerInitRequest(channel: WorkerChannel, requestId: string, msg: rpc.IWorkerInitRequest) { - // Validate version - const version = process.version; - if ( - (version.startsWith('v17.') || version.startsWith('v15.')) && - process.env.AZURE_FUNCTIONS_ENVIRONMENT == 'Development' - ) { - const msg = - 'Node.js version used (' + - version + - ') is not officially supported. You may use it during local development, but must use an officially supported version on Azure:' + - ' https://aka.ms/functions-node-versions'; - channel.log({ - message: msg, - level: LogLevel.Warning, - logCategory: LogCategory.System, - }); - } else if (!(version.startsWith('v14.') || version.startsWith('v16.'))) { - const errorMsg = - 'Incompatible Node.js version' + - ' (' + - version + - ').' + - ' The version of the Azure Functions runtime you are using (v4) supports Node.js v14.x or Node.js v16.x' + - ' Refer to our documentation to see the Node.js versions supported by each version of Azure Functions: https://aka.ms/functions-node-versions'; - systemError(errorMsg); - throw new InternalException(errorMsg); - } - - logColdStartWarning(channel); - if (msg.functionAppDirectory) { - await channel.updatePackageJson(msg.functionAppDirectory); - } - - const workerCapabilities = { - RpcHttpTriggerMetadataRemoved: 'true', - RpcHttpBodyOnly: 'true', - IgnoreEmptyValuedRpcHttpHeaders: 'true', - UseNullableValueDictionaryForHttp: 'true', - WorkerStatus: 'true', - TypedDataCollection: 'true', - }; - - channel.eventStream.write({ - requestId: requestId, - workerInitResponse: { - result: toRpcStatus(), - capabilities: workerCapabilities, - }, - }); -} - -export function logColdStartWarning(channel: WorkerChannel, delayInMs?: number): void { - // On reading a js file with function code('require') NodeJs tries to find 'package.json' all the way up to the file system root. - // In Azure files it causes a delay during cold start as connection to Azure Files is an expensive operation. - if ( - process.env.WEBSITE_CONTENTAZUREFILECONNECTIONSTRING && - process.env.WEBSITE_CONTENTSHARE && - process.env.AzureWebJobsScriptRoot - ) { - // Add delay to avoid affecting coldstart - if (!delayInMs) { - delayInMs = 5000; - } - setTimeout(() => { - access(path.join(process.env.AzureWebJobsScriptRoot!, 'package.json'), constants.F_OK, (err) => { - if (isError(err)) { - channel.log({ - message: - 'package.json is not found at the root of the Function App in Azure Files - cold start for NodeJs can be affected.', - level: LogLevel.Debug, - logCategory: LogCategory.System, - }); - } - }); - }, delayInMs); - } -} diff --git a/src/eventHandlers/workerStatusRequest.ts b/src/eventHandlers/workerStatusRequest.ts deleted file mode 100644 index 5e288dbc..00000000 --- a/src/eventHandlers/workerStatusRequest.ts +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. - -import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { WorkerChannel } from '../WorkerChannel'; - -/** - * Worker sends the host empty response to evaluate the worker's latency - */ -export function workerStatusRequest(channel: WorkerChannel, requestId: string, _msg: rpc.IWorkerStatusRequest): void { - const workerStatusResponse: rpc.IWorkerStatusResponse = {}; - channel.eventStream.write({ - requestId: requestId, - workerStatusResponse, - }); -} diff --git a/src/setupEventStream.ts b/src/setupEventStream.ts index 3d909c2d..735daa5d 100644 --- a/src/setupEventStream.ts +++ b/src/setupEventStream.ts @@ -2,11 +2,12 @@ // Licensed under the MIT License. import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-worker-protobuf/src/rpc'; -import { functionEnvironmentReloadRequest } from './eventHandlers/functionEnvironmentReloadRequest'; -import { functionLoadRequest } from './eventHandlers/functionLoadRequest'; -import { invocationRequest } from './eventHandlers/invocationRequest'; -import { workerInitRequest } from './eventHandlers/workerInitRequest'; -import { workerStatusRequest } from './eventHandlers/workerStatusRequest'; +import { EventHandler, SupportedRequest } from './eventHandlers/EventHandler'; +import { FunctionEnvironmentReloadHandler } from './eventHandlers/FunctionEnvironmentReloadHandler'; +import { FunctionLoadHandler } from './eventHandlers/FunctionLoadHandler'; +import { InvocationHandler } from './eventHandlers/InvocationHandler'; +import { WorkerInitHandler } from './eventHandlers/WorkerInitHandler'; +import { ensureErrorType } from './utils/ensureErrorType'; import { InternalException } from './utils/InternalException'; import { systemError } from './utils/Logger'; import { nonNullProp } from './utils/nonNull'; @@ -22,23 +23,54 @@ import LogLevel = rpc.RpcLog.Level; */ export function setupEventStream(workerId: string, channel: WorkerChannel): void { channel.eventStream.on('data', (msg) => { - const eventName = msg.content; + void handleMessage(workerId, channel, msg); + }); + + channel.eventStream.on('error', function (err) { + systemError(`Worker ${workerId} encountered event stream error: `, err); + throw new InternalException(err); + }); + + // wrap event stream write to validate message correctness + const oldWrite = channel.eventStream.write; + channel.eventStream.write = function checkWrite(msg) { + const msgError = rpc.StreamingMessage.verify(msg); + if (msgError) { + systemError(`Worker ${workerId} malformed message`, msgError); + throw new InternalException(msgError); + } + oldWrite.apply(channel.eventStream, [msg]); + }; +} + +async function handleMessage(workerId: string, channel: WorkerChannel, inMsg: rpc.StreamingMessage): Promise { + const outMsg: rpc.IStreamingMessage = { + requestId: inMsg.requestId, + }; + + let eventHandler: EventHandler | undefined; + let request: SupportedRequest | undefined; + try { + const eventName = inMsg.content; switch (eventName) { case 'functionEnvironmentReloadRequest': - void functionEnvironmentReloadRequest(channel, msg.requestId, nonNullProp(msg, eventName)); + eventHandler = new FunctionEnvironmentReloadHandler(); break; case 'functionLoadRequest': - void functionLoadRequest(channel, msg.requestId, nonNullProp(msg, eventName)); + eventHandler = new FunctionLoadHandler(); break; case 'invocationRequest': - void invocationRequest(channel, msg.requestId, nonNullProp(msg, eventName)); + eventHandler = new InvocationHandler(); break; case 'workerInitRequest': - void workerInitRequest(channel, msg.requestId, nonNullProp(msg, eventName)); + eventHandler = new WorkerInitHandler(); break; case 'workerStatusRequest': - workerStatusRequest(channel, msg.requestId, nonNullProp(msg, eventName)); - break; + // Worker sends the host empty response to evaluate the worker's latency + // The response doesn't even allow a `result` property, which is why we don't implement an EventHandler class + outMsg.workerStatusResponse = {}; + channel.eventStream.write(outMsg); + return; case 'closeSharedMemoryResourcesRequest': case 'fileChangeEventRequest': case 'functionLoadRequestCollection': @@ -48,29 +80,39 @@ export function setupEventStream(workerId: string, channel: WorkerChannel): void case 'workerHeartbeat': case 'workerTerminate': // Not yet implemented - break; + return; default: - channel.log({ - message: `Worker ${workerId} had no handler for message '${eventName}'`, - level: LogLevel.Error, - logCategory: LogCategory.System, - }); + throw new InternalException(`Worker ${workerId} had no handler for message '${eventName}'`); } - }); - channel.eventStream.on('error', function (err) { - systemError(`Worker ${workerId} encountered event stream error: `, err); - throw new InternalException(err); - }); + request = nonNullProp(inMsg, eventName); + const response = await eventHandler.handleEvent(channel, request); + response.result = { status: rpc.StatusResult.Status.Success }; + outMsg[eventHandler.responseName] = response; + } catch (err) { + const error = ensureErrorType(err); + if (error.isAzureFunctionsInternalException) { + channel.log({ + message: error.message, + level: LogLevel.Error, + logCategory: LogCategory.System, + }); + } - // wrap event stream write to validate message correctness - const oldWrite = channel.eventStream.write; - channel.eventStream.write = function checkWrite(msg) { - const msgError = rpc.StreamingMessage.verify(msg); - if (msgError) { - systemError(`Worker ${workerId} malformed message`, msgError); - throw new InternalException(msgError); + if (eventHandler && request) { + const response = eventHandler.getDefaultResponse(request); + response.result = { + status: rpc.StatusResult.Status.Failure, + exception: { + message: error.message, + stackTrace: error.stack, + }, + }; + outMsg[eventHandler.responseName] = response; } - oldWrite.apply(channel.eventStream, [msg]); - }; + } + + if (eventHandler) { + channel.eventStream.write(outMsg); + } } diff --git a/src/utils/nonNull.ts b/src/utils/nonNull.ts index 432b1a61..231fabcc 100644 --- a/src/utils/nonNull.ts +++ b/src/utils/nonNull.ts @@ -1,6 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. +import { InternalException } from './InternalException'; + /** * Retrieves a property by name from an object and checks that it's not null and not undefined. It is strongly typed * for the property and will give a compile error if the given name is not a property of the source. @@ -18,7 +20,7 @@ export function nonNullProp( */ export function nonNullValue(value: T | undefined, propertyNameOrMessage?: string): T { if (value === null || value === undefined) { - throw new Error( + throw new InternalException( 'Internal error: Expected value to be neither null nor undefined' + (propertyNameOrMessage ? `: ${propertyNameOrMessage}` : '') ); diff --git a/src/utils/toRpcStatus.ts b/src/utils/toRpcStatus.ts deleted file mode 100644 index cfbbf60f..00000000 --- a/src/utils/toRpcStatus.ts +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. - -import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { ensureErrorType, isError } from './ensureErrorType'; - -export function toRpcStatus(err?: unknown, errorMessage?: string): rpc.IStatusResult { - const status: rpc.IStatusResult = { - status: rpc.StatusResult.Status.Success, - }; - - if (isError(err)) { - const error = ensureErrorType(err); - status.status = rpc.StatusResult.Status.Failure; - status.exception = { - message: errorMessage || error.message, - stackTrace: error.stack, - }; - } - - return status; -} diff --git a/test/eventHandlers/functionEnvironmentReloadRequest.test.ts b/test/eventHandlers/FunctionEnvironmentReloadHandler.test.ts similarity index 96% rename from test/eventHandlers/functionEnvironmentReloadRequest.test.ts rename to test/eventHandlers/FunctionEnvironmentReloadHandler.test.ts index 559ffd57..0e1e9b1b 100644 --- a/test/eventHandlers/functionEnvironmentReloadRequest.test.ts +++ b/test/eventHandlers/FunctionEnvironmentReloadHandler.test.ts @@ -65,7 +65,7 @@ namespace Msg { ); } -describe('functionEnvironmentReloadRequest', () => { +describe('FunctionEnvironmentReloadHandler', () => { let stream: TestEventStream; let channel: WorkerChannel; diff --git a/test/eventHandlers/functionLoadRequest.test.ts b/test/eventHandlers/FunctionLoadHandler.test.ts similarity index 95% rename from test/eventHandlers/functionLoadRequest.test.ts rename to test/eventHandlers/FunctionLoadHandler.test.ts index 36a931b8..0ed4f781 100644 --- a/test/eventHandlers/functionLoadRequest.test.ts +++ b/test/eventHandlers/FunctionLoadHandler.test.ts @@ -11,7 +11,7 @@ import { TestEventStream } from './TestEventStream'; import LogCategory = rpc.RpcLog.RpcLogCategory; import LogLevel = rpc.RpcLog.Level; -describe('functionLoadRequest', () => { +describe('FunctionLoadHandler', () => { let stream: TestEventStream; let loader: sinon.SinonStubbedInstance; diff --git a/test/eventHandlers/invocationRequest.test.ts b/test/eventHandlers/InvocationHandler.test.ts similarity index 97% rename from test/eventHandlers/invocationRequest.test.ts rename to test/eventHandlers/InvocationHandler.test.ts index 2945fd79..309932c8 100644 --- a/test/eventHandlers/invocationRequest.test.ts +++ b/test/eventHandlers/InvocationHandler.test.ts @@ -239,7 +239,6 @@ namespace Msg { stackTrace: 'testErrorStack', }, }, - outputData: [], }, }; export function receivedInvocLog(): rpc.IStreamingMessage { @@ -320,7 +319,7 @@ namespace InputData { }; } -describe('invocationRequest', () => { +describe('InvocationHandler', () => { let stream: TestEventStream; let loader: sinon.SinonStubbedInstance; let coreApi: typeof coreTypes; diff --git a/test/eventHandlers/workerInitRequest.test.ts b/test/eventHandlers/WorkerInitHandler.test.ts similarity index 94% rename from test/eventHandlers/workerInitRequest.test.ts rename to test/eventHandlers/WorkerInitHandler.test.ts index 667e67ec..edf93d82 100644 --- a/test/eventHandlers/workerInitRequest.test.ts +++ b/test/eventHandlers/WorkerInitHandler.test.ts @@ -5,7 +5,7 @@ import { expect } from 'chai'; import 'mocha'; import * as mock from 'mock-fs'; import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc'; -import { logColdStartWarning } from '../../src/eventHandlers/workerInitRequest'; +import { logColdStartWarning } from '../../src/eventHandlers/WorkerInitHandler'; import { WorkerChannel } from '../../src/WorkerChannel'; import { beforeEventHandlerSuite } from './beforeEventHandlerSuite'; import { TestEventStream } from './TestEventStream'; @@ -61,7 +61,7 @@ namespace Msg { }; } -describe('workerInitRequest', () => { +describe('WorkerInitHandler', () => { let channel: WorkerChannel; let stream: TestEventStream; diff --git a/test/eventHandlers/workerStatusRequest.test.ts b/test/eventHandlers/WorkerStatusHandler.test.ts similarity index 91% rename from test/eventHandlers/workerStatusRequest.test.ts rename to test/eventHandlers/WorkerStatusHandler.test.ts index a8598443..a99de7ff 100644 --- a/test/eventHandlers/workerStatusRequest.test.ts +++ b/test/eventHandlers/WorkerStatusHandler.test.ts @@ -5,7 +5,7 @@ import 'mocha'; import { beforeEventHandlerSuite } from './beforeEventHandlerSuite'; import { TestEventStream } from './TestEventStream'; -describe('workerStatusRequest', () => { +describe('WorkerStatusHandler', () => { let stream: TestEventStream; before(() => {