Skip to content

introduce new IncrementalPublisher class #3894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
473 changes: 318 additions & 155 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { ObjMap } from '../jsutils/ObjMap.js';
import type { Path } from '../jsutils/Path.js';
import { pathToArray } from '../jsutils/Path.js';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type {
@@ -85,237 +84,401 @@ export type FormattedIncrementalResult<
| FormattedIncrementalDeferResult<TData, TExtensions>
| FormattedIncrementalStreamResult<TData, TExtensions>;

export function yieldSubsequentPayloads(
subsequentPayloads: Set<IncrementalDataRecord>,
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;
/**
* This class is used to publish incremental results to the client, enabling semi-concurrent
* execution while preserving result order.
*
* The internal publishing state is managed as follows:
*
* '_released': the set of Incremental Data records that are ready to be sent to the client,
* i.e. their parents have completed and they have also completed.
*
* `_pending`: the set of Incremental Data records that are definitely pending, i.e. their
* parents have completed so that they can no longer be filtered. This includes all Incremental
* Data records in `released`, as well as Incremental Data records that have not yet completed.
*
* `_initialResult`: a record containing the state of the initial result, as follows:
* `isCompleted`: indicates whether the initial result has completed.
* `children`: the set of Incremental Data records that can be be published when the initial
* result is completed.
*
* Each Incremental Data record also contains similar metadata, i.e. these records also contain
* similar `isCompleted` and `children` properties.
*
* @internal
*/
export class IncrementalPublisher {
private _initialResult: {
children: Set<IncrementalDataRecord>;
isCompleted: boolean;
};

private _released: Set<IncrementalDataRecord>;
private _pending: Set<IncrementalDataRecord>;

// these are assigned within the Promise executor called synchronously within the constructor
private _signalled!: Promise<unknown>;
private _resolve!: () => void;

constructor() {
this._initialResult = {
children: new Set(),
isCompleted: false,
};
this._released = new Set();
this._pending = new Set();
this._reset();
}

hasNext(): boolean {
return this._pending.size > 0;
}

async function next(): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
void
> {
if (isDone) {
return { value: undefined, done: true };
}
let isDone = false;

await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise));
const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
// eslint-disable-next-line no-constant-condition
while (true) {
if (isDone) {
return { value: undefined, done: true };
}

if (isDone) {
// a different call to next has exhausted all payloads
return { value: undefined, done: true };
}
for (const item of this._released) {
this._pending.delete(item);
}
const released = this._released;
this._released = new Set();

const incremental = getCompletedIncrementalResults(subsequentPayloads);
const hasNext = subsequentPayloads.size > 0;
const result = this._getIncrementalResult(released);

if (!incremental.length && hasNext) {
return next();
}
if (!this.hasNext()) {
isDone = true;
}

if (!hasNext) {
isDone = true;
}
if (result !== undefined) {
return { value: result, done: false };
}

return {
value: incremental.length ? { incremental, hasNext } : { hasNext },
done: false,
// eslint-disable-next-line no-await-in-loop
await this._signalled;
}
};
}

function returnStreamIterators() {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
subsequentPayloads.forEach((incrementalDataRecord) => {
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
promises.push(incrementalDataRecord.asyncIterator.return());
}
});
return Promise.all(promises);
}
const returnStreamIterators = async (): Promise<void> => {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
this._pending.forEach((incrementalDataRecord) => {
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
promises.push(incrementalDataRecord.asyncIterator.return());
}
});
await Promise.all(promises);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next,
async return(): Promise<
const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> {
await returnStreamIterators();
> => {
isDone = true;
await returnStreamIterators();
return { value: undefined, done: true };
},
async throw(
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
await returnStreamIterators();
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
await returnStreamIterators();
return Promise.reject(error);
},
};
}
};

function getCompletedIncrementalResults(
subsequentPayloads: Set<IncrementalDataRecord>,
): Array<IncrementalResult> {
const incrementalResults: Array<IncrementalResult> = [];
for (const incrementalDataRecord of subsequentPayloads) {
const incrementalResult: IncrementalResult = {};
if (!incrementalDataRecord.isCompleted) {
continue;
}
subsequentPayloads.delete(incrementalDataRecord);
if (isStreamItemsRecord(incrementalDataRecord)) {
const items = incrementalDataRecord.items;
if (incrementalDataRecord.isCompletedAsyncIterator) {
// async iterable resolver just finished but there may be pending payloads
continue;
}
(incrementalResult as IncrementalStreamResult).items = items;
return {
[Symbol.asyncIterator]() {
return this;
},
next: _next,
return: _return,
throw: _throw,
};
}

prepareNewDeferredFragmentRecord(opts: {
label: string | undefined;
path: Path | undefined;
parentContext: IncrementalDataRecord | undefined;
}): DeferredFragmentRecord {
const deferredFragmentRecord = new DeferredFragmentRecord(opts);

const parentContext = opts.parentContext;
if (parentContext) {
parentContext.children.add(deferredFragmentRecord);
} else {
const data = incrementalDataRecord.data;
(incrementalResult as IncrementalDeferResult).data = data ?? null;
this._initialResult.children.add(deferredFragmentRecord);
}

incrementalResult.path = incrementalDataRecord.path;
if (incrementalDataRecord.label != null) {
incrementalResult.label = incrementalDataRecord.label;
}
if (incrementalDataRecord.errors.length > 0) {
incrementalResult.errors = incrementalDataRecord.errors;
return deferredFragmentRecord;
}

prepareNewStreamItemsRecord(opts: {
label: string | undefined;
path: Path | undefined;
asyncIterator?: AsyncIterator<unknown>;
parentContext: IncrementalDataRecord | undefined;
}): StreamItemsRecord {
const streamItemsRecord = new StreamItemsRecord(opts);

const parentContext = opts.parentContext;
if (parentContext) {
parentContext.children.add(streamItemsRecord);
} else {
this._initialResult.children.add(streamItemsRecord);
}
incrementalResults.push(incrementalResult);

return streamItemsRecord;
}

completeDeferredFragmentRecord(
deferredFragmentRecord: DeferredFragmentRecord,
data: ObjMap<unknown> | null,
): void {
deferredFragmentRecord.data = data;
deferredFragmentRecord.isCompleted = true;
this._release(deferredFragmentRecord);
}

completeStreamItemsRecord(
streamItemsRecord: StreamItemsRecord,
items: Array<unknown> | null,
) {
streamItemsRecord.items = items;
streamItemsRecord.isCompleted = true;
this._release(streamItemsRecord);
}

setIsCompletedAsyncIterator(streamItemsRecord: StreamItemsRecord) {
streamItemsRecord.isCompletedAsyncIterator = true;
}

addFieldError(
incrementalDataRecord: IncrementalDataRecord,
error: GraphQLError,
) {
incrementalDataRecord.errors.push(error);
}
return incrementalResults;
}

export function filterSubsequentPayloads(
subsequentPayloads: Set<IncrementalDataRecord>,
nullPath: Path,
currentIncrementalDataRecord: IncrementalDataRecord | undefined,
): void {
const nullPathArray = pathToArray(nullPath);
subsequentPayloads.forEach((incrementalDataRecord) => {
if (incrementalDataRecord === currentIncrementalDataRecord) {
// don't remove payload from where error originates
return;
publishInitial() {
for (const child of this._initialResult.children) {
this._publish(child);
}
for (let i = 0; i < nullPathArray.length; i++) {
if (incrementalDataRecord.path[i] !== nullPathArray[i]) {
// incrementalDataRecord points to a path unaffected by this payload
return;
}

filter(
nullPath: Path,
erroringIncrementalDataRecord: IncrementalDataRecord | undefined,
) {
const nullPathArray = pathToArray(nullPath);

const asyncIterators = new Set<AsyncIterator<unknown>>();

const children =
erroringIncrementalDataRecord === undefined
? this._initialResult.children
: erroringIncrementalDataRecord.children;

for (const child of this._getDescendants(children)) {
if (!this._matchesPath(child.path, nullPathArray)) {
continue;
}

this._delete(child);
const parent =
child.parentContext === undefined
? this._initialResult
: child.parentContext;
parent.children.delete(child);

if (isStreamItemsRecord(child)) {
if (child.asyncIterator !== undefined) {
asyncIterators.add(child.asyncIterator);
}
}
}
// incrementalDataRecord path points to nulled error field
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
incrementalDataRecord.asyncIterator.return().catch(() => {

asyncIterators.forEach((asyncIterator) => {
asyncIterator.return?.().catch(() => {
// ignore error
});
});
}

private _trigger() {
this._resolve();
this._reset();
}

private _reset() {
// promiseWithResolvers uses void only as a generic type parameter
// see: https://typescript-eslint.io/rules/no-invalid-void-type/
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
const { promise: signalled, resolve } = promiseWithResolvers<void>();
this._resolve = resolve;
this._signalled = signalled;
}

private _introduce(item: IncrementalDataRecord) {
this._pending.add(item);
}

private _release(item: IncrementalDataRecord): void {
if (this._pending.has(item)) {
this._released.add(item);
this._trigger();
}
}

private _push(item: IncrementalDataRecord): void {
this._released.add(item);
this._pending.add(item);
this._trigger();
}

private _delete(item: IncrementalDataRecord) {
this._released.delete(item);
this._pending.delete(item);
this._trigger();
}

private _getIncrementalResult(
completedRecords: ReadonlySet<IncrementalDataRecord>,
): SubsequentIncrementalExecutionResult | undefined {
const incrementalResults: Array<IncrementalResult> = [];
let encounteredCompletedAsyncIterator = false;
for (const incrementalDataRecord of completedRecords) {
const incrementalResult: IncrementalResult = {};
for (const child of incrementalDataRecord.children) {
this._publish(child);
}
if (isStreamItemsRecord(incrementalDataRecord)) {
const items = incrementalDataRecord.items;
if (incrementalDataRecord.isCompletedAsyncIterator) {
// async iterable resolver just finished but there may be pending payloads
encounteredCompletedAsyncIterator = true;
continue;
}
(incrementalResult as IncrementalStreamResult).items = items;
} else {
const data = incrementalDataRecord.data;
(incrementalResult as IncrementalDeferResult).data = data ?? null;
}

incrementalResult.path = incrementalDataRecord.path;
if (incrementalDataRecord.label != null) {
incrementalResult.label = incrementalDataRecord.label;
}
if (incrementalDataRecord.errors.length > 0) {
incrementalResult.errors = incrementalDataRecord.errors;
}
incrementalResults.push(incrementalResult);
}

return incrementalResults.length
? { incremental: incrementalResults, hasNext: this.hasNext() }
: encounteredCompletedAsyncIterator && !this.hasNext()
? { hasNext: false }
: undefined;
}

private _publish(incrementalDataRecord: IncrementalDataRecord) {
if (incrementalDataRecord.isCompleted) {
this._push(incrementalDataRecord);
} else {
this._introduce(incrementalDataRecord);
}
}

private _getDescendants(
children: ReadonlySet<IncrementalDataRecord>,
descendants = new Set<IncrementalDataRecord>(),
): ReadonlySet<IncrementalDataRecord> {
for (const child of children) {
descendants.add(child);
this._getDescendants(child.children, descendants);
}
return descendants;
}

private _matchesPath(
testPath: Array<string | number>,
basePath: Array<string | number>,
): boolean {
for (let i = 0; i < basePath.length; i++) {
if (basePath[i] !== testPath[i]) {
// testPath points to a path unaffected at basePath
return false;
}
}
subsequentPayloads.delete(incrementalDataRecord);
});
return true;
}
}

/** @internal */
export class DeferredFragmentRecord {
type: 'defer';
errors: Array<GraphQLError>;
label: string | undefined;
path: Array<string | number>;
promise: Promise<void>;
data: ObjMap<unknown> | null;
parentContext: IncrementalDataRecord | undefined;
children: Set<IncrementalDataRecord>;
isCompleted: boolean;
_subsequentPayloads: Set<IncrementalDataRecord>;
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
constructor(opts: {
label: string | undefined;
path: Path | undefined;
parentContext: IncrementalDataRecord | undefined;
subsequentPayloads: Set<IncrementalDataRecord>;
}) {
this.type = 'defer';
this.label = opts.label;
this.path = pathToArray(opts.path);
this.parentContext = opts.parentContext;
this.errors = [];
this._subsequentPayloads = opts.subsequentPayloads;
this._subsequentPayloads.add(this);
this.children = new Set();
this.isCompleted = false;
this.data = null;
const { promise, resolve } = promiseWithResolvers<ObjMap<unknown> | null>();
this._resolve = resolve;
this.promise = promise.then((data) => {
this.data = data;
this.isCompleted = true;
});
}

addData(data: PromiseOrValue<ObjMap<unknown> | null>) {
const parentData = this.parentContext?.promise;
if (parentData) {
this._resolve?.(parentData.then(() => data));
return;
}
this._resolve?.(data);
}
}

/** @internal */
export class StreamItemsRecord {
type: 'stream';
errors: Array<GraphQLError>;
label: string | undefined;
path: Array<string | number>;
items: Array<unknown> | null;
promise: Promise<void>;
parentContext: IncrementalDataRecord | undefined;
children: Set<IncrementalDataRecord>;
asyncIterator: AsyncIterator<unknown> | undefined;
isCompletedAsyncIterator?: boolean;
isCompleted: boolean;
_subsequentPayloads: Set<IncrementalDataRecord>;
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
constructor(opts: {
label: string | undefined;
path: Path | undefined;
asyncIterator?: AsyncIterator<unknown>;
parentContext: IncrementalDataRecord | undefined;
subsequentPayloads: Set<IncrementalDataRecord>;
}) {
this.type = 'stream';
this.items = null;
this.label = opts.label;
this.path = pathToArray(opts.path);
this.parentContext = opts.parentContext;
this.asyncIterator = opts.asyncIterator;
this.errors = [];
this._subsequentPayloads = opts.subsequentPayloads;
this._subsequentPayloads.add(this);
this.children = new Set();
this.isCompleted = false;
this.items = null;
const { promise, resolve } = promiseWithResolvers<Array<unknown> | null>();
this._resolve = resolve;
this.promise = promise.then((items) => {
this.items = items;
this.isCompleted = true;
});
}

addItems(items: PromiseOrValue<Array<unknown> | null>) {
const parentData = this.parentContext?.promise;
if (parentData) {
this._resolve?.(parentData.then(() => items));
return;
}
this._resolve?.(items);
}

setIsCompletedAsyncIterator() {
this.isCompletedAsyncIterator = true;
}
}

@@ -324,5 +487,5 @@ export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord;
function isStreamItemsRecord(
incrementalDataRecord: IncrementalDataRecord,
): incrementalDataRecord is StreamItemsRecord {
return incrementalDataRecord.type === 'stream';
return incrementalDataRecord instanceof StreamItemsRecord;
}
37 changes: 14 additions & 23 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
@@ -1165,9 +1165,6 @@ describe('Execute: stream directive', () => {
],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
@@ -1191,25 +1188,19 @@ describe('Execute: stream directive', () => {
} /* c8 ignore stop */,
},
});
expectJSON(result).toDeepEqual([
{
errors: [
{
message:
'Cannot return null for non-nullable field NestedObject.nonNullScalarField.',
locations: [{ line: 4, column: 11 }],
path: ['nestedObject', 'nonNullScalarField'],
},
],
data: {
nestedObject: null,
expectJSON(result).toDeepEqual({
errors: [
{
message:
'Cannot return null for non-nullable field NestedObject.nonNullScalarField.',
locations: [{ line: 4, column: 11 }],
path: ['nestedObject', 'nonNullScalarField'],
},
hasNext: true,
},
{
hasNext: false,
],
data: {
nestedObject: null,
},
]);
});
});
it('Filters payloads that are nulled by a later synchronous error', async () => {
const document = parse(`
@@ -1293,6 +1284,9 @@ describe('Execute: stream directive', () => {
path: ['nestedObject', 'nestedFriendList', 0],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
@@ -1350,9 +1344,6 @@ describe('Execute: stream directive', () => {
],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
254 changes: 122 additions & 132 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
@@ -56,14 +56,10 @@ import type {
FormattedIncrementalResult,
IncrementalDataRecord,
IncrementalResult,
SubsequentIncrementalExecutionResult,
} from './IncrementalPublisher.js';
import {
DeferredFragmentRecord,
filterSubsequentPayloads,
StreamItemsRecord,
yieldSubsequentPayloads,
SubsequentIncrementalExecutionResult,
} from './IncrementalPublisher.js';
import { IncrementalPublisher } from './IncrementalPublisher.js';
import { mapAsyncIterable } from './mapAsyncIterable.js';
import {
getArgumentValues,
@@ -133,7 +129,7 @@ export interface ExecutionContext {
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
errors: Array<GraphQLError>;
subsequentPayloads: Set<IncrementalDataRecord>;
incrementalPublisher: IncrementalPublisher;
}

/**
@@ -293,47 +289,46 @@ function executeImpl(
// Errors from sub-fields of a NonNull type may propagate to the top level,
// at which point we still log the error and null the parent field, which
// in this case is the entire response.
const { incrementalPublisher, errors } = exeContext;
try {
const result = executeOperation(exeContext);
if (isPromise(result)) {
return result.then(
(data) => {
const initialResult = buildResponse(data, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
const initialResult = buildResponse(data, errors);
incrementalPublisher.publishInitial();
if (incrementalPublisher.hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: yieldSubsequentPayloads(
exeContext.subsequentPayloads,
),
subsequentResults: incrementalPublisher.subscribe(),
};
}
return initialResult;
},
(error) => {
exeContext.errors.push(error);
return buildResponse(null, exeContext.errors);
errors.push(error);
return buildResponse(null, errors);
},
);
}
const initialResult = buildResponse(result, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
const initialResult = buildResponse(result, errors);
incrementalPublisher.publishInitial();
if (incrementalPublisher.hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: yieldSubsequentPayloads(
exeContext.subsequentPayloads,
),
subsequentResults: incrementalPublisher.subscribe(),
};
}
return initialResult;
} catch (error) {
exeContext.errors.push(error);
return buildResponse(null, exeContext.errors);
errors.push(error);
return buildResponse(null, errors);
}
}

@@ -449,7 +444,7 @@ export function buildExecutionContext(
fieldResolver: fieldResolver ?? defaultFieldResolver,
typeResolver: typeResolver ?? defaultTypeResolver,
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
subsequentPayloads: new Set(),
incrementalPublisher: new IncrementalPublisher(),
errors: [],
};
}
@@ -461,7 +456,7 @@ function buildPerEventExecutionContext(
return {
...exeContext,
rootValue: payload,
subsequentPayloads: new Set(),
// no need to update incrementalPublisher, incremental delivery is not supported for subscriptions
errors: [],
};
}
@@ -714,11 +709,7 @@ function executeField(
path,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(path, incrementalDataRecord);
return null;
});
}
@@ -732,11 +723,7 @@ function executeField(
path,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(path, incrementalDataRecord);
return null;
}
}
@@ -937,11 +924,7 @@ async function completePromisedValue(
path,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(path, incrementalDataRecord);
return null;
}
}
@@ -1217,8 +1200,7 @@ function completeListItemValue(
itemPath,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
exeContext.incrementalPublisher.filter(
itemPath,
incrementalDataRecord,
);
@@ -1239,11 +1221,7 @@ function completeListItemValue(
itemPath,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
itemPath,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord);
completedResults.push(null);
}

@@ -1773,12 +1751,14 @@ function executeDeferredFragment(
path?: Path,
parentContext?: IncrementalDataRecord,
): void {
const incrementalDataRecord = new DeferredFragmentRecord({
label,
path,
parentContext,
subsequentPayloads: exeContext.subsequentPayloads,
});
const incrementalPublisher = exeContext.incrementalPublisher;
const incrementalDataRecord =
incrementalPublisher.prepareNewDeferredFragmentRecord({
label,
path,
parentContext,
});

let promiseOrData;
try {
promiseOrData = executeFields(
@@ -1791,16 +1771,33 @@ function executeDeferredFragment(
);

if (isPromise(promiseOrData)) {
promiseOrData = promiseOrData.then(null, (e) => {
incrementalDataRecord.errors.push(e);
return null;
});
promiseOrData = promiseOrData.then(
(resolved) =>
incrementalPublisher.completeDeferredFragmentRecord(
incrementalDataRecord,
resolved,
),
(e) => {
incrementalPublisher.addFieldError(incrementalDataRecord, e);
incrementalPublisher.completeDeferredFragmentRecord(
incrementalDataRecord,
null,
);
},
);
} else {
incrementalPublisher.completeDeferredFragmentRecord(
incrementalDataRecord,
promiseOrData,
);
}
} catch (e) {
incrementalDataRecord.errors.push(e);
promiseOrData = null;
incrementalPublisher.addFieldError(incrementalDataRecord, e);
incrementalPublisher.completeDeferredFragmentRecord(
incrementalDataRecord,
null,
);
}
incrementalDataRecord.addData(promiseOrData);
}

function executeStreamField(
@@ -1814,14 +1811,16 @@ function executeStreamField(
label?: string,
parentContext?: IncrementalDataRecord,
): IncrementalDataRecord {
const incrementalDataRecord = new StreamItemsRecord({
label,
path: itemPath,
parentContext,
subsequentPayloads: exeContext.subsequentPayloads,
});
const incrementalPublisher = exeContext.incrementalPublisher;
const incrementalDataRecord =
incrementalPublisher.prepareNewStreamItemsRecord({
label,
path: itemPath,
parentContext,
});

if (isPromise(item)) {
const completedItems = completePromisedValue(
completePromisedValue(
exeContext,
itemType,
fieldGroup,
@@ -1830,19 +1829,21 @@ function executeStreamField(
item,
incrementalDataRecord,
).then(
(value) => [value],
(value) =>
incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [
value,
]),
(error) => {
incrementalDataRecord.errors.push(error);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalPublisher.addFieldError(incrementalDataRecord, error);
incrementalPublisher.filter(path, incrementalDataRecord);
incrementalPublisher.completeStreamItemsRecord(
incrementalDataRecord,
null,
);
return null;
},
);

incrementalDataRecord.addItems(completedItems);
return incrementalDataRecord;
}

@@ -1868,25 +1869,17 @@ function executeStreamField(
incrementalDataRecord,
);
completedItem = null;
filterSubsequentPayloads(
exeContext.subsequentPayloads,
itemPath,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord);
}
} catch (error) {
incrementalDataRecord.errors.push(error);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalDataRecord,
);
incrementalDataRecord.addItems(null);
incrementalPublisher.addFieldError(incrementalDataRecord, error);
incrementalPublisher.filter(path, incrementalDataRecord);
incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, null);
return incrementalDataRecord;
}

if (isPromise(completedItem)) {
const completedItems = completedItem
completedItem
.then(undefined, (rawError) => {
handleFieldError(
rawError,
@@ -1896,31 +1889,31 @@ function executeStreamField(
itemPath,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
itemPath,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord);
return null;
})
.then(
(value) => [value],
(value) =>
incrementalPublisher.completeStreamItemsRecord(
incrementalDataRecord,
[value],
),
(error) => {
incrementalDataRecord.errors.push(error);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalPublisher.addFieldError(incrementalDataRecord, error);
incrementalPublisher.filter(path, incrementalDataRecord);
incrementalPublisher.completeStreamItemsRecord(
incrementalDataRecord,
null,
);
return null;
},
);

incrementalDataRecord.addItems(completedItems);
return incrementalDataRecord;
}

incrementalDataRecord.addItems([completedItem]);
incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [
completedItem,
]);
return incrementalDataRecord;
}

@@ -1937,9 +1930,12 @@ async function executeStreamAsyncIteratorItem(
let item;
try {
const { value, done } = await asyncIterator.next();

if (done) {
incrementalDataRecord.setIsCompletedAsyncIterator();
return { done, value: undefined };
exeContext.incrementalPublisher.setIsCompletedAsyncIterator(
incrementalDataRecord,
);
return { done: true, value: undefined };
}
item = value;
} catch (rawError) {
@@ -1967,11 +1963,7 @@ async function executeStreamAsyncIteratorItem(
itemPath,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
itemPath,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord);
return null;
});
}
@@ -1985,11 +1977,7 @@ async function executeStreamAsyncIteratorItem(
itemPath,
incrementalDataRecord,
);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
itemPath,
incrementalDataRecord,
);
exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord);
return { done: false, value: null };
}
}
@@ -2005,18 +1993,19 @@ async function executeStreamAsyncIterator(
label?: string,
parentContext?: IncrementalDataRecord,
): Promise<void> {
const incrementalPublisher = exeContext.incrementalPublisher;
let index = initialIndex;
let previousIncrementalDataRecord = parentContext ?? undefined;
// eslint-disable-next-line no-constant-condition
while (true) {
const itemPath = addPath(path, index, undefined);
const incrementalDataRecord = new StreamItemsRecord({
label,
path: itemPath,
parentContext: previousIncrementalDataRecord,
asyncIterator,
subsequentPayloads: exeContext.subsequentPayloads,
});
const incrementalDataRecord =
incrementalPublisher.prepareNewStreamItemsRecord({
label,
path: itemPath,
parentContext: previousIncrementalDataRecord,
asyncIterator,
});

let iteration;
try {
@@ -2032,13 +2021,12 @@ async function executeStreamAsyncIterator(
itemPath,
);
} catch (error) {
incrementalDataRecord.errors.push(error);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalPublisher.addFieldError(incrementalDataRecord, error);
incrementalPublisher.filter(path, incrementalDataRecord);
incrementalPublisher.completeStreamItemsRecord(
incrementalDataRecord,
null,
);
incrementalDataRecord.addItems(null);
// entire stream has errored and bubbled upwards
if (asyncIterator?.return) {
asyncIterator.return().catch(() => {
@@ -2050,26 +2038,28 @@ async function executeStreamAsyncIterator(

const { done, value: completedItem } = iteration;

let completedItems: PromiseOrValue<Array<unknown> | null>;
if (isPromise(completedItem)) {
completedItems = completedItem.then(
(value) => [value],
completedItem.then(
(value) =>
incrementalPublisher.completeStreamItemsRecord(
incrementalDataRecord,
[value],
),
(error) => {
incrementalDataRecord.errors.push(error);
filterSubsequentPayloads(
exeContext.subsequentPayloads,
path,
incrementalPublisher.addFieldError(incrementalDataRecord, error);
incrementalPublisher.filter(path, incrementalDataRecord);
incrementalPublisher.completeStreamItemsRecord(
incrementalDataRecord,
null,
);
return null;
},
);
} else {
completedItems = [completedItem];
incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [
completedItem,
]);
}

incrementalDataRecord.addItems(completedItems);

if (done) {
break;
}