Skip to content

fix(node-http-handler): http2 lets node exit #3541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 102 additions & 14 deletions packages/node-http-handler/src/node-http2-handler.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbortController } from "@aws-sdk/abort-controller";
import { HttpRequest } from "@aws-sdk/protocol-http";
import { rejects } from "assert";
import http2, { constants, Http2Stream } from "http2";
import http2, { ClientHttp2Session, ClientHttp2Stream, constants, Http2Stream } from "http2";
import { Duplex } from "stream";
import { promisify } from "util";

Expand Down Expand Up @@ -44,7 +44,20 @@ describe(NodeHttp2Handler.name, () => {
});

describe("without options", () => {
let createdSessions!: ClientHttp2Session[];
const connectReal = http2.connect;
let connectSpy!: jest.SpiedFunction<typeof http2.connect>;

beforeEach(() => {
createdSessions = [];
connectSpy = jest.spyOn(http2, "connect").mockImplementation((...args) => {
const session = connectReal(...args);
jest.spyOn(session, "ref");
jest.spyOn(session, "unref");
createdSessions.push(session);
return session;
});

nodeH2Handler = new NodeHttp2Handler();
});

Expand All @@ -58,51 +71,99 @@ describe(NodeHttp2Handler.name, () => {

describe("number calls to http2.connect", () => {
it("is zero on initialization", () => {
const connectSpy = jest.spyOn(http2, "connect");
expect(connectSpy).not.toHaveBeenCalled();
});

it("is one when request is made", async () => {
const connectSpy = jest.spyOn(http2, "connect");

// Make single request.
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const responseBody = response.response.body as ClientHttp2Stream;

const authority = `${protocol}//${hostname}:${port}`;
expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(authority);

// Keeping node alive while request is open.
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);

const closed = new Promise<void>((resolve) => responseBody.once("close", resolve));
(response.response.body as ClientHttp2Stream).destroy();
await closed;

// No longer keeping node alive
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
});

it("is one if multiple requests are made on same URL", async () => {
const connectSpy = jest.spyOn(http2, "connect");

// Make two requests.
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1Body = response1.response.body as ClientHttp2Stream;
const response2 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response2Body = response2.response.body as ClientHttp2Stream;

const authority = `${protocol}//${hostname}:${port}`;
expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(authority);

// Keeping node alive while requests are open.
expect(createdSessions[0].ref).toHaveBeenCalledTimes(2);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);

const closed1 = new Promise<void>((resolve) => response1Body.once("close", resolve));
response1Body.destroy();
await closed1;
const closed2 = new Promise<void>((resolve) => response2Body.once("close", resolve));
response2Body.destroy();
await closed2;

// No longer keeping node alive
expect(createdSessions[0].ref).toHaveBeenCalledTimes(2);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(3);
});

it("is many if requests are made on different URLs", async () => {
const connectSpy = jest.spyOn(http2, "connect");

// Make first request on default URL.
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1Body = response1.response.body as ClientHttp2Stream;

const port2 = port + 1;
const mockH2Server2 = createMockHttp2Server().listen(port2);
mockH2Server2.on("request", createResponseFunction(mockResponse));

// Make second request on URL with port2.
await nodeH2Handler.handle(new HttpRequest({ ...getMockReqOptions(), port: port2 }), {});
const response2 = await nodeH2Handler.handle(new HttpRequest({ ...getMockReqOptions(), port: port2 }), {});
const response2Body = response2.response.body as ClientHttp2Stream;

const authorityPrefix = `${protocol}//${hostname}`;
expect(connectSpy).toHaveBeenCalledTimes(2);
expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}`);
expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}`);
mockH2Server2.close();

// Keeping node alive while requests are open.
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(1);

const closed1 = new Promise<void>((resolve) => response1Body.once("close", resolve));
response1Body.destroy();
await closed1;
const closed2 = new Promise<void>((resolve) => response2Body.once("close", resolve));
response2Body.destroy();
await closed2;

// No longer keeping node alive
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
});
});

Expand Down Expand Up @@ -151,26 +212,44 @@ describe(NodeHttp2Handler.name, () => {
expect(establishedConnections).toBe(3);
expect(numRequests).toBe(3);

// Not keeping node alive
expect(createdSessions).toHaveLength(3);
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[2].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[2].unref).toHaveBeenCalledTimes(2);

// should be able to recover from goaway after reconnecting to a server
// that doesn't send goaway, and reuse the TCP connection (Http2Session)
shouldSendGoAway = false;
mockH2Server3.on("request", createResponseFunction(mockResponse));
await nodeH2Handler.handle(req, {});
const result = await nodeH2Handler.handle(req, {});
const resultReader = result.response.body;

// Keeping node alive
expect(createdSessions).toHaveLength(4);
expect(createdSessions[3].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[3].unref).toHaveBeenCalledTimes(1);

// ...and validate that the mocked response is received
const responseBody = await new Promise((resolve) => {
const buffers = [];
resultReader.on("data", (chunk) => buffers.push(chunk));
resultReader.on("end", () => {
resultReader.on("close", () => {
resolve(Buffer.concat(buffers).toString("utf8"));
});
});
expect(responseBody).toBe("test");
expect(establishedConnections).toBe(4);
expect(numRequests).toBe(5);
expect(numRequests).toBe(4);
mockH2Server3.close();

// Not keeping node alive
expect(createdSessions).toHaveLength(4);
expect(createdSessions[3].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[3].unref).toHaveBeenCalledTimes(2);
});

it("handles connections destroyed by servers", async () => {
Expand Down Expand Up @@ -212,6 +291,15 @@ describe(NodeHttp2Handler.name, () => {
expect(establishedConnections).toBe(3);
expect(numRequests).toBe(3);
mockH2Server3.close();

// Not keeping node alive
expect(createdSessions).toHaveLength(3);
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[2].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[2].unref).toHaveBeenCalledTimes(2);
});
});

Expand Down Expand Up @@ -380,7 +468,7 @@ describe(NodeHttp2Handler.name, () => {
const authority = `${protocol}//${hostname}:${port}`;
// @ts-ignore: access private property
const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0];
const fakeStream = new Duplex();
const fakeStream = new Duplex() as ClientHttp2Stream;
const fakeRstCode = 1;
// @ts-ignore: fake result code
fakeStream.rstCode = fakeRstCode;
Expand All @@ -405,7 +493,7 @@ describe(NodeHttp2Handler.name, () => {
const authority = `${protocol}//${hostname}:${port}`;
// @ts-ignore: access private property
const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0];
const fakeStream = new Duplex();
const fakeStream = new Duplex() as ClientHttp2Stream;
jest.spyOn(session, "request").mockImplementation(() => fakeStream);
// @ts-ignore: access private property
nodeH2Handler.sessionCache.set(`${protocol}//${hostname}:${port}`, [session]);
Expand Down
7 changes: 7 additions & 0 deletions packages/node-http-handler/src/node-http2-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ export class NodeHttp2Handler implements HttpHandler {
[constants.HTTP2_HEADER_METHOD]: method,
});

// Keep node alive while request is in progress. Matched with unref() in close event.
session.ref();

req.on("response", (headers) => {
const httpResponse = new HttpResponse({
statusCode: headers[":status"] || -1,
Expand Down Expand Up @@ -137,6 +140,7 @@ export class NodeHttp2Handler implements HttpHandler {
// http2stream.rstCode property. If the code is any value other than NGHTTP2_NO_ERROR (0),
// an 'error' event will have also been emitted.
req.on("close", () => {
session.unref();
if (this.disableConcurrentStreams) {
session.destroy();
}
Expand Down Expand Up @@ -164,6 +168,9 @@ export class NodeHttp2Handler implements HttpHandler {
if (existingSessions.length > 0 && !disableConcurrentStreams) return existingSessions[0];

const newSession = connect(authority);
// AWS SDK does not expect server push streams, don't keep node alive without a request.
newSession.unref();

const destroySessionCb = () => {
this.destroySession(newSession);
this.deleteSessionFromCache(authority, newSession);
Expand Down