Skip to content

stream: pipeline should error if any stream is destroyed #36674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ronag opened this issue Dec 29, 2020 · 10 comments
Closed

stream: pipeline should error if any stream is destroyed #36674

ronag opened this issue Dec 29, 2020 · 10 comments
Labels
good first issue Issues that are suitable for first-time contributors. stream Issues and PRs related to the stream subsystem.

Comments

@ronag
Copy link
Member

ronag commented Dec 29, 2020

pipeline should immediately fail with ERR_STREAM_DESTROYED when any of the streams have already been destroyed.

Readable might need a little extra consideration since it's possible to read the data after being destroyed. Should maybe check _readableState.errored and/or _readableState.ended.

Refs: #29227 (comment)

@ronag ronag added stream Issues and PRs related to the stream subsystem. good first issue Issues that are suitable for first-time contributors. labels Dec 29, 2020
@kalenikalexander
Copy link

I would like to work on this problem

@misos1
Copy link

misos1 commented May 19, 2021

I also bumped into this. Http client can easily cause this to http server which is using pipeline (this could cause really bad things):

let { PassThrough, pipeline } = require("stream");
let http = require("http");

let server = http.createServer(async function(req, res)
{
	await new Promise(r => setTimeout(r, 1000));
	console.log("request destroyed", req.destroyed);
	let pass = new PassThrough();
	pipeline(req, pass, e => console.log("pipeline finished", e));
	for await (let chunk of pass) console.log("received", chunk.length);
	console.log("body processed");
	res.end();
});

(async function()
{
	await new Promise(resolve => server.listen(resolve));
	let req = http.request({ port: server.address().port, method: "post" });
	req.on("error", () => null);
	req.write(Buffer.alloc(10000));
	setTimeout(() => req.destroy(), 500);
}());
request destroyed true
received 10000

@meixg
Copy link
Member

meixg commented Feb 15, 2022

Can I continue to implement this? @ronag
I see this pr hasn't been updated for a long time: #36791

@meixg
Copy link
Member

meixg commented Feb 17, 2022

I find that if one stream has been destroyed, the pipeline will call the callback with an ERR_STREAM_PREMATURE_CLOSE now.

I used this test case:

const {
  pipeline,
  PassThrough,
} = require('stream');

{
  const r = new PassThrough();
  const d = new PassThrough();

  d.on('data', (data) => {
    console.log(data);
  });

  r.write('aaa');
  r.destroy();

  // make sure r is destroyed
  process.nextTick(function () {
    pipeline([r, d], (err) => {
      console.log(err);
    });
  });
}

On node version 16.13.2, it will output:

<Buffer 61 61 61>

On the master branch, it will output:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at new NodeError (node:internal/errors:372:5)
    at onclose (node:internal/streams/end-of-stream:139:30)
    at processTicksAndRejections (node:internal/process/task_queues:77:11) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}

It happened in the pipe function, which calls the end-of-stream: https://github.com/nodejs/node/blob/master/lib/internal/streams/end-of-stream.js#L182

let closed = isClosed(stream);

// ...

if (closed) {
  process.nextTick(onclose);
// in onclose
if (writable && !writableFinished) {
  if (!isWritableFinished(stream, false))
    return callback.call(stream,
                         new ERR_STREAM_PREMATURE_CLOSE());
}

Do I understand this issue correctly? Do we still need to throw a Error directly? as in https://github.com/nodejs/node/pull/36791/files#r665904922

@ronag

@guymguym
Copy link
Contributor

@meixg Does this work for you with the promisified version of stream.pipeline too?
We've seen behavior where this end up with an unresolvable promise which broke our async-try-await-catch-finaly code because of this tc39/proposal-async-await#89

@guymguym
Copy link
Contributor

Not sure if this is the same isue, but interestingly I see a difference between how this behaves inside an async function (which seems to fail fast as expected) vs. how this runs in the REPL calling await.

This one works -
$ node -v
v16.13.2
$ node -e '(async () => {
  const pp = util.promisify(stream.pipeline);
  const r = new stream.PassThrough();
  const w = new stream.PassThrough();
  w.on("data", console.log);
  r.write("la la la");
  r.destroy();
  await pp([r,w]);
})().then(
  () => console.log("DONE"),
  err => console.error("FAILED",err)
)'

FAILED Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at new NodeError (node:internal/errors:371:5)
    at PassThrough.onclose (node:internal/streams/end-of-stream:135:30)
    at PassThrough.emit (node:events:402:35)
    at emitCloseNT (node:internal/streams/destroy:138:10)
    at processTicksAndRejections (node:internal/process/task_queues:82:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
But in the REPL it gets stuck like this ...
$ node
Welcome to Node.js v16.13.2.
Type ".help" for more information.
> pp = util.promisify(stream.pipeline);
[Function: pipeline]
> r = new stream.PassThrough();
PassThrough { ... }
> w = new stream.PassThrough();
PassThrough { ... }
> w.on("data", console.log);
PassThrough { ... }
> r.write("la la la");
true
> r.destroy();
PassThrough { ... }
> await pp([r,w]);
<Buffer 6c 61 20 6c 61 20 6c 61>

@@@ this is stuck forever here until I ctrl-c @@@

Uncaught:
Error [ERR_SCRIPT_EXECUTION_INTERRUPTED]: Script execution was interrupted by `SIGINT`
    at __node_internal_captureLargerStackTrace (node:internal/errors:464:5)
    at new NodeError (node:internal/errors:371:5)
    at sigintListener (node:repl:598:27)
    at REPLServer.onSigInt (node:repl:813:9)
    at REPLServer.emit (node:events:390:28)
    at REPLServer.emit (node:domain:475:12)
    at REPLServer.Interface._ttyWrite (node:readline:1081:16)
    at REPLServer.self._ttyWrite (node:repl:984:9)
    at ReadStream.onkeypress (node:readline:288:10)
    at ReadStream.emit (node:events:390:28)
    at ReadStream.emit (node:domain:475:12)
    at emitKeys (node:internal/readline/utils:358:14)
    at emitKeys.next (<anonymous>)
    at ReadStream.onData (node:internal/readline/emitKeypressEvents:61:36)
    at ReadStream.emit (node:events:390:28)
    at ReadStream.emit (node:domain:475:12)
    at addChunk (node:internal/streams/readable:315:12)
    at readableAddChunk (node:internal/streams/readable:289:9)
    at ReadStream.Readable.push (node:internal/streams/readable:228:10)
    at TTY.onStreamRead (node:internal/stream_base_commons:199:23)
    at TTY.callbackTrampoline (node:internal/async_hooks:130:17) {
  code: 'ERR_SCRIPT_EXECUTION_INTERRUPTED'
}

@ronag
Copy link
Member Author

ronag commented Feb 17, 2022

I find that if one stream has been destroyed, the pipeline will call the callback with an ERR_STREAM_PREMATURE_CLOSE now.

I used this test case:

const {
  pipeline,
  PassThrough,
} = require('stream');

{
  const r = new PassThrough();
  const d = new PassThrough();

  d.on('data', (data) => {
    console.log(data);
  });

  r.write('aaa');
  r.destroy();

  // make sure r is destroyed
  process.nextTick(function () {
    pipeline([r, d], (err) => {
      console.log(err);
    });
  });
}

On node version 16.13.2, it will output:

<Buffer 61 61 61>

On the master branch, it will output:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at new NodeError (node:internal/errors:372:5)
    at onclose (node:internal/streams/end-of-stream:139:30)
    at processTicksAndRejections (node:internal/process/task_queues:77:11) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}

It happened in the pipe function, which calls the end-of-stream: https://github.com/nodejs/node/blob/master/lib/internal/streams/end-of-stream.js#L182

let closed = isClosed(stream);

// ...

if (closed) {
  process.nextTick(onclose);
// in onclose
if (writable && !writableFinished) {
  if (!isWritableFinished(stream, false))
    return callback.call(stream,
                         new ERR_STREAM_PREMATURE_CLOSE());
}

Do I understand this issue correctly? Do we still need to throw a Error directly? as in https://github.com/nodejs/node/pull/36791/files#r665904922

@ronag

This looks correct to me. You are destroying the writable before it has ended, hence it will get a premature close.

@ronag
Copy link
Member Author

ronag commented Feb 17, 2022

@guymguym Seems unrelated. Could you maybe open a separate issue?

@meixg
Copy link
Member

meixg commented Feb 17, 2022

This looks correct to me. You are destroying the writable before it has ended, hence it will get a premature close.

Does the test case match the issue here?
If it matches, maybe this issue is resolved?

Or if it doesn't match, what the test case should look like? maybe I can work on that.

@ronag
Copy link
Member Author

ronag commented Feb 17, 2022

I don't think this is a problem anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Issues that are suitable for first-time contributors. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants
@guymguym @ronag @meixg @misos1 @kalenikalexander and others