From 3e5688734ec0ac8cfe8c3073bb59c1fe59362bfd Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 2 Jun 2023 11:46:50 +0300 Subject: [PATCH 1/3] introduce new IncrementalPublisher = iterate only through completed items = remove extra ticks by making the publisher manage changes to its state synchronously. = use children array instead of promises to manage hierarchy = have IncrementalPublisher instantiate new IncrementalDataRecords = The new publisher sometimes cause an empty `{ hasNext: false }` to be emitted. In particular, because the publisher is faster than it was, it may emit a stream result before the stream's asynchronous iterator has completed. = The new publisher may sometimes reduce the number of `{ hasNext: false }` records that are emitted. For example, when errors on the initial result filter all subsequent results, this now happens synchronously, and so the publisher knows immediately that there are no subsequent results, such that there is no need for an empty final payload. --- src/execution/IncrementalPublisher.ts | 467 +++++++++++++++++-------- src/execution/__tests__/stream-test.ts | 37 +- src/execution/execute.ts | 254 +++++++------- 3 files changed, 451 insertions(+), 307 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 081214c09d..7e11faf4a6 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,7 +1,6 @@ import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; import { pathToArray } from '../jsutils/Path.js'; -import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { @@ -85,237 +84,401 @@ export type FormattedIncrementalResult< | FormattedIncrementalDeferResult | FormattedIncrementalStreamResult; -export function yieldSubsequentPayloads( - subsequentPayloads: Set, -): AsyncGenerator { - let isDone = false; +/** + * This class is used to publish incremental results to the client, enabling semi-concurrent + * execution while preserving result order. + * + * The internal publishing state is manages as follows: + * + * '_released': the set of Incremental Data records that are ready to be sent to the client, + * i.e. their parents have completed and they have also completed. + * + * `_pending`: the set of Incremental Data records that are definitely pending, i.e. their + * parents have completed so that they can no longer be filtered. This includes all Incremental + * Data records in `released`, as well as Incremental Data records that have not yet completed. + * + * `initialResult`: a record containing the state of the initial result, as follows: + * `isCompleted`: indicates whether the initial result has completed. + * `children`: the set of Incremental Data records that can be be published when the initial + * result is completed. + * + * Each Incremental Data record also contains similar metadata, i.e. these records also contain + * similar `isCompleted` and `children` properties. + * + * @internal + */ +export class IncrementalPublisher { + initialResult: { + children: Set; + isCompleted: boolean; + }; - async function next(): Promise< - IteratorResult - > { - if (isDone) { - return { value: undefined, done: true }; - } + _released: Set; + _pending: Set; - await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise)); + // these are assigned within the Promise executor called synchronously within the constructor + _signalled!: Promise; + _resolve!: () => void; - if (isDone) { - // a different call to next has exhausted all payloads - return { value: undefined, done: true }; - } + constructor() { + this.initialResult = { + children: new Set(), + isCompleted: false, + }; + this._released = new Set(); + this._pending = new Set(); + this._reset(); + } - const incremental = getCompletedIncrementalResults(subsequentPayloads); - const hasNext = subsequentPayloads.size > 0; + _trigger() { + this._resolve(); + this._reset(); + } - if (!incremental.length && hasNext) { - return next(); - } + _reset() { + // promiseWithResolvers uses void only as a generic type parameter + // see: https://typescript-eslint.io/rules/no-invalid-void-type/ + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + const { promise: signalled, resolve } = promiseWithResolvers(); + this._resolve = resolve; + this._signalled = signalled; + } - if (!hasNext) { - isDone = true; + hasNext(): boolean { + return this._pending.size > 0; + } + + _introduce(item: IncrementalDataRecord) { + this._pending.add(item); + } + + _release(item: IncrementalDataRecord): void { + if (this._pending.has(item)) { + this._released.add(item); + this._trigger(); } + } - return { - value: incremental.length ? { incremental, hasNext } : { hasNext }, - done: false, - }; + _push(item: IncrementalDataRecord): void { + this._released.add(item); + this._pending.add(item); + this._trigger(); } - function returnStreamIterators() { - const promises: Array>> = []; - subsequentPayloads.forEach((incrementalDataRecord) => { - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - promises.push(incrementalDataRecord.asyncIterator.return()); - } - }); - return Promise.all(promises); + _delete(item: IncrementalDataRecord) { + this._released.delete(item); + this._pending.delete(item); + this._trigger(); } - return { - [Symbol.asyncIterator]() { - return this; - }, - next, - async return(): Promise< + subscribe(): AsyncGenerator< + SubsequentIncrementalExecutionResult, + void, + void + > { + let isDone = false; + + const _next = async (): Promise< IteratorResult - > { - await returnStreamIterators(); + > => { + // eslint-disable-next-line no-constant-condition + while (true) { + if (isDone) { + return { value: undefined, done: true }; + } + + for (const item of this._released) { + this._pending.delete(item); + } + const released = this._released; + this._released = new Set(); + + const result = this._getIncrementalResult(released); + + if (!this.hasNext()) { + isDone = true; + } + + if (result !== undefined) { + return { value: result, done: false }; + } + + // eslint-disable-next-line no-await-in-loop + await this._signalled; + } + }; + + const returnStreamIterators = async (): Promise => { + const promises: Array>> = []; + this._pending.forEach((incrementalDataRecord) => { + if ( + isStreamItemsRecord(incrementalDataRecord) && + incrementalDataRecord.asyncIterator?.return + ) { + promises.push(incrementalDataRecord.asyncIterator.return()); + } + }); + await Promise.all(promises); + }; + + const _return = async (): Promise< + IteratorResult + > => { isDone = true; + await returnStreamIterators(); return { value: undefined, done: true }; - }, - async throw( + }; + + const _throw = async ( error?: unknown, - ): Promise> { - await returnStreamIterators(); + ): Promise> => { isDone = true; + await returnStreamIterators(); return Promise.reject(error); - }, - }; -} + }; -function getCompletedIncrementalResults( - subsequentPayloads: Set, -): Array { - const incrementalResults: Array = []; - for (const incrementalDataRecord of subsequentPayloads) { - const incrementalResult: IncrementalResult = {}; - if (!incrementalDataRecord.isCompleted) { - continue; - } - subsequentPayloads.delete(incrementalDataRecord); - if (isStreamItemsRecord(incrementalDataRecord)) { - const items = incrementalDataRecord.items; - if (incrementalDataRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; + return { + [Symbol.asyncIterator]() { + return this; + }, + next: _next, + return: _return, + throw: _throw, + }; + } + + _getIncrementalResult( + completedRecords: ReadonlySet, + ): SubsequentIncrementalExecutionResult | undefined { + const incrementalResults: Array = []; + let encounteredCompletedAsyncIterator = false; + for (const incrementalDataRecord of completedRecords) { + const incrementalResult: IncrementalResult = {}; + for (const child of incrementalDataRecord.children) { + this._publish(child); + } + if (isStreamItemsRecord(incrementalDataRecord)) { + const items = incrementalDataRecord.items; + if (incrementalDataRecord.isCompletedAsyncIterator) { + // async iterable resolver just finished but there may be pending payloads + encounteredCompletedAsyncIterator = true; + continue; + } + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = incrementalDataRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; + } + + incrementalResult.path = incrementalDataRecord.path; + if (incrementalDataRecord.label != null) { + incrementalResult.label = incrementalDataRecord.label; } - (incrementalResult as IncrementalStreamResult).items = items; + if (incrementalDataRecord.errors.length > 0) { + incrementalResult.errors = incrementalDataRecord.errors; + } + incrementalResults.push(incrementalResult); + } + + return incrementalResults.length + ? { incremental: incrementalResults, hasNext: this.hasNext() } + : encounteredCompletedAsyncIterator && !this.hasNext() + ? { hasNext: false } + : undefined; + } + + prepareNewDeferredFragmentRecord(opts: { + label: string | undefined; + path: Path | undefined; + parentContext: IncrementalDataRecord | undefined; + }): DeferredFragmentRecord { + const deferredFragmentRecord = new DeferredFragmentRecord(opts); + + const parentContext = opts.parentContext; + if (parentContext) { + parentContext.children.add(deferredFragmentRecord); } else { - const data = incrementalDataRecord.data; - (incrementalResult as IncrementalDeferResult).data = data ?? null; + this.initialResult.children.add(deferredFragmentRecord); } - incrementalResult.path = incrementalDataRecord.path; - if (incrementalDataRecord.label != null) { - incrementalResult.label = incrementalDataRecord.label; + return deferredFragmentRecord; + } + + prepareNewStreamItemsRecord(opts: { + label: string | undefined; + path: Path | undefined; + asyncIterator?: AsyncIterator; + parentContext: IncrementalDataRecord | undefined; + }): StreamItemsRecord { + const streamItemsRecord = new StreamItemsRecord(opts); + + const parentContext = opts.parentContext; + if (parentContext) { + parentContext.children.add(streamItemsRecord); + } else { + this.initialResult.children.add(streamItemsRecord); } - if (incrementalDataRecord.errors.length > 0) { - incrementalResult.errors = incrementalDataRecord.errors; + + return streamItemsRecord; + } + + completeDeferredFragmentRecord( + deferredFragmentRecord: DeferredFragmentRecord, + data: ObjMap | null, + ): void { + deferredFragmentRecord.data = data; + deferredFragmentRecord.isCompleted = true; + this._release(deferredFragmentRecord); + } + + completeStreamItemsRecord( + streamItemsRecord: StreamItemsRecord, + items: Array | null, + ) { + streamItemsRecord.items = items; + streamItemsRecord.isCompleted = true; + this._release(streamItemsRecord); + } + + setIsCompletedAsyncIterator(streamItemsRecord: StreamItemsRecord) { + streamItemsRecord.isCompletedAsyncIterator = true; + } + + addFieldError( + incrementalDataRecord: IncrementalDataRecord, + error: GraphQLError, + ) { + incrementalDataRecord.errors.push(error); + } + + publishInitial() { + for (const child of this.initialResult.children) { + this._publish(child); } - incrementalResults.push(incrementalResult); } - return incrementalResults; -} -export function filterSubsequentPayloads( - subsequentPayloads: Set, - nullPath: Path, - currentIncrementalDataRecord: IncrementalDataRecord | undefined, -): void { - const nullPathArray = pathToArray(nullPath); - subsequentPayloads.forEach((incrementalDataRecord) => { - if (incrementalDataRecord === currentIncrementalDataRecord) { - // don't remove payload from where error originates - return; + _publish(incrementalDataRecord: IncrementalDataRecord) { + if (incrementalDataRecord.isCompleted) { + this._push(incrementalDataRecord); + } else { + this._introduce(incrementalDataRecord); } - for (let i = 0; i < nullPathArray.length; i++) { - if (incrementalDataRecord.path[i] !== nullPathArray[i]) { - // incrementalDataRecord points to a path unaffected by this payload - return; + } + + filter( + nullPath: Path, + erroringIncrementalDataRecord: IncrementalDataRecord | undefined, + ) { + const nullPathArray = pathToArray(nullPath); + + const asyncIterators = new Set>(); + + const children = + erroringIncrementalDataRecord === undefined + ? this.initialResult.children + : erroringIncrementalDataRecord.children; + + for (const child of this._getDescendants(children)) { + if (!this._matchesPath(child.path, nullPathArray)) { + continue; + } + + this._delete(child); + const parent = + child.parentContext === undefined + ? this.initialResult + : child.parentContext; + parent.children.delete(child); + + if (isStreamItemsRecord(child)) { + if (child.asyncIterator !== undefined) { + asyncIterators.add(child.asyncIterator); + } } } - // incrementalDataRecord path points to nulled error field - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - incrementalDataRecord.asyncIterator.return().catch(() => { + + asyncIterators.forEach((asyncIterator) => { + asyncIterator.return?.().catch(() => { // ignore error }); + }); + } + + _getDescendants( + children: ReadonlySet, + descendants = new Set(), + ): ReadonlySet { + for (const child of children) { + descendants.add(child); + this._getDescendants(child.children, descendants); + } + return descendants; + } + + _matchesPath( + testPath: Array, + basePath: Array, + ): boolean { + for (let i = 0; i < basePath.length; i++) { + if (basePath[i] !== testPath[i]) { + // testPath points to a path unaffected at basePath + return false; + } } - subsequentPayloads.delete(incrementalDataRecord); - }); + return true; + } } /** @internal */ export class DeferredFragmentRecord { - type: 'defer'; errors: Array; label: string | undefined; path: Array; - promise: Promise; data: ObjMap | null; parentContext: IncrementalDataRecord | undefined; + children: Set; isCompleted: boolean; - _subsequentPayloads: Set; - _resolve?: (arg: PromiseOrValue | null>) => void; constructor(opts: { label: string | undefined; path: Path | undefined; parentContext: IncrementalDataRecord | undefined; - subsequentPayloads: Set; }) { - this.type = 'defer'; this.label = opts.label; this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.errors = []; - this._subsequentPayloads = opts.subsequentPayloads; - this._subsequentPayloads.add(this); + this.children = new Set(); this.isCompleted = false; this.data = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((data) => { - this.data = data; - this.isCompleted = true; - }); - } - - addData(data: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => data)); - return; - } - this._resolve?.(data); } } /** @internal */ export class StreamItemsRecord { - type: 'stream'; errors: Array; label: string | undefined; path: Array; items: Array | null; - promise: Promise; parentContext: IncrementalDataRecord | undefined; + children: Set; asyncIterator: AsyncIterator | undefined; isCompletedAsyncIterator?: boolean; isCompleted: boolean; - _subsequentPayloads: Set; - _resolve?: (arg: PromiseOrValue | null>) => void; constructor(opts: { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; parentContext: IncrementalDataRecord | undefined; - subsequentPayloads: Set; }) { - this.type = 'stream'; this.items = null; this.label = opts.label; this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.asyncIterator = opts.asyncIterator; this.errors = []; - this._subsequentPayloads = opts.subsequentPayloads; - this._subsequentPayloads.add(this); + this.children = new Set(); this.isCompleted = false; this.items = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((items) => { - this.items = items; - this.isCompleted = true; - }); - } - - addItems(items: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => items)); - return; - } - this._resolve?.(items); - } - - setIsCompletedAsyncIterator() { - this.isCompletedAsyncIterator = true; } } @@ -324,5 +487,5 @@ export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; function isStreamItemsRecord( incrementalDataRecord: IncrementalDataRecord, ): incrementalDataRecord is StreamItemsRecord { - return incrementalDataRecord.type === 'stream'; + return incrementalDataRecord instanceof StreamItemsRecord; } diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 9f61adac1b..ce3b920895 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1165,9 +1165,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1191,25 +1188,19 @@ describe('Execute: stream directive', () => { } /* c8 ignore stop */, }, }); - expectJSON(result).toDeepEqual([ - { - errors: [ - { - message: - 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', - locations: [{ line: 4, column: 11 }], - path: ['nestedObject', 'nonNullScalarField'], - }, - ], - data: { - nestedObject: null, + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', + locations: [{ line: 4, column: 11 }], + path: ['nestedObject', 'nonNullScalarField'], }, - hasNext: true, - }, - { - hasNext: false, + ], + data: { + nestedObject: null, }, - ]); + }); }); it('Filters payloads that are nulled by a later synchronous error', async () => { const document = parse(` @@ -1293,6 +1284,9 @@ describe('Execute: stream directive', () => { path: ['nestedObject', 'nestedFriendList', 0], }, ], + hasNext: true, + }, + { hasNext: false, }, ]); @@ -1350,9 +1344,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 8c9d8f9668..1ec11f72cc 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -56,14 +56,10 @@ import type { FormattedIncrementalResult, IncrementalDataRecord, IncrementalResult, - SubsequentIncrementalExecutionResult, -} from './IncrementalPublisher.js'; -import { - DeferredFragmentRecord, - filterSubsequentPayloads, StreamItemsRecord, - yieldSubsequentPayloads, + SubsequentIncrementalExecutionResult, } from './IncrementalPublisher.js'; +import { IncrementalPublisher } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import { getArgumentValues, @@ -133,7 +129,7 @@ export interface ExecutionContext { typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; errors: Array; - subsequentPayloads: Set; + incrementalPublisher: IncrementalPublisher; } /** @@ -293,47 +289,46 @@ function executeImpl( // Errors from sub-fields of a NonNull type may propagate to the top level, // at which point we still log the error and null the parent field, which // in this case is the entire response. + const { incrementalPublisher, errors } = exeContext; try { const result = executeOperation(exeContext); if (isPromise(result)) { return result.then( (data) => { - const initialResult = buildResponse(data, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const initialResult = buildResponse(data, errors); + incrementalPublisher.publishInitial(); + if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads( - exeContext.subsequentPayloads, - ), + subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; }, (error) => { - exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + errors.push(error); + return buildResponse(null, errors); }, ); } - const initialResult = buildResponse(result, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const initialResult = buildResponse(result, errors); + incrementalPublisher.publishInitial(); + if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads( - exeContext.subsequentPayloads, - ), + subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; } catch (error) { - exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + errors.push(error); + return buildResponse(null, errors); } } @@ -449,7 +444,7 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - subsequentPayloads: new Set(), + incrementalPublisher: new IncrementalPublisher(), errors: [], }; } @@ -461,7 +456,7 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, - subsequentPayloads: new Set(), + // no need to update incrementalPublisher, incremental delivery is not supported for subscriptions errors: [], }; } @@ -714,11 +709,7 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; }); } @@ -732,11 +723,7 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -937,11 +924,7 @@ async function completePromisedValue( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -1217,8 +1200,7 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, + exeContext.incrementalPublisher.filter( itemPath, incrementalDataRecord, ); @@ -1239,11 +1221,7 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); completedResults.push(null); } @@ -1773,12 +1751,14 @@ function executeDeferredFragment( path?: Path, parentContext?: IncrementalDataRecord, ): void { - const incrementalDataRecord = new DeferredFragmentRecord({ - label, - path, - parentContext, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalPublisher = exeContext.incrementalPublisher; + const incrementalDataRecord = + incrementalPublisher.prepareNewDeferredFragmentRecord({ + label, + path, + parentContext, + }); + let promiseOrData; try { promiseOrData = executeFields( @@ -1791,16 +1771,33 @@ function executeDeferredFragment( ); if (isPromise(promiseOrData)) { - promiseOrData = promiseOrData.then(null, (e) => { - incrementalDataRecord.errors.push(e); - return null; - }); + promiseOrData = promiseOrData.then( + (resolved) => + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + resolved, + ), + (e) => { + incrementalPublisher.addFieldError(incrementalDataRecord, e); + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + null, + ); + }, + ); + } else { + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + promiseOrData, + ); } } catch (e) { - incrementalDataRecord.errors.push(e); - promiseOrData = null; + incrementalPublisher.addFieldError(incrementalDataRecord, e); + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + null, + ); } - incrementalDataRecord.addData(promiseOrData); } function executeStreamField( @@ -1814,14 +1811,16 @@ function executeStreamField( label?: string, parentContext?: IncrementalDataRecord, ): IncrementalDataRecord { - const incrementalDataRecord = new StreamItemsRecord({ - label, - path: itemPath, - parentContext, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalPublisher = exeContext.incrementalPublisher; + const incrementalDataRecord = + incrementalPublisher.prepareNewStreamItemsRecord({ + label, + path: itemPath, + parentContext, + }); + if (isPromise(item)) { - const completedItems = completePromisedValue( + completePromisedValue( exeContext, itemType, fieldGroup, @@ -1830,19 +1829,21 @@ function executeStreamField( item, incrementalDataRecord, ).then( - (value) => [value], + (value) => + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + value, + ]), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); return null; }, ); - incrementalDataRecord.addItems(completedItems); return incrementalDataRecord; } @@ -1868,25 +1869,17 @@ function executeStreamField( incrementalDataRecord, ); completedItem = null; - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); } } catch (error) { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); - incrementalDataRecord.addItems(null); + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, null); return incrementalDataRecord; } if (isPromise(completedItem)) { - const completedItems = completedItem + completedItem .then(undefined, (rawError) => { handleFieldError( rawError, @@ -1896,31 +1889,31 @@ function executeStreamField( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return null; }) .then( - (value) => [value], + (value) => + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [value], + ), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - return null; }, ); - incrementalDataRecord.addItems(completedItems); return incrementalDataRecord; } - incrementalDataRecord.addItems([completedItem]); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completedItem, + ]); return incrementalDataRecord; } @@ -1937,9 +1930,12 @@ async function executeStreamAsyncIteratorItem( let item; try { const { value, done } = await asyncIterator.next(); + if (done) { - incrementalDataRecord.setIsCompletedAsyncIterator(); - return { done, value: undefined }; + exeContext.incrementalPublisher.setIsCompletedAsyncIterator( + incrementalDataRecord, + ); + return { done: true, value: undefined }; } item = value; } catch (rawError) { @@ -1967,11 +1963,7 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return null; }); } @@ -1985,11 +1977,7 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return { done: false, value: null }; } } @@ -2005,18 +1993,19 @@ async function executeStreamAsyncIterator( label?: string, parentContext?: IncrementalDataRecord, ): Promise { + const incrementalPublisher = exeContext.incrementalPublisher; let index = initialIndex; let previousIncrementalDataRecord = parentContext ?? undefined; // eslint-disable-next-line no-constant-condition while (true) { const itemPath = addPath(path, index, undefined); - const incrementalDataRecord = new StreamItemsRecord({ - label, - path: itemPath, - parentContext: previousIncrementalDataRecord, - asyncIterator, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalDataRecord = + incrementalPublisher.prepareNewStreamItemsRecord({ + label, + path: itemPath, + parentContext: previousIncrementalDataRecord, + asyncIterator, + }); let iteration; try { @@ -2032,13 +2021,12 @@ async function executeStreamAsyncIterator( itemPath, ); } catch (error) { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - incrementalDataRecord.addItems(null); // entire stream has errored and bubbled upwards if (asyncIterator?.return) { asyncIterator.return().catch(() => { @@ -2050,26 +2038,28 @@ async function executeStreamAsyncIterator( const { done, value: completedItem } = iteration; - let completedItems: PromiseOrValue | null>; if (isPromise(completedItem)) { - completedItems = completedItem.then( - (value) => [value], + completedItem.then( + (value) => + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [value], + ), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - return null; }, ); } else { - completedItems = [completedItem]; + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completedItem, + ]); } - incrementalDataRecord.addItems(completedItems); - if (done) { break; } From a27ea608619b1551fef4c4008f0ccd0d8eae5b9b Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sat, 10 Jun 2023 22:27:26 +0300 Subject: [PATCH 2/3] add private modifier = requires reordering of methods because of eslint rule `@typescript-eslint/member-ordering` = adds underscore to private `_initialResult` property for consistency --- src/execution/IncrementalPublisher.ts | 196 +++++++++++++------------- 1 file changed, 98 insertions(+), 98 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 7e11faf4a6..ee29793f50 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -108,20 +108,20 @@ export type FormattedIncrementalResult< * @internal */ export class IncrementalPublisher { - initialResult: { + private _initialResult: { children: Set; isCompleted: boolean; }; - _released: Set; - _pending: Set; + private _released: Set; + private _pending: Set; // these are assigned within the Promise executor called synchronously within the constructor - _signalled!: Promise; - _resolve!: () => void; + private _signalled!: Promise; + private _resolve!: () => void; constructor() { - this.initialResult = { + this._initialResult = { children: new Set(), isCompleted: false, }; @@ -130,47 +130,10 @@ export class IncrementalPublisher { this._reset(); } - _trigger() { - this._resolve(); - this._reset(); - } - - _reset() { - // promiseWithResolvers uses void only as a generic type parameter - // see: https://typescript-eslint.io/rules/no-invalid-void-type/ - // eslint-disable-next-line @typescript-eslint/no-invalid-void-type - const { promise: signalled, resolve } = promiseWithResolvers(); - this._resolve = resolve; - this._signalled = signalled; - } - hasNext(): boolean { return this._pending.size > 0; } - _introduce(item: IncrementalDataRecord) { - this._pending.add(item); - } - - _release(item: IncrementalDataRecord): void { - if (this._pending.has(item)) { - this._released.add(item); - this._trigger(); - } - } - - _push(item: IncrementalDataRecord): void { - this._released.add(item); - this._pending.add(item); - this._trigger(); - } - - _delete(item: IncrementalDataRecord) { - this._released.delete(item); - this._pending.delete(item); - this._trigger(); - } - subscribe(): AsyncGenerator< SubsequentIncrementalExecutionResult, void, @@ -247,46 +210,6 @@ export class IncrementalPublisher { }; } - _getIncrementalResult( - completedRecords: ReadonlySet, - ): SubsequentIncrementalExecutionResult | undefined { - const incrementalResults: Array = []; - let encounteredCompletedAsyncIterator = false; - for (const incrementalDataRecord of completedRecords) { - const incrementalResult: IncrementalResult = {}; - for (const child of incrementalDataRecord.children) { - this._publish(child); - } - if (isStreamItemsRecord(incrementalDataRecord)) { - const items = incrementalDataRecord.items; - if (incrementalDataRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - encounteredCompletedAsyncIterator = true; - continue; - } - (incrementalResult as IncrementalStreamResult).items = items; - } else { - const data = incrementalDataRecord.data; - (incrementalResult as IncrementalDeferResult).data = data ?? null; - } - - incrementalResult.path = incrementalDataRecord.path; - if (incrementalDataRecord.label != null) { - incrementalResult.label = incrementalDataRecord.label; - } - if (incrementalDataRecord.errors.length > 0) { - incrementalResult.errors = incrementalDataRecord.errors; - } - incrementalResults.push(incrementalResult); - } - - return incrementalResults.length - ? { incremental: incrementalResults, hasNext: this.hasNext() } - : encounteredCompletedAsyncIterator && !this.hasNext() - ? { hasNext: false } - : undefined; - } - prepareNewDeferredFragmentRecord(opts: { label: string | undefined; path: Path | undefined; @@ -298,7 +221,7 @@ export class IncrementalPublisher { if (parentContext) { parentContext.children.add(deferredFragmentRecord); } else { - this.initialResult.children.add(deferredFragmentRecord); + this._initialResult.children.add(deferredFragmentRecord); } return deferredFragmentRecord; @@ -316,7 +239,7 @@ export class IncrementalPublisher { if (parentContext) { parentContext.children.add(streamItemsRecord); } else { - this.initialResult.children.add(streamItemsRecord); + this._initialResult.children.add(streamItemsRecord); } return streamItemsRecord; @@ -352,19 +275,11 @@ export class IncrementalPublisher { } publishInitial() { - for (const child of this.initialResult.children) { + for (const child of this._initialResult.children) { this._publish(child); } } - _publish(incrementalDataRecord: IncrementalDataRecord) { - if (incrementalDataRecord.isCompleted) { - this._push(incrementalDataRecord); - } else { - this._introduce(incrementalDataRecord); - } - } - filter( nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord | undefined, @@ -375,7 +290,7 @@ export class IncrementalPublisher { const children = erroringIncrementalDataRecord === undefined - ? this.initialResult.children + ? this._initialResult.children : erroringIncrementalDataRecord.children; for (const child of this._getDescendants(children)) { @@ -386,7 +301,7 @@ export class IncrementalPublisher { this._delete(child); const parent = child.parentContext === undefined - ? this.initialResult + ? this._initialResult : child.parentContext; parent.children.delete(child); @@ -404,7 +319,92 @@ export class IncrementalPublisher { }); } - _getDescendants( + private _trigger() { + this._resolve(); + this._reset(); + } + + private _reset() { + // promiseWithResolvers uses void only as a generic type parameter + // see: https://typescript-eslint.io/rules/no-invalid-void-type/ + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + const { promise: signalled, resolve } = promiseWithResolvers(); + this._resolve = resolve; + this._signalled = signalled; + } + + private _introduce(item: IncrementalDataRecord) { + this._pending.add(item); + } + + private _release(item: IncrementalDataRecord): void { + if (this._pending.has(item)) { + this._released.add(item); + this._trigger(); + } + } + + private _push(item: IncrementalDataRecord): void { + this._released.add(item); + this._pending.add(item); + this._trigger(); + } + + private _delete(item: IncrementalDataRecord) { + this._released.delete(item); + this._pending.delete(item); + this._trigger(); + } + + private _getIncrementalResult( + completedRecords: ReadonlySet, + ): SubsequentIncrementalExecutionResult | undefined { + const incrementalResults: Array = []; + let encounteredCompletedAsyncIterator = false; + for (const incrementalDataRecord of completedRecords) { + const incrementalResult: IncrementalResult = {}; + for (const child of incrementalDataRecord.children) { + this._publish(child); + } + if (isStreamItemsRecord(incrementalDataRecord)) { + const items = incrementalDataRecord.items; + if (incrementalDataRecord.isCompletedAsyncIterator) { + // async iterable resolver just finished but there may be pending payloads + encounteredCompletedAsyncIterator = true; + continue; + } + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = incrementalDataRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; + } + + incrementalResult.path = incrementalDataRecord.path; + if (incrementalDataRecord.label != null) { + incrementalResult.label = incrementalDataRecord.label; + } + if (incrementalDataRecord.errors.length > 0) { + incrementalResult.errors = incrementalDataRecord.errors; + } + incrementalResults.push(incrementalResult); + } + + return incrementalResults.length + ? { incremental: incrementalResults, hasNext: this.hasNext() } + : encounteredCompletedAsyncIterator && !this.hasNext() + ? { hasNext: false } + : undefined; + } + + private _publish(incrementalDataRecord: IncrementalDataRecord) { + if (incrementalDataRecord.isCompleted) { + this._push(incrementalDataRecord); + } else { + this._introduce(incrementalDataRecord); + } + } + + private _getDescendants( children: ReadonlySet, descendants = new Set(), ): ReadonlySet { @@ -415,7 +415,7 @@ export class IncrementalPublisher { return descendants; } - _matchesPath( + private _matchesPath( testPath: Array, basePath: Array, ): boolean { From 177cf6fcd761be915d2a0a9968a7953692632ec0 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 13 Jun 2023 13:34:34 +0300 Subject: [PATCH 3/3] typos --- src/execution/IncrementalPublisher.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index ee29793f50..f48e62e6a2 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -88,7 +88,7 @@ export type FormattedIncrementalResult< * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. * - * The internal publishing state is manages as follows: + * The internal publishing state is managed as follows: * * '_released': the set of Incremental Data records that are ready to be sent to the client, * i.e. their parents have completed and they have also completed. @@ -97,7 +97,7 @@ export type FormattedIncrementalResult< * parents have completed so that they can no longer be filtered. This includes all Incremental * Data records in `released`, as well as Incremental Data records that have not yet completed. * - * `initialResult`: a record containing the state of the initial result, as follows: + * `_initialResult`: a record containing the state of the initial result, as follows: * `isCompleted`: indicates whether the initial result has completed. * `children`: the set of Incremental Data records that can be be published when the initial * result is completed.