From 9204d78bcd7488dcff7be17b5e60ed8aa3127752 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 22 Nov 2022 15:27:24 +0200 Subject: [PATCH 1/3] extract publisher --- src/execution/execute.ts | 291 +++++++++++++++++++++++---------------- 1 file changed, 172 insertions(+), 119 deletions(-) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1bc6c4267b..d33ee96a57 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -121,7 +121,11 @@ export interface ExecutionContext { typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; errors: Array; - subsequentPayloads: Set; + publisher: Publisher< + AsyncPayloadRecord, + IncrementalResult, + SubsequentIncrementalExecutionResult + >; } /** @@ -357,13 +361,14 @@ function executeImpl( return result.then( (data) => { const initialResult = buildResponse(data, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const publisher = exeContext.publisher; + if (publisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads(exeContext), + subsequentResults: publisher.subscribe(), }; } return initialResult; @@ -375,13 +380,14 @@ function executeImpl( ); } const initialResult = buildResponse(result, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const publisher = exeContext.publisher; + if (publisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads(exeContext), + subsequentResults: publisher.subscribe(), }; } return initialResult; @@ -503,7 +509,7 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - subsequentPayloads: new Set(), + publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults), errors: [], }; } @@ -515,7 +521,7 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, - subsequentPayloads: new Set(), + publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults), errors: [], }; } @@ -2038,132 +2044,49 @@ function filterSubsequentPayloads( currentAsyncRecord: AsyncPayloadRecord | undefined, ): void { const nullPathArray = pathToArray(nullPath); - exeContext.subsequentPayloads.forEach((asyncRecord) => { + exeContext.publisher.filter((asyncRecord) => { if (asyncRecord === currentAsyncRecord) { // don't remove payload from where error originates - return; + return true; } for (let i = 0; i < nullPathArray.length; i++) { if (asyncRecord.path[i] !== nullPathArray[i]) { // asyncRecord points to a path unaffected by this payload - return; + return true; } } - // asyncRecord path points to nulled error field - if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) { - asyncRecord.iterator.return().catch(() => { - // ignore error - }); - } - exeContext.subsequentPayloads.delete(asyncRecord); + + return false; }); } -function getCompletedIncrementalResults( - exeContext: ExecutionContext, -): Array { - const incrementalResults: Array = []; - for (const asyncPayloadRecord of exeContext.subsequentPayloads) { - const incrementalResult: IncrementalResult = {}; - if (!asyncPayloadRecord.isCompleted) { - continue; - } - exeContext.subsequentPayloads.delete(asyncPayloadRecord); - if (isStreamPayload(asyncPayloadRecord)) { - const items = asyncPayloadRecord.items; - if (asyncPayloadRecord.isCompletedIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; - } - (incrementalResult as IncrementalStreamResult).items = items; - } else { - const data = asyncPayloadRecord.data; - (incrementalResult as IncrementalDeferResult).data = data ?? null; - } - - incrementalResult.path = asyncPayloadRecord.path; - if (asyncPayloadRecord.label) { - incrementalResult.label = asyncPayloadRecord.label; - } - if (asyncPayloadRecord.errors.length > 0) { - incrementalResult.errors = asyncPayloadRecord.errors; - } - incrementalResults.push(incrementalResult); +function resultFromAsyncPayloadRecord( + asyncPayloadRecord: AsyncPayloadRecord, +): IncrementalResult { + const incrementalResult: IncrementalResult = {}; + if (isStreamPayload(asyncPayloadRecord)) { + const items = asyncPayloadRecord.items; + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = asyncPayloadRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; } - return incrementalResults; -} - -function yieldSubsequentPayloads( - exeContext: ExecutionContext, -): AsyncGenerator { - let isDone = false; - - async function next(): Promise< - IteratorResult - > { - if (isDone) { - return { value: undefined, done: true }; - } - - await Promise.race( - Array.from(exeContext.subsequentPayloads).map((p) => p.promise), - ); - - if (isDone) { - // a different call to next has exhausted all payloads - return { value: undefined, done: true }; - } - const incremental = getCompletedIncrementalResults(exeContext); - const hasNext = exeContext.subsequentPayloads.size > 0; - - if (!incremental.length && hasNext) { - return next(); - } - - if (!hasNext) { - isDone = true; - } - - return { - value: incremental.length ? { incremental, hasNext } : { hasNext }, - done: false, - }; + incrementalResult.path = asyncPayloadRecord.path; + if (asyncPayloadRecord.label) { + incrementalResult.label = asyncPayloadRecord.label; } - - function returnStreamIterators() { - const promises: Array>> = []; - exeContext.subsequentPayloads.forEach((asyncPayloadRecord) => { - if ( - isStreamPayload(asyncPayloadRecord) && - asyncPayloadRecord.iterator?.return - ) { - promises.push(asyncPayloadRecord.iterator.return()); - } - }); - return Promise.all(promises); + if (asyncPayloadRecord.errors.length > 0) { + incrementalResult.errors = asyncPayloadRecord.errors; } + return incrementalResult; +} - return { - [Symbol.asyncIterator]() { - return this; - }, - next, - async return(): Promise< - IteratorResult - > { - await returnStreamIterators(); - isDone = true; - return { value: undefined, done: true }; - }, - async throw( - error?: unknown, - ): Promise> { - await returnStreamIterators(); - isDone = true; - return Promise.reject(error); - }, - }; +function payloadFromResults( + incremental: ReadonlyArray, + hasNext: boolean, +): SubsequentIncrementalExecutionResult { + return incremental.length ? { incremental, hasNext } : { hasNext }; } class DeferredFragmentRecord { @@ -2189,7 +2112,7 @@ class DeferredFragmentRecord { this.parentContext = opts.parentContext; this.errors = []; this._exeContext = opts.exeContext; - this._exeContext.subsequentPayloads.add(this); + this._exeContext.publisher.add(this); this.isCompleted = false; this.data = null; this.promise = new Promise | null>((resolve) => { @@ -2240,7 +2163,7 @@ class StreamRecord { this.iterator = opts.iterator; this.errors = []; this._exeContext = opts.exeContext; - this._exeContext.subsequentPayloads.add(this); + this._exeContext.publisher.add(this); this.isCompleted = false; this.items = null; this.promise = new Promise | null>((resolve) => { @@ -2274,3 +2197,133 @@ function isStreamPayload( ): asyncPayload is StreamRecord { return asyncPayload.type === 'stream'; } + +interface Source { + promise: Promise; + isCompleted: boolean; + isCompletedIterator?: boolean | undefined; + iterator?: AsyncIterator | undefined; +} + +type ToIncrementalResult = ( + source: TSource, +) => TIncremental; + +type ToPayload = ( + incremental: ReadonlyArray, + hasNext: boolean, +) => TPayload; + +/** + * @internal + */ +export class Publisher { + sources: Set; + toIncrementalResult: ToIncrementalResult; + toPayload: ToPayload; + + constructor( + toIncrementalResult: ToIncrementalResult, + toPayload: ToPayload, + ) { + this.sources = new Set(); + this.toIncrementalResult = toIncrementalResult; + this.toPayload = toPayload; + } + + add(source: TSource) { + this.sources.add(source); + } + + hasNext(): boolean { + return this.sources.size > 0; + } + + filter(predicate: (source: TSource) => boolean): void { + this.sources.forEach((source) => { + if (predicate(source)) { + return; + } + if (source.iterator?.return) { + source.iterator.return().catch(() => { + // ignore error + }); + } + this.sources.delete(source); + }); + } + + _getCompletedIncrementalResults(): Array { + const incrementalResults: Array = []; + for (const source of this.sources) { + if (!source.isCompleted) { + continue; + } + this.sources.delete(source); + if (source.isCompletedIterator) { + continue; + } + incrementalResults.push(this.toIncrementalResult(source)); + } + return incrementalResults; + } + + subscribe(): AsyncGenerator { + let isDone = false; + + const next = async (): Promise> => { + if (isDone) { + return { value: undefined, done: true }; + } + + await Promise.race(Array.from(this.sources).map((p) => p.promise)); + + if (isDone) { + return { value: undefined, done: true }; + } + + const incremental = this._getCompletedIncrementalResults(); + const hasNext = this.sources.size > 0; + + if (!incremental.length && hasNext) { + return next(); + } + + if (!hasNext) { + isDone = true; + } + + return { + value: this.toPayload(incremental, hasNext), + done: false, + }; + }; + + const returnIterators = () => { + const promises: Array>> = []; + this.sources.forEach((source) => { + if (source.iterator?.return) { + promises.push(source.iterator.return()); + } + }); + return Promise.all(promises); + }; + + return { + [Symbol.asyncIterator]() { + return this; + }, + next, + async return(): Promise> { + await returnIterators(); + isDone = true; + return { value: undefined, done: true }; + }, + async throw(error?: unknown): Promise> { + await returnIterators(); + isDone = true; + return Promise.reject(error); + }, + }; + } +} From c537db419c6edad7b776b0c1e1d3f89a00dbcc52 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Thu, 5 Jan 2023 12:15:49 +0200 Subject: [PATCH 2/3] move publisher to separate file --- src/execution/execute.ts | 131 +------------------------------------ src/execution/publisher.ts | 129 ++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 130 deletions(-) create mode 100644 src/execution/publisher.ts diff --git a/src/execution/execute.ts b/src/execution/execute.ts index d33ee96a57..a57247ebec 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -53,6 +53,7 @@ import { collectSubfields as _collectSubfields, } from './collectFields.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; +import { Publisher } from './publisher.js'; import { getArgumentValues, getDirectiveValues, @@ -2197,133 +2198,3 @@ function isStreamPayload( ): asyncPayload is StreamRecord { return asyncPayload.type === 'stream'; } - -interface Source { - promise: Promise; - isCompleted: boolean; - isCompletedIterator?: boolean | undefined; - iterator?: AsyncIterator | undefined; -} - -type ToIncrementalResult = ( - source: TSource, -) => TIncremental; - -type ToPayload = ( - incremental: ReadonlyArray, - hasNext: boolean, -) => TPayload; - -/** - * @internal - */ -export class Publisher { - sources: Set; - toIncrementalResult: ToIncrementalResult; - toPayload: ToPayload; - - constructor( - toIncrementalResult: ToIncrementalResult, - toPayload: ToPayload, - ) { - this.sources = new Set(); - this.toIncrementalResult = toIncrementalResult; - this.toPayload = toPayload; - } - - add(source: TSource) { - this.sources.add(source); - } - - hasNext(): boolean { - return this.sources.size > 0; - } - - filter(predicate: (source: TSource) => boolean): void { - this.sources.forEach((source) => { - if (predicate(source)) { - return; - } - if (source.iterator?.return) { - source.iterator.return().catch(() => { - // ignore error - }); - } - this.sources.delete(source); - }); - } - - _getCompletedIncrementalResults(): Array { - const incrementalResults: Array = []; - for (const source of this.sources) { - if (!source.isCompleted) { - continue; - } - this.sources.delete(source); - if (source.isCompletedIterator) { - continue; - } - incrementalResults.push(this.toIncrementalResult(source)); - } - return incrementalResults; - } - - subscribe(): AsyncGenerator { - let isDone = false; - - const next = async (): Promise> => { - if (isDone) { - return { value: undefined, done: true }; - } - - await Promise.race(Array.from(this.sources).map((p) => p.promise)); - - if (isDone) { - return { value: undefined, done: true }; - } - - const incremental = this._getCompletedIncrementalResults(); - const hasNext = this.sources.size > 0; - - if (!incremental.length && hasNext) { - return next(); - } - - if (!hasNext) { - isDone = true; - } - - return { - value: this.toPayload(incremental, hasNext), - done: false, - }; - }; - - const returnIterators = () => { - const promises: Array>> = []; - this.sources.forEach((source) => { - if (source.iterator?.return) { - promises.push(source.iterator.return()); - } - }); - return Promise.all(promises); - }; - - return { - [Symbol.asyncIterator]() { - return this; - }, - next, - async return(): Promise> { - await returnIterators(); - isDone = true; - return { value: undefined, done: true }; - }, - async throw(error?: unknown): Promise> { - await returnIterators(); - isDone = true; - return Promise.reject(error); - }, - }; - } -} diff --git a/src/execution/publisher.ts b/src/execution/publisher.ts new file mode 100644 index 0000000000..0378aae34e --- /dev/null +++ b/src/execution/publisher.ts @@ -0,0 +1,129 @@ +interface Source { + promise: Promise; + isCompleted: boolean; + isCompletedIterator?: boolean | undefined; + iterator?: AsyncIterator | undefined; +} + +type ToIncrementalResult = ( + source: TSource, +) => TIncremental; + +type ToPayload = ( + incremental: ReadonlyArray, + hasNext: boolean, +) => TPayload; + +/** + * @internal + */ +export class Publisher { + sources: Set; + toIncrementalResult: ToIncrementalResult; + toPayload: ToPayload; + + constructor( + toIncrementalResult: ToIncrementalResult, + toPayload: ToPayload, + ) { + this.sources = new Set(); + this.toIncrementalResult = toIncrementalResult; + this.toPayload = toPayload; + } + + add(source: TSource) { + this.sources.add(source); + } + + hasNext(): boolean { + return this.sources.size > 0; + } + + filter(predicate: (source: TSource) => boolean): void { + this.sources.forEach((source) => { + if (predicate(source)) { + return; + } + if (source.iterator?.return) { + source.iterator.return().catch(() => { + // ignore error + }); + } + this.sources.delete(source); + }); + } + + _getCompletedIncrementalResults(): Array { + const incrementalResults: Array = []; + for (const source of this.sources) { + if (!source.isCompleted) { + continue; + } + this.sources.delete(source); + if (source.isCompletedIterator) { + continue; + } + incrementalResults.push(this.toIncrementalResult(source)); + } + return incrementalResults; + } + + subscribe(): AsyncGenerator { + let isDone = false; + + const next = async (): Promise> => { + if (isDone) { + return { value: undefined, done: true }; + } + + await Promise.race(Array.from(this.sources).map((p) => p.promise)); + + if (isDone) { + return { value: undefined, done: true }; + } + + const incremental = this._getCompletedIncrementalResults(); + const hasNext = this.sources.size > 0; + + if (!incremental.length && hasNext) { + return next(); + } + + if (!hasNext) { + isDone = true; + } + + return { + value: this.toPayload(incremental, hasNext), + done: false, + }; + }; + + const returnIterators = () => { + const promises: Array>> = []; + this.sources.forEach((source) => { + if (source.iterator?.return) { + promises.push(source.iterator.return()); + } + }); + return Promise.all(promises); + }; + + return { + [Symbol.asyncIterator]() { + return this; + }, + next, + async return(): Promise> { + await returnIterators(); + isDone = true; + return { value: undefined, done: true }; + }, + async throw(error?: unknown): Promise> { + await returnIterators(); + isDone = true; + return Promise.reject(error); + }, + }; + } +} From 9feafd824560523f1a9c050ccfb1293f251aa3f4 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 25 Nov 2022 09:32:30 +0200 Subject: [PATCH 3/3] introduce new Publisher for incremental delivery Depends on #3784 The proposed new Publisher: 1. does not use the event loop to manage AsyncRecord dependencies 2. performs as much work as possible synchronously 2. uses separate sets to store pending vs ready AsyncRecords 3. does not use `Promise.race` -- No event loop for managing AsyncRecord dependencies The current Publisher wraps every AsyncRecord result in a promise that only resolves after the parent AsyncRecord resolves. If multiple items within a stream are released for publishing because their parent has just been published, the stream cannot be in its entirety until after all of these promises unwind. The new Publisher keeps track of dependencies manually. When an AsyncRecord is pushed by the publisher, all of its dependent AsyncRecords are synchronously pushed, repeating as necessary, without using the event loop. In general, the new publisher aims to perform as much work as possible synchronously. -- Separate sets for pending vs ready AsyncRecords The current publisher inspects all pending AsyncRecords whenever any of them resolves. All that are completed are added to the response. The new Publisher moves AsyncRecords from the pending set to the ready set as they are pushed, so that on each call to next, only the ready set must be iterated. As a side-effect of this change, the incremental array is ordered by which items are ready for delivery first, and not by the initial document. This seems like a worthwhile tradeoff, and is still adherent to the spec, as far as I can tell. -- No `Promise.race` The old Publisher uses `Promise.race` as the trigger to determine whether payloads are ready. The new Publisher uses a single triggering promise that is triggered whenever an AsyncRecord is pushed, and then reset. This may be beneficial as the implementation of `Promise.race` within V8 has a known memory leak for long-running promises. (see https://bugs.chromium.org/p/v8/issues/detail?id=9858). An alternative would be to utilize @brainkim 's memory-safe version detailed in that issue. --- src/execution/__tests__/defer-test.ts | 10 -- src/execution/__tests__/stream-test.ts | 97 +++----------- src/execution/execute.ts | 112 +++++++--------- src/execution/publisher.ts | 175 +++++++++++++++++++++---- 4 files changed, 215 insertions(+), 179 deletions(-) diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index 5cad95bbc3..f9c0639306 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -605,11 +605,6 @@ describe('Execute: defer directive', () => { data: { slowField: 'slow', friends: [{}, {}, {}] }, path: ['hero'], }, - ], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, path: ['hero', 'friends', 0] }, { data: { name: 'Leia' }, path: ['hero', 'friends', 1] }, { data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] }, @@ -653,11 +648,6 @@ describe('Execute: defer directive', () => { }, path: ['hero'], }, - ], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, path: ['hero', 'friends', 0] }, { data: { name: 'Leia' }, path: ['hero', 'friends', 1] }, { data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] }, diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index aed5211ae1..65d7f67381 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -151,11 +151,10 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['banana'], path: ['scalarList', 1] }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + incremental: [ + { items: ['banana'], path: ['scalarList', 1] }, + { items: ['coconut'], path: ['scalarList', 2] }, + ], hasNext: false, }, ]); @@ -173,15 +172,11 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['apple'], path: ['scalarList', 0] }], - hasNext: true, - }, - { - incremental: [{ items: ['banana'], path: ['scalarList', 1] }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + incremental: [ + { items: ['apple'], path: ['scalarList', 0] }, + { items: ['banana'], path: ['scalarList', 1] }, + { items: ['coconut'], path: ['scalarList', 2] }, + ], hasNext: false, }, ]); @@ -230,11 +225,6 @@ describe('Execute: stream directive', () => { path: ['scalarList', 1], label: 'scalar-stream', }, - ], - hasNext: true, - }, - { - incremental: [ { items: ['coconut'], path: ['scalarList', 2], @@ -296,11 +286,6 @@ describe('Execute: stream directive', () => { items: [['banana', 'banana', 'banana']], path: ['scalarListList', 1], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [['coconut', 'coconut', 'coconut']], path: ['scalarListList', 2], @@ -379,20 +364,10 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], path: ['friendList', 0], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Han', id: '2' }], path: ['friendList', 1], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], path: ['friendList', 2], @@ -531,11 +506,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], path: ['friendList', 2], @@ -707,7 +677,12 @@ describe('Execute: stream directive', () => { hasNext: true, }, }, - { done: false, value: { hasNext: false } }, + { + done: false, + value: { + hasNext: false, + }, + }, { done: true, value: undefined }, ]); }); @@ -935,11 +910,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], @@ -984,11 +954,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], @@ -1117,11 +1082,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], @@ -1407,9 +1367,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1556,9 +1513,6 @@ describe('Execute: stream directive', () => { path: ['friendList', 2], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1612,15 +1566,6 @@ describe('Execute: stream directive', () => { data: { scalarField: 'slow', nestedFriendList: [] }, path: ['nestedObject'], }, - ], - hasNext: true, - }, - done: false, - }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ - value: { - incremental: [ { items: [{ name: 'Luke' }], path: ['nestedObject', 'nestedFriendList', 0], @@ -1630,8 +1575,8 @@ describe('Execute: stream directive', () => { }, done: false, }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ value: { incremental: [ { @@ -1643,13 +1588,13 @@ describe('Execute: stream directive', () => { }, done: false, }); - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ value: { hasNext: false }, done: false, }); - const result6 = await iterator.next(); - expectJSON(result6).toDeepEqual({ + const result5 = await iterator.next(); + expectJSON(result5).toDeepEqual({ value: undefined, done: true, }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index a57247ebec..6ce7964736 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1798,16 +1798,21 @@ function executeDeferredFragment( fields, asyncPayloadRecord, ); - - if (isPromise(promiseOrData)) { - promiseOrData = promiseOrData.then(null, (e) => { - asyncPayloadRecord.errors.push(e); - return null; - }); - } } catch (e) { asyncPayloadRecord.errors.push(e); - promiseOrData = null; + asyncPayloadRecord.addData(null); + return; + } + + if (isPromise(promiseOrData)) { + promiseOrData.then( + (value) => asyncPayloadRecord.addData(value), + (error) => { + asyncPayloadRecord.errors.push(error); + asyncPayloadRecord.addData(null); + }, + ); + return; } asyncPayloadRecord.addData(promiseOrData); } @@ -1830,7 +1835,7 @@ function executeStreamField( exeContext, }); if (isPromise(item)) { - const completedItems = completePromisedValue( + completePromisedValue( exeContext, itemType, fieldNodes, @@ -1839,15 +1844,14 @@ function executeStreamField( item, asyncPayloadRecord, ).then( - (value) => [value], + (value) => asyncPayloadRecord.addItems([value]), (error) => { asyncPayloadRecord.errors.push(error); filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - return null; + asyncPayloadRecord.addItems(null); }, ); - asyncPayloadRecord.addItems(completedItems); return asyncPayloadRecord; } @@ -1880,7 +1884,7 @@ function executeStreamField( } if (isPromise(completedItem)) { - const completedItems = completedItem + completedItem .then(undefined, (rawError) => { const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); const handledError = handleFieldError( @@ -1892,15 +1896,14 @@ function executeStreamField( return handledError; }) .then( - (value) => [value], + (value) => asyncPayloadRecord.addItems([value]), (error) => { asyncPayloadRecord.errors.push(error); filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - return null; + asyncPayloadRecord.addItems(null); }, ); - asyncPayloadRecord.addItems(completedItems); return asyncPayloadRecord; } @@ -2015,22 +2018,19 @@ async function executeStreamIterator( const { done, value: completedItem } = iteration; - let completedItems: PromiseOrValue | null>; if (isPromise(completedItem)) { - completedItems = completedItem.then( - (value) => [value], + completedItem.then( + (resolvedItem) => asyncPayloadRecord.addItems([resolvedItem]), (error) => { asyncPayloadRecord.errors.push(error); filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - return null; + asyncPayloadRecord.addItems(null); }, ); } else { - completedItems = [completedItem]; + asyncPayloadRecord.addItems([completedItem]); } - asyncPayloadRecord.addItems(completedItems); - if (done) { break; } @@ -2095,12 +2095,14 @@ class DeferredFragmentRecord { errors: Array; label: string | undefined; path: Array; - promise: Promise; data: ObjMap | null; parentContext: AsyncPayloadRecord | undefined; - isCompleted: boolean; - _exeContext: ExecutionContext; - _resolve?: (arg: PromiseOrValue | null>) => void; + _publisher: Publisher< + AsyncPayloadRecord, + IncrementalResult, + SubsequentIncrementalExecutionResult + >; + constructor(opts: { label: string | undefined; path: Path | undefined; @@ -2112,27 +2114,14 @@ class DeferredFragmentRecord { this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.errors = []; - this._exeContext = opts.exeContext; - this._exeContext.publisher.add(this); - this.isCompleted = false; + this._publisher = opts.exeContext.publisher; + this._publisher.add(this); this.data = null; - this.promise = new Promise | null>((resolve) => { - this._resolve = (promiseOrValue) => { - resolve(promiseOrValue); - }; - }).then((data) => { - this.data = data; - this.isCompleted = true; - }); } - addData(data: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => data)); - return; - } - this._resolve?.(data); + addData(data: ObjMap | null) { + this.data = data; + this._publisher.complete(this); } } @@ -2142,13 +2131,15 @@ class StreamRecord { label: string | undefined; path: Array; items: Array | null; - promise: Promise; parentContext: AsyncPayloadRecord | undefined; iterator: AsyncIterator | undefined; isCompletedIterator?: boolean; - isCompleted: boolean; - _exeContext: ExecutionContext; - _resolve?: (arg: PromiseOrValue | null>) => void; + _publisher: Publisher< + AsyncPayloadRecord, + IncrementalResult, + SubsequentIncrementalExecutionResult + >; + constructor(opts: { label: string | undefined; path: Path | undefined; @@ -2163,27 +2154,14 @@ class StreamRecord { this.parentContext = opts.parentContext; this.iterator = opts.iterator; this.errors = []; - this._exeContext = opts.exeContext; - this._exeContext.publisher.add(this); - this.isCompleted = false; + this._publisher = opts.exeContext.publisher; + this._publisher.add(this); this.items = null; - this.promise = new Promise | null>((resolve) => { - this._resolve = (promiseOrValue) => { - resolve(promiseOrValue); - }; - }).then((items) => { - this.items = items; - this.isCompleted = true; - }); } - addItems(items: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => items)); - return; - } - this._resolve?.(items); + addItems(items: Array | null) { + this.items = items; + this._publisher.complete(this); } setIsCompletedIterator() { diff --git a/src/execution/publisher.ts b/src/execution/publisher.ts index 0378aae34e..7fe4f37074 100644 --- a/src/execution/publisher.ts +++ b/src/execution/publisher.ts @@ -1,10 +1,17 @@ interface Source { - promise: Promise; - isCompleted: boolean; + parentContext: this | undefined; isCompletedIterator?: boolean | undefined; iterator?: AsyncIterator | undefined; } +interface HasParent { + parentContext: T; +} + +function hasParent(value: T): value is T & HasParent { + return (value as HasParent).parentContext !== undefined; +} + type ToIncrementalResult = ( source: TSource, ) => TIncremental; @@ -18,7 +25,14 @@ type ToPayload = ( * @internal */ export class Publisher { - sources: Set; + // This is safe because a promise executor within the constructor will assign this. + trigger!: () => void; + signal: Promise; + pending: Set; + waiting: Set>; + waitingByParent: Map>>; + pushed: WeakSet; + current: Set; toIncrementalResult: ToIncrementalResult; toPayload: ToPayload; @@ -26,40 +40,118 @@ export class Publisher { toIncrementalResult: ToIncrementalResult, toPayload: ToPayload, ) { - this.sources = new Set(); + this.signal = new Promise((resolve) => { + this.trigger = resolve; + }); + this.pending = new Set(); + this.waiting = new Set(); + this.waitingByParent = new Map(); + this.pushed = new WeakSet(); + this.current = new Set(); this.toIncrementalResult = toIncrementalResult; this.toPayload = toPayload; } - add(source: TSource) { - this.sources.add(source); + add(source: TSource): void { + this.pending.add(source); + } + + complete(source: TSource): void { + // if source has been filtered, ignore completion + if (!this.pending.has(source)) { + return; + } + + this.pending.delete(source); + + if (!hasParent(source)) { + this._push(source); + this.trigger(); + return; + } + + const parentContext = source.parentContext; + if (this.pushed.has(source.parentContext)) { + this._push(source); + this.trigger(); + return; + } + + this.waiting.add(source); + + const waitingByParent = this.waitingByParent.get(parentContext); + if (waitingByParent) { + waitingByParent.add(source); + return; + } + + this.waitingByParent.set(parentContext, new Set([source])); + } + + _push(source: TSource): void { + this.pushed.add(source); + this.current.add(source); + + const waitingByParent = this.waitingByParent.get(source); + if (waitingByParent === undefined) { + return; + } + + for (const child of waitingByParent) { + this.waitingByParent.delete(child); + this.waiting.delete(child); + this._push(child); + } } hasNext(): boolean { - return this.sources.size > 0; + return ( + this.pending.size > 0 || this.waiting.size > 0 || this.current.size > 0 + ); } filter(predicate: (source: TSource) => boolean): void { - this.sources.forEach((source) => { + const iterators = new Set>(); + for (const set of [this.pending, this.current]) { + set.forEach((source) => { + if (predicate(source)) { + return; + } + if (source.iterator?.return) { + iterators.add(source.iterator); + } + set.delete(source); + }); + } + + this.waiting.forEach((source) => { if (predicate(source)) { return; } + if (source.iterator?.return) { - source.iterator.return().catch(() => { - // ignore error - }); + iterators.add(source.iterator); } - this.sources.delete(source); + + this.waiting.delete(source); + + const parentContext = source.parentContext; + const children = this.waitingByParent.get(parentContext); + // TODO: children can never be undefined, but TS doesn't know that + children?.delete(source); }); + + for (const iterator of iterators) { + iterator.return?.().catch(() => { + // ignore error + }); + } } _getCompletedIncrementalResults(): Array { const incrementalResults: Array = []; - for (const source of this.sources) { - if (!source.isCompleted) { - continue; - } - this.sources.delete(source); + for (const source of this.current) { + this.current.delete(source); if (source.isCompletedIterator) { continue; } @@ -76,17 +168,39 @@ export class Publisher { return { value: undefined, done: true }; } - await Promise.race(Array.from(this.sources).map((p) => p.promise)); + const incremental = this._getCompletedIncrementalResults(); + if (!incremental.length) { + return onSignal(); + } + + const hasNext = this.hasNext(); + + if (!hasNext) { + isDone = true; + } + + return { + value: this.toPayload(incremental, hasNext), + done: false, + }; + }; + + const onSignal = async (): Promise> => { + await this.signal; if (isDone) { return { value: undefined, done: true }; } const incremental = this._getCompletedIncrementalResults(); - const hasNext = this.sources.size > 0; + this.signal = new Promise((resolve) => { + this.trigger = resolve; + }); + + const hasNext = this.hasNext(); if (!incremental.length && hasNext) { - return next(); + return onSignal(); } if (!hasNext) { @@ -100,12 +214,21 @@ export class Publisher { }; const returnIterators = () => { + const iterators = new Set>(); + for (const set of [this.pending, this.waiting, this.current]) { + for (const source of set) { + if (source.iterator?.return) { + iterators.add(source.iterator); + } + } + } + const promises: Array>> = []; - this.sources.forEach((source) => { - if (source.iterator?.return) { - promises.push(source.iterator.return()); + for (const iterator of iterators) { + if (iterator?.return) { + promises.push(iterator.return()); } - }); + } return Promise.all(promises); }; @@ -115,13 +238,13 @@ export class Publisher { }, next, async return(): Promise> { - await returnIterators(); isDone = true; + await returnIterators(); return { value: undefined, done: true }; }, async throw(error?: unknown): Promise> { - await returnIterators(); isDone = true; + await returnIterators(); return Promise.reject(error); }, };