From d7d6d394aacf46a944ea54ec6ce7fc991d25e959 Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Wed, 30 Sep 2020 10:43:56 -0400 Subject: [PATCH 1/7] Also run tests in Node 12 and 14 Node 14 will become an LTS release in October. --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index c5670b1c..54ebff79 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,8 @@ language: node_js node_js: - 8 - 10 +- 12 +- 14 after_success: - npm install -g istanbul - npm install -D coveralls From f070be3a5cec474c7d79dd247084691abf7bfa77 Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Thu, 1 Oct 2020 13:11:15 -0400 Subject: [PATCH 2/7] Enforce timeouts and add simple http2 client test cases Enforce a 10 second timeout - if any request takes longer than that, then assume that APNs is having networking or server issues so that the node-apn client can disconnect and reconnect in the background. Add tests of goaway, timeouts, protocol errors, connection termination by the server, and a few http response codes. Make eslintConfig work with modern eslint versions Arrow functions are newer than es6 --- doc/provider.markdown | 2 + lib/client.js | 250 +++++++++------- lib/config.js | 2 + package.json | 3 +- test/client.js | 648 ++++++++++++++++++++++++++++++++++-------- test/config.js | 1 + 6 files changed, 682 insertions(+), 224 deletions(-) diff --git a/doc/provider.markdown b/doc/provider.markdown index aecc1a4e..08706df9 100644 --- a/doc/provider.markdown +++ b/doc/provider.markdown @@ -25,6 +25,8 @@ Options: - `connectionRetryLimit` {Number} The maximum number of connection failures that will be tolerated before `apn.Provider` will "give up". [See below.](#connection-retry-limit) (Defaults to: 3) + - `timeout` {Number} The timeout in milliseconds for a given HTTP/2 request before the `apn.Client` will assume the server is dead and forcefully reconnect. (Defaults to: 10000) + #### Provider Certificates vs. Authentication Tokens Apple have introduced a new means of authentication with the APNs - [Provider Authentication Tokens][provider-auth-tokens]. These replace the old-style Certificate/Key pairs with tokens based on the [JWT][jwt] standard. The new system is superior in a number of ways: diff --git a/lib/client.js b/lib/client.js index f45b4261..07b09889 100644 --- a/lib/client.js +++ b/lib/client.js @@ -17,6 +17,30 @@ module.exports = function (dependencies) { HTTP2_METHOD_POST } = http2.constants; + const safeCloseSession = (session, callback) => { + if (session && !session.destroyed && !session._nodeApnIsDestroying) { + session._nodeApnIsDestroying = true; + const startDestroying = () => { + if (!session.destroyed) { + session._nodeApnIsDestroying = true; + session.destroy(); + } + if (callback) { + callback(); + } + }; + if (session.closed) { + startDestroying(); + } else { + session.close(startDestroying); + } + } else { + if (callback) { + callback(); + } + } + }; + function Client (options) { this.config = config(options); this.healthCheckInterval = setInterval(() => { @@ -31,115 +55,146 @@ module.exports = function (dependencies) { }, this.config.heartBeat).unref(); } - Client.prototype.write = function write (notification, device, count) { - // Connect session - if (!this.session || this.session.destroyed) { - this.session = http2.connect(`https://${this.config.address}`, this.config); + Client.prototype._createSession = function() { + const url = this._mockOverrideUrl || `https://${this.config.address}`; + // Get the reference to the current session so that + // we don't unintentionally destroy a different session on an async callback + const session = http2.connect(url, this.config); - this.session.on("socketError", (error) => { - if (logger.enabled) { - logger(`Socket error: ${error}`); - } - if (this.session && !this.session.destroyed) { - this.session.destroy(); - } + session.on("socketError", (error) => { + if (logger.enabled) { + logger(`Socket error: ${error}`); + } + safeCloseSession(session); + }); + session.on("error", (error) => { + if (logger.enabled) { + logger(`Session error: ${error}`); + } + safeCloseSession(session); + }); + + session.on("goaway", (errorCode, lastStreamId, opaqueData) => { + logger(`GOAWAY received: (errorCode ${errorCode}, lastStreamId: ${lastStreamId}, opaqueData: ${opaqueData})`); + // gracefully stop accepting new streams + // This may be redundant, since nodejs http2 client is supposed to shut down automatically on receiving a goaway frame + safeCloseSession(session); + }); + + if (logger.enabled) { + session.on("connect", () => { + logger("Session connected"); }); - this.session.on("error", (error) => { - if (logger.enabled) { - logger(`Session error: ${error}`); - } - if (this.session && !this.session.destroyed) { - this.session.destroy(); - } + session.on("close", () => { + logger("Session closed"); }); + session.on("frameError", (frameType, errorCode, streamId) => { + logger(`Frame error: (frameType: ${frameType}, errorCode ${errorCode}, streamId: ${streamId})`); + }); + } + return session; + }; - this.session.on("goaway", (errorCode, lastStreamId, opaqueData) => { - logger(`GOAWAY received: (errorCode ${errorCode}, lastStreamId: ${lastStreamId}, opaqueData: ${opaqueData})`); - // gracefully stop accepting new streams - const session = this.session; - this.session = undefined; - if (session && !session.destroyed) { - session.close(() => { - session.destroy(); - }); - } - }); - - if (logger.enabled) { - this.session.on("connect", () => { - logger("Session connected"); - }); - this.session.on("close", () => { - logger("Session closed"); - }); - this.session.on("frameError", (frameType, errorCode, streamId) => { - logger(`Frame error: (frameType: ${frameType}, errorCode ${errorCode}, streamId: ${streamId})`); - }); + /** + * @param {Notification} notification the notification data to send through APNs + * @param {string} device the device token + * @param {number} [count] the number of retries that have occurred so far + * @returns {Promise<{device:string, error?: VError}>} object with device, optional error. + */ + Client.prototype.write = function write (notification, device, count = 0) { + return new Promise((resolve) => { + // Connect session + if (!this.session || this.session.destroyed || this.session._nodeApnIsDestroying) { + logger('creating a new APNs session'); + this.session = this._createSession(); } - } - let tokenGeneration = null; - let status = null; - let responseData = ""; - let retryCount = count || 0; - - const headers = extend({ - [HTTP2_HEADER_SCHEME]: "https", - [HTTP2_HEADER_METHOD]: HTTP2_METHOD_POST, - [HTTP2_HEADER_AUTHORITY]: this.config.address, - [HTTP2_HEADER_PATH]: `/3/device/${device}`, - }, notification.headers); - - if (this.config.token) { - if (this.config.token.isExpired(3300)) { - this.config.token.regenerate(this.config.token.generation); + let tokenGeneration = null; + let status = null; + let responseData = ""; + let retryCount = count || 0; + + const headers = extend({ + [HTTP2_HEADER_SCHEME]: "https", + [HTTP2_HEADER_METHOD]: HTTP2_METHOD_POST, + [HTTP2_HEADER_AUTHORITY]: this.config.address, + [HTTP2_HEADER_PATH]: `/3/device/${device}`, + }, notification.headers); + + if (this.config.token) { + if (this.config.token.isExpired(3300)) { + this.config.token.regenerate(this.config.token.generation); + } + headers.authorization = `bearer ${this.config.token.current}`; + tokenGeneration = this.config.token.generation; } - headers.authorization = `bearer ${this.config.token.current}`; - tokenGeneration = this.config.token.generation; - } + const currentSession = this.session; - const request = this.session.request(headers) + const request = currentSession.request(headers) + const timeout = this.config.timeout || 10000; - request.setEncoding("utf8"); + request.setTimeout(timeout, () => { + const errorMessage = `Forcibly closing connection to APNs after reaching the request timeout of ${timeout} milliseconds`; + // The first call to resolve will be what the promise resolves to. + resolve({device, error: new VError(errorMessage)}); + if (currentSession !== this.session) { + return; + } + if (currentSession.destroyed) { + return; + } + logger(errorMessage); + safeCloseSession(currentSession); + this.session = null; + }); - request.on("response", (headers) => { - status = headers[HTTP2_HEADER_STATUS]; - }); + request.setEncoding("utf8"); - request.on("data", (data) => { - responseData += data; - }); + request.on("response", (headers) => { + status = headers[HTTP2_HEADER_STATUS]; + }); - request.write(notification.body); + request.on("data", (data) => { + responseData += data; + }); + + request.write(notification.body); - return new Promise ( resolve => { request.on("end", () => { - if (logger.enabled) { - logger(`Request ended with status ${status} and responseData: ${responseData}`); - } + try { + if (logger.enabled) { + logger(`Request ended with status ${status} and responseData: ${responseData}`); + } - if (status === 200) { - resolve({ device }); - } else if (responseData !== "") { - const response = JSON.parse(responseData); - - if (status === 403 && response.reason === "ExpiredProviderToken" && retryCount < 2) { - this.config.token.regenerate(tokenGeneration); - resolve(this.write(notification, device, retryCount + 1)); - return; - } else if (status === 500 && response.reason === "InternalServerError") { - this.session.destroy(); - let error = new VError("Error 500, stream ended unexpectedly"); + if (status === 200) { + resolve({ device }); + } else if (responseData !== "") { + const response = JSON.parse(responseData); + + if (status === 403 && response.reason === "ExpiredProviderToken" && retryCount < 2) { + this.config.token.regenerate(tokenGeneration); + resolve(this.write(notification, device, retryCount + 1)); + return; + } else if (status === 500 && response.reason === "InternalServerError") { + let error = new VError("Error 500, stream ended unexpectedly"); + resolve({ device, error }); + + safeCloseSession(currentSession); + this.session = null; + return; + } + + resolve({ device, status, response }); + } else { + let error = new VError("stream ended unexpectedly"); resolve({ device, error }); - return; } - - resolve({ device, status, response }); - } else { - let error = new VError("stream ended unexpectedly"); + } catch (e) { + const error = new VError(e, 'Unexpected error processing APNs response'); + logger(`Unexpected error processing APNs response: ${e.message}`); resolve({ device, error }); } - }) + }); request.on("error", (error) => { if (logger.enabled) { @@ -159,16 +214,17 @@ module.exports = function (dependencies) { }; Client.prototype.shutdown = function shutdown(callback) { + logger('Called client.shutdown()'); if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); } - if (this.session && !this.session.destroyed) { - this.session.close(() => { - this.session.destroy(); - if (callback) { - callback(); - } - }); + if (this.session) { + safeCloseSession(this.session, callback); + this.session = null; + } else { + if (callback) { + callback(); + } } }; diff --git a/lib/config.js b/lib/config.js index 4451cb5c..3ce0e517 100644 --- a/lib/config.js +++ b/lib/config.js @@ -28,6 +28,8 @@ module.exports = function(dependencies) { rejectUnauthorized: true, connectionRetryLimit: 10, heartBeat: 60000, + // The timeout for any request to APNs. + timeout: 10000, }; validateOptions(options); diff --git a/package.json b/package.json index 6a4f89e1..13cd52ba 100644 --- a/package.json +++ b/package.json @@ -40,9 +40,8 @@ "node": true }, "eslintConfig": { - "ecmaVersion": 6, "env": { - "es6": true, + "es2017": true, "node": true } }, diff --git a/test/client.js b/test/client.js index 1b00ebeb..0308c8c9 100644 --- a/test/client.js +++ b/test/client.js @@ -1,39 +1,437 @@ "use strict"; -const sinon = require("sinon"); -const stream = require("stream"); -const EventEmitter = require("events"); - -function builtNotification() { - return { - headers: {}, - body: JSON.stringify({ aps: { badge: 1 } }), +const VError = require("verror"); +const http2 = require("http2"); +const util = require("util"); + +const debug = require("debug")("apn"); +const credentials = require("../lib/credentials")({ + logger: debug +}); + +const TEST_PORT = 30939; + + +const config = require("../lib/config")({ + logger: debug, + prepareCertificate: () => ({}), // credentials.certificate, + prepareToken: credentials.token, + prepareCA: credentials.ca, +}); +const Client = require("../lib/client")({ + logger: debug, + config, + http2, +}); +debug.log = console.log.bind(console); + +// function builtNotification() { +// return { +// headers: {}, +// body: JSON.stringify({ aps: { badge: 1 } }), +// }; +// } + +// function FakeStream(deviceId, statusCode, response) { +// const fakeStream = new stream.Transform({ +// transform: sinon.spy(function(chunk, encoding, callback) { +// expect(this.headers).to.be.calledOnce; +// +// const headers = this.headers.firstCall.args[0]; +// expect(headers[":path"].substring(10)).to.equal(deviceId); +// +// this.emit("headers", { +// ":status": statusCode +// }); +// callback(null, Buffer.from(JSON.stringify(response) || "")); +// }) +// }); +// fakeStream.headers = sinon.stub(); +// +// return fakeStream; +// } + +// XXX these may be flaky in CI due to being sensitive to timing, +// and if a test case crashes, then others may get stuck. +// +// Try to fix this if any issues come up. +describe("Client", () => { + let server; + let client; + const MOCK_BODY = '{"mock-key":"mock-value"}'; + const MOCK_DEVICE_TOKEN = 'abcf0123abcf0123abcf0123abcf0123abcf0123abcf0123abcf0123abcf0123'; + + // Create an insecure http2 client for unit testing. + // (APNS would use https://, not http://) + // (It's probably possible to allow accepting invalid certificates instead, + // but that's not the most important point of these tests) + const createClient = (port) => { + let c = new Client({ + port: TEST_PORT, + address: '127.0.0.1', + timeout: 500, + }); + c._mockOverrideUrl = `http://127.0.0.1:${port}`; + c.config.port = port; + c.config.address = '127.0.0.1'; + return c; }; -} + // Create an insecure server for unit testing. + const createAndStartMockServer = (port, cb) => { + server = http2.createServer((req, res) => { + var buffers = []; + req.on('data', (data) => buffers.push(data)); + req.on('end', () => { + const requestBody = Buffer.concat(buffers).toString('utf-8'); + cb(req, res, requestBody); + }); + }); + server.listen(port); + server.on('error', (err) => { + expect.fail(`unexpected error ${err}`); + }); + // Don't block the tests if this server doesn't shut down properly + server.unref(); + return server; + }; + const createAndStartMockLowLevelServer = (port, cb) => { + server = http2.createServer(); + server.on('stream', cb); + server.listen(port); + server.on('error', (err) => { + expect.fail(`unexpected error ${err}`); + }); + // Don't block the tests if this server doesn't shut down properly + server.unref(); + return server; + }; + + afterEach((done) => { + let closeServer = () => { + if (server) { + server.close(); + server = null; + } + done(); + }; + if (client) { + client.shutdown(closeServer); + client = null; + } else { + closeServer(); + } + }); + + it("Treats HTTP 200 responses as successful", async () => { + let didRequest = false; + let establishedConnections = 0; + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + expect(req.headers).to.deep.equal({ + ':authority': '127.0.0.1', + ':method': 'POST', + ':path': `/3/device/${MOCK_DEVICE_TOKEN}`, + ':scheme': 'https', + 'apns-someheader': 'somevalue', + }); + expect(requestBody).to.equal(MOCK_BODY); + // res.setHeader('X-Foo', 'bar'); + // res.writeHead(200, { 'Content-Type': 'text/plain; charset=utf-8' }); + res.writeHead(200); + res.end(''); + didRequest = true; + }); + server.on('connection', () => establishedConnections += 1); + await new Promise((resolve) => server.on('listening', resolve)); + + client = createClient(TEST_PORT); + + const runSuccessfulRequest = async () => { + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ device: MOCK_DEVICE_TOKEN }); + expect(didRequest).to.be.true; + didRequest = false; + }; + await runSuccessfulRequest(); + await runSuccessfulRequest(); + expect(establishedConnections).to.equal(1); // should establish a connection to the server and reuse it + }); + + // https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/handling_notification_responses_from_apns + it("JSON decodes HTTP 400 responses", async () => { + let didRequest = false; + let establishedConnections = 0; + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + expect(requestBody).to.equal(MOCK_BODY); + // res.setHeader('X-Foo', 'bar'); + // res.writeHead(200, { 'Content-Type': 'text/plain; charset=utf-8' }); + res.writeHead(400); + res.end('{"reason": "BadDeviceToken"}'); + didRequest = true; + }); + server.on('connection', () => establishedConnections += 1); + await new Promise((resolve) => server.on('listening', resolve)); + + client = createClient(TEST_PORT); + + const runRequestWithBadDeviceToken = async () => { + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + response: { + "reason": "BadDeviceToken", + }, + status: 400, + }); + expect(didRequest).to.be.true; + didRequest = false; + }; + await runRequestWithBadDeviceToken(); + await runRequestWithBadDeviceToken(); + expect(establishedConnections).to.equal(1); // should establish a connection to the server and reuse it + }); + + // node-apn started closing connections in response to a bug report where HTTP 500 responses + // persisted until a new connection was reopened + it("Closes connections when HTTP 500 responses are received", async () => { + let establishedConnections = 0; + let responseDelay = 0; + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + // Wait 50ms before sending the responses in parallel + setTimeout(() => { + expect(requestBody).to.equal(MOCK_BODY); + res.writeHead(500); + res.end('{"reason": "InternalServerError"}'); + }, responseDelay); + }); + server.on('connection', () => establishedConnections += 1); + await new Promise((resolve) => server.on('listening', resolve)); + + client = createClient(TEST_PORT); + + const runRequestWithInternalServerError = async () => { + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + error: new VError("Error 500, stream ended unexpectedly"), + }); + }; + await runRequestWithInternalServerError(); + await runRequestWithInternalServerError(); + await runRequestWithInternalServerError(); + expect(establishedConnections).to.equal(3); // should close and establish new connections on http 500 + // Validate that nothing wrong happens when multiple HTTP 500s are received simultaneously. + // (no segfaults, all promises get resolved, etc.) + responseDelay = 50; + await Promise.all([ + runRequestWithInternalServerError(), + runRequestWithInternalServerError(), + runRequestWithInternalServerError(), + runRequestWithInternalServerError(), + ]); + expect(establishedConnections).to.equal(4); // should close and establish new connections on http 500 + }); -function FakeStream(deviceId, statusCode, response) { - const fakeStream = new stream.Transform({ - transform: sinon.spy(function(chunk, encoding, callback) { - expect(this.headers).to.be.calledOnce; + it("Handles unexpected invalid JSON responses", async () => { + let establishedConnections = 0; + let responseDelay = 0; + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + // Wait 50ms before sending the responses in parallel + setTimeout(() => { + expect(requestBody).to.equal(MOCK_BODY); + res.writeHead(500); + res.end('PC LOAD LETTER'); + }, responseDelay); + }); + server.on('connection', () => establishedConnections += 1); + await new Promise((resolve) => server.on('listening', resolve)); + + client = createClient(TEST_PORT); + + const runRequestWithInternalServerError = async () => { + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const result = await client.write( + mockNotification, + mockDevice, + ); + // Should not happen, but if it does, the promise should resolve with an error + expect(result.device).to.equal(MOCK_DEVICE_TOKEN); + expect(result.error.message).to.equal('Unexpected error processing APNs response: Unexpected token P in JSON at position 0'); + }; + await runRequestWithInternalServerError(); + await runRequestWithInternalServerError(); + expect(establishedConnections).to.equal(1); // Currently reuses the connection. + }); - const headers = this.headers.firstCall.args[0]; - expect(headers[":path"].substring(10)).to.equal(deviceId); + it("Handles APNs timeouts", async () => { + let didGetRequest = false; + let didGetResponse = false; + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + didGetRequest = true; + setTimeout(() => { + res.writeHead(200); + res.end(''); + didGetResponse = true; + }, 1900); + }); + client = createClient(TEST_PORT); + + const onListeningPromise = new Promise((resolve) => server.on('listening', resolve));; + await onListeningPromise; + + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const performRequestExpectingTimeout = async () => { + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + error: new VError('Forcibly closing connection to APNs after reaching the request timeout of 500 milliseconds'), + }); + expect(didGetRequest).to.be.true; + expect(didGetResponse).to.be.false; + }; + await performRequestExpectingTimeout(); + didGetResponse = false; + didGetRequest = false; + // Should be able to have multiple in flight requests all get notified that the server is shutting down + await Promise.all([ + performRequestExpectingTimeout(), + performRequestExpectingTimeout(), + performRequestExpectingTimeout(), + performRequestExpectingTimeout(), + ]); + }); - this.emit("headers", { - ":status": statusCode + it("Handles goaway frames", async () => { + let didGetRequest = false; + let establishedConnections = 0; + server = createAndStartMockLowLevelServer(TEST_PORT, (stream) => { + const session = stream.session; + const errorCode = 1; + didGetRequest = true; + session.goaway(errorCode); + }); + server.on('connection', () => establishedConnections += 1); + client = createClient(TEST_PORT); + + const onListeningPromise = new Promise((resolve) => server.on('listening', resolve));; + await onListeningPromise; + + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const performRequestExpectingGoAway = async () => { + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + error: new VError('stream ended unexpectedly'), }); - callback(null, Buffer.from(JSON.stringify(response) || "")); - }) + expect(didGetRequest).to.be.true; + didGetRequest = false; + }; + await performRequestExpectingGoAway(); + await performRequestExpectingGoAway(); + expect(establishedConnections).to.equal(2); }); - fakeStream.headers = sinon.stub(); - return fakeStream; -} + it("Handles unexpected protocol errors (no response sent)", async () => { + let didGetRequest = false; + let establishedConnections = 0; + let responseTimeout = 0; + server = createAndStartMockLowLevelServer(TEST_PORT, (stream) => { + setTimeout(() => { + const session = stream.session; + didGetRequest = true; + if (session) { + session.destroy(); + } + }, responseTimeout); + }); + server.on('connection', () => establishedConnections += 1); + client = createClient(TEST_PORT); + + const onListeningPromise = new Promise((resolve) => server.on('listening', resolve));; + await onListeningPromise; + + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const performRequestExpectingDisconnect = async () => { + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + error: new VError('stream ended unexpectedly'), + }); + expect(didGetRequest).to.be.true; + }; + await performRequestExpectingDisconnect(); + didGetRequest = false; + await performRequestExpectingDisconnect(); + didGetRequest = false; + expect(establishedConnections).to.equal(2); + responseTimeout = 10; + await Promise.all([ + performRequestExpectingDisconnect(), + performRequestExpectingDisconnect(), + performRequestExpectingDisconnect(), + performRequestExpectingDisconnect(), + ]); + expect(establishedConnections).to.equal(3); + }); -describe("Client", function () { // let fakes, Client; - // beforeEach(function () { + // beforeEach(() => { // fakes = { // config: sinon.stub(), // EndpointManager: sinon.stub(), @@ -46,8 +444,8 @@ describe("Client", function () { // Client = require("../lib/client")(fakes); // }); - // describe("constructor", function () { - // it("prepares the configuration with passed options", function () { + // describe("constructor", () => { + // it("prepares the configuration with passed options", () => { // let options = { production: true }; // let client = new Client(options); @@ -55,14 +453,14 @@ describe("Client", function () { // }); // describe("EndpointManager instance", function() { - // it("is created", function () { + // it("is created", () => { // let client = new Client(); // expect(fakes.EndpointManager).to.be.calledOnce; // expect(fakes.EndpointManager).to.be.calledWithNew; // }); - // it("is passed the prepared configuration", function () { + // it("is passed the prepared configuration", () => { // const returnSentinel = { "configKey": "configValue"}; // fakes.config.returns(returnSentinel); @@ -72,37 +470,37 @@ describe("Client", function () { // }); // }); - describe("write", function () { - // beforeEach(function () { + describe("write", () => { + // beforeEach(() => { // fakes.config.returnsArg(0); // fakes.endpointManager.getStream = sinon.stub(); // fakes.EndpointManager.returns(fakes.endpointManager); // }); - // context("a stream is available", function () { + // context("a stream is available", () => { // let client; - // context("transmission succeeds", function () { - // beforeEach( function () { + // context("transmission succeeds", () => { + // beforeEach( () => { // client = new Client( { address: "testapi" } ); // fakes.stream = new FakeStream("abcd1234", "200"); // fakes.endpointManager.getStream.onCall(0).returns(fakes.stream); // }); - // it("attempts to acquire one stream", function () { + // it("attempts to acquire one stream", () => { // return client.write(builtNotification(), "abcd1234") - // .then(function () { + // .then(() => { // expect(fakes.endpointManager.getStream).to.be.calledOnce; // }); // }); - // describe("headers", function () { + // describe("headers", () => { - // it("sends the required HTTP/2 headers", function () { + // it("sends the required HTTP/2 headers", () => { // return client.write(builtNotification(), "abcd1234") - // .then(function () { + // .then(() => { // expect(fakes.stream.headers).to.be.calledWithMatch( { // ":scheme": "https", // ":method": "POST", @@ -112,16 +510,16 @@ describe("Client", function () { // }); // }); - // it("does not include apns headers when not required", function () { + // it("does not include apns headers when not required", () => { // return client.write(builtNotification(), "abcd1234") - // .then(function () { + // .then(() => { // ["apns-id", "apns-priority", "apns-expiration", "apns-topic"].forEach( header => { // expect(fakes.stream.headers).to.not.be.calledWithMatch(sinon.match.has(header)); // }); // }); // }); - // it("sends the notification-specific apns headers when specified", function () { + // it("sends the notification-specific apns headers when specified", () => { // let notification = builtNotification(); // notification.headers = { @@ -132,7 +530,7 @@ describe("Client", function () { // }; // return client.write(notification, "abcd1234") - // .then(function () { + // .then(() => { // expect(fakes.stream.headers).to.be.calledWithMatch( { // "apns-id": "123e4567-e89b-12d3-a456-42665544000", // "apns-priority": 5, @@ -142,8 +540,8 @@ describe("Client", function () { // }); // }); - // context("when token authentication is enabled", function () { - // beforeEach(function () { + // context("when token authentication is enabled", () => { + // beforeEach(() => { // fakes.token = { // generation: 0, // current: "fake-token", @@ -157,10 +555,10 @@ describe("Client", function () { // fakes.endpointManager.getStream.onCall(0).returns(fakes.stream); // }); - // it("sends the bearer token", function () { + // it("sends the bearer token", () => { // let notification = builtNotification(); - // return client.write(notification, "abcd1234").then(function () { + // return client.write(notification, "abcd1234").then(() => { // expect(fakes.stream.headers).to.be.calledWithMatch({ // authorization: "bearer fake-token", // }); @@ -168,51 +566,51 @@ describe("Client", function () { // }); // }); - // context("when token authentication is disabled", function () { - // beforeEach(function () { + // context("when token authentication is disabled", () => { + // beforeEach(() => { // client = new Client( { address: "testapi" } ); // fakes.stream = new FakeStream("abcd1234", "200"); // fakes.endpointManager.getStream.onCall(0).returns(fakes.stream); // }); - // it("does not set an authorization header", function () { + // it("does not set an authorization header", () => { // let notification = builtNotification(); - // return client.write(notification, "abcd1234").then(function () { + // return client.write(notification, "abcd1234").then(() => { // expect(fakes.stream.headers.firstCall.args[0]).to.not.have.property("authorization"); // }) // }); // }) // }); - // it("writes the notification data to the pipe", function () { + // it("writes the notification data to the pipe", () => { // const notification = builtNotification(); // return client.write(notification, "abcd1234") - // .then(function () { + // .then(() => { // expect(fakes.stream._transform).to.be.calledWithMatch(actual => actual.equals(Buffer.from(notification.body))); // }); // }); - // it("ends the stream", function () { + // it("ends the stream", () => { // sinon.spy(fakes.stream, "end"); // return client.write(builtNotification(), "abcd1234") - // .then(function () { + // .then(() => { // expect(fakes.stream.end).to.be.calledOnce; // }); // }); - // it("resolves with the device token", function () { + // it("resolves with the device token", () => { // return expect(client.write(builtNotification(), "abcd1234")) // .to.become({ device: "abcd1234" }); // }); // }); - // context("error occurs", function () { + // context("error occurs", () => { // let promise; - // context("general case", function () { - // beforeEach(function () { + // context("general case", () => { + // beforeEach(() => { // const client = new Client( { address: "testapi" } ); // fakes.stream = new FakeStream("abcd1234", "400", { "reason" : "BadDeviceToken" }); @@ -221,23 +619,23 @@ describe("Client", function () { // promise = client.write(builtNotification(), "abcd1234"); // }); - // it("resolves with the device token, status code and response", function () { + // it("resolves with the device token, status code and response", () => { // return expect(promise).to.eventually.deep.equal({ status: "400", device: "abcd1234", response: { reason: "BadDeviceToken" }}); // }); // }) - // context("ExpiredProviderToken", function () { - // beforeEach(function () { + // context("ExpiredProviderToken", () => { + // beforeEach(() => { // let tokenGenerator = sinon.stub().returns("fake-token"); // const client = new Client( { address: "testapi", token: tokenGenerator }); // }) // }); // }); - // context("stream ends without completing request", function () { + // context("stream ends without completing request", () => { // let promise; - // beforeEach(function () { + // beforeEach(() => { // const client = new Client( { address: "testapi" } ); // fakes.stream = new stream.Transform({ // transform: function(chunk, encoding, callback) {} @@ -251,11 +649,11 @@ describe("Client", function () { // fakes.stream.push(null); // }); - // it("resolves with an object containing the device token", function () { + // it("resolves with an object containing the device token", () => { // return expect(promise).to.eventually.have.property("device", "abcd1234"); // }); - // it("resolves with an object containing an error", function () { + // it("resolves with an object containing an error", () => { // return promise.then( (response) => { // expect(response).to.have.property("error"); // expect(response.error).to.be.an.instanceOf(Error); @@ -264,10 +662,10 @@ describe("Client", function () { // }); // }); - // context("stream is unprocessed", function () { + // context("stream is unprocessed", () => { // let promise; - // beforeEach(function () { + // beforeEach(() => { // const client = new Client( { address: "testapi" } ); // fakes.stream = new stream.Transform({ // transform: function(chunk, encoding, callback) {} @@ -293,15 +691,15 @@ describe("Client", function () { // }); // }); - // it("fulfills the promise", function () { + // it("fulfills the promise", () => { // return expect(promise).to.eventually.deep.equal({ device: "abcd1234" }); // }); // }); - // context("stream error occurs", function () { + // context("stream error occurs", () => { // let promise; - // beforeEach(function () { + // beforeEach(() => { // const client = new Client( { address: "testapi" } ); // fakes.stream = new stream.Transform({ // transform: function(chunk, encoding, callback) {} @@ -313,16 +711,16 @@ describe("Client", function () { // promise = client.write(builtNotification(), "abcd1234"); // }); - // context("passing an Error", function () { - // beforeEach(function () { + // context("passing an Error", () => { + // beforeEach(() => { // fakes.stream.emit("error", new Error("stream error")); // }); - // it("resolves with an object containing the device token", function () { + // it("resolves with an object containing the device token", () => { // return expect(promise).to.eventually.have.property("device", "abcd1234"); // }); - // it("resolves with an object containing a wrapped error", function () { + // it("resolves with an object containing a wrapped error", () => { // return promise.then( (response) => { // expect(response.error).to.be.an.instanceOf(Error); // expect(response.error).to.match(/apn write failed/); @@ -331,8 +729,8 @@ describe("Client", function () { // }); // }); - // context("passing a string", function () { - // it("resolves with the device token and an error", function () { + // context("passing a string", () => { + // it("resolves with the device token and an error", () => { // fakes.stream.emit("error", "stream error"); // return promise.then( (response) => { // expect(response).to.have.property("device", "abcd1234"); @@ -345,10 +743,10 @@ describe("Client", function () { // }); // }); - // context("no new stream is returned but the endpoint later wakes up", function () { + // context("no new stream is returned but the endpoint later wakes up", () => { // let notification, promise; - // beforeEach( function () { + // beforeEach( () => { // const client = new Client( { address: "testapi" } ); // fakes.stream = new FakeStream("abcd1234", "200"); @@ -365,7 +763,7 @@ describe("Client", function () { // return promise; // }); - // it("sends the required headers to the newly available stream", function () { + // it("sends the required headers to the newly available stream", () => { // expect(fakes.stream.headers).to.be.calledWithMatch( { // ":scheme": "https", // ":method": "POST", @@ -374,14 +772,14 @@ describe("Client", function () { // }); // }); - // it("writes the notification data to the pipe", function () { + // it("writes the notification data to the pipe", () => { // expect(fakes.stream._transform).to.be.calledWithMatch(actual => actual.equals(Buffer.from(notification.body))); // }); // }); - // context("when 5 successive notifications are sent", function () { + // context("when 5 successive notifications are sent", () => { - // beforeEach(function () { + // beforeEach(() => { // fakes.streams = [ // new FakeStream("abcd1234", "200"), // new FakeStream("adfe5969", "400", { reason: "MissingTopic" }), @@ -391,10 +789,10 @@ describe("Client", function () { // ]; // }); - // context("streams are always returned", function () { + // context("streams are always returned", () => { // let promises; - // beforeEach( function () { + // beforeEach( () => { // const client = new Client( { address: "testapi" } ); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[0]); @@ -414,7 +812,7 @@ describe("Client", function () { // return promises; // }); - // it("sends the required headers for each stream", function () { + // it("sends the required headers for each stream", () => { // expect(fakes.streams[0].headers).to.be.calledWithMatch( { ":path": "/3/device/abcd1234" } ); // expect(fakes.streams[1].headers).to.be.calledWithMatch( { ":path": "/3/device/adfe5969" } ); // expect(fakes.streams[2].headers).to.be.calledWithMatch( { ":path": "/3/device/abcd1335" } ); @@ -422,13 +820,13 @@ describe("Client", function () { // expect(fakes.streams[4].headers).to.be.calledWithMatch( { ":path": "/3/device/aabbc788" } ); // }); - // it("writes the notification data for each stream", function () { + // it("writes the notification data for each stream", () => { // fakes.streams.forEach( stream => { // expect(stream._transform).to.be.calledWithMatch(actual => actual.equals(Buffer.from(builtNotification().body))); // }); // }); - // it("resolves with the notification outcomes", function () { + // it("resolves with the notification outcomes", () => { // return expect(promises).to.eventually.deep.equal([ // { device: "abcd1234"}, // { device: "adfe5969", status: "400", response: { reason: "MissingTopic" } }, @@ -439,7 +837,7 @@ describe("Client", function () { // }); // }); - // context("some streams return, others wake up later", function () { + // context("some streams return, others wake up later", () => { // let promises; // beforeEach( function() { @@ -456,14 +854,14 @@ describe("Client", function () { // client.write(builtNotification(), "aabbc788"), // ]); - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[2]); // fakes.endpointManager.getStream.onCall(1).returns(null); // fakes.endpointManager.emit("wakeup"); // }, 1); - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[3]); // fakes.endpointManager.getStream.onCall(1).returns(fakes.streams[4]); @@ -473,7 +871,7 @@ describe("Client", function () { // return promises; // }); - // it("sends the correct device ID for each stream", function () { + // it("sends the correct device ID for each stream", () => { // expect(fakes.streams[0].headers).to.be.calledWithMatch({":path": "/3/device/abcd1234"}); // expect(fakes.streams[1].headers).to.be.calledWithMatch({":path": "/3/device/adfe5969"}); // expect(fakes.streams[2].headers).to.be.calledWithMatch({":path": "/3/device/abcd1335"}); @@ -481,13 +879,13 @@ describe("Client", function () { // expect(fakes.streams[4].headers).to.be.calledWithMatch({":path": "/3/device/aabbc788"}); // }); - // it("writes the notification data for each stream", function () { + // it("writes the notification data for each stream", () => { // fakes.streams.forEach( stream => { // expect(stream._transform).to.be.calledWithMatch(actual => actual.equals(Buffer.from(builtNotification().body))); // }); // }); - // it("resolves with the notification reponses", function () { + // it("resolves with the notification reponses", () => { // return expect(promises).to.eventually.deep.equal([ // { device: "abcd1234"}, // { device: "adfe5969", status: "400", response: { reason: "MissingTopic" } }, @@ -498,7 +896,7 @@ describe("Client", function () { // }); // }); - // context("connection fails", function () { + // context("connection fails", () => { // let promises, client; // beforeEach( function() { @@ -512,7 +910,7 @@ describe("Client", function () { // client.write(builtNotification(), "abcd1335"), // ]); - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.emit("error", new Error("endpoint failed")); // }, 1); @@ -520,20 +918,20 @@ describe("Client", function () { // return promises; // }); - // it("resolves with 1 success", function () { + // it("resolves with 1 success", () => { // return promises.then( response => { // expect(response[0]).to.deep.equal({ device: "abcd1234" }); // }); // }); - // it("resolves with 2 errors", function () { + // it("resolves with 2 errors", () => { // return promises.then( response => { // expect(response[1]).to.deep.equal({ device: "adfe5969", error: new Error("endpoint failed") }); // expect(response[2]).to.deep.equal({ device: "abcd1335", error: new Error("endpoint failed") }); // }) // }); - // it("clears the queue", function () { + // it("clears the queue", () => { // return promises.then( () => { // expect(client.queue.length).to.equal(0); // }); @@ -542,8 +940,8 @@ describe("Client", function () { // }); - // describe("token generator behaviour", function () { - // beforeEach(function () { + // describe("token generator behaviour", () => { + // beforeEach(() => { // fakes.token = { // generation: 0, // current: "fake-token", @@ -558,10 +956,10 @@ describe("Client", function () { // ]; // }); - // it("reuses the token", function () { + // it("reuses the token", () => { // const client = new Client( { address: "testapi", token: fakes.token } ); - // fakes.token.regenerate = function () { + // fakes.token.regenerate = () => { // fakes.token.generation = 1; // fakes.token.current = "second-token"; // } @@ -574,16 +972,16 @@ describe("Client", function () { // client.write(builtNotification(), "abcd1234"), // client.write(builtNotification(), "adfe5969"), // client.write(builtNotification(), "abcd1335"), - // ]).then(function () { + // ]).then(() => { // expect(fakes.streams[0].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); // expect(fakes.streams[1].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); // expect(fakes.streams[2].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); // }); // }); - // context("token expires", function () { + // context("token expires", () => { - // beforeEach(function () { + // beforeEach(() => { // fakes.token.regenerate = function (generation) { // if (generation === fakes.token.generation) { // fakes.token.generation += 1; @@ -592,7 +990,7 @@ describe("Client", function () { // } // }); - // it("resends the notification with a new token", function () { + // it("resends the notification with a new token", () => { // fakes.streams = [ // new FakeStream("adfe5969", "403", { reason: "ExpiredProviderToken" }), // new FakeStream("adfe5969", "200"), @@ -604,19 +1002,19 @@ describe("Client", function () { // const promise = client.write(builtNotification(), "adfe5969"); - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[1]); // fakes.endpointManager.emit("wakeup"); // }, 1); - // return promise.then(function () { + // return promise.then(() => { // expect(fakes.streams[0].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); // expect(fakes.streams[1].headers).to.be.calledWithMatch({ authorization: "bearer token-1" }); // }); // }); - // it("only regenerates the token once per-expiry", function () { + // it("only regenerates the token once per-expiry", () => { // fakes.streams = [ // new FakeStream("abcd1234", "200"), // new FakeStream("adfe5969", "403", { reason: "ExpiredProviderToken" }), @@ -637,14 +1035,14 @@ describe("Client", function () { // client.write(builtNotification(), "abcd1335"), // ]); - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[3]); // fakes.endpointManager.getStream.onCall(1).returns(fakes.streams[4]); // fakes.endpointManager.emit("wakeup"); // }, 1); - // return promises.then(function () { + // return promises.then(() => { // expect(fakes.streams[0].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); // expect(fakes.streams[1].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); // expect(fakes.streams[2].headers).to.be.calledWithMatch({ authorization: "bearer fake-token" }); @@ -653,7 +1051,7 @@ describe("Client", function () { // }); // }); - // it("abandons sending after 3 ExpiredProviderToken failures", function () { + // it("abandons sending after 3 ExpiredProviderToken failures", () => { // fakes.streams = [ // new FakeStream("adfe5969", "403", { reason: "ExpiredProviderToken" }), // new FakeStream("adfe5969", "403", { reason: "ExpiredProviderToken" }), @@ -669,7 +1067,7 @@ describe("Client", function () { // return expect(client.write(builtNotification(), "adfe5969")).to.eventually.have.property("status", "403"); // }); - // it("regenerate token", function () { + // it("regenerate token", () => { // fakes.stream = new FakeStream("abcd1234", "200"); // fakes.endpointManager.getStream.onCall(0).returns(fakes.stream); @@ -681,14 +1079,14 @@ describe("Client", function () { // address: "testapi", // token: fakes.token // }); - + // return client.write(builtNotification(), "abcd1234") - // .then(function () { + // .then(() => { // expect(fakes.token.generation).to.equal(1); // }); // }); - // it("internal server error", function () { + // it("internal server error", () => { // fakes.stream = new FakeStream("abcd1234", "500", { reason: "InternalServerError" }); // fakes.stream.connection = sinon.stub(); // fakes.stream.connection.close = sinon.stub(); @@ -698,23 +1096,23 @@ describe("Client", function () { // address: "testapi", // token: fakes.token // }); - + // return expect(client.write(builtNotification(), "abcd1234")).to.eventually.have.deep.property("error.jse_shortmsg","Error 500, stream ended unexpectedly"); // }); // }); // }); }); - describe("shutdown", function () { - // beforeEach(function () { + describe("shutdown", () => { + // beforeEach(() => { // fakes.config.returnsArg(0); // fakes.endpointManager.getStream = sinon.stub(); // fakes.EndpointManager.returns(fakes.endpointManager); // }); - // context("with no pending notifications", function () { - // it("invokes shutdown on endpoint manager", function () { + // context("with no pending notifications", () => { + // it("invokes shutdown on endpoint manager", () => { // let client = new Client(); // client.shutdown(); @@ -722,8 +1120,8 @@ describe("Client", function () { // }); // }); - // context("with pending notifications", function () { - // it("invokes shutdown on endpoint manager after queue drains", function () { + // context("with pending notifications", () => { + // it("invokes shutdown on endpoint manager after queue drains", () => { // let client = new Client({ address: "none" }); // fakes.streams = [ @@ -749,14 +1147,14 @@ describe("Client", function () { // expect(fakes.endpointManager.shutdown).to.not.be.called; - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[2]); // fakes.endpointManager.getStream.onCall(1).returns(null); // fakes.endpointManager.emit("wakeup"); // }, 1); - // setTimeout(function () { + // setTimeout(() => { // fakes.endpointManager.getStream.reset(); // fakes.endpointManager.getStream.onCall(0).returns(fakes.streams[3]); // fakes.endpointManager.getStream.onCall(1).returns(fakes.streams[4]); diff --git a/test/config.js b/test/config.js index c00017cc..7c2185ca 100644 --- a/test/config.js +++ b/test/config.js @@ -31,6 +31,7 @@ describe("config", function () { rejectUnauthorized: true, connectionRetryLimit: 10, heartBeat: 60000, + timeout: 10000, }); }); From f41f08c303c599ab045eafd4e70a0e1fdb7a4186 Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Thu, 1 Oct 2020 17:34:03 -0400 Subject: [PATCH 3/7] Add test of immediately receiving concurrent requests --- test/client.js | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/test/client.js b/test/client.js index 0308c8c9..6b30c55b 100644 --- a/test/client.js +++ b/test/client.js @@ -125,6 +125,7 @@ describe("Client", () => { it("Treats HTTP 200 responses as successful", async () => { let didRequest = false; let establishedConnections = 0; + let requestsServed = 0; server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { expect(req.headers).to.deep.equal({ ':authority': '127.0.0.1', @@ -138,6 +139,7 @@ describe("Client", () => { // res.writeHead(200, { 'Content-Type': 'text/plain; charset=utf-8' }); res.writeHead(200); res.end(''); + requestsServed += 1; didRequest = true; }); server.on('connection', () => establishedConnections += 1); @@ -158,11 +160,21 @@ describe("Client", () => { ); expect(result).to.deep.equal({ device: MOCK_DEVICE_TOKEN }); expect(didRequest).to.be.true; - didRequest = false; }; - await runSuccessfulRequest(); + expect(establishedConnections).to.equal(0); // should not establish a connection until it's needed + // Validate that when multiple valid requests arrive concurrently, + // only one HTTP/2 connection gets established + await Promise.all([ + runSuccessfulRequest(), + runSuccessfulRequest(), + runSuccessfulRequest(), + runSuccessfulRequest(), + runSuccessfulRequest(), + ]); + didRequest = false; await runSuccessfulRequest(); expect(establishedConnections).to.equal(1); // should establish a connection to the server and reuse it + expect(requestsServed).to.equal(6); }); // https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/handling_notification_responses_from_apns From 4550ac867b8e4379cf79dfe8dc7a41429713bb71 Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Thu, 1 Oct 2020 19:20:35 -0400 Subject: [PATCH 4/7] Refactor the way clients are cleaned up --- lib/client.js | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/client.js b/lib/client.js index 07b09889..6277c92f 100644 --- a/lib/client.js +++ b/lib/client.js @@ -44,7 +44,7 @@ module.exports = function (dependencies) { function Client (options) { this.config = config(options); this.healthCheckInterval = setInterval(() => { - if (this.session && !this.session.destroyed) { + if (this.session && !this.session.destroyed && !this.session.closed) { this.session.ping((error, duration) => { if (error) { logger("No Ping response after " + duration + " ms"); @@ -55,6 +55,13 @@ module.exports = function (dependencies) { }, this.config.heartBeat).unref(); } + Client.prototype._safeCloseSession = function(session, callback) { + safeCloseSession(session, callback); + if (session === this.session) { + this.session = null; + } + } + Client.prototype._createSession = function() { const url = this._mockOverrideUrl || `https://${this.config.address}`; // Get the reference to the current session so that @@ -65,20 +72,20 @@ module.exports = function (dependencies) { if (logger.enabled) { logger(`Socket error: ${error}`); } - safeCloseSession(session); + this._safeCloseSession(session); }); session.on("error", (error) => { if (logger.enabled) { logger(`Session error: ${error}`); } - safeCloseSession(session); + this._safeCloseSession(session); }); session.on("goaway", (errorCode, lastStreamId, opaqueData) => { logger(`GOAWAY received: (errorCode ${errorCode}, lastStreamId: ${lastStreamId}, opaqueData: ${opaqueData})`); // gracefully stop accepting new streams // This may be redundant, since nodejs http2 client is supposed to shut down automatically on receiving a goaway frame - safeCloseSession(session); + this._safeCloseSession(session); }); if (logger.enabled) { @@ -144,8 +151,7 @@ module.exports = function (dependencies) { return; } logger(errorMessage); - safeCloseSession(currentSession); - this.session = null; + this._safeCloseSession(currentSession); }); request.setEncoding("utf8"); @@ -179,8 +185,7 @@ module.exports = function (dependencies) { let error = new VError("Error 500, stream ended unexpectedly"); resolve({ device, error }); - safeCloseSession(currentSession); - this.session = null; + this._safeCloseSession(currentSession); return; } @@ -219,8 +224,7 @@ module.exports = function (dependencies) { clearInterval(this.healthCheckInterval); } if (this.session) { - safeCloseSession(this.session, callback); - this.session = null; + this._safeCloseSession(this.session, callback); } else { if (callback) { callback(); From 021799b45e65aa3c81490a2b43f559b51334b22e Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Fri, 2 Oct 2020 11:29:37 -0400 Subject: [PATCH 5/7] Tolerate gradual responses over slow network connections Don't time out immediately on the slowest request. - That would just lead to constantly creating and tearing down connections to APNs and possibly getting blocked. Instead, check if the last successful response was received longer ago than the configured timeout. --- lib/client.js | 46 +++++++++++++++++++++++++-- test/client.js | 85 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/lib/client.js b/lib/client.js index 6277c92f..47fa5003 100644 --- a/lib/client.js +++ b/lib/client.js @@ -99,6 +99,18 @@ module.exports = function (dependencies) { logger(`Frame error: (frameType: ${frameType}, errorCode ${errorCode}, streamId: ${streamId})`); }); } + /** + * Act as if the last successful apn response was now as a + * placeholder value to simplify subsequent checks. + * + * If we get a burst of requests on a network with limited bandwidth, + * it's possible that APNs may be working properly, but some requests will be + * delayed because of the network. + * + * To handle cases like that, the most recent successful response time is tracked + * when deciding if a connection to APNs is actually dead. + */ + session._lastSuccessfulApnResponseMillis = Date.now(); return session; }; @@ -109,7 +121,13 @@ module.exports = function (dependencies) { * @returns {Promise<{device:string, error?: VError}>} object with device, optional error. */ Client.prototype.write = function write (notification, device, count = 0) { - return new Promise((resolve) => { + return new Promise((originalResolve) => { + let isResolved = false; + const resolve = (result) => { + isResolved = true; + originalResolve(result); + }; + // Connect session if (!this.session || this.session.destroyed || this.session._nodeApnIsDestroying) { logger('creating a new APNs session'); @@ -138,9 +156,28 @@ module.exports = function (dependencies) { const currentSession = this.session; const request = currentSession.request(headers) + // Timeout in milliseconds const timeout = this.config.timeout || 10000; - request.setTimeout(timeout, () => { + const timeoutCb = () => { + if (isResolved) { + // Assume that something emitted the 'error' event and this request/session finished. + return; + } + const newEndTime = currentSession._lastSuccessfulApnResponseMillis + timeout; + if (newEndTime >= Date.now()) { + // Postpone this by an arbitrary delay if there were recent successes + // for the current session in case there are a lot of in flight requests to APNs + // and the only reason for the delay was limited bandwidth. + // + // (e.g. sending out hundreds of thousands of notifications at once) + // + // Deliberately not shortening the delay to anywhere near 0 + // in case that would lead to a spike in CPU usage if all in-flight requests did that. + const newTimeout = Math.max(timeout / 4, newEndTime - Date.now()); + request.setTimeout(newTimeout, timeoutCb); + return; + } const errorMessage = `Forcibly closing connection to APNs after reaching the request timeout of ${timeout} milliseconds`; // The first call to resolve will be what the promise resolves to. resolve({device, error: new VError(errorMessage)}); @@ -152,7 +189,9 @@ module.exports = function (dependencies) { } logger(errorMessage); this._safeCloseSession(currentSession); - }); + }; + + request.setTimeout(timeout, timeoutCb); request.setEncoding("utf8"); @@ -167,6 +206,7 @@ module.exports = function (dependencies) { request.write(notification.body); request.on("end", () => { + currentSession._lastSuccessfulApnResponseMillis = Date.now(); try { if (logger.enabled) { logger(`Request ended with status ${status} and responseData: ${responseData}`); diff --git a/test/client.js b/test/client.js index 6b30c55b..7a4bfd63 100644 --- a/test/client.js +++ b/test/client.js @@ -65,11 +65,11 @@ describe("Client", () => { // (APNS would use https://, not http://) // (It's probably possible to allow accepting invalid certificates instead, // but that's not the most important point of these tests) - const createClient = (port) => { + const createClient = (port, timeout = 500) => { let c = new Client({ port: TEST_PORT, address: '127.0.0.1', - timeout: 500, + timeout, }); c._mockOverrideUrl = `http://127.0.0.1:${port}`; c.config.port = port; @@ -352,6 +352,87 @@ describe("Client", () => { ]); }); + it("Does not disconnect when there is limited bandwidth but responses are received", async () => { + let didGetRequest = false; + let didGetResponse = false; + let currentRequest = 0; + let establishedConnections = 0; + // Simulate responses getting sent at time 0, 50, 100, 150, ... 1000 + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + setTimeout(() => { + res.writeHead(200); + res.end(''); + }, currentRequest * 50); + currentRequest += 1; + }); + server.on('connection', () => establishedConnections += 1); + const testTimeout = 200; + client = createClient(TEST_PORT, testTimeout); + + const onListeningPromise = new Promise((resolve) => server.on('listening', resolve));; + await onListeningPromise; + + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const performRequestExpectingResponse = async () => { + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + }); + }; + // Should be able to have multiple in flight requests all get responses eventually + // when the bandwidth is limited or the process is under temporarily high load, + // as long ad the server keeps getting responses + await Promise.all([ + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + performRequestExpectingResponse(), + ]); + expect(currentRequest).to.equal(15); + expect(establishedConnections).to.equal(1); + + // Set the server timeout to 250 milliseconds + currentRequest = 5; + const performRequestExpectingTimeout = async () => { + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ + device: MOCK_DEVICE_TOKEN, + error: new VError('Forcibly closing connection to APNs after reaching the request timeout of 200 milliseconds'), + }); + }; + // If there is a timeout with no recent successful requests, + // then the server should disconnect and reconnect for subsequent attempts. + await performRequestExpectingTimeout(); + expect(currentRequest).to.equal(6); + expect(establishedConnections).to.equal(1); + + await performRequestExpectingTimeout(); + expect(currentRequest).to.equal(7); + expect(establishedConnections).to.equal(2); + }); + it("Handles goaway frames", async () => { let didGetRequest = false; let establishedConnections = 0; From 581b2e8e8ef11edcddffc569fdc086ed800cce11 Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Fri, 2 Oct 2020 11:36:51 -0400 Subject: [PATCH 6/7] [skip ci] Update timeout documentation --- doc/provider.markdown | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/provider.markdown b/doc/provider.markdown index 08706df9..e606ff48 100644 --- a/doc/provider.markdown +++ b/doc/provider.markdown @@ -25,7 +25,7 @@ Options: - `connectionRetryLimit` {Number} The maximum number of connection failures that will be tolerated before `apn.Provider` will "give up". [See below.](#connection-retry-limit) (Defaults to: 3) - - `timeout` {Number} The timeout in milliseconds for a given HTTP/2 request before the `apn.Client` will assume the server is dead and forcefully reconnect. (Defaults to: 10000) + - `timeout` {Number} The timeout in milliseconds for a given HTTP/2 connection. If a request is sent out and no successful response has been received for *any* request within this timeout, the `apn.Client` will assume the connection to the server is dead and forcefully reconnect. (Defaults to: 10000) #### Provider Certificates vs. Authentication Tokens From 3f9d10ac733b0f09f047d8b2f266539100550597 Mon Sep 17 00:00:00 2001 From: Tyson Andre Date: Fri, 2 Oct 2020 12:37:30 -0400 Subject: [PATCH 7/7] Add a simple load test --- test/client.js | 54 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/test/client.js b/test/client.js index 7a4bfd63..fcc90d89 100644 --- a/test/client.js +++ b/test/client.js @@ -10,7 +10,7 @@ const credentials = require("../lib/credentials")({ }); const TEST_PORT = 30939; - +const LOAD_TEST_BATCH_SIZE = 2000; const config = require("../lib/config")({ logger: debug, @@ -177,6 +177,58 @@ describe("Client", () => { expect(requestsServed).to.equal(6); }); + // Assert that this doesn't crash when a large batch of requests are requested simultaneously + it("Treats HTTP 200 responses as successful (load test for a batch of requests)", async function () { + this.timeout(10000); + let establishedConnections = 0; + let requestsServed = 0; + server = createAndStartMockServer(TEST_PORT, (req, res, requestBody) => { + expect(req.headers).to.deep.equal({ + ':authority': '127.0.0.1', + ':method': 'POST', + ':path': `/3/device/${MOCK_DEVICE_TOKEN}`, + ':scheme': 'https', + 'apns-someheader': 'somevalue', + }); + expect(requestBody).to.equal(MOCK_BODY); + // Set a timeout of 100 to simulate latency to a remote server. + setTimeout(() => { + res.writeHead(200); + res.end(''); + requestsServed += 1; + }, 100); + }); + server.on('connection', () => establishedConnections += 1); + await new Promise((resolve) => server.on('listening', resolve)); + + client = createClient(TEST_PORT, 1500); + + const runSuccessfulRequest = async () => { + const mockHeaders = {'apns-someheader': 'somevalue'}; + const mockNotification = { + headers: mockHeaders, + body: MOCK_BODY, + }; + const mockDevice = MOCK_DEVICE_TOKEN; + const result = await client.write( + mockNotification, + mockDevice, + ); + expect(result).to.deep.equal({ device: MOCK_DEVICE_TOKEN }); + }; + expect(establishedConnections).to.equal(0); // should not establish a connection until it's needed + // Validate that when multiple valid requests arrive concurrently, + // only one HTTP/2 connection gets established + const promises = []; + for (let i = 0; i < LOAD_TEST_BATCH_SIZE; i++) { + promises.push(runSuccessfulRequest()); + } + + await Promise.all(promises); + expect(establishedConnections).to.equal(1); // should establish a connection to the server and reuse it + expect(requestsServed).to.equal(LOAD_TEST_BATCH_SIZE); + }); + // https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/handling_notification_responses_from_apns it("JSON decodes HTTP 400 responses", async () => { let didRequest = false;