Skip to content

introduce new Publisher for incremental delivery #3786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions src/execution/__tests__/defer-test.ts
Original file line number Diff line number Diff line change
@@ -605,11 +605,6 @@ describe('Execute: defer directive', () => {
data: { slowField: 'slow', friends: [{}, {}, {}] },
path: ['hero'],
},
],
hasNext: true,
},
{
incremental: [
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },
@@ -653,11 +648,6 @@ describe('Execute: defer directive', () => {
},
path: ['hero'],
},
],
hasNext: true,
},
{
incremental: [
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },
97 changes: 21 additions & 76 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
@@ -151,11 +151,10 @@ describe('Execute: stream directive', () => {
hasNext: true,
},
{
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
hasNext: true,
},
{
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
incremental: [
{ items: ['banana'], path: ['scalarList', 1] },
{ items: ['coconut'], path: ['scalarList', 2] },
],
hasNext: false,
},
]);
@@ -173,15 +172,11 @@ describe('Execute: stream directive', () => {
hasNext: true,
},
{
incremental: [{ items: ['apple'], path: ['scalarList', 0] }],
hasNext: true,
},
{
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
hasNext: true,
},
{
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
incremental: [
{ items: ['apple'], path: ['scalarList', 0] },
{ items: ['banana'], path: ['scalarList', 1] },
{ items: ['coconut'], path: ['scalarList', 2] },
],
hasNext: false,
},
]);
@@ -230,11 +225,6 @@ describe('Execute: stream directive', () => {
path: ['scalarList', 1],
label: 'scalar-stream',
},
],
hasNext: true,
},
{
incremental: [
{
items: ['coconut'],
path: ['scalarList', 2],
@@ -296,11 +286,6 @@ describe('Execute: stream directive', () => {
items: [['banana', 'banana', 'banana']],
path: ['scalarListList', 1],
},
],
hasNext: true,
},
{
incremental: [
{
items: [['coconut', 'coconut', 'coconut']],
path: ['scalarListList', 2],
@@ -379,20 +364,10 @@ describe('Execute: stream directive', () => {
items: [{ name: 'Luke', id: '1' }],
path: ['friendList', 0],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ name: 'Han', id: '2' }],
path: ['friendList', 1],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
@@ -531,11 +506,6 @@ describe('Execute: stream directive', () => {
},
],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
@@ -707,7 +677,12 @@ describe('Execute: stream directive', () => {
hasNext: true,
},
},
{ done: false, value: { hasNext: false } },
{
done: false,
value: {
hasNext: false,
},
},
{ done: true, value: undefined },
]);
});
@@ -935,11 +910,6 @@ describe('Execute: stream directive', () => {
},
],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ nonNullName: 'Han' }],
path: ['friendList', 2],
@@ -984,11 +954,6 @@ describe('Execute: stream directive', () => {
},
],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ nonNullName: 'Han' }],
path: ['friendList', 2],
@@ -1117,11 +1082,6 @@ describe('Execute: stream directive', () => {
},
],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ nonNullName: 'Han' }],
path: ['friendList', 2],
@@ -1407,9 +1367,6 @@ describe('Execute: stream directive', () => {
],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
@@ -1556,9 +1513,6 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
@@ -1612,15 +1566,6 @@ describe('Execute: stream directive', () => {
data: { scalarField: 'slow', nestedFriendList: [] },
path: ['nestedObject'],
},
],
hasNext: true,
},
done: false,
});
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
value: {
incremental: [
{
items: [{ name: 'Luke' }],
path: ['nestedObject', 'nestedFriendList', 0],
@@ -1630,8 +1575,8 @@ describe('Execute: stream directive', () => {
},
done: false,
});
const result4 = await iterator.next();
expectJSON(result4).toDeepEqual({
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
value: {
incremental: [
{
@@ -1643,13 +1588,13 @@ describe('Execute: stream directive', () => {
},
done: false,
});
const result5 = await iterator.next();
expectJSON(result5).toDeepEqual({
const result4 = await iterator.next();
expectJSON(result4).toDeepEqual({
value: { hasNext: false },
done: false,
});
const result6 = await iterator.next();
expectJSON(result6).toDeepEqual({
const result5 = await iterator.next();
expectJSON(result5).toDeepEqual({
value: undefined,
done: true,
});
270 changes: 86 additions & 184 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ import {
collectSubfields as _collectSubfields,
} from './collectFields.js';
import { mapAsyncIterable } from './mapAsyncIterable.js';
import { Publisher } from './publisher.js';
import {
getArgumentValues,
getDirectiveValues,
@@ -121,7 +122,11 @@ export interface ExecutionContext {
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
errors: Array<GraphQLError>;
subsequentPayloads: Set<AsyncPayloadRecord>;
publisher: Publisher<
AsyncPayloadRecord,
IncrementalResult,
SubsequentIncrementalExecutionResult
>;
}

/**
@@ -357,13 +362,14 @@ function executeImpl(
return result.then(
(data) => {
const initialResult = buildResponse(data, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
const publisher = exeContext.publisher;
if (publisher.hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: yieldSubsequentPayloads(exeContext),
subsequentResults: publisher.subscribe(),
};
}
return initialResult;
@@ -375,13 +381,14 @@ function executeImpl(
);
}
const initialResult = buildResponse(result, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
const publisher = exeContext.publisher;
if (publisher.hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: yieldSubsequentPayloads(exeContext),
subsequentResults: publisher.subscribe(),
};
}
return initialResult;
@@ -503,7 +510,7 @@ export function buildExecutionContext(
fieldResolver: fieldResolver ?? defaultFieldResolver,
typeResolver: typeResolver ?? defaultTypeResolver,
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
subsequentPayloads: new Set(),
publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults),
errors: [],
};
}
@@ -515,7 +522,7 @@ function buildPerEventExecutionContext(
return {
...exeContext,
rootValue: payload,
subsequentPayloads: new Set(),
publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults),
errors: [],
};
}
@@ -1791,16 +1798,21 @@ function executeDeferredFragment(
fields,
asyncPayloadRecord,
);

if (isPromise(promiseOrData)) {
promiseOrData = promiseOrData.then(null, (e) => {
asyncPayloadRecord.errors.push(e);
return null;
});
}
} catch (e) {
asyncPayloadRecord.errors.push(e);
promiseOrData = null;
asyncPayloadRecord.addData(null);
return;
}

if (isPromise(promiseOrData)) {
promiseOrData.then(
(value) => asyncPayloadRecord.addData(value),
(error) => {
asyncPayloadRecord.errors.push(error);
asyncPayloadRecord.addData(null);
},
);
return;
}
asyncPayloadRecord.addData(promiseOrData);
}
@@ -1823,7 +1835,7 @@ function executeStreamField(
exeContext,
});
if (isPromise(item)) {
const completedItems = completePromisedValue(
completePromisedValue(
exeContext,
itemType,
fieldNodes,
@@ -1832,15 +1844,14 @@ function executeStreamField(
item,
asyncPayloadRecord,
).then(
(value) => [value],
(value) => asyncPayloadRecord.addItems([value]),
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
asyncPayloadRecord.addItems(null);
},
);

asyncPayloadRecord.addItems(completedItems);
return asyncPayloadRecord;
}

@@ -1873,7 +1884,7 @@ function executeStreamField(
}

if (isPromise(completedItem)) {
const completedItems = completedItem
completedItem
.then(undefined, (rawError) => {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const handledError = handleFieldError(
@@ -1885,15 +1896,14 @@ function executeStreamField(
return handledError;
})
.then(
(value) => [value],
(value) => asyncPayloadRecord.addItems([value]),
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
asyncPayloadRecord.addItems(null);
},
);

asyncPayloadRecord.addItems(completedItems);
return asyncPayloadRecord;
}

@@ -2008,22 +2018,19 @@ async function executeStreamIterator(

const { done, value: completedItem } = iteration;

let completedItems: PromiseOrValue<Array<unknown> | null>;
if (isPromise(completedItem)) {
completedItems = completedItem.then(
(value) => [value],
completedItem.then(
(resolvedItem) => asyncPayloadRecord.addItems([resolvedItem]),
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
asyncPayloadRecord.addItems(null);
},
);
} else {
completedItems = [completedItem];
asyncPayloadRecord.addItems([completedItem]);
}

asyncPayloadRecord.addItems(completedItems);

if (done) {
break;
}
@@ -2038,145 +2045,64 @@ function filterSubsequentPayloads(
currentAsyncRecord: AsyncPayloadRecord | undefined,
): void {
const nullPathArray = pathToArray(nullPath);
exeContext.subsequentPayloads.forEach((asyncRecord) => {
exeContext.publisher.filter((asyncRecord) => {
if (asyncRecord === currentAsyncRecord) {
// don't remove payload from where error originates
return;
return true;
}
for (let i = 0; i < nullPathArray.length; i++) {
if (asyncRecord.path[i] !== nullPathArray[i]) {
// asyncRecord points to a path unaffected by this payload
return;
return true;
}
}
// asyncRecord path points to nulled error field
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
asyncRecord.iterator.return().catch(() => {
// ignore error
});
}
exeContext.subsequentPayloads.delete(asyncRecord);

return false;
});
}

function getCompletedIncrementalResults(
exeContext: ExecutionContext,
): Array<IncrementalResult> {
const incrementalResults: Array<IncrementalResult> = [];
for (const asyncPayloadRecord of exeContext.subsequentPayloads) {
const incrementalResult: IncrementalResult = {};
if (!asyncPayloadRecord.isCompleted) {
continue;
}
exeContext.subsequentPayloads.delete(asyncPayloadRecord);
if (isStreamPayload(asyncPayloadRecord)) {
const items = asyncPayloadRecord.items;
if (asyncPayloadRecord.isCompletedIterator) {
// async iterable resolver just finished but there may be pending payloads
continue;
}
(incrementalResult as IncrementalStreamResult).items = items;
} else {
const data = asyncPayloadRecord.data;
(incrementalResult as IncrementalDeferResult).data = data ?? null;
}

incrementalResult.path = asyncPayloadRecord.path;
if (asyncPayloadRecord.label) {
incrementalResult.label = asyncPayloadRecord.label;
}
if (asyncPayloadRecord.errors.length > 0) {
incrementalResult.errors = asyncPayloadRecord.errors;
}
incrementalResults.push(incrementalResult);
function resultFromAsyncPayloadRecord(
asyncPayloadRecord: AsyncPayloadRecord,
): IncrementalResult {
const incrementalResult: IncrementalResult = {};
if (isStreamPayload(asyncPayloadRecord)) {
const items = asyncPayloadRecord.items;
(incrementalResult as IncrementalStreamResult).items = items;
} else {
const data = asyncPayloadRecord.data;
(incrementalResult as IncrementalDeferResult).data = data ?? null;
}
return incrementalResults;
}

function yieldSubsequentPayloads(
exeContext: ExecutionContext,
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;

async function next(): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> {
if (isDone) {
return { value: undefined, done: true };
}

await Promise.race(
Array.from(exeContext.subsequentPayloads).map((p) => p.promise),
);

if (isDone) {
// a different call to next has exhausted all payloads
return { value: undefined, done: true };
}

const incremental = getCompletedIncrementalResults(exeContext);
const hasNext = exeContext.subsequentPayloads.size > 0;

if (!incremental.length && hasNext) {
return next();
}

if (!hasNext) {
isDone = true;
}

return {
value: incremental.length ? { incremental, hasNext } : { hasNext },
done: false,
};
incrementalResult.path = asyncPayloadRecord.path;
if (asyncPayloadRecord.label) {
incrementalResult.label = asyncPayloadRecord.label;
}

function returnStreamIterators() {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
exeContext.subsequentPayloads.forEach((asyncPayloadRecord) => {
if (
isStreamPayload(asyncPayloadRecord) &&
asyncPayloadRecord.iterator?.return
) {
promises.push(asyncPayloadRecord.iterator.return());
}
});
return Promise.all(promises);
if (asyncPayloadRecord.errors.length > 0) {
incrementalResult.errors = asyncPayloadRecord.errors;
}
return incrementalResult;
}

return {
[Symbol.asyncIterator]() {
return this;
},
next,
async return(): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> {
await returnStreamIterators();
isDone = true;
return { value: undefined, done: true };
},
async throw(
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
await returnStreamIterators();
isDone = true;
return Promise.reject(error);
},
};
function payloadFromResults(
incremental: ReadonlyArray<IncrementalResult>,
hasNext: boolean,
): SubsequentIncrementalExecutionResult {
return incremental.length ? { incremental, hasNext } : { hasNext };
}

class DeferredFragmentRecord {
type: 'defer';
errors: Array<GraphQLError>;
label: string | undefined;
path: Array<string | number>;
promise: Promise<void>;
data: ObjMap<unknown> | null;
parentContext: AsyncPayloadRecord | undefined;
isCompleted: boolean;
_exeContext: ExecutionContext;
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
_publisher: Publisher<
AsyncPayloadRecord,
IncrementalResult,
SubsequentIncrementalExecutionResult
>;

constructor(opts: {
label: string | undefined;
path: Path | undefined;
@@ -2188,27 +2114,14 @@ class DeferredFragmentRecord {
this.path = pathToArray(opts.path);
this.parentContext = opts.parentContext;
this.errors = [];
this._exeContext = opts.exeContext;
this._exeContext.subsequentPayloads.add(this);
this.isCompleted = false;
this._publisher = opts.exeContext.publisher;
this._publisher.add(this);
this.data = null;
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
this._resolve = (promiseOrValue) => {
resolve(promiseOrValue);
};
}).then((data) => {
this.data = data;
this.isCompleted = true;
});
}

addData(data: PromiseOrValue<ObjMap<unknown> | null>) {
const parentData = this.parentContext?.promise;
if (parentData) {
this._resolve?.(parentData.then(() => data));
return;
}
this._resolve?.(data);
addData(data: ObjMap<unknown> | null) {
this.data = data;
this._publisher.complete(this);
}
}

@@ -2218,13 +2131,15 @@ class StreamRecord {
label: string | undefined;
path: Array<string | number>;
items: Array<unknown> | null;
promise: Promise<void>;
parentContext: AsyncPayloadRecord | undefined;
iterator: AsyncIterator<unknown> | undefined;
isCompletedIterator?: boolean;
isCompleted: boolean;
_exeContext: ExecutionContext;
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
_publisher: Publisher<
AsyncPayloadRecord,
IncrementalResult,
SubsequentIncrementalExecutionResult
>;

constructor(opts: {
label: string | undefined;
path: Path | undefined;
@@ -2239,27 +2154,14 @@ class StreamRecord {
this.parentContext = opts.parentContext;
this.iterator = opts.iterator;
this.errors = [];
this._exeContext = opts.exeContext;
this._exeContext.subsequentPayloads.add(this);
this.isCompleted = false;
this._publisher = opts.exeContext.publisher;
this._publisher.add(this);
this.items = null;
this.promise = new Promise<Array<unknown> | null>((resolve) => {
this._resolve = (promiseOrValue) => {
resolve(promiseOrValue);
};
}).then((items) => {
this.items = items;
this.isCompleted = true;
});
}

addItems(items: PromiseOrValue<Array<unknown> | null>) {
const parentData = this.parentContext?.promise;
if (parentData) {
this._resolve?.(parentData.then(() => items));
return;
}
this._resolve?.(items);
addItems(items: Array<unknown> | null) {
this.items = items;
this._publisher.complete(this);
}

setIsCompletedIterator() {
252 changes: 252 additions & 0 deletions src/execution/publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
interface Source {
parentContext: this | undefined;
isCompletedIterator?: boolean | undefined;
iterator?: AsyncIterator<unknown> | undefined;
}

interface HasParent<T> {
parentContext: T;
}

function hasParent<T>(value: T): value is T & HasParent<T> {
return (value as HasParent<T>).parentContext !== undefined;
}

type ToIncrementalResult<TSource extends Source, TIncremental> = (
source: TSource,
) => TIncremental;

type ToPayload<TIncremental, TPayload> = (
incremental: ReadonlyArray<TIncremental>,
hasNext: boolean,
) => TPayload;

/**
* @internal
*/
export class Publisher<TSource extends Source, TIncremental, TPayload> {
// This is safe because a promise executor within the constructor will assign this.
trigger!: () => void;
signal: Promise<void>;
pending: Set<TSource>;
waiting: Set<TSource & HasParent<TSource>>;
waitingByParent: Map<TSource, Set<TSource & HasParent<TSource>>>;
pushed: WeakSet<TSource>;
current: Set<TSource>;
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>;
toPayload: ToPayload<TIncremental, TPayload>;

constructor(
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>,
toPayload: ToPayload<TIncremental, TPayload>,
) {
this.signal = new Promise((resolve) => {
this.trigger = resolve;
});
this.pending = new Set();
this.waiting = new Set();
this.waitingByParent = new Map();
this.pushed = new WeakSet();
this.current = new Set();
this.toIncrementalResult = toIncrementalResult;
this.toPayload = toPayload;
}

add(source: TSource): void {
this.pending.add(source);
}

complete(source: TSource): void {
// if source has been filtered, ignore completion
if (!this.pending.has(source)) {
return;
}

this.pending.delete(source);

if (!hasParent(source)) {
this._push(source);
this.trigger();
return;
}

const parentContext = source.parentContext;
if (this.pushed.has(source.parentContext)) {
this._push(source);
this.trigger();
return;
}

this.waiting.add(source);

const waitingByParent = this.waitingByParent.get(parentContext);
if (waitingByParent) {
waitingByParent.add(source);
return;
}

this.waitingByParent.set(parentContext, new Set([source]));
}

_push(source: TSource): void {
this.pushed.add(source);
this.current.add(source);

const waitingByParent = this.waitingByParent.get(source);
if (waitingByParent === undefined) {
return;
}

for (const child of waitingByParent) {
this.waitingByParent.delete(child);
this.waiting.delete(child);
this._push(child);
}
}

hasNext(): boolean {
return (
this.pending.size > 0 || this.waiting.size > 0 || this.current.size > 0
);
}

filter(predicate: (source: TSource) => boolean): void {
const iterators = new Set<AsyncIterator<unknown>>();
for (const set of [this.pending, this.current]) {
set.forEach((source) => {
if (predicate(source)) {
return;
}
if (source.iterator?.return) {
iterators.add(source.iterator);
}
set.delete(source);
});
}

this.waiting.forEach((source) => {
if (predicate(source)) {
return;
}

if (source.iterator?.return) {
iterators.add(source.iterator);
}

this.waiting.delete(source);

const parentContext = source.parentContext;
const children = this.waitingByParent.get(parentContext);
// TODO: children can never be undefined, but TS doesn't know that
children?.delete(source);
});

for (const iterator of iterators) {
iterator.return?.().catch(() => {
// ignore error
});
}
}

_getCompletedIncrementalResults(): Array<TIncremental> {
const incrementalResults: Array<TIncremental> = [];
for (const source of this.current) {
this.current.delete(source);
if (source.isCompletedIterator) {
continue;
}
incrementalResults.push(this.toIncrementalResult(source));
}
return incrementalResults;
}

subscribe(): AsyncGenerator<TPayload, void, void> {
let isDone = false;

const next = async (): Promise<IteratorResult<TPayload, void>> => {
if (isDone) {
return { value: undefined, done: true };
}

const incremental = this._getCompletedIncrementalResults();
if (!incremental.length) {
return onSignal();
}

const hasNext = this.hasNext();

if (!hasNext) {
isDone = true;
}

return {
value: this.toPayload(incremental, hasNext),
done: false,
};
};

const onSignal = async (): Promise<IteratorResult<TPayload, void>> => {
await this.signal;

if (isDone) {
return { value: undefined, done: true };
}

const incremental = this._getCompletedIncrementalResults();

this.signal = new Promise((resolve) => {
this.trigger = resolve;
});

const hasNext = this.hasNext();
if (!incremental.length && hasNext) {
return onSignal();
}

if (!hasNext) {
isDone = true;
}

return {
value: this.toPayload(incremental, hasNext),
done: false,
};
};

const returnIterators = () => {
const iterators = new Set<AsyncIterator<unknown>>();
for (const set of [this.pending, this.waiting, this.current]) {
for (const source of set) {
if (source.iterator?.return) {
iterators.add(source.iterator);
}
}
}

const promises: Array<Promise<IteratorResult<unknown>>> = [];
for (const iterator of iterators) {
if (iterator?.return) {
promises.push(iterator.return());
}
}
return Promise.all(promises);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next,
async return(): Promise<IteratorResult<TPayload, void>> {
isDone = true;
await returnIterators();
return { value: undefined, done: true };
},
async throw(error?: unknown): Promise<IteratorResult<TPayload, void>> {
isDone = true;
await returnIterators();
return Promise.reject(error);
},
};
}
}