Skip to content

Change the Stage interface to make stdin/stdout handling more flexible #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

mhagger
Copy link
Member

@mhagger mhagger commented Dec 17, 2023

This is basically one of two versions of this patch series. The other one adds an optional second Stage2 interface instead of modifying Stage, thereby remaining backwards compatible and avoiding the need for a major version bump. This version is less awkward and requires less code, but is not entirely backwards compatible, so it would require a major version bump. The two patch series have the first six patches in common and differ only in the last two.

The old Stage interface, and in particular its Start() method, was not ideal. Start() was responsible for creating its own stdout, without knowledge of what will be consuming it.

In practice, there are only two main stages:

  • commandStage ultimately runs a subprocess, which needs an *os.File as both stdin and stdout. The old code created its stdout using cmd.StdoutPipe(), which creates an *os.File.

  • goStage runs a Go function, which should be happy with any kind of io.ReadCloser / io.WriteCloser for its stdin and stdout. The old code created its stdout using io.Pipe(), which doesn't return an *os.File.

There are some scenarios where the old behavior was not ideal:

  1. If a goStage was followed by a commandStage, the commandStage would had to consume the non-*os.File stdin that was created by the former. But since an external command requires an *os.File, exec.Cmd had to create an os.Pipe() internally and create an extra goroutine to copy from the io.Reader to the pipe. This is not only wasteful, but also meant that the goStage was not informed when the subprocess terminated or closed its stdin. (For example, the copy goroutine could block waiting to read from the io.Reader.)

  2. If Pipeline.stdout was set, then an extra stage was always needed to copy from the output of the last stage to Pipeline.stdout. But:

    • If the last stage was a commandStage and Pipeline.stdout was an *os.File, then this copy was unnecessary; the subprocess could instead have written directly to the corresponding file descriptor. This was wasteful, and also lead to cases where the subprocess couldn't detect that Pipeline.stdout had been closed.

    • If the last stage was a goStage, then the copy was also unnecessary; the stage could have written directly to Pipeline.stdout whatever its type.

Problem (1) could have been fixed by changing goStage to always use os.Pipe() to create its stdout pipe. But that would be wasteful if two goStages were adjacent, in which case they could use a cheaper io.Pipe() instead. And it wouldn't solve problem (2) at all.

Both problems can only be solved by considering both the producer and the consumer of the stdin and stdout of any stage. If either end is a commandStage, then it is preferable to us os.Pipe(). If both ends are goStages, then it is preferable to use io.Pipe(). And if Pipeline.Stdout is set, the last stage should write directly into it whenever possible.

This PR solves the problem by changing the Stage interface to add a Preferences() method and change the signature of the Start() method:

Preferences() StagePreferences
Start(
    ctx context.Context, env Env,
stdin io.ReadCloser, stdout io.WriteCloser,
) error

The first indicates what kind of stdin/stdout the stage prefers, and the second starts up the stage with a stdin and stdout that are provided by the caller, rather than letting the stage return its own stdout.

Now, when a stage is added to a Pipeline, then Pipeline.Start() uses the first method to figure out what kind of pipes are preferred between this stage and its neighbors, then the second is called to start the stage with the preferred type of pipe if possible. It also passes Pipeline.stdout into the last stage rather than copying the data an extra time.

Note that this is a backwards-incompatible change, and thus will require a change to v2. Any clients that implement their own Stage will have to change their stage to conform to the new interface.

However, clients that only create stages using the functions in this package (e.g., pipe.Command(), pipe.CommandStage(), pipe.Function(), pipe.LinewiseFunction(), etc.) should continue to work without changes, since those functions' signatures haven't changed. Such clients will get the benefit of the new behavior. For example, the benchmarks BenchmarkMoreDataBuffered and BenchmarkMoreDataUnbuffered (admittedly, worst cases for the old code) are sped up by roughly 2.25x and 6.6x, respectively:

snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8497	   1383275 ns/op
BenchmarkTenPrograms-20           	    2186	   5388075 ns/op
BenchmarkTenFunctions-20          	   37605	    324808 ns/op
BenchmarkTenMixedStages-20        	    3380	   3565218 ns/op
BenchmarkMoreDataUnbuffered-20    	      25	 423838490 ns/op
BenchmarkMoreDataBuffered-20      	      44	 261734773 ns/op
PASS
ok  	command-line-arguments	76.120s
172.91user 91.15system 1:16.56elapsed 344%CPU (0avgtext+0avgdata 114080maxresident)k
0inputs+7768outputs (40major+3819487minor)pagefaults 0swaps

snare:~/github/proj/go-pipe/git(version-2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8458	   1366214 ns/op
BenchmarkTenPrograms-20           	    2233	   5296019 ns/op
BenchmarkTenFunctions-20          	   42453	    289761 ns/op
BenchmarkTenMixedStages-20        	    3398	   3497226 ns/op
BenchmarkMoreDataUnbuffered-20    	     177	  64410211 ns/op
BenchmarkMoreDataBuffered-20      	     100	 115728132 ns/op
PASS
ok  	command-line-arguments	82.751s
175.42user 142.81system 1:23.21elapsed 382%CPU (0avgtext+0avgdata 114080maxresident)k
0inputs+7776outputs (42major+3883888minor)pagefaults 0swaps

Also, look how much simpler testMemoryLimit() has become, since it doesn't need the awkward workaround that was previously required.

In terms of backwards compatibility, some applications might notice a difference with the new pipe structure. The difference should usually be an improvement, for example lower resource consumption and less risk of deadlock. It is conceivable that some applications were in some way relying on the delayed completion of pipelines when an io.Pipe was closed, though I'm having trouble imagining scenarios like that in the real world.

/cc @elhmn, @migue, @znull, @spraints as people who might have thoughts on this.

See also #20 for another alternative.

We want a stdin that is a pipe but not an `*os.File`? Just use
`io.Pipe()` to create it.
`WithStdoutCloser()` is a thing now. No need to do it by hand.
Add some benchmarks that move MB-scale data through pipelines
consisting of alternating commands and functions, one in small writes,
and one buffered into larger writes, then processing it one line at a
time. This is not so efficient, because every transition from
`Function` → `Command` requires an extra (hidden) goroutine that
copies the data from an `io.Reader` to a `*os.File`.

We can make this faster!
* Rename
  * `newNopCloser()` → `newReaderNopCloser()`
  * `nopCloser` → `readerNopCloser`
  * `nopCloserWriterTo` → `readerWriterToNopCloser`
  * `nopWriteCloser` → `writerNopCloser`

  to help keep readers and writers straight and because only the
  `Close()` part is a NOP.

* Move `writerNopCloser` to `nop_closer.go` to be with its siblings.
The old `Stage` interface, and in particular its `Start()` method, was
not ideal. `Start()` was responsible for creating its own stdout,
without knowledge of what will be consuming it.

In practice, there are only two main stages:

* `commandStage` ultimately runs a subprocess, which needs an
  `*os.File` as both stdin and stdout. The old code created its stdout
  using `cmd.StdoutPipe()`, which creates an `*os.File`.

* `goStage` runs a Go function, which should be happy with any kind of
  `io.ReadCloser` / `io.WriteCloser` for its stdin and stdout. The old
  code created its stdout using `io.Pipe()`, which _doesn't_ return an
  `*os.File`.

There are some scenarios where the old behavior was not ideal:

1. If a `goStage` was followed by a `commandStage`, the `commandStage`
   would had to consume the non-`*os.File` stdin that was created by
   the former. But since an external command requires an `*os.File`,
   `exec.Cmd` had to create an `os.Pipe()` internally and create an
   extra goroutine to copy from the `io.Reader` to the pipe. This is
   not only wasteful, but also meant that the `goStage` was not
   informed when the subprocess terminated or closed its stdin. (For
   example, the copy goroutine could block waiting to read from the
   `io.Reader`.)

2. If `Pipeline.stdout` was set, then an extra stage was always needed
   to copy from the output of the last stage to `Pipeline.stdout`. But:

   * If the last stage was a `commandStage` and `Pipeline.stdout` was
     an `*os.File`, then this copy was unnecessary; the subprocess
     could instead have written directly to the corresponding file
     descriptor. This was wasteful, and also lead to cases where the
     subprocess couldn't detect that `Pipeline.stdout` had been
     closed.

   * If the last stage was a `goStage`, then the copy was also
     unnecessary; the stage could have written directly to
     `Pipeline.stdout` whatever its type.

Problem (1) could have been fixed by changing `goStage` to always use
`os.Pipe()` to create its stdout pipe. But that would be wasteful if
two `goStage`s were adjacent, in which case they could use a cheaper
`io.Pipe()` instead. And it wouldn't solve problem (2) at all.

Both problems can only be solved by considering both the producer
_and_ the consumer of the stdin and stdout of any stage. If either end
is a `commandStage`, then it is preferable to us `os.Pipe()`. If both
ends are `goStage`s, then it is preferable to use `io.Pipe()`. And if
`Pipeline.Stdout` is set, the last stage should write directly into it
whenever possible.

This PR solves the problem by changing the `Stage` interface to add a
`Preferences()` method and change the signature of the `Start()`
method:

    Preferences() StagePreferences
    Start(
        ctx context.Context, env Env,
	stdin io.ReadCloser, stdout io.WriteCloser,
    ) error

The first indicates what kind of stdin/stdout the stage prefers, and
the second starts up the stage with a `stdin` and `stdout` that are
provided by the caller, rather than letting the stage return its own
stdout.

Now, when a stage is added to a `Pipeline`, then `Pipeline.Start()`
uses the first method to figure out what kind of pipes are preferred
between this stage and its neighbors, then the second is called to
start the stage with the preferred type of pipe if possible. It also
passes `Pipeline.stdout` into the last stage rather than copying the
data an extra time.

Note that this is a backwards-incompatible change, and thus will
require a change to v2. Any clients that implement their own `Stage`
will have to change their stage to conform to the new interface.

However, clients that only create stages using the functions in this
package (e.g., `pipe.Command()`, `pipe.CommandStage()`,
`pipe.Function()`, `pipe.LinewiseFunction()`, etc.) should continue to
work without changes, since those functions' signatures haven't
changed. Such clients will get the benefit of the new behavior. For
example, the benchmarks `BenchmarkMoreDataBuffered` and
`BenchmarkMoreDataUnbuffered` (admittedly, worst cases for the old
code) are sped up by roughly 2.25x and 6.6x, respectively:

```
snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8497	   1383275 ns/op
BenchmarkTenPrograms-20           	    2186	   5388075 ns/op
BenchmarkTenFunctions-20          	   37605	    324808 ns/op
BenchmarkTenMixedStages-20        	    3380	   3565218 ns/op
BenchmarkMoreDataUnbuffered-20    	      25	 423838490 ns/op
BenchmarkMoreDataBuffered-20      	      44	 261734773 ns/op
PASS
ok  	command-line-arguments	76.120s
172.91user 91.15system 1:16.56elapsed 344%CPU (0avgtext+0avgdata 114080maxresident)k
0inputs+7768outputs (40major+3819487minor)pagefaults 0swaps

snare:~/github/proj/go-pipe/git(version-2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8458	   1366214 ns/op
BenchmarkTenPrograms-20           	    2233	   5296019 ns/op
BenchmarkTenFunctions-20          	   42453	    289761 ns/op
BenchmarkTenMixedStages-20        	    3398	   3497226 ns/op
BenchmarkMoreDataUnbuffered-20    	     177	  64410211 ns/op
BenchmarkMoreDataBuffered-20      	     100	 115728132 ns/op
PASS
ok  	command-line-arguments	82.751s
175.42user 142.81system 1:23.21elapsed 382%CPU (0avgtext+0avgdata 114080maxresident)k
0inputs+7776outputs (42major+3883888minor)pagefaults 0swaps
```

Also, look how much simpler `testMemoryLimit()` has become, since it
doesn't need the awkward workaround that was previously required.

In terms of backwards compatibility, some applications might notice a
difference with the new pipe structure. The difference should usually
be an improvement, for example lower resource consumption and less
risk of deadlock. It is conceivable that some applications were in
some way relying on the delayed completion of pipelines when an
`io.Pipe` was closed, though I'm having trouble imagining scenarios
like that in the real world.
The most complicated code dealing with the change to `Stage.Start()`
is the selection of which types of stdin/stderr to pass to stages, and
that's also the main advantage of the new interface. So add a bunch of
tests that the correct types (especially, `io.Pipe()` vs. `os.Pipe()`)
are indeed being selected.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant