Skip to content

Commit e7512c0

Browse files
committed
do not initiate multiple streams at the same path
1 parent 6446050 commit e7512c0

File tree

2 files changed

+146
-21
lines changed

2 files changed

+146
-21
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,107 @@ describe('Execute: stream directive', () => {
636636
},
637637
]);
638638
});
639+
it('Does not initiate multiple streams at the same path', async () => {
640+
const document = parse(`
641+
query {
642+
friendList @stream(initialCount: 2) {
643+
name
644+
id
645+
}
646+
... @defer {
647+
friendList @stream(initialCount: 2) {
648+
name
649+
id
650+
}
651+
}
652+
}
653+
`);
654+
const result = await complete(document, { friendList: friends });
655+
expectJSON(result).toDeepEqual([
656+
{
657+
data: {
658+
friendList: [
659+
{ name: 'Luke', id: '1' },
660+
{ name: 'Han', id: '2' },
661+
],
662+
},
663+
hasNext: true,
664+
},
665+
{
666+
incremental: [
667+
{
668+
items: [{ name: 'Leia', id: '3' }],
669+
path: ['friendList', 2],
670+
},
671+
{
672+
data: {
673+
friendList: [
674+
{ name: 'Luke', id: '1' },
675+
{ name: 'Han', id: '2' },
676+
],
677+
},
678+
path: [],
679+
},
680+
],
681+
hasNext: false,
682+
},
683+
]);
684+
});
685+
it('Does not initiate multiple streams at the same path for async iterables', async () => {
686+
const document = parse(`
687+
query {
688+
friendList @stream(initialCount: 2) {
689+
name
690+
id
691+
}
692+
... @defer {
693+
friendList @stream(initialCount: 2) {
694+
name
695+
id
696+
}
697+
}
698+
}
699+
`);
700+
const result = await complete(document, {
701+
async *friendList() {
702+
yield await Promise.resolve(friends[0]);
703+
yield await Promise.resolve(friends[1]);
704+
yield await Promise.resolve(friends[2]);
705+
},
706+
});
707+
expectJSON(result).toDeepEqual([
708+
{
709+
data: {
710+
friendList: [
711+
{ name: 'Luke', id: '1' },
712+
{ name: 'Han', id: '2' },
713+
],
714+
},
715+
hasNext: true,
716+
},
717+
{
718+
incremental: [
719+
{
720+
data: {
721+
friendList: [
722+
{ name: 'Luke', id: '1' },
723+
{ name: 'Han', id: '2' },
724+
],
725+
},
726+
path: [],
727+
},
728+
{
729+
items: [{ name: 'Leia', id: '3' }],
730+
path: ['friendList', 2],
731+
},
732+
],
733+
hasNext: true,
734+
},
735+
{
736+
hasNext: false,
737+
},
738+
]);
739+
});
639740
it('Negative values of initialCount throw field errors on a field that returns an async iterable', async () => {
640741
const document = parse(`
641742
query {

src/execution/execute.ts

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export interface ExecutionContext {
125125
errors: Array<GraphQLError>;
126126
subsequentPayloads: Set<AsyncPayloadRecord>;
127127
branches: WeakMap<GroupedFieldSet, Set<Path | undefined>>;
128+
streams: WeakMap<FieldGroup, Set<Path>>;
128129
}
129130

130131
/**
@@ -505,6 +506,7 @@ export function buildExecutionContext(
505506
addPath: createPathFactory(),
506507
subsequentPayloads: new Set(),
507508
branches: new WeakMap(),
509+
streams: new WeakMap(),
508510
errors: [],
509511
};
510512
}
@@ -519,6 +521,7 @@ function buildPerEventExecutionContext(
519521
addPath: createPathFactory(),
520522
subsequentPayloads: new Set(),
521523
branches: new WeakMap(),
524+
streams: new WeakMap(),
522525
errors: [],
523526
};
524527
}
@@ -540,6 +543,23 @@ function shouldBranch(
540543
return false;
541544
}
542545

546+
function shouldStream(
547+
fieldGroup: FieldGroup,
548+
exeContext: ExecutionContext,
549+
path: Path,
550+
): boolean {
551+
const set = exeContext.streams.get(fieldGroup);
552+
if (set === undefined) {
553+
exeContext.streams.set(fieldGroup, new Set([path]));
554+
return true;
555+
}
556+
if (!set.has(path)) {
557+
set.add(path);
558+
return true;
559+
}
560+
return false;
561+
}
562+
543563
/**
544564
* Implements the "Executing operations" section of the spec.
545565
*/
@@ -1102,17 +1122,19 @@ async function completeAsyncIteratorValue(
11021122
typeof stream.initialCount === 'number' &&
11031123
index >= stream.initialCount
11041124
) {
1105-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1106-
executeStreamAsyncIterator(
1107-
index,
1108-
asyncIterator,
1109-
exeContext,
1110-
fieldGroup,
1111-
info,
1112-
itemType,
1113-
path,
1114-
asyncPayloadRecord,
1115-
);
1125+
if (shouldStream(fieldGroup, exeContext, path)) {
1126+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1127+
executeStreamAsyncIterator(
1128+
index,
1129+
asyncIterator,
1130+
exeContext,
1131+
fieldGroup,
1132+
info,
1133+
itemType,
1134+
path,
1135+
asyncPayloadRecord,
1136+
);
1137+
}
11161138
break;
11171139
}
11181140

@@ -1208,16 +1230,18 @@ function completeListValue(
12081230
typeof stream.initialCount === 'number' &&
12091231
index >= stream.initialCount
12101232
) {
1211-
executeStreamIterator(
1212-
index,
1213-
iterator,
1214-
exeContext,
1215-
fieldGroup,
1216-
info,
1217-
itemType,
1218-
path,
1219-
asyncPayloadRecord,
1220-
);
1233+
if (shouldStream(fieldGroup, exeContext, path)) {
1234+
executeStreamIterator(
1235+
index,
1236+
iterator,
1237+
exeContext,
1238+
fieldGroup,
1239+
info,
1240+
itemType,
1241+
path,
1242+
asyncPayloadRecord,
1243+
);
1244+
}
12211245
break;
12221246
}
12231247

0 commit comments

Comments
 (0)