Skip to content

Commit 7c6bc64

Browse files
authored
fix(stream): premature close when it is paused (#2416)
* Fix premature close * Add test for premature close of the stream when it is paused * Restore package-lock.json
1 parent 35c61d1 commit 7c6bc64

File tree

2 files changed

+14
-5
lines changed

2 files changed

+14
-5
lines changed

lib/commands/query.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class Query extends Command {
3030
this._receivedFieldsCount = 0;
3131
this._resultIndex = 0;
3232
this._localStream = null;
33-
this._unpipeStream = function() {};
33+
this._unpipeStream = function () { };
3434
this._streamFactory = options.infileStreamFactory;
3535
this._connection = null;
3636
}
@@ -155,7 +155,7 @@ class Query extends Command {
155155
const onPause = () => {
156156
this._localStream.pause();
157157
};
158-
const onData = function(data) {
158+
const onData = function (data) {
159159
const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
160160
data.copy(dataWithHeader, 4);
161161
connection.writePacket(
@@ -227,7 +227,7 @@ class Query extends Command {
227227
}
228228

229229
/* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
230-
row(packet, _connection) {
230+
row(packet, _connection) {
231231
if (packet.isEOF()) {
232232
const status = packet.eofStatusFlags();
233233
const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
@@ -279,11 +279,13 @@ class Query extends Command {
279279
});
280280
this.on('end', () => {
281281
stream.push(null); // pushing null, indicating EOF
282-
setImmediate(() => stream.emit('close')); // notify readers that query has completed
283282
});
284283
this.on('fields', fields => {
285284
stream.emit('fields', fields); // replicate old emitter
286285
});
286+
stream.on('end', () => {
287+
stream.emit('close');
288+
});
287289
return stream;
288290
}
289291

@@ -302,7 +304,7 @@ class Query extends Command {
302304
Timers.clearTimeout(this.queryTimeout);
303305
this.queryTimeout = null;
304306
}
305-
307+
306308
const err = new Error('Query inactivity timeout');
307309
err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT';
308310
err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';

test/integration/connection/test-stream.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ let rows;
88
const rows1 = [];
99
const rows2 = [];
1010
const rows3 = [];
11+
const rows4 = [];
1112

1213
connection.query(
1314
[
@@ -65,11 +66,17 @@ connection.execute('SELECT * FROM announcements', async (err, _rows) => {
6566
for await (const row of s3) {
6667
rows3.push(row);
6768
}
69+
const s4 = connection.query('SELECT * FROM announcements').stream();
70+
for await (const row of s4) {
71+
await new Promise(resolve => setTimeout(resolve, 1000));
72+
rows4.push(row);
73+
}
6874
});
6975

7076
process.on('exit', () => {
7177
assert.deepEqual(rows.length, 2);
7278
assert.deepEqual(rows, rows1);
7379
assert.deepEqual(rows, rows2);
7480
assert.deepEqual(rows, rows3);
81+
assert.deepEqual(rows, rows4);
7582
});

0 commit comments

Comments
 (0)