Skip to content

Commit 61260af

Browse files
committed
introduce new Publisher for incremental delivery
Depends on #3784 The proposed new Publisher: 1. does not use the event loop to manage AsyncRecord dependencies 2. uses separate sets to store pending vs ready AsyncRecords 3. does not use `Promise.race` ## No event loop for managing AsyncRecord dependencies The current Publisher wraps every AsyncRecord result in a promise that only resolves after the parent AsyncRecord resolves. If multiple items within a stream are released for publishing because their parent has just been published, the stream cannot be in its entirety until after all of these promises unwind. The new Publisher keeps track of dependencies manually. When an AsyncRecord is pushed by the publisher, all of its dependent AsyncRecords are synchronously pushed, repeating as necessary, without using the event loop. ## Separate sets for pending vs ready AsyncRecords The current publisher inspects all pending AsyncRecords whenever any of them resolves. All that are completed are added to the response. The new Publisher moves AsyncRecords from the pending set to the ready set as they are pushed, so that on each call to next, only the ready set must be iterated. As a side-effect of this change, the incremental array is ordered by which items are ready for delivery first, and not by the initial document. This seems like a worthwhile tradeoff, and is still adherent to the spec, as far as I can tell. ## No `Promise.race` The old Publisher uses `Promise.race` as the trigger to determine whether payloads are ready. The new Publisher uses a single triggering promise that is triggered whenever an AsyncRecord is pushed, and then reset. This may be beneficial as the implementation of `Promise.race` within V8 has a known memory leak for long-running promises. (see https://bugs.chromium.org/p/v8/issues/detail?id=9858). An alternative would be to utilize @brainkim 's memory-safe version detailed in that issue.
1 parent 189519a commit 61260af

File tree

4 files changed

+173
-154
lines changed

4 files changed

+173
-154
lines changed

src/execution/__tests__/defer-test.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -605,11 +605,6 @@ describe('Execute: defer directive', () => {
605605
data: { slowField: 'slow', friends: [{}, {}, {}] },
606606
path: ['hero'],
607607
},
608-
],
609-
hasNext: true,
610-
},
611-
{
612-
incremental: [
613608
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
614609
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
615610
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },
@@ -653,11 +648,6 @@ describe('Execute: defer directive', () => {
653648
},
654649
path: ['hero'],
655650
},
656-
],
657-
hasNext: true,
658-
},
659-
{
660-
incremental: [
661651
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
662652
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
663653
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },

src/execution/__tests__/stream-test.ts

Lines changed: 21 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,10 @@ describe('Execute: stream directive', () => {
151151
hasNext: true,
152152
},
153153
{
154-
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
155-
hasNext: true,
156-
},
157-
{
158-
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
154+
incremental: [
155+
{ items: ['banana'], path: ['scalarList', 1] },
156+
{ items: ['coconut'], path: ['scalarList', 2] },
157+
],
159158
hasNext: false,
160159
},
161160
]);
@@ -173,15 +172,11 @@ describe('Execute: stream directive', () => {
173172
hasNext: true,
174173
},
175174
{
176-
incremental: [{ items: ['apple'], path: ['scalarList', 0] }],
177-
hasNext: true,
178-
},
179-
{
180-
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
181-
hasNext: true,
182-
},
183-
{
184-
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
175+
incremental: [
176+
{ items: ['apple'], path: ['scalarList', 0] },
177+
{ items: ['banana'], path: ['scalarList', 1] },
178+
{ items: ['coconut'], path: ['scalarList', 2] },
179+
],
185180
hasNext: false,
186181
},
187182
]);
@@ -230,11 +225,6 @@ describe('Execute: stream directive', () => {
230225
path: ['scalarList', 1],
231226
label: 'scalar-stream',
232227
},
233-
],
234-
hasNext: true,
235-
},
236-
{
237-
incremental: [
238228
{
239229
items: ['coconut'],
240230
path: ['scalarList', 2],
@@ -296,11 +286,6 @@ describe('Execute: stream directive', () => {
296286
items: [['banana', 'banana', 'banana']],
297287
path: ['scalarListList', 1],
298288
},
299-
],
300-
hasNext: true,
301-
},
302-
{
303-
incremental: [
304289
{
305290
items: [['coconut', 'coconut', 'coconut']],
306291
path: ['scalarListList', 2],
@@ -379,20 +364,10 @@ describe('Execute: stream directive', () => {
379364
items: [{ name: 'Luke', id: '1' }],
380365
path: ['friendList', 0],
381366
},
382-
],
383-
hasNext: true,
384-
},
385-
{
386-
incremental: [
387367
{
388368
items: [{ name: 'Han', id: '2' }],
389369
path: ['friendList', 1],
390370
},
391-
],
392-
hasNext: true,
393-
},
394-
{
395-
incremental: [
396371
{
397372
items: [{ name: 'Leia', id: '3' }],
398373
path: ['friendList', 2],
@@ -580,9 +555,6 @@ describe('Execute: stream directive', () => {
580555
path: ['friendList', 2],
581556
},
582557
],
583-
hasNext: true,
584-
},
585-
{
586558
hasNext: false,
587559
},
588560
]);
@@ -622,7 +594,7 @@ describe('Execute: stream directive', () => {
622594
}
623595
}
624596
`);
625-
const result = await completeAsync(document, 3, {
597+
const result = await completeAsync(document, 2, {
626598
async *friendList() {
627599
yield await Promise.resolve(friends[0]);
628600
yield await Promise.resolve(friends[1]);
@@ -651,10 +623,9 @@ describe('Execute: stream directive', () => {
651623
path: ['friendList', 2],
652624
},
653625
],
654-
hasNext: true,
626+
hasNext: false,
655627
},
656628
},
657-
{ done: false, value: { hasNext: false } },
658629
{ done: true, value: undefined },
659630
]);
660631
});
@@ -882,11 +853,6 @@ describe('Execute: stream directive', () => {
882853
},
883854
],
884855
},
885-
],
886-
hasNext: true,
887-
},
888-
{
889-
incremental: [
890856
{
891857
items: [{ nonNullName: 'Han' }],
892858
path: ['friendList', 2],
@@ -975,11 +941,6 @@ describe('Execute: stream directive', () => {
975941
},
976942
],
977943
},
978-
],
979-
hasNext: true,
980-
},
981-
{
982-
incremental: [
983944
{
984945
items: [{ nonNullName: 'Han' }],
985946
path: ['friendList', 2],
@@ -1135,6 +1096,10 @@ describe('Execute: stream directive', () => {
11351096
},
11361097
{
11371098
incremental: [
1099+
{
1100+
items: [{ name: 'Luke' }],
1101+
path: ['nestedObject', 'nestedFriendList', 0],
1102+
},
11381103
{
11391104
data: { scalarField: null },
11401105
path: ['otherNestedObject'],
@@ -1146,10 +1111,6 @@ describe('Execute: stream directive', () => {
11461111
},
11471112
],
11481113
},
1149-
{
1150-
items: [{ name: 'Luke' }],
1151-
path: ['nestedObject', 'nestedFriendList', 0],
1152-
},
11531114
],
11541115
hasNext: false,
11551116
},
@@ -1253,9 +1214,6 @@ describe('Execute: stream directive', () => {
12531214
],
12541215
},
12551216
],
1256-
hasNext: true,
1257-
},
1258-
{
12591217
hasNext: false,
12601218
},
12611219
]);
@@ -1402,9 +1360,6 @@ describe('Execute: stream directive', () => {
14021360
path: ['friendList', 2],
14031361
},
14041362
],
1405-
hasNext: true,
1406-
},
1407-
{
14081363
hasNext: false,
14091364
},
14101365
]);
@@ -1458,15 +1413,6 @@ describe('Execute: stream directive', () => {
14581413
data: { scalarField: 'slow', nestedFriendList: [] },
14591414
path: ['nestedObject'],
14601415
},
1461-
],
1462-
hasNext: true,
1463-
},
1464-
done: false,
1465-
});
1466-
const result3 = await iterator.next();
1467-
expectJSON(result3).toDeepEqual({
1468-
value: {
1469-
incremental: [
14701416
{
14711417
items: [{ name: 'Luke' }],
14721418
path: ['nestedObject', 'nestedFriendList', 0],
@@ -1476,8 +1422,8 @@ describe('Execute: stream directive', () => {
14761422
},
14771423
done: false,
14781424
});
1479-
const result4 = await iterator.next();
1480-
expectJSON(result4).toDeepEqual({
1425+
const result3 = await iterator.next();
1426+
expectJSON(result3).toDeepEqual({
14811427
value: {
14821428
incremental: [
14831429
{
@@ -1489,13 +1435,13 @@ describe('Execute: stream directive', () => {
14891435
},
14901436
done: false,
14911437
});
1492-
const result5 = await iterator.next();
1493-
expectJSON(result5).toDeepEqual({
1438+
const result4 = await iterator.next();
1439+
expectJSON(result4).toDeepEqual({
14941440
value: { hasNext: false },
14951441
done: false,
14961442
});
1497-
const result6 = await iterator.next();
1498-
expectJSON(result6).toDeepEqual({
1443+
const result5 = await iterator.next();
1444+
expectJSON(result5).toDeepEqual({
14991445
value: undefined,
15001446
done: true,
15011447
});

src/execution/execute.ts

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,12 +2150,14 @@ class DeferredFragmentRecord {
21502150
errors: Array<GraphQLError>;
21512151
label: string | undefined;
21522152
path: Array<string | number>;
2153-
promise: Promise<void>;
21542153
data: ObjMap<unknown> | null;
21552154
parentContext: AsyncPayloadRecord | undefined;
2156-
isCompleted: boolean;
2157-
_exeContext: ExecutionContext;
2158-
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
2155+
_publisher: Publisher<
2156+
AsyncPayloadRecord,
2157+
IncrementalResult,
2158+
SubsequentIncrementalExecutionResult
2159+
>;
2160+
21592161
constructor(opts: {
21602162
label: string | undefined;
21612163
path: Path | undefined;
@@ -2167,27 +2169,14 @@ class DeferredFragmentRecord {
21672169
this.path = pathToArray(opts.path);
21682170
this.parentContext = opts.parentContext;
21692171
this.errors = [];
2170-
this._exeContext = opts.exeContext;
2171-
this._exeContext.publisher.add(this);
2172-
this.isCompleted = false;
2172+
this._publisher = opts.exeContext.publisher;
2173+
this._publisher.add(this);
21732174
this.data = null;
2174-
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
2175-
this._resolve = (promiseOrValue) => {
2176-
resolve(promiseOrValue);
2177-
};
2178-
}).then((data) => {
2179-
this.data = data;
2180-
this.isCompleted = true;
2181-
});
21822175
}
21832176

21842177
addData(data: ObjMap<unknown> | null) {
2185-
const parentData = this.parentContext?.promise;
2186-
if (parentData) {
2187-
this._resolve?.(parentData.then(() => data));
2188-
return;
2189-
}
2190-
this._resolve?.(data);
2178+
this.data = data;
2179+
this._publisher.complete(this);
21912180
}
21922181
}
21932182

@@ -2197,13 +2186,15 @@ class StreamRecord {
21972186
label: string | undefined;
21982187
path: Array<string | number>;
21992188
items: Array<unknown> | null;
2200-
promise: Promise<void>;
22012189
parentContext: AsyncPayloadRecord | undefined;
22022190
iterator: AsyncIterator<unknown> | undefined;
22032191
isCompletedIterator?: boolean;
2204-
isCompleted: boolean;
2205-
_exeContext: ExecutionContext;
2206-
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
2192+
_publisher: Publisher<
2193+
AsyncPayloadRecord,
2194+
IncrementalResult,
2195+
SubsequentIncrementalExecutionResult
2196+
>;
2197+
22072198
constructor(opts: {
22082199
label: string | undefined;
22092200
path: Path | undefined;
@@ -2218,27 +2209,14 @@ class StreamRecord {
22182209
this.parentContext = opts.parentContext;
22192210
this.iterator = opts.iterator;
22202211
this.errors = [];
2221-
this._exeContext = opts.exeContext;
2222-
this._exeContext.publisher.add(this);
2223-
this.isCompleted = false;
2212+
this._publisher = opts.exeContext.publisher;
2213+
this._publisher.add(this);
22242214
this.items = null;
2225-
this.promise = new Promise<Array<unknown> | null>((resolve) => {
2226-
this._resolve = (promiseOrValue) => {
2227-
resolve(promiseOrValue);
2228-
};
2229-
}).then((items) => {
2230-
this.items = items;
2231-
this.isCompleted = true;
2232-
});
22332215
}
22342216

22352217
addItems(items: Array<unknown> | null) {
2236-
const parentData = this.parentContext?.promise;
2237-
if (parentData) {
2238-
this._resolve?.(parentData.then(() => items));
2239-
return;
2240-
}
2241-
this._resolve?.(items);
2218+
this.items = items;
2219+
this._publisher.complete(this);
22422220
}
22432221

22442222
setIsCompletedIterator() {

0 commit comments

Comments
 (0)