From f2c57e819888857b3f657f3efddde8c998e6b221 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Tue, 3 Mar 2020 18:41:14 +0100 Subject: [PATCH] [fix] Call buffered send callbacks on abnormal closure If the socket is closed while data is being compressed, invoke the current send callback and the buffered send callbacks with an error before emitting the `'close'` event. Refs: https://github.com/nodejs/node/pull/30596 --- lib/permessage-deflate.js | 16 +++++--- lib/sender.js | 16 ++++++++ lib/websocket.js | 4 +- test/permessage-deflate.test.js | 12 ++++-- test/websocket.test.js | 67 ++++++++++++++++++++++++++++----- 5 files changed, 94 insertions(+), 21 deletions(-) diff --git a/lib/permessage-deflate.js b/lib/permessage-deflate.js index b9c5ef499..7bb7c98d7 100644 --- a/lib/permessage-deflate.js +++ b/lib/permessage-deflate.js @@ -131,12 +131,18 @@ class PerMessageDeflate { } if (this._deflate) { - if (this._deflate[kCallback]) { - this._deflate[kCallback](); - } + const callback = this._deflate[kCallback]; this._deflate.close(); this._deflate = null; + + if (callback) { + callback( + new Error( + 'The deflate stream was closed while data was being processed' + ) + ); + } } } @@ -314,9 +320,7 @@ class PerMessageDeflate { zlibLimiter.add((done) => { this._compress(data, fin, (err, result) => { done(); - if (err || result) { - callback(err, result); - } + callback(err, result); }); }); } diff --git a/lib/sender.js b/lib/sender.js index a9bcef201..f23042453 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -306,6 +306,22 @@ class Sender { this._deflating = true; perMessageDeflate.compress(data, options.fin, (_, buf) => { + if (this._socket.destroyed) { + const err = new Error( + 'The socket was closed while data was being compressed' + ); + + if (typeof cb === 'function') cb(err); + + for (let i = 0; i < this._queue.length; i++) { + const callback = this._queue[i][4]; + + if (typeof callback === 'function') callback(err); + } + + return; + } + this._deflating = false; options.readOnly = false; this.sendFrame(Sender.frame(buf, options), cb); diff --git a/lib/websocket.js b/lib/websocket.js index c157cfd2c..ca72ad76d 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -179,9 +179,8 @@ class WebSocket extends EventEmitter { * @private */ emitClose() { - this.readyState = WebSocket.CLOSED; - if (!this._socket) { + this.readyState = WebSocket.CLOSED; this.emit('close', this._closeCode, this._closeMessage); return; } @@ -191,6 +190,7 @@ class WebSocket extends EventEmitter { } this._receiver.removeAllListeners(); + this.readyState = WebSocket.CLOSED; this.emit('close', this._closeCode, this._closeMessage); } diff --git a/test/permessage-deflate.test.js b/test/permessage-deflate.test.js index 11f8555af..09681d96f 100644 --- a/test/permessage-deflate.test.js +++ b/test/permessage-deflate.test.js @@ -615,15 +615,19 @@ describe('PerMessageDeflate', () => { }); }); - it("doesn't call the callback if the deflate stream is closed prematurely", (done) => { + it('calls the callback if the deflate stream is closed prematurely', (done) => { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); const buf = Buffer.from('A'.repeat(50)); perMessageDeflate.accept([{}]); - perMessageDeflate.compress(buf, true, () => { - done(new Error('Unexpected callback invocation')); + perMessageDeflate.compress(buf, true, (err) => { + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The deflate stream was closed while data was being processed' + ); + done(); }); - perMessageDeflate._deflate.on('close', done); process.nextTick(() => perMessageDeflate.cleanup()); }); diff --git a/test/websocket.test.js b/test/websocket.test.js index 229b78715..b9275035d 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2341,6 +2341,52 @@ describe('WebSocket', () => { ws.on('message', (message) => ws.send(message, { compress: true })); }); }); + + it('calls the callback if the socket is closed prematurely', (done) => { + const wss = new WebSocket.Server( + { perMessageDeflate: true, port: 0 }, + () => { + const called = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } + }); + + ws.on('open', () => { + ws.send('foo'); + ws.send('bar', (err) => { + called.push(1); + + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while data was being compressed' + ); + }); + ws.send('baz'); + ws.send('qux', (err) => { + called.push(2); + + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while data was being compressed' + ); + }); + }); + + ws.on('close', () => { + assert.deepStrictEqual(called, [1, 2]); + wss.close(done); + }); + } + ); + + wss.on('connection', (ws) => { + ws._socket.end(); + }); + }); }); describe('#terminate', () => { @@ -2356,19 +2402,22 @@ describe('WebSocket', () => { }); ws.on('open', () => { - ws.send('hi', () => - done(new Error('Unexpected callback invocation')) - ); + ws.send('hi', (err) => { + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while data was being compressed' + ); + + ws.on('close', () => { + wss.close(done); + }); + }); ws.terminate(); }); } ); - - wss.on('connection', (ws) => { - ws.on('close', () => { - wss.close(done); - }); - }); }); it('can be used while data is being decompressed', (done) => {