Skip to content

Enforce timeouts and add simple http2 client test cases #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ language: node_js
node_js:
- 8
- 10
- 12
- 14
after_success:
- npm install -g istanbul
- npm install -D coveralls
Expand Down
2 changes: 2 additions & 0 deletions doc/provider.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Options:

- `connectionRetryLimit` {Number} The maximum number of connection failures that will be tolerated before `apn.Provider` will "give up". [See below.](#connection-retry-limit) (Defaults to: 3)

- `timeout` {Number} The timeout in milliseconds for a given HTTP/2 connection. If a request is sent out and no successful response has been received for *any* request within this timeout, the `apn.Client` will assume the connection to the server is dead and forcefully reconnect. (Defaults to: 10000)

#### Provider Certificates vs. Authentication Tokens

Apple have introduced a new means of authentication with the APNs - [Provider Authentication Tokens][provider-auth-tokens]. These replace the old-style Certificate/Key pairs with tokens based on the [JWT][jwt] standard. The new system is superior in a number of ways:
Expand Down
294 changes: 197 additions & 97 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,34 @@ module.exports = function (dependencies) {
HTTP2_METHOD_POST
} = http2.constants;

const safeCloseSession = (session, callback) => {
if (session && !session.destroyed && !session._nodeApnIsDestroying) {
session._nodeApnIsDestroying = true;
const startDestroying = () => {
if (!session.destroyed) {
session._nodeApnIsDestroying = true;
session.destroy();
}
if (callback) {
callback();
}
};
if (session.closed) {
startDestroying();
} else {
session.close(startDestroying);
}
} else {
if (callback) {
callback();
}
}
};

function Client (options) {
this.config = config(options);
this.healthCheckInterval = setInterval(() => {
if (this.session && !this.session.destroyed) {
if (this.session && !this.session.destroyed && !this.session.closed) {
this.session.ping((error, duration) => {
if (error) {
logger("No Ping response after " + duration + " ms");
Expand All @@ -31,115 +55,191 @@ module.exports = function (dependencies) {
}, this.config.heartBeat).unref();
}

Client.prototype.write = function write (notification, device, count) {
// Connect session
if (!this.session || this.session.destroyed) {
this.session = http2.connect(`https://${this.config.address}`, this.config);
Client.prototype._safeCloseSession = function(session, callback) {
safeCloseSession(session, callback);
if (session === this.session) {
this.session = null;
}
}

this.session.on("socketError", (error) => {
if (logger.enabled) {
logger(`Socket error: ${error}`);
}
if (this.session && !this.session.destroyed) {
this.session.destroy();
}
Client.prototype._createSession = function() {
const url = this._mockOverrideUrl || `https://${this.config.address}`;
// Get the reference to the current session so that
// we don't unintentionally destroy a different session on an async callback
const session = http2.connect(url, this.config);

session.on("socketError", (error) => {
if (logger.enabled) {
logger(`Socket error: ${error}`);
}
this._safeCloseSession(session);
});
session.on("error", (error) => {
if (logger.enabled) {
logger(`Session error: ${error}`);
}
this._safeCloseSession(session);
});

session.on("goaway", (errorCode, lastStreamId, opaqueData) => {
logger(`GOAWAY received: (errorCode ${errorCode}, lastStreamId: ${lastStreamId}, opaqueData: ${opaqueData})`);
// gracefully stop accepting new streams
// This may be redundant, since nodejs http2 client is supposed to shut down automatically on receiving a goaway frame
this._safeCloseSession(session);
});

if (logger.enabled) {
session.on("connect", () => {
logger("Session connected");
});
this.session.on("error", (error) => {
if (logger.enabled) {
logger(`Session error: ${error}`);
}
if (this.session && !this.session.destroyed) {
this.session.destroy();
}
session.on("close", () => {
logger("Session closed");
});
session.on("frameError", (frameType, errorCode, streamId) => {
logger(`Frame error: (frameType: ${frameType}, errorCode ${errorCode}, streamId: ${streamId})`);
});
}
/**
* Act as if the last successful apn response was now as a
* placeholder value to simplify subsequent checks.
*
* If we get a burst of requests on a network with limited bandwidth,
* it's possible that APNs may be working properly, but some requests will be
* delayed because of the network.
*
* To handle cases like that, the most recent successful response time is tracked
* when deciding if a connection to APNs is actually dead.
*/
session._lastSuccessfulApnResponseMillis = Date.now();
return session;
};

this.session.on("goaway", (errorCode, lastStreamId, opaqueData) => {
logger(`GOAWAY received: (errorCode ${errorCode}, lastStreamId: ${lastStreamId}, opaqueData: ${opaqueData})`);
// gracefully stop accepting new streams
const session = this.session;
this.session = undefined;
if (session && !session.destroyed) {
session.close(() => {
session.destroy();
});
}
});
/**
* @param {Notification} notification the notification data to send through APNs
* @param {string} device the device token
* @param {number} [count] the number of retries that have occurred so far
* @returns {Promise<{device:string, error?: VError}>} object with device, optional error.
*/
Client.prototype.write = function write (notification, device, count = 0) {
return new Promise((originalResolve) => {
let isResolved = false;
const resolve = (result) => {
isResolved = true;
originalResolve(result);
};

if (logger.enabled) {
this.session.on("connect", () => {
logger("Session connected");
});
this.session.on("close", () => {
logger("Session closed");
});
this.session.on("frameError", (frameType, errorCode, streamId) => {
logger(`Frame error: (frameType: ${frameType}, errorCode ${errorCode}, streamId: ${streamId})`);
});
// Connect session
if (!this.session || this.session.destroyed || this.session._nodeApnIsDestroying) {
logger('creating a new APNs session');
this.session = this._createSession();
}
}

let tokenGeneration = null;
let status = null;
let responseData = "";
let retryCount = count || 0;

const headers = extend({
[HTTP2_HEADER_SCHEME]: "https",
[HTTP2_HEADER_METHOD]: HTTP2_METHOD_POST,
[HTTP2_HEADER_AUTHORITY]: this.config.address,
[HTTP2_HEADER_PATH]: `/3/device/${device}`,
}, notification.headers);

if (this.config.token) {
if (this.config.token.isExpired(3300)) {
this.config.token.regenerate(this.config.token.generation);
let tokenGeneration = null;
let status = null;
let responseData = "";
let retryCount = count || 0;

const headers = extend({
[HTTP2_HEADER_SCHEME]: "https",
[HTTP2_HEADER_METHOD]: HTTP2_METHOD_POST,
[HTTP2_HEADER_AUTHORITY]: this.config.address,
[HTTP2_HEADER_PATH]: `/3/device/${device}`,
}, notification.headers);

if (this.config.token) {
if (this.config.token.isExpired(3300)) {
this.config.token.regenerate(this.config.token.generation);
}
headers.authorization = `bearer ${this.config.token.current}`;
tokenGeneration = this.config.token.generation;
}
headers.authorization = `bearer ${this.config.token.current}`;
tokenGeneration = this.config.token.generation;
}
const currentSession = this.session;

const request = this.session.request(headers)
const request = currentSession.request(headers)
// Timeout in milliseconds
const timeout = this.config.timeout || 10000;

request.setEncoding("utf8");
const timeoutCb = () => {
if (isResolved) {
// Assume that something emitted the 'error' event and this request/session finished.
return;
}
const newEndTime = currentSession._lastSuccessfulApnResponseMillis + timeout;
if (newEndTime >= Date.now()) {
// Postpone this by an arbitrary delay if there were recent successes
// for the current session in case there are a lot of in flight requests to APNs
// and the only reason for the delay was limited bandwidth.
//
// (e.g. sending out hundreds of thousands of notifications at once)
//
// Deliberately not shortening the delay to anywhere near 0
// in case that would lead to a spike in CPU usage if all in-flight requests did that.
const newTimeout = Math.max(timeout / 4, newEndTime - Date.now());
request.setTimeout(newTimeout, timeoutCb);
return;
}
const errorMessage = `Forcibly closing connection to APNs after reaching the request timeout of ${timeout} milliseconds`;
// The first call to resolve will be what the promise resolves to.
resolve({device, error: new VError(errorMessage)});
if (currentSession !== this.session) {
return;
}
if (currentSession.destroyed) {
return;
}
logger(errorMessage);
this._safeCloseSession(currentSession);
};

request.on("response", (headers) => {
status = headers[HTTP2_HEADER_STATUS];
});
request.setTimeout(timeout, timeoutCb);

request.on("data", (data) => {
responseData += data;
});
request.setEncoding("utf8");

request.on("response", (headers) => {
status = headers[HTTP2_HEADER_STATUS];
});

request.on("data", (data) => {
responseData += data;
});

request.write(notification.body);
request.write(notification.body);

return new Promise ( resolve => {
request.on("end", () => {
if (logger.enabled) {
logger(`Request ended with status ${status} and responseData: ${responseData}`);
}
currentSession._lastSuccessfulApnResponseMillis = Date.now();
try {
if (logger.enabled) {
logger(`Request ended with status ${status} and responseData: ${responseData}`);
}

if (status === 200) {
resolve({ device });
} else if (responseData !== "") {
const response = JSON.parse(responseData);

if (status === 403 && response.reason === "ExpiredProviderToken" && retryCount < 2) {
this.config.token.regenerate(tokenGeneration);
resolve(this.write(notification, device, retryCount + 1));
return;
} else if (status === 500 && response.reason === "InternalServerError") {
this.session.destroy();
let error = new VError("Error 500, stream ended unexpectedly");
if (status === 200) {
resolve({ device });
} else if (responseData !== "") {
const response = JSON.parse(responseData);

if (status === 403 && response.reason === "ExpiredProviderToken" && retryCount < 2) {
this.config.token.regenerate(tokenGeneration);
resolve(this.write(notification, device, retryCount + 1));
return;
} else if (status === 500 && response.reason === "InternalServerError") {
let error = new VError("Error 500, stream ended unexpectedly");
resolve({ device, error });

this._safeCloseSession(currentSession);
return;
}

resolve({ device, status, response });
} else {
let error = new VError("stream ended unexpectedly");
resolve({ device, error });
return;
}

resolve({ device, status, response });
} else {
let error = new VError("stream ended unexpectedly");
} catch (e) {
const error = new VError(e, 'Unexpected error processing APNs response');
logger(`Unexpected error processing APNs response: ${e.message}`);
resolve({ device, error });
}
})
});

request.on("error", (error) => {
if (logger.enabled) {
Expand All @@ -159,16 +259,16 @@ module.exports = function (dependencies) {
};

Client.prototype.shutdown = function shutdown(callback) {
logger('Called client.shutdown()');
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}
if (this.session && !this.session.destroyed) {
this.session.close(() => {
this.session.destroy();
if (callback) {
callback();
}
});
if (this.session) {
this._safeCloseSession(this.session, callback);
} else {
if (callback) {
callback();
}
}
};

Expand Down
2 changes: 2 additions & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ module.exports = function(dependencies) {
rejectUnauthorized: true,
connectionRetryLimit: 10,
heartBeat: 60000,
// The timeout for any request to APNs.
timeout: 10000,
};

validateOptions(options);
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@
"node": true
},
"eslintConfig": {
"ecmaVersion": 6,
"env": {
"es6": true,
"es2017": true,
"node": true
}
},
Expand Down
Loading