From 35b7e2f08f565fd67718bacfbccd6f8c208f260a Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 17 Mar 2018 17:52:57 +0100 Subject: [PATCH 1/4] net: track bytesWritten in C++ land Move tracking of `socket.bytesWritten` to C++ land. This makes it easier to provide this functionality for all `StreamBase` instances, and in particular should keep working when they have been 'consumed' in C++ in some way (e.g. for the network sockets that are underlying to TLS or HTTP2 streams). Also, this parallels `socket.bytesRead` a lot more now. --- lib/net.js | 23 +++++++++++++++-------- src/env.h | 1 + src/stream_base-inl.h | 28 +++++++++++++++++++++++++++- src/stream_base.cc | 1 + src/stream_base.h | 4 ++++ 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/lib/net.js b/lib/net.js index aa5709981ca3da..96ce990d8c7df8 100644 --- a/lib/net.js +++ b/lib/net.js @@ -206,7 +206,6 @@ function normalizeArgs(args) { // called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { self._undestroy(); - self._bytesDispatched = 0; self._sockname = null; // Handle creation may be deferred to bind() or connect() time. @@ -222,7 +221,8 @@ function initSocketHandle(self) { } -const BYTES_READ = Symbol('bytesRead'); +const kBytesRead = Symbol('kBytesRead'); +const kBytesWritten = Symbol('kBytesWritten'); function Socket(options) { @@ -316,7 +316,8 @@ function Socket(options) { this._server = null; // Used after `.destroy()` - this[BYTES_READ] = 0; + this[kBytesRead] = 0; + this[kBytesWritten] = 0; } util.inherits(Socket, stream.Duplex); @@ -588,8 +589,9 @@ Socket.prototype._destroy = function(exception, cb) { if (this !== process.stderr) debug('close handle'); var isException = exception ? true : false; - // `bytesRead` should be accessible after `.destroy()` - this[BYTES_READ] = this._handle.bytesRead; + // `bytesRead` and `kBytesWritten` should be accessible after `.destroy()` + this[kBytesRead] = this._handle.bytesRead; + this[kBytesWritten] = this._handle.bytesWritten; this._handle.close(() => { debug('emit close'); @@ -689,7 +691,7 @@ function protoGetter(name, callback) { } protoGetter('bytesRead', function bytesRead() { - return this._handle ? this._handle.bytesRead : this[BYTES_READ]; + return this._handle ? this._handle.bytesRead : this[kBytesRead]; }); protoGetter('remoteAddress', function remoteAddress() { @@ -761,8 +763,6 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { // Bail out if handle.write* returned an error if (ret) return ret; - this._bytesDispatched += req.bytes; - if (!req.async) { cb(); return; @@ -782,6 +782,13 @@ Socket.prototype._write = function(data, encoding, cb) { this._writeGeneric(false, data, encoding, cb); }; + +// Legacy alias. Having this is probably being overly cautious, but it doesn't +// really hurt anyone either. This can probably be removed safely if desired. +protoGetter('_bytesDispatched', function _bytesDispatched() { + return this._handle ? this._handle.bytesWritten : this[kBytesWritten]; +}); + protoGetter('bytesWritten', function bytesWritten() { var bytes = this._bytesDispatched; const state = this._writableState; diff --git a/src/env.h b/src/env.h index 19079aa5f0742c..75c530af454572 100644 --- a/src/env.h +++ b/src/env.h @@ -117,6 +117,7 @@ struct PackageConfig { V(bytes_string, "bytes") \ V(bytes_parsed_string, "bytesParsed") \ V(bytes_read_string, "bytesRead") \ + V(bytes_written_string, "bytesWritten") \ V(cached_data_string, "cachedData") \ V(cached_data_produced_string, "cachedDataProduced") \ V(cached_data_rejected_string, "cachedDataRejected") \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index f4c228d7c5956f..35e49dfea2c721 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -193,6 +193,10 @@ inline StreamWriteResult StreamBase::Write( v8::Local req_wrap_obj) { Environment* env = stream_env(); int err; + + for (size_t i = 0; i < count; ++i) + bytes_written_ += bufs[i].len; + if (send_handle == nullptr) { err = DoTryWrite(&bufs, &count); if (err != 0 || count == 0) { @@ -301,6 +305,12 @@ void StreamBase::AddMethods(Environment* env, env->as_external(), signature); + Local get_bytes_written_templ = + FunctionTemplate::New(env->isolate(), + GetBytesWritten, + env->as_external(), + signature); + t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(), get_fd_templ, Local(), @@ -316,6 +326,11 @@ void StreamBase::AddMethods(Environment* env, Local(), attributes); + t->PrototypeTemplate()->SetAccessorProperty(env->bytes_written_string(), + get_bytes_written_templ, + Local(), + attributes); + env->SetProtoMethod(t, "readStart", JSMethod); env->SetProtoMethod(t, "readStop", JSMethod); if ((flags & kFlagNoShutdown) == 0) @@ -357,7 +372,6 @@ void StreamBase::GetFD(const FunctionCallbackInfo& args) { template void StreamBase::GetBytesRead(const FunctionCallbackInfo& args) { - // The handle instance hasn't been set. So no bytes could have been read. Base* handle; ASSIGN_OR_RETURN_UNWRAP(&handle, args.This(), @@ -368,6 +382,18 @@ void StreamBase::GetBytesRead(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(static_cast(wrap->bytes_read_)); } +template +void StreamBase::GetBytesWritten(const FunctionCallbackInfo& args) { + Base* handle; + ASSIGN_OR_RETURN_UNWRAP(&handle, + args.This(), + args.GetReturnValue().Set(0)); + + StreamBase* wrap = static_cast(handle); + // uint64_t -> double. 53bits is enough for all real cases. + args.GetReturnValue().Set(static_cast(wrap->bytes_written_)); +} + template void StreamBase::GetExternal(const FunctionCallbackInfo& args) { Base* handle; diff --git a/src/stream_base.cc b/src/stream_base.cc index 8838a1a6dfb6b3..7b27a48c16f4a4 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -243,6 +243,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { uv_buf_t* bufs = &buf; size_t count = 1; err = DoTryWrite(&bufs, &count); + bytes_written_ += data_size; // Immediate failure or success if (err != 0 || count == 0) { diff --git a/src/stream_base.h b/src/stream_base.h index d5a759bd8ded30..4fe4a8c48c31bc 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -247,6 +247,7 @@ class StreamResource { StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0; + uint64_t bytes_written_ = 0; friend class StreamListener; }; @@ -324,6 +325,9 @@ class StreamBase : public StreamResource { template static void GetBytesRead(const v8::FunctionCallbackInfo& args); + template + static void GetBytesWritten(const v8::FunctionCallbackInfo& args); + template & args)> From 0c5dea867d08324b4d431e10f88550a878d10d13 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 24 Mar 2018 19:23:51 +0100 Subject: [PATCH 2/4] =?UTF-8?q?[squash]=20fix=20windows=20=F0=9F=A4=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/internal/net.js | 2 +- lib/net.js | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/internal/net.js b/lib/internal/net.js index 9c2602b79e9208..78e155e055a820 100644 --- a/lib/internal/net.js +++ b/lib/internal/net.js @@ -32,7 +32,7 @@ function makeSyncWrite(fd) { if (enc !== 'buffer') chunk = Buffer.from(chunk, enc); - this._bytesDispatched += chunk.length; + this._handle.bytesWritten += chunk.length; const ctx = {}; writeBuffer(fd, chunk, 0, chunk.length, null, undefined, ctx); diff --git a/lib/net.js b/lib/net.js index 96ce990d8c7df8..5b460befa49374 100644 --- a/lib/net.js +++ b/lib/net.js @@ -278,6 +278,11 @@ function Socket(options) { this._writev = null; this._write = makeSyncWrite(fd); + // makeSyncWrite adjusts this value like the original handle would, so + // we need to let it do that by turning it into a writable, own property. + Object.defineProperty(this._handle, 'bytesWritten', { + value: 0, writable: true + }); } } else { // these will be set once there is a connection From 0b824ed47b3281482cd75f40500e1bfc379cf18c Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 24 Mar 2018 17:02:34 +0100 Subject: [PATCH 3/4] test: add regression test for large write Fixes: https://github.com/nodejs/node/issues/19562 --- test/parallel/test-net-bytes-written-large.js | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 test/parallel/test-net-bytes-written-large.js diff --git a/test/parallel/test-net-bytes-written-large.js b/test/parallel/test-net-bytes-written-large.js new file mode 100644 index 00000000000000..79a997ec5a38f1 --- /dev/null +++ b/test/parallel/test-net-bytes-written-large.js @@ -0,0 +1,67 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); + +// Regression test for https://github.com/nodejs/node/issues/19562: +// Writing to a socket first tries to push through as much data as possible +// without blocking synchronously, and, if that is not enough, queues more +// data up for asynchronous writing. +// Check that `bytesWritten` accounts for both parts of a write. + +const N = 10000000; +{ + // Variant 1: Write a Buffer. + const server = net.createServer(common.mustCall((socket) => { + socket.end(Buffer.alloc(N), common.mustCall(() => { + assert.strictEqual(socket.bytesWritten, N); + })); + assert.strictEqual(socket.bytesWritten, N); + })).listen(0, common.mustCall(() => { + const client = net.connect(server.address().port); + client.resume(); + client.on('close', common.mustCall(() => { + assert.strictEqual(client.bytesRead, N); + server.close(); + })); + })); +} + +{ + // Variant 2: Write a string. + const server = net.createServer(common.mustCall((socket) => { + socket.end('a'.repeat(N), common.mustCall(() => { + assert.strictEqual(socket.bytesWritten, N); + })); + assert.strictEqual(socket.bytesWritten, N); + })).listen(0, common.mustCall(() => { + const client = net.connect(server.address().port); + client.resume(); + client.on('close', common.mustCall(() => { + assert.strictEqual(client.bytesRead, N); + server.close(); + })); + })); +} + +{ + // Variant 2: writev() with mixed data. + const server = net.createServer(common.mustCall((socket) => { + socket.cork(); + socket.write('a'.repeat(N)); + assert.strictEqual(socket.bytesWritten, N); + socket.write(Buffer.alloc(N)); + assert.strictEqual(socket.bytesWritten, 2 * N); + socket.end('', common.mustCall(() => { + assert.strictEqual(socket.bytesWritten, 2 * N); + })); + socket.uncork(); + })).listen(0, common.mustCall(() => { + const client = net.connect(server.address().port); + client.resume(); + client.on('close', common.mustCall(() => { + assert.strictEqual(client.bytesRead, 2 * N); + server.close(); + })); + })); +} From 6f6f00f81c78257e11bd189395fb805be34b953c Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 24 Mar 2018 17:21:14 +0100 Subject: [PATCH 4/4] src: clean up `req.bytes` tracking Simply always tell the caller how many bytes were written, rather than letting them track it. In the case of writing a string, also keep track of the bytes written by the earlier `DoTryWrite()`. Refs: https://github.com/nodejs/node/issues/19562 --- src/stream_base-inl.h | 8 +++++--- src/stream_base.cc | 23 ++++++++++++----------- src/stream_base.h | 1 + 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 35e49dfea2c721..392dc2c87c3ca3 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -194,13 +194,15 @@ inline StreamWriteResult StreamBase::Write( Environment* env = stream_env(); int err; + size_t total_bytes = 0; for (size_t i = 0; i < count; ++i) - bytes_written_ += bufs[i].len; + total_bytes += bufs[i].len; + bytes_written_ += total_bytes; if (send_handle == nullptr) { err = DoTryWrite(&bufs, &count); if (err != 0 || count == 0) { - return StreamWriteResult { false, err, nullptr }; + return StreamWriteResult { false, err, nullptr, total_bytes }; } } @@ -230,7 +232,7 @@ inline StreamWriteResult StreamBase::Write( ClearError(); } - return StreamWriteResult { async, err, req_wrap }; + return StreamWriteResult { async, err, req_wrap, total_bytes }; } template diff --git a/src/stream_base.cc b/src/stream_base.cc index 7b27a48c16f4a4..263943d2b03420 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -60,12 +60,11 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { inline void SetWriteResultPropertiesOnWrapObject( Environment* env, Local req_wrap_obj, - const StreamWriteResult& res, - size_t bytes) { + const StreamWriteResult& res) { req_wrap_obj->Set( env->context(), env->bytes_string(), - Number::New(env->isolate(), bytes)).FromJust(); + Number::New(env->isolate(), res.bytes)).FromJust(); req_wrap_obj->Set( env->context(), env->async(), @@ -91,7 +90,6 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { MaybeStackBuffer bufs(count); size_t storage_size = 0; - uint32_t bytes = 0; size_t offset; if (!all_buffers) { @@ -123,7 +121,6 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { Local chunk = chunks->Get(i); bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); - bytes += bufs[i].len; } } @@ -140,7 +137,6 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { if (Buffer::HasInstance(chunk)) { bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); - bytes += bufs[i].len; continue; } @@ -160,12 +156,11 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { bufs[i].base = str_storage; bufs[i].len = str_size; offset += str_size; - bytes += str_size; } } StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); - SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res, bytes); + SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); if (res.wrap != nullptr && storage) { res.wrap->SetAllocatedStorage(storage.release(), storage_size); } @@ -193,7 +188,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { if (res.async) req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust(); - SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res, buf.len); + SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); return res.err; } @@ -228,6 +223,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { // Try writing immediately if write size isn't too big char stack_storage[16384]; // 16kb size_t data_size; + size_t synchronously_written = 0; uv_buf_t buf; bool try_write = storage_size <= sizeof(stack_storage) && @@ -243,7 +239,11 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { uv_buf_t* bufs = &buf; size_t count = 1; err = DoTryWrite(&bufs, &count); - bytes_written_ += data_size; + // Keep track of the bytes written here, because we're taking a shortcut + // by using `DoTryWrite()` directly instead of using the utilities + // provided by `Write()`. + synchronously_written = count == 0 ? data_size : data_size - buf.len; + bytes_written_ += synchronously_written; // Immediate failure or success if (err != 0 || count == 0) { @@ -299,8 +299,9 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { } StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); + res.bytes += synchronously_written; - SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res, data_size); + SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); if (res.wrap != nullptr) { res.wrap->SetAllocatedStorage(data.release(), data_size); } diff --git a/src/stream_base.h b/src/stream_base.h index 4fe4a8c48c31bc..dfce7df44a5ebf 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -23,6 +23,7 @@ struct StreamWriteResult { bool async; int err; WriteWrap* wrap; + size_t bytes; };