@@ -122,6 +122,7 @@ export interface ExecutionContext {
122
122
subscribeFieldResolver : GraphQLFieldResolver < any , any > ;
123
123
errors : Array < GraphQLError > ;
124
124
subsequentPayloads : Set < AsyncPayloadRecord > ;
125
+ streams : Set < StreamContext > ;
125
126
}
126
127
127
128
/**
@@ -504,6 +505,7 @@ export function buildExecutionContext(
504
505
typeResolver : typeResolver ?? defaultTypeResolver ,
505
506
subscribeFieldResolver : subscribeFieldResolver ?? defaultFieldResolver ,
506
507
subsequentPayloads : new Set ( ) ,
508
+ streams : new Set ( ) ,
507
509
errors : [ ] ,
508
510
} ;
509
511
}
@@ -516,6 +518,7 @@ function buildPerEventExecutionContext(
516
518
...exeContext ,
517
519
rootValue : payload ,
518
520
subsequentPayloads : new Set ( ) ,
521
+ streams : new Set ( ) ,
519
522
errors : [ ] ,
520
523
} ;
521
524
}
@@ -1036,6 +1039,11 @@ async function completeAsyncIteratorValue(
1036
1039
typeof stream . initialCount === 'number' &&
1037
1040
index >= stream . initialCount
1038
1041
) {
1042
+ const streamContext : StreamContext = {
1043
+ path : pathToArray ( path ) ,
1044
+ iterator,
1045
+ } ;
1046
+ exeContext . streams . add ( streamContext ) ;
1039
1047
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1040
1048
executeStreamIterator (
1041
1049
index ,
@@ -1045,6 +1053,7 @@ async function completeAsyncIteratorValue(
1045
1053
info ,
1046
1054
itemType ,
1047
1055
path ,
1056
+ streamContext ,
1048
1057
stream . label ,
1049
1058
asyncPayloadRecord ,
1050
1059
) ;
@@ -1129,6 +1138,7 @@ function completeListValue(
1129
1138
let previousAsyncPayloadRecord = asyncPayloadRecord ;
1130
1139
const completedResults : Array < unknown > = [ ] ;
1131
1140
let index = 0 ;
1141
+ let streamContext : StreamContext | undefined ;
1132
1142
for ( const item of result ) {
1133
1143
// No need to modify the info object containing the path,
1134
1144
// since from here on it is not ever accessed by resolver functions.
@@ -1139,6 +1149,8 @@ function completeListValue(
1139
1149
typeof stream . initialCount === 'number' &&
1140
1150
index >= stream . initialCount
1141
1151
) {
1152
+ streamContext = { path : pathToArray ( path ) } ;
1153
+ exeContext . streams . add ( streamContext ) ;
1142
1154
previousAsyncPayloadRecord = executeStreamField (
1143
1155
path ,
1144
1156
itemPath ,
@@ -1147,6 +1159,7 @@ function completeListValue(
1147
1159
fieldNodes ,
1148
1160
info ,
1149
1161
itemType ,
1162
+ streamContext ,
1150
1163
stream . label ,
1151
1164
previousAsyncPayloadRecord ,
1152
1165
) ;
@@ -1173,6 +1186,10 @@ function completeListValue(
1173
1186
index ++ ;
1174
1187
}
1175
1188
1189
+ if ( streamContext ) {
1190
+ exeContext . streams . delete ( streamContext ) ;
1191
+ }
1192
+
1176
1193
return containsPromise ? Promise . all ( completedResults ) : completedResults ;
1177
1194
}
1178
1195
@@ -1813,6 +1830,7 @@ function executeStreamField(
1813
1830
fieldNodes : ReadonlyArray < FieldNode > ,
1814
1831
info : GraphQLResolveInfo ,
1815
1832
itemType : GraphQLOutputType ,
1833
+ streamContext : StreamContext ,
1816
1834
label ?: string ,
1817
1835
parentContext ?: AsyncPayloadRecord ,
1818
1836
) : AsyncPayloadRecord {
@@ -1835,6 +1853,8 @@ function executeStreamField(
1835
1853
( value ) => [ value ] ,
1836
1854
( error ) => {
1837
1855
asyncPayloadRecord . errors . push ( error ) ;
1856
+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
1857
+ exeContext . streams . delete ( streamContext ) ;
1838
1858
filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
1839
1859
return null ;
1840
1860
} ,
@@ -1867,6 +1887,8 @@ function executeStreamField(
1867
1887
}
1868
1888
} catch ( error ) {
1869
1889
asyncPayloadRecord . errors . push ( error ) ;
1890
+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
1891
+ exeContext . streams . delete ( streamContext ) ;
1870
1892
filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
1871
1893
asyncPayloadRecord . addItems ( null ) ;
1872
1894
return asyncPayloadRecord ;
@@ -1887,6 +1909,8 @@ function executeStreamField(
1887
1909
. then (
1888
1910
( value ) => [ value ] ,
1889
1911
( error ) => {
1912
+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
1913
+ exeContext . streams . delete ( streamContext ) ;
1890
1914
asyncPayloadRecord . errors . push ( error ) ;
1891
1915
filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
1892
1916
return null ;
@@ -1965,19 +1989,18 @@ async function executeStreamIterator(
1965
1989
info : GraphQLResolveInfo ,
1966
1990
itemType : GraphQLOutputType ,
1967
1991
path : Path ,
1992
+ streamContext : StreamContext ,
1968
1993
label ?: string ,
1969
1994
parentContext ?: AsyncPayloadRecord ,
1970
1995
) : Promise < void > {
1971
1996
let index = initialIndex ;
1972
1997
let previousAsyncPayloadRecord = parentContext ?? undefined ;
1973
- // eslint-disable-next-line no-constant-condition
1974
- while ( true ) {
1998
+ while ( exeContext . streams . has ( streamContext ) ) {
1975
1999
const itemPath = addPath ( path , index , undefined ) ;
1976
2000
const asyncPayloadRecord = new StreamRecord ( {
1977
2001
label,
1978
2002
path : itemPath ,
1979
2003
parentContext : previousAsyncPayloadRecord ,
1980
- iterator,
1981
2004
exeContext,
1982
2005
} ) ;
1983
2006
@@ -1995,14 +2018,10 @@ async function executeStreamIterator(
1995
2018
) ;
1996
2019
} catch ( error ) {
1997
2020
asyncPayloadRecord . errors . push ( error ) ;
2021
+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
2022
+ exeContext . streams . delete ( streamContext ) ;
1998
2023
filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
1999
2024
asyncPayloadRecord . addItems ( null ) ;
2000
- // entire stream has errored and bubbled upwards
2001
- if ( iterator ?. return ) {
2002
- iterator . return ( ) . catch ( ( ) => {
2003
- // ignore errors
2004
- } ) ;
2005
- }
2006
2025
return ;
2007
2026
}
2008
2027
@@ -2014,6 +2033,8 @@ async function executeStreamIterator(
2014
2033
( value ) => [ value ] ,
2015
2034
( error ) => {
2016
2035
asyncPayloadRecord . errors . push ( error ) ;
2036
+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
2037
+ exeContext . streams . delete ( streamContext ) ;
2017
2038
filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
2018
2039
return null ;
2019
2040
} ,
@@ -2025,6 +2046,7 @@ async function executeStreamIterator(
2025
2046
asyncPayloadRecord . addItems ( completedItems ) ;
2026
2047
2027
2048
if ( done ) {
2049
+ exeContext . streams . delete ( streamContext ) ;
2028
2050
break ;
2029
2051
}
2030
2052
previousAsyncPayloadRecord = asyncPayloadRecord ;
@@ -2038,6 +2060,16 @@ function filterSubsequentPayloads(
2038
2060
currentAsyncRecord : AsyncPayloadRecord | undefined ,
2039
2061
) : void {
2040
2062
const nullPathArray = pathToArray ( nullPath ) ;
2063
+ exeContext . streams . forEach ( ( stream ) => {
2064
+ for ( let i = 0 ; i < nullPathArray . length ; i ++ ) {
2065
+ if ( stream . path [ i ] !== nullPathArray [ i ] ) {
2066
+ // stream points to a path unaffected by this payload
2067
+ return ;
2068
+ }
2069
+ }
2070
+ returnStreamIteratorIgnoringErrors ( stream ) ;
2071
+ exeContext . streams . delete ( stream ) ;
2072
+ } ) ;
2041
2073
exeContext . subsequentPayloads . forEach ( ( asyncRecord ) => {
2042
2074
if ( asyncRecord === currentAsyncRecord ) {
2043
2075
// don't remove payload from where error originates
@@ -2049,16 +2081,21 @@ function filterSubsequentPayloads(
2049
2081
return ;
2050
2082
}
2051
2083
}
2052
- // asyncRecord path points to nulled error field
2053
- if ( isStreamPayload ( asyncRecord ) && asyncRecord . iterator ?. return ) {
2054
- asyncRecord . iterator . return ( ) . catch ( ( ) => {
2055
- // ignore error
2056
- } ) ;
2057
- }
2058
2084
exeContext . subsequentPayloads . delete ( asyncRecord ) ;
2059
2085
} ) ;
2060
2086
}
2061
2087
2088
+ function returnStreamIteratorIgnoringErrors (
2089
+ streamContext : StreamContext ,
2090
+ ) : void {
2091
+ const returnFn = streamContext . iterator ?. return ;
2092
+ if ( returnFn ) {
2093
+ returnFn ( ) . catch ( ( ) => {
2094
+ // ignore error
2095
+ } ) ;
2096
+ }
2097
+ }
2098
+
2062
2099
function getCompletedIncrementalResults (
2063
2100
exeContext : ExecutionContext ,
2064
2101
) : Array < IncrementalResult > {
@@ -2133,12 +2170,9 @@ function yieldSubsequentPayloads(
2133
2170
2134
2171
function returnStreamIterators ( ) {
2135
2172
const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2136
- exeContext . subsequentPayloads . forEach ( ( asyncPayloadRecord ) => {
2137
- if (
2138
- isStreamPayload ( asyncPayloadRecord ) &&
2139
- asyncPayloadRecord . iterator ?. return
2140
- ) {
2141
- promises . push ( asyncPayloadRecord . iterator . return ( ) ) ;
2173
+ exeContext . streams . forEach ( ( stream ) => {
2174
+ if ( stream . iterator ?. return ) {
2175
+ promises . push ( stream . iterator . return ( ) ) ;
2142
2176
}
2143
2177
} ) ;
2144
2178
return Promise . all ( promises ) ;
@@ -2211,6 +2245,10 @@ class DeferredFragmentRecord {
2211
2245
this . _resolve ?.( data ) ;
2212
2246
}
2213
2247
}
2248
+ interface StreamContext {
2249
+ path : Array < string | number > ;
2250
+ iterator ?: AsyncIterator < unknown > | undefined ;
2251
+ }
2214
2252
2215
2253
class StreamRecord {
2216
2254
type : 'stream' ;
@@ -2220,15 +2258,13 @@ class StreamRecord {
2220
2258
items : Array < unknown > | null ;
2221
2259
promise : Promise < void > ;
2222
2260
parentContext : AsyncPayloadRecord | undefined ;
2223
- iterator : AsyncIterator < unknown > | undefined ;
2224
2261
isCompletedIterator ?: boolean ;
2225
2262
isCompleted : boolean ;
2226
2263
_exeContext : ExecutionContext ;
2227
2264
_resolve ?: ( arg : PromiseOrValue < Array < unknown > | null > ) => void ;
2228
2265
constructor ( opts : {
2229
2266
label : string | undefined ;
2230
2267
path : Path | undefined ;
2231
- iterator ?: AsyncIterator < unknown > ;
2232
2268
parentContext : AsyncPayloadRecord | undefined ;
2233
2269
exeContext : ExecutionContext ;
2234
2270
} ) {
@@ -2237,7 +2273,6 @@ class StreamRecord {
2237
2273
this . label = opts . label ;
2238
2274
this . path = pathToArray ( opts . path ) ;
2239
2275
this . parentContext = opts . parentContext ;
2240
- this . iterator = opts . iterator ;
2241
2276
this . errors = [ ] ;
2242
2277
this . _exeContext = opts . exeContext ;
2243
2278
this . _exeContext . subsequentPayloads . add ( this ) ;
0 commit comments