Skip to content

Implement an optional Stage2 interface that allows such stages to be started more flexibly #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

mhagger
Copy link
Member

@mhagger mhagger commented Dec 17, 2023

This is basically one of two variants of this patch series. The other one modifies the Stage interface instead of creating a new Stage2 interface, but would require a major version bump. The two patch series have the first six patches in common and differ only in the last two.

The old Stage interface, and in particular its Start() method, is not ideal. Start() is responsible for creating its own stdout, without knowledge of what will be consuming it.

In practice, there are only two main stages:

  • commandStage ultimately runs a subprocess, which needs an *os.File as both stdin and stdout. The old code created its stdout using cmd.StdoutPipe(), which creates an *os.File.

  • goStage runs a Go function, which is happy with any kind of io.ReadCloser / io.WriteCloser for its stdin and stdout. The old code created its stdout using io.Pipe(), which doesn't return an *os.File.

There are some scenarios where the old behavior was not ideal:

  1. If a goStage was followed by a commandStage, the commandStage would had to consume the non-*os.File stdin that was created by the former. But since an external command requires an *os.File, exec.Cmd had to create an os.Pipe() internally and create an extra goroutine to copy from the io.Reader to the pipe. This is not only wasteful, but also meant that the goStage was not informed when the subprocess terminated or closed its stdin. (For example, the copy goroutine could block waiting to read from the io.Reader.)

  2. If Pipeline.stdout was set to an *os.File and the last stage was a commandStage, then an extra stage was needed to copy the output of the subprocess to Pipeline.stdout, when the subprocess could instead have written directly to the corresponding file descriptor. This was wasteful, and also lead to cases where the subprocess couldn't detect that Pipeline.stdout had been closed.

Problem (1) could have been fixed by changing goStage to always use os.Pipe() to create its stdout pipe. But that would be wasteful if two goStages were adjacent, in which case they could use a cheaper io.Pipe() instead. And it wouldn't solve problem (2) at all.

The problem can only be solved by considering both the producer and the consumer of the stdin and stdout of any stage. If either end is a commandStage, then it is preferable to us os.Pipe(). If both ends are goStages, then it is preferable to use io.Pipe(). And if Pipeline.Stdout is set, the last stage should write directly into it whenever possible.

This PR solves the problem by adding a new interface, Stage2, that can optionally be implemented by a Stage. The new interface includes two new methods,

Preferences() StagePreferences
Start2(
    ctx context.Context, env Env,
stdin io.ReadCloser, stdout io.WriteCloser,
) error

The first indicates what kind of stdin/stdout the stage prefers, and the second starts up the stage with a stdin and stdout that are provided by the caller, rather than letting the stage return its own stdout.

If a stage that implements Stage2 is added to a Pipeline, then Pipeline.Start() uses the first method to figure out what kind of pipes are preferred between this stage and its neighbors, and the second starts the stage with the preferred type of pipe if possible. It also passes Pipeline.stdout into the last stage rather than copying the data an extra time.

All of the stages that are defined in this package now implement both Stage and Stage2, so they get the benefit of this new behavior. Therefore, any callers that create stages in the usual way (using pipe.Command(), pipe.CommandStage(), pipe.Function(), pipe.LinewiseFunction(), etc.) will also get the benefit of the new behavior. For example, the benchmarks BenchmarkMoreDataBuffered and BenchmarkMoreDataUnbuffered (admittedly, worst cases for the old code) are sped up by roughly 2.25x and 6x, respectively:

snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8254	   1384888 ns/op
BenchmarkTenPrograms-20           	    2174	   5454223 ns/op
BenchmarkTenFunctions-20          	   37846	    327601 ns/op
BenchmarkTenMixedStages-20        	    3298	   3548630 ns/op
BenchmarkMoreDataUnbuffered-20    	      28	 400316217 ns/op
BenchmarkMoreDataBuffered-20      	      45	 259220902 ns/op
PASS
ok  	command-line-arguments	76.254s
172.01user 92.35system 1:16.73elapsed 344%CPU (0avgtext+0avgdata 107680maxresident)k
0inputs+7792outputs (42major+3771289minor)pagefaults 0swaps

snare:~/github/proj/go-pipe/git(stage2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8586	   1362019 ns/op
BenchmarkTenPrograms-20           	    2234	   5308280 ns/op
BenchmarkTenFunctions-20          	   43003	    291655 ns/op
BenchmarkTenMixedStages-20        	    3441	   3468454 ns/op
BenchmarkMoreDataUnbuffered-20    	     175	  67083563 ns/op
BenchmarkMoreDataBuffered-20      	     100	 113872376 ns/op
PASS
ok  	command-line-arguments	83.116s
177.30user 143.48system 1:23.54elapsed 383%CPU (0avgtext+0avgdata 114560maxresident)k
0inputs+7808outputs (40major+3921427minor)pagefaults 0swaps

Also, look how much simpler testMemoryLimit() has become without the awkward workaround that was previously required.

Callers that define their own Stage types, on the other hand, will only benefit from the new behavior if they change their stages to also implement Stage2. Even if they don't do that, however, their old stages should continue to work as before.

In terms of backwards compatibility, some applications might notice a difference with the new pipe structure. The difference should usually be an improvement, for example lower resource consumption and less risk of deadlock. It is conceivable that some applications were in some way relying on the delayed completion of pipelines when an io.Pipe was closed, though I'm having trouble imagining scenarios like that in the real world.

The amount of code needed to support backwards compatibility is rather substantial, not to mention that any new Stage types would have to implement both Start() and Start2() to take advantage of the new system. That might be an argument for revving the package's major version number and getting rid of the old interface entirely. Most clients would not need changes because there's not much reason for a client to implement its own Stage type.

/cc @elhmn, @migue, @znull, @spraints as people who might have thoughts on this.

See also #21 for another alternative.

We want a stdin that is a pipe but not an `*os.File`? Just use
`io.Pipe()` to create it.
`WithStdoutCloser()` is a thing now. No need to do it by hand.
Add some benchmarks that move MB-scale data through pipelines
consisting of alternating commands and functions, one in small writes,
and one buffered into larger writes, then processing it one line at a
time. This is not so efficient, because every transition from
`Function` → `Command` requires an extra (hidden) goroutine that
copies the data from an `io.Reader` to a `*os.File`.

We can make this faster!
* Rename
  * `newNopCloser()` → `newReaderNopCloser()`
  * `nopCloser` → `readerNopCloser`
  * `nopCloserWriterTo` → `readerWriterToNopCloser`
  * `nopWriteCloser` → `writerNopCloser`

  to help keep readers and writers straight and because only the
  `Close()` part is a NOP.

* Move `writerNopCloser` to `nop_closer.go` to be with its siblings.
The old `Stage` interface, and in particular its `Start()` method, is
not ideal. `Start()` is responsible for creating its own stdout,
without knowledge of what will be consuming it.

In practice, there are only two main stages:

* `commandStage` ultimately runs a subprocess, which needs an
  `*os.File` as both stdin and stdout. The old code created its stdout
  using `cmd.StdoutPipe()`, which creates an `*os.File`.

* `goStage` runs a Go function, which is happy with any kind of
  `io.ReadCloser` / `io.WriteCloser` for its stdin and stdout. The old
  code created its stdout using `io.Pipe()`, which _doesn't_ return an
  `*os.File`.

There are some scenarios where the old behavior was not ideal:

1. If a `goStage` was followed by a `commandStage`, the `commandStage`
   would had to consume the non-`*os.File` stdin that was created by
   the former. But since an external command requires an `*os.File`,
   `exec.Cmd` had to create an `os.Pipe()` internally and create an
   extra goroutine to copy from the `io.Reader` to the pipe. This is
   not only wasteful, but also meant that the `goStage` was not
   informed when the subprocess terminated or closed its stdin. (For
   example, the copy goroutine could block waiting to read from the
   `io.Reader`.)

2. If `Pipeline.stdout` was set to an `*os.File` and the last stage
   was a `commandStage`, then an extra stage was needed to copy the
   output of the subprocess to `Pipeline.stdout`, when the subprocess
   could instead have written directly to the corresponding file
   descriptor. This was wasteful, and also lead to cases where the
   subprocess couldn't detect that `Pipeline.stdout` had been closed.

Problem (1) could have been fixed by changing `goStage` to always use
`os.Pipe()` to create its stdout pipe. But that would be wasteful if
two `goStage`s were adjacent, in which case they could use a cheaper
`io.Pipe()` instead. And it wouldn't solve problem (2) at all.

The problem can only be solved by considering both the producer _and_
the consumer of the stdin and stdout of any stage. If either end is a
`commandStage`, then it is preferable to us `os.Pipe()`. If both ends
are `goStage`s, then it is preferable to use `io.Pipe()`. And if
`Pipeline.Stdout` is set, the last stage should write directly into it
whenever possible.

This PR solves the problem by adding a new interface, `Stage2`, that
can optionally be implemented by a `Stage`. The new interface includes
two new methods,

    Preferences() StagePreferences
    Start2(
        ctx context.Context, env Env,
	stdin io.ReadCloser, stdout io.WriteCloser,
    ) error

The first indicates what kind of stdin/stdout the stage prefers, and
the second starts up the stage with a `stdin` and `stdout` that are
provided by the caller, rather than letting the stage return its own
stdout.

If a stage that implements `Stage2` is added to a `Pipeline`, then
`Pipeline.Start()` uses the first method to figure out what kind of
pipes are preferred between this stage and its neighbors, and the
second starts the stage with the preferred type of pipe if possible.
It also passes `Pipeline.stdout` into the last stage rather than
copying the data an extra time.

All of the stages that are defined in this package now implement both
`Stage` and `Stage2`, so they get the benefit of this new behavior.
Therefore, any callers that create stages in the usual way (using
`pipe.Command()`, `pipe.CommandStage()`, `pipe.Function()`,
`pipe.LinewiseFunction()`, etc.) will also get the benefit of the new
behavior. For example, the benchmarks `BenchmarkMoreDataBuffered` and
`BenchmarkMoreDataUnbuffered` (admittedly, worst cases for the old
code) are sped up by roughly 2.25x and 6x, respectively:

```
snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8254	   1384888 ns/op
BenchmarkTenPrograms-20           	    2174	   5454223 ns/op
BenchmarkTenFunctions-20          	   37846	    327601 ns/op
BenchmarkTenMixedStages-20        	    3298	   3548630 ns/op
BenchmarkMoreDataUnbuffered-20    	      28	 400316217 ns/op
BenchmarkMoreDataBuffered-20      	      45	 259220902 ns/op
PASS
ok  	command-line-arguments	76.254s
172.01user 92.35system 1:16.73elapsed 344%CPU (0avgtext+0avgdata 107680maxresident)k
0inputs+7792outputs (42major+3771289minor)pagefaults 0swaps

snare:~/github/proj/go-pipe/git(stage2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8586	   1362019 ns/op
BenchmarkTenPrograms-20           	    2234	   5308280 ns/op
BenchmarkTenFunctions-20          	   43003	    291655 ns/op
BenchmarkTenMixedStages-20        	    3441	   3468454 ns/op
BenchmarkMoreDataUnbuffered-20    	     175	  67083563 ns/op
BenchmarkMoreDataBuffered-20      	     100	 113872376 ns/op
PASS
ok  	command-line-arguments	83.116s
177.30user 143.48system 1:23.54elapsed 383%CPU (0avgtext+0avgdata 114560maxresident)k
0inputs+7808outputs (40major+3921427minor)pagefaults 0swaps
```

Also, look how much simpler `testMemoryLimit()` has become without the
awkward workaround that was previously required.

Callers that define their own `Stage` types, on the other hand, will
only benefit from the new behavior if they change their stages to
_also_ implement `Stage2`. Even if they don't do that, however, their
old stages should continue to work as before.

In terms of backwards compatibility, some applications might notice a
difference with the new pipe structure. The difference should usually
be an improvement, for example lower resource consumption and less
risk of deadlock. It is conceivable that some applications were in
some way relying on the delayed completion of pipelines when an
`io.Pipe` was closed, though I'm having trouble imagining scenarios
like that in the real world.

The amount of code needed to support backwards compatibility is rather
substantial, not to mention that any new `Stage` types would have to
implement both `Start()` and `Start2()` to take advantage of the new
system. That might be an argument for revving the package's major
version number and getting rid of the old interface entirely. Most
clients would not need changes because there's not much reason for
a client to implement its own `Stage` type.
The most complicated code dealing with `Stage2` is the selection of
which types of stdin/stderr to pass to stages, and that's also the
main advantage of the `Stage2` interface. So add a bunch of tests that
the correct types (especially, `io.Pipe()` vs. `os.Pipe()`) are indeed
being selected.
// So instead, in this special case, we wrap `stdin` in our
// own `nopCloser`, which behaves like `io.NopCloser`, except
// that `pipe.CommandStage` knows how to unwrap it before
// passing it to `exec.Cmd`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is not enough in all cases as it leaves out the case where the passed stdin is some io.Reader or io.ReadCloser where the caller might never close, e.g. if it comes from a network connection that is kept open from the client side even after the conversation is over (of course ideally it would get closed, but sometimes it's not clear where/who should do this). If we did just have the *net.TCPConn it wouldn't be an issue, but the nature of the Hijack() means we might sometimes have data that's buffered so we can't skip on that.

In these cases we're forced to take the StdinPipe() and write into it from our own goroutine. Even passing one end of a io.Pipe doesn't work here because that stdlib goroutine will never finish (you could pass in a os.Pipe but that's extra cost again).

This is not new here, but it's still something that might require workarounds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that StdinPipe() doesn't do anything more magical than create an os.Pipe() and fire up io.Copy() or its equivalent in a goroutine. So I think that the caller could do the same, and decide itself when to close the write end of the pipe (or not close it at all, if the command doesn't care about EOF).

If the process's stdin needs to combine two sources of data (e.g., an io.MultiReader()), then I think that the copying of the data can't be avoided. How could the extra os.Pipe() be avoided in this case, even in theory?

If you have a suggestion for how things could be simplified or made more general, please let me know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, passing in an os.Pipe() should be the same as getting Cmd.StdinPipe(), I'm not sure if I was thinking both behaved like the io.Pipe() where you do need the extra goroutine. With a "real" pipe, it's passed directly into the child process so that's all fine and there isn't an extra one.

It's still annoying that you have to set this up extra instead of being able to pass in more complex types, but that remains a gotcha from the stdlib rather than this library.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what happened is that I was working against the latest tag, 1.0.2 which does not have the latest changes that know to unwrap things, so with the released code, passing in a is.Pipe() does still show the issue that it blocks forever.

mhagger added 3 commits August 2, 2024 19:51
* `Stage2` → `StageWithIO`
* `Stage2.Start()` → `StageWithIO.StartWithIO()`

Rename some other miscellaneous things analogously.
@mhagger
Copy link
Member Author

mhagger commented Aug 3, 2024

I just pushed some more commits. The only interesting difference is a rename of the new interface from Stage2 to StageWithIO and its Start2() method to StartWithIO().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants