Skip to content

Commit 71ef973

Browse files
committed
http2: graceful session close when using .close()
This slightly alters the behaviour of session close by using .end() on a session socket instead of .destroy(). This allows the socket to finish transmitting data, receive proper FIN packet and avoid ECONNRESET errors upon graceful close. Now after the session is closed we call ReadStart() on the underlying stream to allow socket to receive the remaining data and FIN packet. Previously only ReadStop() was used therefore blocking the receival of FIN by the socket and 'end' event after .end() call. onStreamClose now directly calls stream.destroy() instead of kMaybeDestroy because the latter will first check that the stream has writableFinished set. And that may not be true as we have just (synchronously) called .end() on the stream if it was not closed and that doesn't give it enough time to finish. Furthermore there is no point in waiting for 'finish' as the other party have already closed the stream and we won't be able to write anyway. Few tests got changed because of this. They weren't correctly handling graceful session close. This includes: * not reading request data (on client side) * not reading push stream data (on client side) * relying on socket.destroy() (on client) to finish server session due to the destroy of the socket without closing the server session. As the goaway itself is *not* a session close. This also led to a few missing 'close' session events. Added appropriate mustCall checks.
1 parent b1e6a9e commit 71ef973

7 files changed

+92
-40
lines changed

lib/internal/http2/core.js

Lines changed: 74 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,10 @@ function onStreamClose(code) {
486486
if (!stream || stream.destroyed)
487487
return false;
488488

489-
debugStreamObj(stream, 'closed with code %d', code);
489+
debugStreamObj(
490+
stream, 'closed with code %d, closed %s, readable %s',
491+
code, stream.closed, stream.readable
492+
);
490493

491494
if (!stream.closed)
492495
closeStream(stream, code, kNoRstStream);
@@ -495,7 +498,7 @@ function onStreamClose(code) {
495498
// Defer destroy we actually emit end.
496499
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
497500
// If errored or ended, we can destroy immediately.
498-
stream[kMaybeDestroy](code);
501+
stream.destroy();
499502
} else {
500503
// Wait for end to destroy.
501504
stream.on('end', stream[kMaybeDestroy]);
@@ -983,24 +986,79 @@ function emitClose(self, error) {
983986
self.emit('close');
984987
}
985988

986-
function finishSessionDestroy(session, error) {
987-
debugSessionObj(session, 'finishSessionDestroy');
988-
989+
function cleanupSession(session) {
989990
const socket = session[kSocket];
990-
if (!socket.destroyed)
991-
socket.destroy(error);
992-
993991
session[kProxySocket] = undefined;
994992
session[kSocket] = undefined;
995993
session[kHandle] = undefined;
996994
session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount);
997-
socket[kSession] = undefined;
998-
socket[kServer] = undefined;
995+
if (socket) {
996+
socket[kSession] = undefined;
997+
socket[kServer] = undefined;
998+
}
999+
}
1000+
1001+
function finishSessionDestroy(session, error) {
1002+
debugSessionObj(session, 'finishSessionDestroy');
1003+
1004+
const handle = session[kHandle];
1005+
const socket = session[kSocket];
1006+
if (handle) handle.ondone = null;
1007+
cleanupSession(session);
1008+
1009+
if (socket && !socket.destroyed)
1010+
socket.destroy(error);
9991011

10001012
// Finally, emit the close and error events (if necessary) on next tick.
10011013
process.nextTick(emitClose, session, error);
10021014
}
10031015

1016+
function finishSessionClose(session) {
1017+
debugSessionObj(session, 'finishSessionClose');
1018+
1019+
const socket = session[kSocket];
1020+
const handle = session[kHandle];
1021+
if (handle) handle.ondone = null;
1022+
cleanupSession(session);
1023+
1024+
if (!socket.destroyed) {
1025+
socket.end(() => emitClose(session));
1026+
} else {
1027+
process.nextTick(emitClose, session);
1028+
}
1029+
}
1030+
1031+
function sessionClose(session, code, error, finishfn) {
1032+
debugSessionObj(session, 'start closing/destroying');
1033+
1034+
const state = session[kState];
1035+
state.flags |= SESSION_FLAGS_DESTROYED;
1036+
state.destroyCode = code;
1037+
1038+
// Clear timeout and remove timeout listeners.
1039+
session.setTimeout(0);
1040+
session.removeAllListeners('timeout');
1041+
1042+
// Destroy any pending and open streams
1043+
if (state.pendingStreams.size > 0 || state.streams.size > 0) {
1044+
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
1045+
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
1046+
state.streams.forEach((stream) => stream.destroy(error));
1047+
}
1048+
1049+
// Disassociate from the socket and server.
1050+
const socket = session[kSocket];
1051+
const handle = session[kHandle];
1052+
1053+
// Destroy the handle if it exists at this point.
1054+
if (handle !== undefined) {
1055+
handle.ondone = finishfn.bind(null, session, error);
1056+
handle.destroy(code, socket.destroyed);
1057+
} else {
1058+
finishfn(session, error);
1059+
}
1060+
}
1061+
10041062
// Upon creation, the Http2Session takes ownership of the socket. The session
10051063
// may not be ready to use immediately if the socket is not yet fully connected.
10061064
// In that case, the Http2Session will wait for the socket to connect. Once
@@ -1325,6 +1383,7 @@ class Http2Session extends EventEmitter {
13251383
destroy(error = NGHTTP2_NO_ERROR, code) {
13261384
if (this.destroyed)
13271385
return;
1386+
13281387
debugSessionObj(this, 'destroying');
13291388

13301389
if (typeof error === 'number') {
@@ -1336,30 +1395,7 @@ class Http2Session extends EventEmitter {
13361395
if (code === undefined && error != null)
13371396
code = NGHTTP2_INTERNAL_ERROR;
13381397

1339-
const state = this[kState];
1340-
state.flags |= SESSION_FLAGS_DESTROYED;
1341-
state.destroyCode = code;
1342-
1343-
// Clear timeout and remove timeout listeners
1344-
this.setTimeout(0);
1345-
this.removeAllListeners('timeout');
1346-
1347-
// Destroy any pending and open streams
1348-
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
1349-
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
1350-
state.streams.forEach((stream) => stream.destroy(error));
1351-
1352-
// Disassociate from the socket and server
1353-
const socket = this[kSocket];
1354-
const handle = this[kHandle];
1355-
1356-
// Destroy the handle if it exists at this point
1357-
if (handle !== undefined) {
1358-
handle.ondone = finishSessionDestroy.bind(null, this, error);
1359-
handle.destroy(code, socket.destroyed);
1360-
} else {
1361-
finishSessionDestroy(this, error);
1362-
}
1398+
sessionClose(this, code, error, finishSessionDestroy);
13631399
}
13641400

13651401
// Closing the session will:
@@ -1405,12 +1441,15 @@ class Http2Session extends EventEmitter {
14051441
const state = this[kState];
14061442
// Do not destroy if we're not closed and there are pending/open streams
14071443
if (!this.closed ||
1444+
this.destroyed ||
14081445
state.streams.size > 0 ||
14091446
state.pendingStreams.size > 0) {
14101447
return;
14111448
}
1449+
sessionClose(this, NGHTTP2_NO_ERROR, null, finishSessionClose);
1450+
} else {
1451+
this.destroy(error);
14121452
}
1413-
this.destroy(error);
14141453
}
14151454

14161455
_onTimeout() {

src/node_http2.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,7 @@ void Http2Session::Close(uint32_t code, bool socket_closed) {
767767
if ((flags_ & SESSION_STATE_WRITE_IN_PROGRESS) == 0) {
768768
Debug(this, "make done session callback");
769769
MakeCallback(env()->ondone_string(), 0, nullptr);
770+
if (stream_ != nullptr) stream_->ReadStart();
770771
}
771772

772773
// If there are outstanding pings, those will need to be canceled, do
@@ -1566,7 +1567,9 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
15661567

15671568
if ((flags_ & SESSION_STATE_READING_STOPPED) &&
15681569
!(flags_ & SESSION_STATE_WRITE_IN_PROGRESS) &&
1569-
nghttp2_session_want_read(session_)) {
1570+
(nghttp2_session_want_read(session_) ||
1571+
(flags_ & SESSION_STATE_CLOSED) != 0)) {
1572+
Debug(this, "OnStreamAfterWrite read start");
15701573
flags_ &= ~SESSION_STATE_READING_STOPPED;
15711574
stream_->ReadStart();
15721575
}

test/parallel/test-http2-capture-rejection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ events.captureRejections = true;
7272
}));
7373
}
7474

75-
7675
{
7776
// Test error thrown in 'request' event
7877

@@ -136,6 +135,7 @@ events.captureRejections = true;
136135
const session = connect(`http://localhost:${port}`);
137136

138137
const req = session.request();
138+
req.resume();
139139

140140
session.on('stream', common.mustCall(async (stream) => {
141141
session.close();

test/parallel/test-http2-compat-client-upload-reject.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => {
2323
res.end();
2424
});
2525
}));
26+
server.on('close', common.mustCall());
2627

2728
server.listen(0, common.mustCall(() => {
2829
const client = http2.connect(`http://localhost:${server.address().port}`);
30+
client.on('close', common.mustCall());
2931

3032
const req = client.request({ ':method': 'POST' });
3133
req.on('response', common.mustCall((headers) => {

test/parallel/test-http2-create-client-connect.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ const URL = url.URL;
3838
const client =
3939
h2.connect.apply(null, i)
4040
.on('connect', common.mustCall(() => maybeClose(client)));
41+
client.on('close', common.mustCall());
4142
});
4243

4344
// Will fail because protocol does not match the server.
44-
h2.connect({ port: port, protocol: 'https:' })
45+
const client = h2.connect({ port: port, protocol: 'https:' })
4546
.on('error', common.mustCall(() => serverClose.dec()));
47+
client.on('close', common.mustCall());
4648
}));
4749
}
4850

test/parallel/test-http2-goaway-opaquedata.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,24 @@ const http2 = require('http2');
88

99
const server = http2.createServer();
1010
const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
11+
let session;
1112

1213
server.on('stream', common.mustCall((stream) => {
13-
stream.session.goaway(0, 0, data);
14+
session = stream.session;
15+
session.on('close', common.mustCall());
16+
session.goaway(0, 0, data);
1417
stream.respond();
1518
stream.end();
1619
}));
20+
server.on('close', common.mustCall());
1721

1822
server.listen(0, () => {
19-
2023
const client = http2.connect(`http://localhost:${server.address().port}`);
2124
client.once('goaway', common.mustCall((code, lastStreamID, buf) => {
2225
assert.deepStrictEqual(code, 0);
2326
assert.deepStrictEqual(lastStreamID, 1);
2427
assert.deepStrictEqual(data, buf);
28+
session.close();
2529
server.close();
2630
}));
2731
const req = client.request();

test/parallel/test-http2-server-push-stream.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => {
5555
assert.strictEqual(headers['x-push-data'], 'pushed by server');
5656
}));
5757
stream.on('aborted', common.mustNotCall());
58+
// We have to read the data of the push stream to end gracefully.
59+
stream.resume();
5860
}));
5961

6062
let data = '';

0 commit comments

Comments
 (0)