Skip to content

pipeline + generator - Premature close #33792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
urugator opened this issue Jun 8, 2020 · 7 comments
Open

pipeline + generator - Premature close #33792

urugator opened this issue Jun 8, 2020 · 7 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@urugator
Copy link

urugator commented Jun 8, 2020

Version: 14.4.0
Platform: 64-bit (Windows)
Subsystem: stream

pipeline throws Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close when:

  1. There is a destination function and transformer function throws from for await block .
// Expecting: Error: transformer
// Got: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close 
Stream.pipeline(
  Stream.Readable.from(['a', 'b', 'c']),
  async function* (readable) {    
    for await (const chunk of readable) {
      // If this line is moved before or after the `for await` a correct error is thrown
      throw new Error('transformer');
    }
  },
  // If destination function is removed a correct error is thrown
  async function (readable) {
    let result = '';
    for await (const chunk of readable) {
      result += chunk;
    }
    return result;
  },
  (error, val) => error ? console.error(error) : console.log(val)
)
  1. The destination function throws from for await block
// Expecting: Error: destination
// Got: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close 
Stream.pipeline(
  Stream.Readable.from(['a', 'b', 'c']),
  async function (readable) {
    let result = '';
    for await (const chunk of readable) {
      // If this line is moved before or after the `for await` a correct error is thrown
      throw new Error('destination');	
      result += chunk;      	
    }
    return result;
  },
  (error, val) => error ? console.error(error) : console.log(val)
)
  1. The transformer or destination returns from for await block
// Expecting: Pipeline resolved with the value returned from destination and unfinished streams being silently destroyed
// Got: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close 
Stream.pipeline(
  Stream.Readable.from(['a', 'b', 'c']),
  async function (readable) {
    for await (const chunk of readable) {
      // If this line is moved BEFORE `for await` - callback is NOT called AT ALL (node simply exits if it has nothing else to do)
      return 'foo';
    }
  },
  (error, val) => error ? console.error(error) : console.log(val)
)
@benjamingr
Copy link
Member

@urugator first of all hey 👋 fancy seeing you here.

Second, cc @mcollina @ronag

Third - I'm not sure what the right behavior here is, I think this is because return or throw in a generator calls return on it.

You are iterating a Readable, when you .throw() or return which finishes the steam that finishes the stream.

pipeline also closes the stream (so that you don't have streams "hanging", since you are explicitly closing the readable and it's also implicitly closed by pipeline you are getting ERR_STREAM_PREMATURE_CLOSE

@benjamingr
Copy link
Member

Hmm, weirdly if there is a second readable between the two and it's .returning on an async iterator and not a readable it passes.

This passes

const Stream = require ('stream');

const stream = Stream.pipeline(
    Stream.Readable.from(['a', 'b', 'c']),
    // If destination function is removed a correct error is thrown
    fakeIterator,    
    async function (readable) {
        for await (const chunk of readable) {
          // If this line is moved BEFORE `for await` - callback is NOT called AT ALL (node simply exits if it has nothing else to do)
          return 'foo';
        }
      },
    (error, val) => error ? console.error("errored", error) : console.log("got", val)
  );

  stream.on('data', (chunk) => console.log('got chunk', chunk));

  function fakeIterator(stream) {
      console.log('fakeReadable called');
      let value = 0;
      const asyncIterator = ({
          next() {
              value++;
              console.log('next called', value);
              return { done: value > 10, value }
          },
          return() {
              console.log('return called');
              return { done: true };
          },
          throw() {
              console.log('throw called');
              return { done: true };
          }
      });
      return { [Symbol.asyncIterator]() { return asyncIterator } }
  }

@urugator
Copy link
Author

urugator commented Jun 8, 2020

When iterating manually it also seems to propagate the correct Error: transformer:

Stream.pipeline(
  Stream.Readable.from(['a', 'b', 'c']),
  async function* (readable) {
    const iter = readable[Symbol.asyncIterator]() 
    let chunk = await iter.next();
    throw new Error('transformer');    
  },
  async function (readable) {
    let result = '';
    for await (const chunk of readable) {
      result += chunk;
    }
    return result;
  },
  (error, val) => error ? console.error(error) : console.log(val)
)

I was hoping I could use pipeline to avoid the complexity of streams handling, but the current behavior seems hardly useful and quite confusing/inconsistent.
For illustration, here is the original use case:

function limitStreamLength(maxLength = 0) {
  return async function* limitStreamLength(readable) {
    const length = 0;
    for await (const chunk of readable) {
      length += chunk.length;
      if (length > maxLength) {
        // Throw something the user can catch and react to
        throw new Error('Max length reached');
        // Alternatively don't throw, just truncate the result
        return '';
      }
      yield chunk;
    }
  }
}

async function streamToString(readable) {
  readable.setEncoding('utf8');
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const result = await pipeline(
  Stream.Readable.from(['a', 'b', 'c']),
  limitStreamLength(1),
  streamToString,
)

@mcollina
Copy link
Member

mcollina commented Jun 8, 2020

First of all, there are a lot of layers we are trying to simplify things and this is a new feature. Also, software has bugs.

but the current behavior seems hardly useful and quite confusing/inconsistent

Secondly, I would recommend you to rethink this line of reasoning, mainly because it implies a judgement on your part on why we added this in the first place.

Last but least, I don’t fully understand how this would be changed. Again this is a complex subject with 3 different layers intersecting and somehow it might get timing dependent.

I would consider most of your problems as bugs until we assess how we could improve the situation.

If you think you cane make this feature more useful, please send a PR.

@benjamingr
Copy link
Member

@mcollina I mean, the current behavior in the OP just looks like a use case that wasn't considered with the feature.

@urugator
Copy link
Author

urugator commented Jun 8, 2020

it implies a judgement on your part on why we added this in the first place.

Which is exactly why I am mentioning it, because I am hoping that someone tells me whether my judgment or expectations are wrong. It's completely possible the pipeline isn't intended to be used like this, but I am unable to tell from available sources (docs/issues/google).
I am sorry if it sounds offensive or something, I very much respect all the work that is being put into this.

@mcollina
Copy link
Member

mcollina commented Jun 9, 2020

I mean, the current behavior in the OP just looks like a use case that wasn't considered with the feature.

I think the 3rd case is a proper bug, and the other two are some error cases we should handle somehow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

4 participants