Skip to content

Commit 1eb6b01

Browse files
committed
http2: use native pipe instead of synchronous I/O
This resolves the issue of using synchronous I/O for `respondWithFile()` and `respondWithFD()`, and enables scenarios in which the underlying file does not need to be a regular file. PR-URL: #18936 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 67f1d76 commit 1eb6b01

File tree

4 files changed

+51
-162
lines changed

4 files changed

+51
-162
lines changed

lib/internal/http2/core.js

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@
44

55
require('internal/util').assertCrypto();
66

7+
const { internalBinding } = require('internal/bootstrap_loaders');
78
const { async_id_symbol } = require('internal/async_hooks').symbols;
9+
const { UV_EOF } = process.binding('uv');
810
const http = require('http');
911
const binding = process.binding('http2');
12+
const { FileHandle } = process.binding('fs');
13+
const { StreamPipe } = internalBinding('stream_pipe');
1014
const assert = require('assert');
1115
const { Buffer } = require('buffer');
1216
const EventEmitter = require('events');
@@ -65,6 +69,7 @@ const { onServerStream,
6569
const { utcDate } = require('internal/http');
6670
const { promisify } = require('internal/util');
6771
const { isArrayBufferView } = require('internal/util/types');
72+
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
6873
const { _connectionListener: httpConnectionListener } = require('http');
6974
const { createPromise, promiseResolve } = process.binding('util');
7075
const debug = util.debuglog('http2');
@@ -345,9 +350,7 @@ function onStreamClose(code) {
345350
stream.end();
346351
}
347352

348-
if (state.fd !== undefined)
349-
tryClose(state.fd);
350-
353+
state.fd = -1;
351354
// Defer destroy we actually emit end.
352355
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
353356
// If errored or ended, we can destroy immediately.
@@ -1928,6 +1931,26 @@ function processHeaders(headers) {
19281931
return headers;
19291932
}
19301933

1934+
function onFileCloseError(stream, err) {
1935+
stream.emit(err);
1936+
}
1937+
1938+
function onFileUnpipe() {
1939+
const stream = this.sink[kOwner];
1940+
if (stream.ownsFd)
1941+
this.source.close().catch(onFileCloseError.bind(stream));
1942+
else
1943+
this.source.releaseFD();
1944+
}
1945+
1946+
// This is only called once the pipe has returned back control, so
1947+
// it only has to handle errors and End-of-File.
1948+
function onPipedFileHandleRead(err) {
1949+
if (err < 0 && err !== UV_EOF) {
1950+
this.stream.close(NGHTTP2_INTERNAL_ERROR);
1951+
}
1952+
}
1953+
19311954
function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
19321955
streamOptions = 0) {
19331956
const state = self[kState];
@@ -1940,18 +1963,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
19401963
return;
19411964
}
19421965

1943-
1944-
// Close the writable side of the stream
1966+
// Close the writable side of the stream, but only as far as the writable
1967+
// stream implementation is concerned.
1968+
self._final = null;
19451969
self.end();
19461970

1947-
const ret = self[kHandle].respondFD(fd, headersList,
1948-
offset, length,
1949-
streamOptions);
1971+
const ret = self[kHandle].respond(headersList, streamOptions);
19501972

19511973
if (ret < 0) {
19521974
self.destroy(new NghttpError(ret));
19531975
return;
19541976
}
1977+
1978+
defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe,
1979+
self, fd, offset, length);
1980+
}
1981+
1982+
function startFilePipe(self, fd, offset, length) {
1983+
const handle = new FileHandle(fd, offset, length);
1984+
handle.onread = onPipedFileHandleRead;
1985+
handle.stream = self;
1986+
1987+
const pipe = new StreamPipe(handle._externalStream,
1988+
self[kHandle]._externalStream);
1989+
pipe.onunpipe = onFileUnpipe;
1990+
pipe.start();
1991+
19551992
// exact length of the file doesn't matter here, since the
19561993
// stream is closing anyway - just use 1 to signify that
19571994
// a write does exist
@@ -2270,8 +2307,9 @@ class ServerHttp2Stream extends Http2Stream {
22702307
throw new ERR_INVALID_ARG_TYPE('fd', 'number');
22712308

22722309
debug(`Http2Stream ${this[kID]} [Http2Session ` +
2273-
`${sessionName(session[kType])}]: initiating response`);
2310+
`${sessionName(session[kType])}]: initiating response from fd`);
22742311
this[kUpdateTimer]();
2312+
this.ownsFd = false;
22752313

22762314
headers = processHeaders(headers);
22772315
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
@@ -2333,9 +2371,9 @@ class ServerHttp2Stream extends Http2Stream {
23332371

23342372
const session = this[kSession];
23352373
debug(`Http2Stream ${this[kID]} [Http2Session ` +
2336-
`${sessionName(session[kType])}]: initiating response`);
2374+
`${sessionName(session[kType])}]: initiating response from file`);
23372375
this[kUpdateTimer]();
2338-
2376+
this.ownsFd = true;
23392377

23402378
headers = processHeaders(headers);
23412379
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;

src/node_http2.cc

Lines changed: 0 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1888,28 +1888,6 @@ inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
18881888
}
18891889

18901890

1891-
// Initiate a response that contains data read from a file descriptor.
1892-
inline int Http2Stream::SubmitFile(int fd,
1893-
nghttp2_nv* nva, size_t len,
1894-
int64_t offset,
1895-
int64_t length,
1896-
int options) {
1897-
CHECK(!this->IsDestroyed());
1898-
Http2Scope h2scope(this);
1899-
DEBUG_HTTP2STREAM(this, "submitting file");
1900-
if (options & STREAM_OPTION_GET_TRAILERS)
1901-
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
1902-
1903-
if (offset > 0) fd_offset_ = offset;
1904-
if (length > -1) fd_length_ = length;
1905-
1906-
Http2Stream::Provider::FD prov(this, options, fd);
1907-
int ret = nghttp2_submit_response(session_->session(), id_, nva, len, *prov);
1908-
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
1909-
return ret;
1910-
}
1911-
1912-
19131891
// Submit informational headers for a stream.
19141892
inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
19151893
CHECK(!this->IsDestroyed());
@@ -2085,87 +2063,6 @@ Http2Stream::Provider::~Provider() {
20852063
provider_.source.ptr = nullptr;
20862064
}
20872065

2088-
// The FD Provider pulls data from a file descriptor using libuv. All of the
2089-
// data transfer occurs in C++, without any chunks being passed through JS
2090-
// land.
2091-
Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd)
2092-
: Http2Stream::Provider(stream, options) {
2093-
CHECK(!stream->IsDestroyed());
2094-
provider_.source.fd = fd;
2095-
provider_.read_callback = Http2Stream::Provider::FD::OnRead;
2096-
}
2097-
2098-
Http2Stream::Provider::FD::FD(int options, int fd)
2099-
: Http2Stream::Provider(options) {
2100-
provider_.source.fd = fd;
2101-
provider_.read_callback = Http2Stream::Provider::FD::OnRead;
2102-
}
2103-
2104-
ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle,
2105-
int32_t id,
2106-
uint8_t* buf,
2107-
size_t length,
2108-
uint32_t* flags,
2109-
nghttp2_data_source* source,
2110-
void* user_data) {
2111-
Http2Session* session = static_cast<Http2Session*>(user_data);
2112-
Http2Stream* stream = session->FindStream(id);
2113-
if (stream->statistics_.first_byte_sent == 0)
2114-
stream->statistics_.first_byte_sent = uv_hrtime();
2115-
2116-
DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id);
2117-
CHECK_EQ(id, stream->id());
2118-
2119-
int fd = source->fd;
2120-
int64_t offset = stream->fd_offset_;
2121-
ssize_t numchars = 0;
2122-
2123-
if (stream->fd_length_ >= 0 &&
2124-
stream->fd_length_ < static_cast<int64_t>(length))
2125-
length = stream->fd_length_;
2126-
2127-
uv_buf_t data;
2128-
data.base = reinterpret_cast<char*>(buf);
2129-
data.len = length;
2130-
2131-
uv_fs_t read_req;
2132-
2133-
if (length > 0) {
2134-
// TODO(addaleax): Never use synchronous I/O on the main thread.
2135-
numchars = uv_fs_read(session->event_loop(),
2136-
&read_req,
2137-
fd, &data, 1,
2138-
offset, nullptr);
2139-
uv_fs_req_cleanup(&read_req);
2140-
}
2141-
2142-
// Close the stream with an error if reading fails
2143-
if (numchars < 0)
2144-
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
2145-
2146-
// Update the read offset for the next read
2147-
stream->fd_offset_ += numchars;
2148-
stream->fd_length_ -= numchars;
2149-
2150-
DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars);
2151-
2152-
// if numchars < length, assume that we are done.
2153-
if (static_cast<size_t>(numchars) < length || length <= 0) {
2154-
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
2155-
*flags |= NGHTTP2_DATA_FLAG_EOF;
2156-
session->GetTrailers(stream, flags);
2157-
// If the stream or session gets destroyed during the GetTrailers
2158-
// callback, check that here and close down the stream
2159-
if (stream->IsDestroyed())
2160-
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
2161-
if (session->IsDestroyed())
2162-
return NGHTTP2_ERR_CALLBACK_FAILURE;
2163-
}
2164-
2165-
stream->statistics_.sent_bytes += numchars;
2166-
return numchars;
2167-
}
2168-
21692066
// The Stream Provider pulls data from a linked list of uv_buf_t structs
21702067
// built via the StreamBase API and the Streams js API.
21712068
Http2Stream::Provider::Stream::Stream(int options)
@@ -2508,27 +2405,6 @@ void Http2Stream::Respond(const FunctionCallbackInfo<Value>& args) {
25082405
DEBUG_HTTP2STREAM(stream, "response submitted");
25092406
}
25102407

2511-
// Initiates a response on the Http2Stream using a file descriptor to provide
2512-
// outbound DATA frames.
2513-
void Http2Stream::RespondFD(const FunctionCallbackInfo<Value>& args) {
2514-
Environment* env = Environment::GetCurrent(args);
2515-
Local<Context> context = env->context();
2516-
Isolate* isolate = env->isolate();
2517-
Http2Stream* stream;
2518-
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
2519-
2520-
int fd = args[0]->Int32Value(context).ToChecked();
2521-
Local<Array> headers = args[1].As<Array>();
2522-
2523-
int64_t offset = args[2]->IntegerValue(context).ToChecked();
2524-
int64_t length = args[3]->IntegerValue(context).ToChecked();
2525-
int options = args[4]->IntegerValue(context).ToChecked();
2526-
2527-
Headers list(isolate, context, headers);
2528-
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
2529-
offset, length, options));
2530-
DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd);
2531-
}
25322408

25332409
// Submits informational headers on the Http2Stream
25342410
void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {
@@ -2891,7 +2767,6 @@ void Initialize(Local<Object> target,
28912767
env->SetProtoMethod(stream, "priority", Http2Stream::Priority);
28922768
env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise);
28932769
env->SetProtoMethod(stream, "info", Http2Stream::Info);
2894-
env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD);
28952770
env->SetProtoMethod(stream, "respond", Http2Stream::Respond);
28962771
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
28972772
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);

src/node_http2.h

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -580,13 +580,6 @@ class Http2Stream : public AsyncWrap,
580580
size_t len,
581581
int options);
582582

583-
// Send data read from a file descriptor as the response on this stream.
584-
inline int SubmitFile(int fd,
585-
nghttp2_nv* nva, size_t len,
586-
int64_t offset,
587-
int64_t length,
588-
int options);
589-
590583
// Submit informational headers for this stream
591584
inline int SubmitInfo(nghttp2_nv* nva, size_t len);
592585

@@ -709,7 +702,6 @@ class Http2Stream : public AsyncWrap,
709702
static void PushPromise(const FunctionCallbackInfo<Value>& args);
710703
static void RefreshState(const FunctionCallbackInfo<Value>& args);
711704
static void Info(const FunctionCallbackInfo<Value>& args);
712-
static void RespondFD(const FunctionCallbackInfo<Value>& args);
713705
static void Respond(const FunctionCallbackInfo<Value>& args);
714706
static void RstStream(const FunctionCallbackInfo<Value>& args);
715707

@@ -753,8 +745,6 @@ class Http2Stream : public AsyncWrap,
753745
// waiting to be written out to the socket.
754746
std::queue<nghttp2_stream_write> queue_;
755747
size_t available_outbound_length_ = 0;
756-
int64_t fd_offset_ = 0;
757-
int64_t fd_length_ = -1;
758748

759749
Http2StreamListener stream_listener_;
760750

@@ -780,20 +770,6 @@ class Http2Stream::Provider {
780770
bool empty_ = false;
781771
};
782772

783-
class Http2Stream::Provider::FD : public Http2Stream::Provider {
784-
public:
785-
FD(int options, int fd);
786-
FD(Http2Stream* stream, int options, int fd);
787-
788-
static ssize_t OnRead(nghttp2_session* session,
789-
int32_t id,
790-
uint8_t* buf,
791-
size_t length,
792-
uint32_t* flags,
793-
nghttp2_data_source* source,
794-
void* user_data);
795-
};
796-
797773
class Http2Stream::Provider::Stream : public Http2Stream::Provider {
798774
public:
799775
Stream(Http2Stream* stream, int options);

test/parallel/test-http2-respond-with-fd-errors.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ const tests = specificTests.concat(genericTests);
4646

4747
let currentError;
4848

49-
// mock respondFD because we only care about testing error handling
50-
Http2Stream.prototype.respondFD = () => currentError.ngError;
49+
// mock `respond` because we only care about testing error handling
50+
Http2Stream.prototype.respond = () => currentError.ngError;
5151

5252
const server = http2.createServer();
5353
server.on('stream', common.mustCall((stream, headers) => {

0 commit comments

Comments
 (0)