1
1
import { inspect } from '../jsutils/inspect' ;
2
2
import { isAsyncIterable } from '../jsutils/isAsyncIterable' ;
3
+ import { isPromise } from '../jsutils/isPromise' ;
3
4
import type { Maybe } from '../jsutils/Maybe' ;
5
+ import type { Path } from '../jsutils/Path' ;
4
6
import { addPath , pathToArray } from '../jsutils/Path' ;
7
+ import type { PromiseOrValue } from '../jsutils/PromiseOrValue' ;
5
8
6
9
import { GraphQLError } from '../error/GraphQLError' ;
7
10
import { locatedError } from '../error/locatedError' ;
8
11
9
- import type { DocumentNode } from '../language/ast' ;
12
+ import type { DocumentNode , FieldNode } from '../language/ast' ;
10
13
11
14
import type { GraphQLFieldResolver } from '../type/definition' ;
12
15
import type { GraphQLSchema } from '../type/schema' ;
@@ -47,9 +50,11 @@ import { getArgumentValues } from './values';
47
50
*
48
51
* Accepts either an object with named arguments, or individual arguments.
49
52
*/
50
- export async function subscribe (
53
+ export function subscribe (
51
54
args : ExecutionArgs ,
52
- ) : Promise < AsyncGenerator < ExecutionResult , void , void > | ExecutionResult > {
55
+ ) : PromiseOrValue <
56
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
57
+ > {
53
58
const {
54
59
schema,
55
60
document,
@@ -61,7 +66,7 @@ export async function subscribe(
61
66
subscribeFieldResolver,
62
67
} = args ;
63
68
64
- const resultOrStream = await createSourceEventStream (
69
+ const resultOrStream = createSourceEventStream (
65
70
schema ,
66
71
document ,
67
72
rootValue ,
@@ -71,6 +76,42 @@ export async function subscribe(
71
76
subscribeFieldResolver ,
72
77
) ;
73
78
79
+ if ( isPromise ( resultOrStream ) ) {
80
+ return resultOrStream . then ( ( resolvedResultOrStream ) =>
81
+ mapSourceToResponse (
82
+ schema ,
83
+ document ,
84
+ resolvedResultOrStream ,
85
+ contextValue ,
86
+ variableValues ,
87
+ operationName ,
88
+ fieldResolver ,
89
+ ) ,
90
+ ) ;
91
+ }
92
+
93
+ return mapSourceToResponse (
94
+ schema ,
95
+ document ,
96
+ resultOrStream ,
97
+ contextValue ,
98
+ variableValues ,
99
+ operationName ,
100
+ fieldResolver ,
101
+ ) ;
102
+ }
103
+
104
+ function mapSourceToResponse (
105
+ schema : GraphQLSchema ,
106
+ document : DocumentNode ,
107
+ resultOrStream : ExecutionResult | AsyncIterable < unknown > ,
108
+ contextValue ?: unknown ,
109
+ variableValues ?: Maybe < { readonly [ variable : string ] : unknown } > ,
110
+ operationName ?: Maybe < string > ,
111
+ fieldResolver ?: Maybe < GraphQLFieldResolver < any , any > > ,
112
+ ) : PromiseOrValue <
113
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
114
+ > {
74
115
if ( ! isAsyncIterable ( resultOrStream ) ) {
75
116
return resultOrStream ;
76
117
}
@@ -81,7 +122,7 @@ export async function subscribe(
81
122
// the GraphQL specification. The `execute` function provides the
82
123
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
83
124
// "ExecuteQuery" algorithm, for which `execute` is also used.
84
- const mapSourceToResponse = ( payload : unknown ) =>
125
+ return mapAsyncIterator ( resultOrStream , ( payload : unknown ) =>
85
126
execute ( {
86
127
schema,
87
128
document,
@@ -90,10 +131,8 @@ export async function subscribe(
90
131
variableValues,
91
132
operationName,
92
133
fieldResolver,
93
- } ) ;
94
-
95
- // Map every source value to a ExecutionResult value as described above.
96
- return mapAsyncIterator ( resultOrStream , mapSourceToResponse ) ;
134
+ } ) ,
135
+ ) ;
97
136
}
98
137
99
138
/**
@@ -124,15 +163,15 @@ export async function subscribe(
124
163
* or otherwise separating these two steps. For more on this, see the
125
164
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
126
165
*/
127
- export async function createSourceEventStream (
166
+ export function createSourceEventStream (
128
167
schema : GraphQLSchema ,
129
168
document : DocumentNode ,
130
169
rootValue ?: unknown ,
131
170
contextValue ?: unknown ,
132
171
variableValues ?: Maybe < { readonly [ variable : string ] : unknown } > ,
133
172
operationName ?: Maybe < string > ,
134
173
subscribeFieldResolver ?: Maybe < GraphQLFieldResolver < any , any > > ,
135
- ) : Promise < AsyncIterable < unknown > | ExecutionResult > {
174
+ ) : PromiseOrValue < AsyncIterable < unknown > | ExecutionResult > {
136
175
// If arguments are missing or incorrectly typed, this is an internal
137
176
// developer mistake which should throw an early error.
138
177
assertValidExecutionArguments ( schema , document , variableValues ) ;
@@ -155,17 +194,20 @@ export async function createSourceEventStream(
155
194
}
156
195
157
196
try {
158
- const eventStream = await executeSubscription ( exeContext ) ;
197
+ const eventStream = executeSubscription ( exeContext ) ;
198
+ if ( isPromise ( eventStream ) ) {
199
+ return eventStream . then ( undefined , ( error ) => ( { errors : [ error ] } ) ) ;
200
+ }
159
201
160
202
return eventStream ;
161
203
} catch ( error ) {
162
204
return { errors : [ error ] } ;
163
205
}
164
206
}
165
207
166
- async function executeSubscription (
208
+ function executeSubscription (
167
209
exeContext : ExecutionContext ,
168
- ) : Promise < AsyncIterable < unknown > > {
210
+ ) : PromiseOrValue < AsyncIterable < unknown > | ExecutionResult > {
169
211
const { schema, fragments, operation, variableValues, rootValue } =
170
212
exeContext ;
171
213
@@ -220,22 +262,44 @@ async function executeSubscription(
220
262
// Call the `subscribe()` resolver or the default resolver to produce an
221
263
// AsyncIterable yielding raw payloads.
222
264
const resolveFn = fieldDef . subscribe ?? exeContext . subscribeFieldResolver ;
223
- const eventStream = await resolveFn ( rootValue , args , contextValue , info ) ;
224
-
225
- if ( eventStream instanceof Error ) {
226
- throw eventStream ;
227
- }
265
+ const eventStream = resolveFn ( rootValue , args , contextValue , info ) ;
228
266
229
- // Assert field returned an event stream, otherwise yield an error.
230
- if ( ! isAsyncIterable ( eventStream ) ) {
231
- throw new GraphQLError (
232
- 'Subscription field must return Async Iterable. ' +
233
- `Received: ${ inspect ( eventStream ) } .` ,
267
+ if ( isPromise ( eventStream ) ) {
268
+ return eventStream . then (
269
+ ( resolvedEventStream ) =>
270
+ ensureAsyncIterable ( resolvedEventStream , fieldNodes , path ) ,
271
+ ( error ) => {
272
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
273
+ } ,
234
274
) ;
235
275
}
236
276
237
- return eventStream ;
277
+ return ensureAsyncIterable ( eventStream , fieldNodes , path ) ;
238
278
} catch ( error ) {
239
279
throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
240
280
}
241
281
}
282
+
283
+ function ensureAsyncIterable (
284
+ eventStream : unknown ,
285
+ fieldNodes : ReadonlyArray < FieldNode > ,
286
+ path : Path ,
287
+ ) : AsyncIterable < unknown > {
288
+ if ( eventStream instanceof Error ) {
289
+ throw locatedError ( eventStream , fieldNodes , pathToArray ( path ) ) ;
290
+ }
291
+
292
+ // Assert field returned an event stream, otherwise yield an error.
293
+ if ( ! isAsyncIterable ( eventStream ) ) {
294
+ throw locatedError (
295
+ new GraphQLError (
296
+ 'Subscription field must return Async Iterable. ' +
297
+ `Received: ${ inspect ( eventStream ) } .` ,
298
+ ) ,
299
+ fieldNodes ,
300
+ pathToArray ( path ) ,
301
+ ) ;
302
+ }
303
+
304
+ return eventStream ;
305
+ }
0 commit comments