Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.

Add support for defer and stream directives (feedback is welcome) #726

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrationTests/ts/package.json
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
"dependencies": {
"@types/node": "14.0.13",
"express-graphql": "file:../express-graphql.tgz",
"graphql": "14.7.0",
"graphql": "15.4.0-experimental-stream-defer.1",
"typescript-3.4": "npm:[email protected]",
"typescript-3.5": "npm:[email protected]",
"typescript-3.6": "npm:[email protected]",
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "express-graphql",
"version": "0.12.0",
"version": "0.12.0-experimental-stream-defer.1",
"description": "Production ready GraphQL HTTP middleware.",
"license": "MIT",
"private": true,
@@ -85,7 +85,7 @@
"express": "4.17.1",
"graphiql": "1.0.6",
"graphiql-subscriptions-fetcher": "0.0.2",
"graphql": "15.4.0",
"graphql": "15.4.0-experimental-stream-defer.1",
"mocha": "8.2.1",
"multer": "1.4.2",
"nyc": "15.1.0",
@@ -102,6 +102,6 @@
"unfetch": "4.2.0"
},
"peerDependencies": {
"graphql": "^14.7.0 || ^15.3.0"
"graphql": "^14.7.0 || ^15.3.0 || 15.4.0-experimental-stream-defer.1"
}
}
533 changes: 528 additions & 5 deletions src/__tests__/http-test.ts

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions src/__tests__/isAsyncIterable-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import { isAsyncIterable } from '../isAsyncIterable';

describe('isAsyncIterable', () => {
it('returns false for null', () => {
expect(isAsyncIterable(null)).to.equal(false);
});
it('returns false for non-object', () => {
expect(isAsyncIterable(1)).to.equal(false);
});
it('returns true for async generator function', () => {
// istanbul ignore next: test function
// eslint-disable-next-line @typescript-eslint/no-empty-function
const myGen = async function* () {};
const result = myGen();
expect(isAsyncIterable(result)).to.equal(true);
});
});
83 changes: 83 additions & 0 deletions src/__tests__/simplePubSub-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import SimplePubSub from './simplePubSub';

describe('SimplePubSub', () => {
it('subscribe async-iterator mock', async () => {
const pubsub = new SimplePubSub();
const iterator = pubsub.getSubscriber();

// Queue up publishes
expect(pubsub.emit('Apple')).to.equal(true);
expect(pubsub.emit('Banana')).to.equal(true);

// Read payloads
expect(await iterator.next()).to.deep.equal({
done: false,
value: 'Apple',
});
expect(await iterator.next()).to.deep.equal({
done: false,
value: 'Banana',
});

// Read ahead
const i3 = iterator.next().then((x) => x);
const i4 = iterator.next().then((x) => x);

// Publish
expect(pubsub.emit('Coconut')).to.equal(true);
expect(pubsub.emit('Durian')).to.equal(true);

// Await out of order to get correct results
expect(await i4).to.deep.equal({ done: false, value: 'Durian' });
expect(await i3).to.deep.equal({ done: false, value: 'Coconut' });

// Read ahead
const i5 = iterator.next().then((x) => x);

// Terminate queue
await iterator.return();

// Publish is not caught after terminate
expect(pubsub.emit('Fig')).to.equal(false);

// Find that cancelled read-ahead got a "done" result
expect(await i5).to.deep.equal({ done: true, value: undefined });

// And next returns empty completion value
expect(await iterator.next()).to.deep.equal({
done: true,
value: undefined,
});
});
it('empties queue when thrown', async () => {
const pubsub = new SimplePubSub();
const iterator = pubsub.getSubscriber();

expect(pubsub.emit('Apple')).to.equal(true);

// Read payloads
expect(await iterator.next()).to.deep.equal({
done: false,
value: 'Apple',
});

// Terminate queue
try {
await iterator.throw(new Error('Thrown!'));
} catch (e: unknown) {
// ignore thrown error
}

// Publish is not caught after terminate
expect(pubsub.emit('Fig')).to.equal(false);

// And next returns empty completion value
expect(await iterator.next()).to.deep.equal({
done: true,
value: undefined,
});
});
});
75 changes: 75 additions & 0 deletions src/__tests__/simplePubSub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
* PubSub system for tests.
*/
export default class SimplePubSub<T> {
subscribers: Set<(arg0: T) => void>;

constructor() {
this.subscribers = new Set();
}

emit(event: T): boolean {
for (const subscriber of this.subscribers) {
subscriber(event);
}
return this.subscribers.size > 0;
}

// Use custom return type to avoid checking for optional `return` method
getSubscriber(): AsyncGenerator<T, void, void> {
type EventResolve = (arg0: IteratorResult<T>) => void;

const pullQueue: Array<EventResolve> = [];
const pushQueue: Array<T> = [];
let listening = true;
this.subscribers.add(pushValue);

const emptyQueue = () => {
listening = false;
this.subscribers.delete(pushValue);
for (const resolve of pullQueue) {
resolve({ value: undefined, done: true });
}
pullQueue.length = 0;
pushQueue.length = 0;
};

return {
next(): Promise<IteratorResult<T>> {
if (!listening) {
return Promise.resolve({ value: undefined, done: true });
}

if (pushQueue.length > 0) {
return Promise.resolve({
value: pushQueue.shift() as T,
done: false,
});
}
return new Promise((resolve: EventResolve) => {
pullQueue.push(resolve);
});
},
return() {
emptyQueue();
return Promise.resolve({ value: undefined, done: true });
},
throw(error: unknown) {
emptyQueue();
return Promise.reject(error);
},
[Symbol.asyncIterator]() {
return this;
},
};

function pushValue(value: T): void {
if (pullQueue.length > 0) {
(pullQueue.shift() as EventResolve)({ value, done: false });
} else {
pushQueue.push(value);
}
}
}
}
172 changes: 154 additions & 18 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { IncomingMessage, ServerResponse } from 'http';
import type { IncomingMessage } from 'http';
import { ServerResponse } from 'http';

import type {
ASTVisitor,
@@ -8,6 +9,9 @@ import type {
ExecutionArgs,
ExecutionResult,
FormattedExecutionResult,
ExecutionPatchResult,
FormattedExecutionPatchResult,
AsyncExecutionResult,
GraphQLSchema,
GraphQLFieldResolver,
GraphQLTypeResolver,
@@ -30,12 +34,16 @@ import {

import type { GraphiQLOptions, GraphiQLData } from './renderGraphiQL';
import { parseBody } from './parseBody';
import { isAsyncIterable } from './isAsyncIterable';
import { renderGraphiQL } from './renderGraphiQL';

// `url` is always defined for IncomingMessage coming from http.Server
type Request = IncomingMessage & { url: string };

type Response = ServerResponse & { json?: (data: unknown) => void };
type Response = ServerResponse & {
json?: (data: unknown) => void;
flush?: () => void;
};
type MaybePromise<T> = Promise<T> | T;

/**
@@ -94,7 +102,9 @@ export interface OptionsData {
* An optional function which will be used to execute instead of default `execute`
* from `graphql-js`.
*/
customExecuteFn?: (args: ExecutionArgs) => MaybePromise<ExecutionResult>;
customExecuteFn?: (
args: ExecutionArgs,
) => MaybePromise<ExecutionResult | AsyncIterable<AsyncExecutionResult>>;

/**
* An optional function which will be used to format any errors produced by
@@ -172,7 +182,7 @@ export interface RequestInfo {
/**
* The result of executing the operation.
*/
result: FormattedExecutionResult;
result: AsyncExecutionResult;

/**
* A value to pass as the context to the graphql() function.
@@ -198,8 +208,12 @@ export function graphqlHTTP(options: Options): Middleware {
let showGraphiQL = false;
let graphiqlOptions;
let formatErrorFn = formatError;
let extensionsFn;
let pretty = false;
let documentAST: DocumentNode;
let executeResult;
let result: ExecutionResult;
let finishedIterable = false;

try {
// Parse the Request to get GraphQL request parameters.
@@ -227,7 +241,6 @@ export function graphqlHTTP(options: Options): Middleware {
const fieldResolver = optionsData.fieldResolver;
const typeResolver = optionsData.typeResolver;
const graphiql = optionsData.graphiql ?? false;
const extensionsFn = optionsData.extensions;
const context = optionsData.context ?? request;
const parseFn = optionsData.customParseFn ?? parse;
const executeFn = optionsData.customExecuteFn ?? execute;
@@ -258,6 +271,25 @@ export function graphqlHTTP(options: Options): Middleware {
graphiqlOptions = graphiql;
}

// Collect and apply any metadata extensions if a function was provided.
// https://graphql.github.io/graphql-spec/#sec-Response-Format
if (optionsData.extensions) {
extensionsFn = (payload: AsyncExecutionResult) => {
/* istanbul ignore else: condition not reachable, required for typescript */
if (optionsData.extensions) {
return optionsData.extensions({
document: documentAST,
variables,
operationName,
result: payload,
context,
});
}
/* istanbul ignore next: condition not reachable, required for typescript */
return undefined;
};
}

// If there is no query, but GraphiQL will be displayed, do not produce
// a result, otherwise return a 400: Bad Request.
if (query == null) {
@@ -277,7 +309,6 @@ export function graphqlHTTP(options: Options): Middleware {
}

// Parse source to AST, reporting any syntax error.
let documentAST;
try {
documentAST = parseFn(new Source(query, 'GraphQL request'));
} catch (syntaxError: unknown) {
@@ -323,7 +354,7 @@ export function graphqlHTTP(options: Options): Middleware {

// Perform the execution, reporting any errors creating the context.
try {
result = await executeFn({
executeResult = await executeFn({
schema,
document: documentAST,
rootValue,
@@ -333,23 +364,39 @@ export function graphqlHTTP(options: Options): Middleware {
fieldResolver,
typeResolver,
});

if (isAsyncIterable(executeResult)) {
// Get first payload from AsyncIterator. http status will reflect status
// of this payload.
const asyncIterator = getAsyncIterator<ExecutionResult>(
executeResult,
);

response.on('close', () => {
if (
!finishedIterable &&
typeof asyncIterator.return === 'function'
) {
asyncIterator.return().catch(() => {
// can't do anything with the error, connection is already closed
});
}
});

const { value } = await asyncIterator.next();
result = value;
} else {
result = executeResult;
}
} catch (contextError: unknown) {
// Return 400: Bad Request if any execution context errors exist.
throw httpError(400, 'GraphQL execution context error.', {
graphqlErrors: [contextError],
});
}

// Collect and apply any metadata extensions if a function was provided.
// https://graphql.github.io/graphql-spec/#sec-Response-Format
if (extensionsFn) {
const extensions = await extensionsFn({
document: documentAST,
variables,
operationName,
result,
context,
});
const extensions = await extensionsFn(result);

if (extensions != null) {
result = { ...result, extensions };
@@ -363,6 +410,7 @@ export function graphqlHTTP(options: Options): Middleware {
rawError instanceof Error ? rawError : String(rawError),
);

// eslint-disable-next-line require-atomic-updates
response.statusCode = error.status;

const { headers } = error;
@@ -381,9 +429,12 @@ export function graphqlHTTP(options: Options): Middleware {
undefined,
error,
);
result = { data: undefined, errors: [graphqlError] };
executeResult = result = { data: undefined, errors: [graphqlError] };
} else {
result = { data: undefined, errors: error.graphqlErrors };
executeResult = result = {
data: undefined,
errors: error.graphqlErrors,
};
}
}

@@ -393,6 +444,7 @@ export function graphqlHTTP(options: Options): Middleware {
// the resulting JSON payload.
// https://graphql.github.io/graphql-spec/#sec-Data
if (response.statusCode === 200 && result.data == null) {
// eslint-disable-next-line require-atomic-updates
response.statusCode = 500;
}

@@ -402,6 +454,41 @@ export function graphqlHTTP(options: Options): Middleware {
errors: result.errors?.map(formatErrorFn),
};

if (isAsyncIterable(executeResult)) {
response.setHeader('Content-Type', 'multipart/mixed; boundary="-"');
response.write('\r\n---\r\n');
sendPartialResponse(pretty, response, formattedResult);
try {
for await (let payload of executeResult) {
// Collect and apply any metadata extensions if a function was provided.
// https://graphql.github.io/graphql-spec/#sec-Response-Format
if (extensionsFn) {
const extensions = await extensionsFn(payload);

if (extensions != null) {
payload = { ...payload, extensions };
}
}
const formattedPayload: FormattedExecutionPatchResult = {
// first payload is already consumed, all subsequent payloads typed as ExecutionPatchResult
...(payload as ExecutionPatchResult),
errors: payload.errors?.map(formatErrorFn),
};
sendPartialResponse(pretty, response, formattedPayload);
}
} catch (rawError: unknown) {
const graphqlError = getGraphQlError(rawError);
sendPartialResponse(pretty, response, {
data: undefined,
errors: [formatErrorFn(graphqlError)],
hasNext: false,
});
}
response.end();
finishedIterable = true;
return;
}

// If allowed to show GraphiQL, present it instead of JSON.
if (showGraphiQL) {
return respondWithGraphiQL(
@@ -522,6 +609,34 @@ function canDisplayGraphiQL(request: Request, params: GraphQLParams): boolean {
return !params.raw && accepts(request).types(['json', 'html']) === 'html';
}

/**
* Helper function for sending part of a multi-part response using only the core Node server APIs.
*/
function sendPartialResponse(
pretty: boolean,
response: Response,
result: FormattedExecutionResult | FormattedExecutionPatchResult,
): void {
const json = JSON.stringify(result, null, pretty ? 2 : 0);
const chunk = Buffer.from(json, 'utf8');
const data = ['Content-Type: application/json; charset=utf-8', '', chunk];
// @ts-expect-error
if (result.hasNext === true) {
data.push('---\r\n');
} else {
data.push('-----\r\n');
}
response.write(data.join('\r\n'));
// flush response if compression middleware is used
if (
typeof response.flush === 'function' &&
// @ts-expect-error deprecated flush method is implemented on ServerResponse but not typed
response.flush !== ServerResponse.prototype.flush
) {
response.flush();
}
}

/**
* Helper function for sending a response using only the core Node server APIs.
*/
@@ -546,3 +661,24 @@ function devAssert(condition: unknown, message: string): void {
throw new TypeError(message);
}
}

function getAsyncIterator<T>(
asyncIterable: AsyncIterable<T>,
): AsyncIterator<T> {
const method = asyncIterable[Symbol.asyncIterator];
return method.call(asyncIterable);
}

function getGraphQlError(rawError: unknown) {
/* istanbul ignore next: Thrown by underlying library. */
const error =
rawError instanceof Error ? rawError : new Error(String(rawError));
return new GraphQLError(
error.message,
undefined,
undefined,
undefined,
undefined,
error,
);
}
8 changes: 8 additions & 0 deletions src/isAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export function isAsyncIterable<T>(
maybeAsyncIterable: any,
): maybeAsyncIterable is AsyncIterable<T> {
if (maybeAsyncIterable == null || typeof maybeAsyncIterable !== 'object') {
return false;
}
return typeof maybeAsyncIterable[Symbol.asyncIterator] === 'function';
}