Skip to content

Commit d3b9670

Browse files
committed
Refactor source code to use async/await instead of done
1 parent f477c4b commit d3b9670

File tree

5 files changed

+156
-181
lines changed

5 files changed

+156
-181
lines changed

src/Context.ts

Lines changed: 16 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,12 @@ import {
2222
import { FunctionInfo } from './FunctionInfo';
2323
import { Request } from './http/Request';
2424
import { Response } from './http/Response';
25+
import EventEmitter = require('events');
2526
import LogLevel = rpc.RpcLog.Level;
26-
import LogCategory = rpc.RpcLog.RpcLogCategory;
2727

28-
export function CreateContextAndInputs(
29-
info: FunctionInfo,
30-
request: rpc.IInvocationRequest,
31-
logCallback: LogCallback,
32-
callback: ResultCallback
33-
) {
34-
const context = new InvocationContext(info, request, logCallback, callback);
28+
export function CreateContextAndInputs(info: FunctionInfo, request: rpc.IInvocationRequest, logCallback: LogCallback) {
29+
const doneEmitter = new EventEmitter();
30+
const context = new InvocationContext(info, request, logCallback, doneEmitter);
3531

3632
const bindings: ContextBindings = {};
3733
const inputs: any[] = [];
@@ -76,6 +72,7 @@ export function CreateContextAndInputs(
7672
return {
7773
context: <Context>context,
7874
inputs: inputs,
75+
doneEmitter,
7976
};
8077
}
8178

@@ -95,7 +92,7 @@ class InvocationContext implements Context {
9592
info: FunctionInfo,
9693
request: rpc.IInvocationRequest,
9794
logCallback: LogCallback,
98-
callback: ResultCallback
95+
doneEmitter: EventEmitter
9996
) {
10097
this.invocationId = <string>request.invocationId;
10198
this.traceContext = fromRpcTraceContext(request.traceContext);
@@ -107,89 +104,32 @@ class InvocationContext implements Context {
107104
};
108105
this.executionContext = executionContext;
109106
this.bindings = {};
110-
let _done = false;
111-
let _promise = false;
112107

113108
// Log message that is tied to function invocation
114-
this.log = Object.assign(
115-
(...args: any[]) => logWithAsyncCheck(_done, logCallback, LogLevel.Information, executionContext, ...args),
116-
{
117-
error: (...args: any[]) =>
118-
logWithAsyncCheck(_done, logCallback, LogLevel.Error, executionContext, ...args),
119-
warn: (...args: any[]) =>
120-
logWithAsyncCheck(_done, logCallback, LogLevel.Warning, executionContext, ...args),
121-
info: (...args: any[]) =>
122-
logWithAsyncCheck(_done, logCallback, LogLevel.Information, executionContext, ...args),
123-
verbose: (...args: any[]) =>
124-
logWithAsyncCheck(_done, logCallback, LogLevel.Trace, executionContext, ...args),
125-
}
126-
);
109+
this.log = Object.assign((...args: any[]) => logCallback(LogLevel.Information, ...args), {
110+
error: (...args: any[]) => logCallback(LogLevel.Error, ...args),
111+
warn: (...args: any[]) => logCallback(LogLevel.Warning, ...args),
112+
info: (...args: any[]) => logCallback(LogLevel.Information, ...args),
113+
verbose: (...args: any[]) => logCallback(LogLevel.Trace, ...args),
114+
});
127115

128116
this.bindingData = getNormalizedBindingData(request);
129117
this.bindingDefinitions = getBindingDefinitions(info);
130118

131-
// isPromise is a hidden parameter that we set to true in the event of a returned promise
132-
this.done = (err?: any, result?: any, isPromise?: boolean) => {
133-
_promise = isPromise === true;
134-
if (_done) {
135-
if (_promise) {
136-
logCallback(
137-
LogLevel.Error,
138-
LogCategory.User,
139-
"Error: Choose either to return a promise or call 'done'. Do not use both in your script."
140-
);
141-
} else {
142-
logCallback(
143-
LogLevel.Error,
144-
LogCategory.User,
145-
"Error: 'done' has already been called. Please check your script for extraneous calls to 'done'."
146-
);
147-
}
148-
return;
149-
}
150-
_done = true;
151-
152-
// Allow HTTP response from context.res if HTTP response is not defined from the context.bindings object
153-
if (info.httpOutputName && this.res && this.bindings[info.httpOutputName] === undefined) {
154-
this.bindings[info.httpOutputName] = this.res;
155-
}
156-
157-
callback(err, {
158-
return: result,
159-
bindings: this.bindings,
160-
});
119+
this.done = (err?: unknown, result?: any) => {
120+
doneEmitter.emit('done', err, result);
161121
};
162122
}
163123
}
164124

165-
// Emit warning if trying to log after function execution is done.
166-
function logWithAsyncCheck(
167-
done: boolean,
168-
log: LogCallback,
169-
level: LogLevel,
170-
executionContext: ExecutionContext,
171-
...args: any[]
172-
) {
173-
if (done) {
174-
let badAsyncMsg =
175-
"Warning: Unexpected call to 'log' on the context object after function execution has completed. Please check for asynchronous calls that are not awaited or calls to 'done' made before function execution completes. ";
176-
badAsyncMsg += `Function name: ${executionContext.functionName}. Invocation Id: ${executionContext.invocationId}. `;
177-
badAsyncMsg += `Learn more: https://go.microsoft.com/fwlink/?linkid=2097909 `;
178-
log(LogLevel.Warning, LogCategory.System, badAsyncMsg);
179-
}
180-
return log(level, LogCategory.User, ...args);
181-
}
182-
183125
export interface InvocationResult {
184126
return: any;
185127
bindings: ContextBindings;
186128
}
187129

188-
export type DoneCallback = (err?: Error | string, result?: any) => void;
189-
190-
export type LogCallback = (level: LogLevel, category: rpc.RpcLog.RpcLogCategory, ...args: any[]) => void;
130+
export type DoneCallback = (err?: unknown, result?: any) => void;
191131

192-
export type ResultCallback = (err?: any, result?: InvocationResult) => void;
132+
export type LogCallback = (level: LogLevel, ...args: any[]) => void;
193133

194134
export interface Dict<T> {
195135
[key: string]: T;

src/WorkerChannel.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ export class WorkerChannel {
116116
* @param requestId gRPC message request id
117117
* @param msg gRPC message content
118118
*/
119-
public invocationRequest(requestId: string, msg: rpc.InvocationRequest) {
120-
invocationRequest(this, requestId, msg);
119+
public async invocationRequest(requestId: string, msg: rpc.InvocationRequest) {
120+
await invocationRequest(this, requestId, msg);
121121
}
122122

123123
/**

src/eventHandlers/invocationRequest.ts

Lines changed: 125 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
import { format } from 'util';
55
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
6-
import { CreateContextAndInputs, LogCallback, ResultCallback } from '../Context';
6+
import { CreateContextAndInputs } from '../Context';
77
import { toTypedData } from '../converters';
8+
import { isError } from '../utils/ensureErrorType';
89
import { toRpcStatus } from '../utils/toRpcStatus';
910
import { WorkerChannel } from '../WorkerChannel';
1011
import LogCategory = rpc.RpcLog.RpcLogCategory;
@@ -15,28 +16,89 @@ import LogLevel = rpc.RpcLog.Level;
1516
* @param requestId gRPC message request id
1617
* @param msg gRPC message content
1718
*/
18-
export function invocationRequest(channel: WorkerChannel, requestId: string, msg: rpc.InvocationRequest) {
19+
export async function invocationRequest(channel: WorkerChannel, requestId: string, msg: rpc.InvocationRequest) {
20+
const response: rpc.IInvocationResponse = {
21+
invocationId: msg.invocationId,
22+
result: toRpcStatus(),
23+
};
24+
// explicitly set outputData to empty array to concat later
25+
response.outputData = [];
26+
27+
let isDone = false;
28+
let resultIsPromise = false;
29+
1930
const info = channel.functionLoader.getInfo(msg.functionId);
20-
const logCallback: LogCallback = (level, category, ...args) => {
31+
32+
function log(level: LogLevel, category: LogCategory, ...args: any[]) {
2133
channel.log({
2234
invocationId: msg.invocationId,
2335
category: `${info.name}.Invocation`,
2436
message: format.apply(null, <[any, any[]]>args),
2537
level: level,
2638
logCategory: category,
2739
});
28-
};
40+
}
41+
function systemLog(level: LogLevel, ...args: any[]) {
42+
log(level, LogCategory.System, ...args);
43+
}
44+
function userLog(level: LogLevel, ...args: any[]) {
45+
if (isDone) {
46+
let badAsyncMsg =
47+
"Warning: Unexpected call to 'log' on the context object after function execution has completed. Please check for asynchronous calls that are not awaited or calls to 'done' made before function execution completes. ";
48+
badAsyncMsg += `Function name: ${info.name}. Invocation Id: ${msg.invocationId}. `;
49+
badAsyncMsg += `Learn more: https://go.microsoft.com/fwlink/?linkid=2097909 `;
50+
systemLog(LogLevel.Warning, badAsyncMsg);
51+
}
52+
log(level, LogCategory.User, ...args);
53+
}
2954

3055
// Log invocation details to ensure the invocation received by node worker
31-
logCallback(LogLevel.Debug, LogCategory.System, 'Received FunctionInvocationRequest');
56+
systemLog(LogLevel.Debug, 'Received FunctionInvocationRequest');
3257

33-
const resultCallback: ResultCallback = (err: unknown, result) => {
34-
const response: rpc.IInvocationResponse = {
35-
invocationId: msg.invocationId,
36-
result: toRpcStatus(err),
37-
};
38-
// explicitly set outputData to empty array to concat later
39-
response.outputData = [];
58+
function onDone(): void {
59+
if (isDone) {
60+
const message = resultIsPromise
61+
? "Error: Choose either to return a promise or call 'done'. Do not use both in your script."
62+
: "Error: 'done' has already been called. Please check your script for extraneous calls to 'done'.";
63+
systemLog(LogLevel.Error, message);
64+
}
65+
isDone = true;
66+
}
67+
68+
const { context, inputs, doneEmitter } = CreateContextAndInputs(info, msg, userLog);
69+
try {
70+
const legacyDoneTask = new Promise((resolve, reject) => {
71+
doneEmitter.on('done', (err?: unknown, result?: any) => {
72+
onDone();
73+
if (isError(err)) {
74+
reject(err);
75+
} else {
76+
resolve(result);
77+
}
78+
});
79+
});
80+
81+
let userFunction = channel.functionLoader.getFunc(msg.functionId);
82+
userFunction = channel.runInvocationRequestBefore(context, userFunction);
83+
let rawResult = userFunction(context, ...inputs);
84+
resultIsPromise = rawResult && typeof rawResult.then === 'function';
85+
let resultTask: Promise<any>;
86+
if (resultIsPromise) {
87+
rawResult = Promise.resolve(rawResult).then((r) => {
88+
onDone();
89+
return r;
90+
});
91+
resultTask = Promise.race([rawResult, legacyDoneTask]);
92+
} else {
93+
resultTask = legacyDoneTask;
94+
}
95+
96+
const result = await resultTask;
97+
98+
// Allow HTTP response from context.res if HTTP response is not defined from the context.bindings object
99+
if (info.httpOutputName && context.res && context.bindings[info.httpOutputName] === undefined) {
100+
context.bindings[info.httpOutputName] = context.res;
101+
}
40102

41103
// As legacy behavior, falsy values get serialized to `null` in AzFunctions.
42104
// This breaks Durable Functions expectations, where customers expect any
@@ -45,86 +107,61 @@ export function invocationRequest(channel: WorkerChannel, requestId: string, msg
45107
// values get serialized.
46108
const isDurableBinding = info?.bindings?.name?.type == 'activityTrigger';
47109

48-
try {
49-
if (result || (isDurableBinding && result != null)) {
50-
const returnBinding = info.getReturnBinding();
51-
// Set results from return / context.done
52-
if (result.return || (isDurableBinding && result.return != null)) {
53-
// $return binding is found: return result data to $return binding
54-
if (returnBinding) {
55-
response.returnValue = returnBinding.converter(result.return);
56-
// $return binding is not found: read result as object of outputs
57-
} else {
58-
response.outputData = Object.keys(info.outputBindings)
59-
.filter((key) => result.return[key] !== undefined)
60-
.map(
61-
(key) =>
62-
<rpc.IParameterBinding>{
63-
name: key,
64-
data: info.outputBindings[key].converter(result.return[key]),
65-
}
66-
);
67-
}
68-
// returned value does not match any output bindings (named or $return)
69-
// if not http, pass along value
70-
if (!response.returnValue && response.outputData.length == 0 && !info.hasHttpTrigger) {
71-
response.returnValue = toTypedData(result.return);
72-
}
73-
}
74-
// Set results from context.bindings
75-
if (result.bindings) {
76-
response.outputData = response.outputData.concat(
77-
Object.keys(info.outputBindings)
78-
// Data from return prioritized over data from context.bindings
79-
.filter((key) => {
80-
const definedInBindings: boolean = result.bindings[key] !== undefined;
81-
const hasReturnValue = !!result.return;
82-
const hasReturnBinding = !!returnBinding;
83-
const definedInReturn: boolean =
84-
hasReturnValue && !hasReturnBinding && result.return[key] !== undefined;
85-
return definedInBindings && !definedInReturn;
86-
})
87-
.map(
88-
(key) =>
89-
<rpc.IParameterBinding>{
90-
name: key,
91-
data: info.outputBindings[key].converter(result.bindings[key]),
92-
}
93-
)
110+
const returnBinding = info.getReturnBinding();
111+
// Set results from return / context.done
112+
if (result || (isDurableBinding && result != null)) {
113+
// $return binding is found: return result data to $return binding
114+
if (returnBinding) {
115+
response.returnValue = returnBinding.converter(result);
116+
// $return binding is not found: read result as object of outputs
117+
} else {
118+
response.outputData = Object.keys(info.outputBindings)
119+
.filter((key) => result[key] !== undefined)
120+
.map(
121+
(key) =>
122+
<rpc.IParameterBinding>{
123+
name: key,
124+
data: info.outputBindings[key].converter(result[key]),
125+
}
94126
);
95-
}
96127
}
97-
} catch (err) {
98-
response.result = toRpcStatus(err);
128+
// returned value does not match any output bindings (named or $return)
129+
// if not http, pass along value
130+
if (!response.returnValue && response.outputData.length == 0 && !info.hasHttpTrigger) {
131+
response.returnValue = toTypedData(result);
132+
}
99133
}
100-
channel.eventStream.write({
101-
requestId: requestId,
102-
invocationResponse: response,
103-
});
104-
105-
channel.runInvocationRequestAfter(context);
106-
};
107-
108-
const { context, inputs } = CreateContextAndInputs(info, msg, logCallback, resultCallback);
109-
let userFunction = channel.functionLoader.getFunc(msg.functionId);
110-
111-
userFunction = channel.runInvocationRequestBefore(context, userFunction);
112-
113-
// catch user errors from the same async context in the event loop and correlate with invocation
114-
// throws from asynchronous work (setTimeout, etc) are caught by 'unhandledException' and cannot be correlated with invocation
115-
try {
116-
const result = userFunction(context, ...inputs);
117-
118-
if (result && typeof result.then === 'function') {
119-
result
120-
.then((result) => {
121-
(<any>context.done)(null, result, true);
122-
})
123-
.catch((err) => {
124-
(<any>context.done)(err, null, true);
125-
});
134+
// Set results from context.bindings
135+
if (context.bindings) {
136+
response.outputData = response.outputData.concat(
137+
Object.keys(info.outputBindings)
138+
// Data from return prioritized over data from context.bindings
139+
.filter((key) => {
140+
const definedInBindings: boolean = context.bindings[key] !== undefined;
141+
const hasReturnValue = !!result;
142+
const hasReturnBinding = !!returnBinding;
143+
const definedInReturn: boolean =
144+
hasReturnValue && !hasReturnBinding && result[key] !== undefined;
145+
return definedInBindings && !definedInReturn;
146+
})
147+
.map(
148+
(key) =>
149+
<rpc.IParameterBinding>{
150+
name: key,
151+
data: info.outputBindings[key].converter(context.bindings[key]),
152+
}
153+
)
154+
);
126155
}
127156
} catch (err) {
128-
resultCallback(err);
157+
response.result = toRpcStatus(err);
158+
isDone = true;
129159
}
160+
161+
channel.eventStream.write({
162+
requestId: requestId,
163+
invocationResponse: response,
164+
});
165+
166+
channel.runInvocationRequestAfter(context);
130167
}

0 commit comments

Comments
 (0)