diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 31cbe3e65edfa9..9051e4fe4ad40b 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -414,9 +414,8 @@ TLSSocket.prototype._init = function(socket, wrap) { var ssl = this._handle; // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately - // TODO(indutny): rewise this solution, it might be 1 before handshake and - // represent real writeQueueSize during regular writes. + // immediately. After the handshake is done this will represent the actual + // write queue size ssl.writeQueueSize = 1; this.server = options.server; diff --git a/lib/net.js b/lib/net.js index c6005aeecfac69..56df03c4cab504 100644 --- a/lib/net.js +++ b/lib/net.js @@ -332,6 +332,16 @@ Socket.prototype.setTimeout = function(msecs, callback) { Socket.prototype._onTimeout = function() { + if (this._handle) { + // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is + // an active write in progress, so we suppress the timeout. + const prevWriteQueueSize = this._handle.writeQueueSize; + if (prevWriteQueueSize > 0 && + prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { + this._unrefTimer(); + return; + } + } debug('_onTimeout'); this.emit('timeout'); }; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 602a3642cf7ddb..c30d35265a9fed 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -82,6 +82,7 @@ StreamWrap::StreamWrap(Environment* env, void StreamWrap::AddMethods(Environment* env, v8::Local target, int flags) { + env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize); env->SetProtoMethod(target, "setBlocking", SetBlocking); StreamBase::AddMethods(env, target, flags); } @@ -122,11 +123,14 @@ bool StreamWrap::IsIPCPipe() { } -void StreamWrap::UpdateWriteQueueSize() { +uint32_t StreamWrap::UpdateWriteQueueSize() { HandleScope scope(env()->isolate()); - Local write_queue_size = - Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size); - object()->Set(env()->write_queue_size_string(), write_queue_size); + uint32_t write_queue_size = stream()->write_queue_size; + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; } @@ -267,6 +271,16 @@ void StreamWrap::OnRead(uv_stream_t* handle, } +void StreamWrap::UpdateWriteQueueSize( + const FunctionCallbackInfo& args) { + StreamWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { StreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index b0d9986db50c5d..7fff7f7b25d23e 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -65,13 +65,15 @@ class StreamWrap : public HandleWrap, public StreamBase { } AsyncWrap* GetAsyncWrap() override; - void UpdateWriteQueueSize(); + uint32_t UpdateWriteQueueSize(); static void AddMethods(Environment* env, v8::Local target, int flags = StreamBase::kFlagNone); private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index cab6a75c8c2a3e..55cf490c494b0b 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -283,6 +283,7 @@ void TLSWrap::EncOut() { // No data to write if (BIO_pending(enc_out_) == 0) { + UpdateWriteQueueSize(); if (clear_in_->Length() == 0) InvokeQueued(0); return; @@ -539,6 +540,18 @@ bool TLSWrap::IsClosing() { } +uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) { + HandleScope scope(env()->isolate()); + if (write_queue_size == 0) + write_queue_size = BIO_pending(enc_out_); + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; +} + + int TLSWrap::ReadStart() { return stream_->ReadStart(); } @@ -580,8 +593,12 @@ int TLSWrap::DoWrite(WriteWrap* w, ClearOut(); // However, if there is any data that should be written to the socket, // the callback should not be invoked immediately - if (BIO_pending(enc_out_) == 0) + if (BIO_pending(enc_out_) == 0) { + // net.js expects writeQueueSize to be > 0 if the write isn't + // immediately flushed + UpdateWriteQueueSize(1); return stream_->DoWrite(w, bufs, count, send_handle); + } } // Queue callback to execute it on next tick @@ -635,13 +652,15 @@ int TLSWrap::DoWrite(WriteWrap* w, // Try writing data immediately EncOut(); + UpdateWriteQueueSize(); return 0; } void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { - // Intentionally empty + TLSWrap* wrap = static_cast(ctx); + wrap->UpdateWriteQueueSize(); } @@ -906,6 +925,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB +void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo& args) { + TLSWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void TLSWrap::Initialize(Local target, Local unused, Local context) { @@ -926,6 +954,7 @@ void TLSWrap::Initialize(Local target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "destroySSL", DestroySSL); env->SetProtoMethod(t, "enableCertCb", EnableCertCb); + env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize); StreamBase::AddMethods(env, t, StreamBase::kFlagHasWritev); SSLWrap::AddMethods(env, t); diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 7c70d364bdbd38..ba6f318a47a8c5 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -107,6 +107,7 @@ class TLSWrap : public AsyncWrap, AsyncWrap* GetAsyncWrap() override; bool IsIPCPipe() override; + uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); // Resource implementation static void OnAfterWriteImpl(WriteWrap* w, void* ctx); @@ -163,6 +164,10 @@ class TLSWrap : public AsyncWrap, // If true - delivered EOF to the js-land, either after `close_notify`, or // after the `UV_EOF` on socket. bool eof_; + + private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); }; } // namespace node diff --git a/test/parallel/test-net-timeout-no-handle.js b/test/parallel/test-net-timeout-no-handle.js new file mode 100644 index 00000000000000..539f661cae8414 --- /dev/null +++ b/test/parallel/test-net-timeout-no-handle.js @@ -0,0 +1,17 @@ +'use strict'; + +const common = require('../common'); +const net = require('net'); +const assert = require('assert'); + +const socket = new net.Socket(); +socket.setTimeout(common.platformTimeout(50)); + +socket.on('timeout', common.mustCall(() => { + assert.strictEqual(socket._handle, null); +})); + +socket.on('connect', common.mustNotCall()); + +// since the timeout is unrefed, the code will exit without this +setTimeout(() => {}, common.platformTimeout(200)); diff --git a/test/parallel/test-tls-buffersize.js b/test/parallel/test-tls-buffersize.js new file mode 100644 index 00000000000000..5dd201981c1500 --- /dev/null +++ b/test/parallel/test-tls-buffersize.js @@ -0,0 +1,43 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fs = require('fs'); +const tls = require('tls'); + +const iter = 10; +const overhead = 30; + +const server = tls.createServer({ + key: fs.readFileSync(common.fixturesDir + '/keys/agent2-key.pem'), + cert: fs.readFileSync(common.fixturesDir + '/keys/agent2-cert.pem') +}, common.mustCall((socket) => { + socket.on('readable', () => { + socket.read(); + }, 1); + + socket.on('end', common.mustCall(() => { + server.close(); + })); +})); + +server.listen(0, common.mustCall(() => { + const client = tls.connect({ + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall(() => { + assert.strictEqual(client.bufferSize, 0); + + for (let i = 1; i < iter; i++) { + client.write('a'); + assert.strictEqual(client.bufferSize, i + overhead); + } + + client.on('finish', common.mustCall(() => { + assert.strictEqual(client.bufferSize, 0); + })); + + client.end(); + })); +})); diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js new file mode 100644 index 00000000000000..6cdf5cde59132a --- /dev/null +++ b/test/sequential/test-http-keep-alive-large-write.js @@ -0,0 +1,80 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const minReadSize = 250000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 3000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = http.createServer(common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + http.get({ + path: '/', + port: server.address().port + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', (buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +})); diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js new file mode 100644 index 00000000000000..4a6af445fa4563 --- /dev/null +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -0,0 +1,87 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fs = require('fs'); +const https = require('https'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const minReadSize = 250000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 2000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = https.createServer({ + key: fs.readFileSync(common.fixturesDir + '/keys/agent2-key.pem'), + cert: fs.readFileSync(common.fixturesDir + '/keys/agent2-cert.pem') +}, common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + https.get({ + path: '/', + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', (buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +}));