Skip to content

Commit a607439

Browse files
author
Rick Anderson
authored
pipeline doc - (#14414)
* Fowlers pipeline doc * work * work * work * work * work * work * Delete Pipes.AssemblyInfo.cs * Delete Pipes.AssemblyInfoInputs.cache * Delete project.assets.json * Delete Pipes.csproj.nuget.g.targets * Delete Pipes.csproj.nuget.dgspec.json * Delete Pipes.csproj.nuget.cache * Delete Pipes.csproj.nuget.g.props * work * work * work * work * work * work * work * work * work * work * revert code samples * move includes * some fixes * fix include name * make article name lowercase * fix errors * acrolinx + xref updates * remove code from repo + improvements * minor clean up * description: Shows how to ... * Update docs/standard/io/pipelines.md Co-Authored-By: Maira Wenzel <[email protected]> * Apply suggestions from code review halter73 suggestions Co-Authored-By: Stephen Halter <[email protected]> * Apply suggestions from code review Co-Authored-By: Stephen Halter <[email protected]> * Apply suggestions from code review Co-Authored-By: Stephen Halter <[email protected]> * add halter73 suggestions * add halter73 suggestions * Apply suggestions from code review halter73 suggestions Co-Authored-By: Stephen Halter <[email protected]> * add halter73 suggestions * add halter73 suggestions
1 parent 760a590 commit a607439

File tree

5 files changed

+348
-0
lines changed

5 files changed

+348
-0
lines changed
32 KB
Loading

docs/standard/io/pipelines.md

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
---
2+
title: "I/O pipelines - .NET"
3+
description: Learn how to efficiently use I/O pipelines in .NET and avoid problems in your code.
4+
ms.date: "10/01/2019"
5+
ms.technology: dotnet-standard
6+
helpviewer_keywords:
7+
- "Pipelines"
8+
- "Pipelines I/O"
9+
- "I/O [.NET], Pipelines"
10+
author: rick-anderson
11+
ms.author: riande
12+
---
13+
# System.IO.Pipelines in .NET
14+
15+
<xref:System.IO.Pipelines> is a new library that is designed to make it easier to do high-performance I/O in .NET. It’s a library targeting .NET Standard that works on all .NET implementations.
16+
17+
<a name="solve"></a>
18+
19+
## What problem does System.IO.Pipelines solve
20+
<!-- corner case doesn't MT (machine translate) -->
21+
Apps that parse streaming data are composed of boilerplate code having many specialized and unusual code flows. The boilerplate and special case code is complex and difficult to maintain.
22+
23+
`System.IO.Pipelines` was architected to:
24+
25+
* Have high performance parsing streaming data.
26+
* Reduce code complexity.
27+
28+
The following code is typical for a TCP server that receives line-delimited messages (delimited by `'\n'`) from a client:
29+
30+
```csharp
31+
async Task ProcessLinesAsync(NetworkStream stream)
32+
{
33+
var buffer = new byte[1024];
34+
await stream.ReadAsync(buffer, 0, buffer.Length);
35+
36+
// Process a single line from the buffer
37+
ProcessLine(buffer);
38+
}
39+
```
40+
41+
The preceding code has several problems:
42+
43+
* The entire message (end of line) might not be received in a single call to `ReadAsync`.
44+
* It's ignoring the result of `stream.ReadAsync`. `stream.ReadAsync` returns how much data was read.
45+
* It doesn't handle the case where multiple lines are read in a single `ReadAsync` call.
46+
* It allocates a `byte` array with each read.
47+
48+
To fix the preceding problems, the following changes are required:
49+
50+
* Buffer the incoming data until a new line is found.
51+
* Parse all the lines returned in the buffer.
52+
* It's possible that the line is bigger than 1 KB (1024 bytes). The code needs to resize the input buffer a complete line is found.
53+
54+
* If the buffer is resized, more buffer copies are made as longer lines appear in the input.
55+
* To reduce wasted space, compact the buffer used for reading lines.
56+
57+
* Consider using buffer pooling to avoid allocating memory repeatedly.
58+
* The following code address some of these problems:
59+
60+
[!code-csharp[](~/samples/snippets/csharp/pipelines/ProcessLinesAsync.cs?name=snippet)]
61+
62+
The previous code is complex and doesn't address all the problems identified. High-performance networking usually means writing very complex code to maximize performance. `System.IO.Pipelines` was designed to make writing this type of code easier.
63+
64+
## Pipe
65+
66+
The <xref:System.IO.Pipelines.Pipe> class can be used to create a `PipeWriter/PipeReader` pair. All data written into the `PipeWriter` is available in the `PipeReader`:
67+
68+
[!code-csharp[](~/samples/snippets/csharp/pipelines/Pipe.cs?name=snippet2)]
69+
70+
<a name="pbu"></a>
71+
72+
### Pipe basic usage
73+
74+
[!code-csharp[](~/samples/snippets/csharp/pipelines/Pipe.cs?name=snippet)]
75+
76+
There are two loops:
77+
78+
* `FillPipeAsync` reads from the `Socket` and writes to the `PipeWriter`.
79+
* `ReadPipeAsync` reads from the `PipeReader` and parses incoming lines.
80+
81+
There are no explicit buffers allocated. All buffer management is delegated to the `PipeReader` and `PipeWriter` implementations. Delegating buffer management makes it easier for consuming code to focus solely on the business logic.
82+
83+
In the first loop:
84+
85+
* <xref:System.IO.Pipelines.PipeWriter.GetMemory(System.Int32)?displayProperty=nameWithType> is called to get memory from the underlying writer.
86+
* <xref:System.IO.Pipelines.PipeWriter.Advance(System.Int32)?displayProperty=nameWithType>
87+
is called to tell the `PipeWriter` how much data was written to the buffer.
88+
* <xref:System.IO.Pipelines.PipeWriter.FlushAsync%2A?displayProperty=nameWithType> is called to make the data available to the `PipeReader`.
89+
90+
In the second loop, the `PipeReader` consumes the buffers written by `PipeWriter`. The buffers come from the socket. The call to `PipeReader.ReadAsync`:
91+
92+
* Returns a <xref:System.IO.Pipelines.ReadResult> that contains two important pieces of information:
93+
94+
* The data that was read in the form of `ReadOnlySequence<byte>`.
95+
* A boolean `IsCompleted` that indicates if the end of data (EOF) has been reached.
96+
97+
After finding the end of line (EOL) delimiter and parsing the line:
98+
99+
* The logic processes the buffer to skip what's already processed.
100+
* `PipeReader.AdvanceTo` is called to tell the `PipeReader` how much data has been consumed and examined.
101+
102+
The reader and writer loops end by calling `Complete`. `Complete` lets the underlying Pipe release the memory it allocated.
103+
104+
### Backpressure and flow control
105+
106+
Ideally, reading and parsing work together:
107+
108+
* The writing thread consumes data from the network and puts it in buffers.
109+
* The parsing thread is responsible for constructing the appropriate data structures.
110+
111+
Typically, parsing takes more time than just copying blocks of data from the network:
112+
113+
* The reading thread gets ahead of the parsing thread.
114+
* The reading thread has to either slow down or allocate more memory to store the data for the parsing thread.
115+
116+
For optimal performance, there's a balance between frequent pauses and allocating more memory.
117+
118+
To solve the preceding problem, the `Pipe` has two settings to control the flow of data:
119+
120+
* <xref:System.IO.Pipelines.PipeOptions.PauseWriterThreshold>: Determines how much data should be buffered before calls to <xref:System.IO.Pipelines.PipeWriter.FlushAsync%2A> pause.
121+
* <xref:System.IO.Pipelines.PipeOptions.ResumeWriterThreshold>: Determines how much data the reader has to observe before calls to `PipeWriter.FlushAsync` resume.
122+
123+
![Diagram with ResumeWriterThreshold and PauseWriterThreshold](./media/pipelines/resume-pause.png)
124+
125+
<xref:System.IO.Pipelines.PipeWriter.FlushAsync%2A?displayProperty=nameWithType>:
126+
127+
* Returns an incomplete `ValueTask<FlushResult>` when the amount of data in the `Pipe` crosses `PauseWriterThreshold`.
128+
* Completes `ValueTask<FlushResult>` when it becomes lower than `ResumeWriterThreshold`.
129+
130+
Two values are used to prevent rapid cycling, which can occur if one value is used.
131+
132+
### Examples
133+
134+
```csharp
135+
// The Pipe will start returning incomplete tasks from FlushAsync until
136+
// the reader examines at least 5 bytes.
137+
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
138+
var pipe = new Pipe(options);
139+
```
140+
141+
### PipeScheduler
142+
143+
Typically when using `async` and `await`, asynchronous code resumes on either on a <xref:System.Threading.Tasks.TaskScheduler> or on the current <xref:System.Threading.SynchronizationContext>.
144+
145+
When doing I/O, it's important to have fine-grained control over where the I/O is performed. This control allows taking advantage of CPU caches effectively. Efficient caching is critical for high-performance apps like web servers. <xref:System.IO.Pipelines.PipeScheduler> provides control over where asynchronous callbacks run. By default:
146+
147+
* The current <xref:System.Threading.SynchronizationContext> is used.
148+
* If there's no `SynchronizationContext`, it uses the thread pool to run callbacks.
149+
150+
[!code-csharp[](~/samples/snippets/csharp/pipelines/Program.cs?name=snippet)]
151+
152+
[PipeScheduler.ThreadPool](xref:System.IO.Pipelines.PipeScheduler.ThreadPool) is the <xref:System.IO.Pipelines.PipeScheduler> implementation that queues callbacks to the thread pool. `PipeScheduler.ThreadPool` is the default and generally the best choice. [PipeScheduler.Inline](xref:System.IO.Pipelines.PipeScheduler.Inline) can cause unintended consequences such as deadlocks.
153+
154+
### Pipe reset
155+
156+
It's frequently efficient to reuse the `Pipe` object. To reset the pipe, call <xref:System.IO.Pipelines.PipeReader> <xref:System.IO.Pipelines.Pipe.Reset%2A> when both the `PipeReader` and `PipeWriter` are complete.
157+
158+
## PipeReader
159+
160+
<xref:System.IO.Pipelines.PipeReader> manages memory on the caller's behalf. **Always** call <xref:System.IO.Pipelines.PipeReader.AdvanceTo%2A?displayProperty=nameWithType> after calling <xref:System.IO.Pipelines.PipeReader.ReadAsync%2A?displayProperty=nameWithType>. This lets the `PipeReader` know when the caller is done with the memory so that it can be tracked. The `ReadOnlySequence<byte>` returned from `PipeReader.ReadAsync` is only valid until the call the `PipeReader.AdvanceTo`. It's illegal to use `ReadOnlySequence<byte>` after calling `PipeReader.AdvanceTo`.
161+
162+
`PipeReader.AdvanceTo` takes two <xref:System.SequencePosition> arguments:
163+
164+
* The first argument determines how much memory was consumed.
165+
* The second argument determines how much of the buffer was observed.
166+
167+
Marking data as consumed means that the pipe can return the memory to the underlying buffer pool. Marking data as observed controls what the next call to `PipeReader.ReadAsync` does. Marking everything as observed means that the next call to `PipeReader.ReadAsync` won't return until there's more data written to the pipe. Any other value will make the next call to `PipeReader.ReadAsync` return immediately with the observed *and* unobserved data, but data that has already been consumed.
168+
169+
### Read streaming data scenarios
170+
171+
There are a couple of typical patterns that emerge when trying to read streaming data:
172+
173+
* Given a stream of data, parse a single message.
174+
* Given a stream of data, parse all available messages.
175+
176+
The following examples use the `TryParseMessage` method for parsing messages from a `ReadOnlySequence<byte>`. `TryParseMessage` parses a single message and update the input buffer to trim the parsed message from the buffer. `TryParseMessage` is not part of .NET, it's a user written method used in the following sections.
177+
178+
```csharp
179+
bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);
180+
```
181+
182+
### Read a single message
183+
184+
The following code reads a single message from a `PipeReader` and returns it to the caller.
185+
186+
[!code-csharp[ReadSingleMsg](~/samples/snippets/csharp/pipelines/ReadSingleMsg.cs?name=snippet)]
187+
188+
The preceding code:
189+
190+
* Parses a single message.
191+
* Updates the consumed `SequencePosition` and examined `SequencePosition` to point to the start of the trimmed input buffer.
192+
193+
The two `SequencePosition` arguments are updated because `TryParseMessage` removes the parsed message from the input buffer. Generally, when parsing a single message from the buffer, the examined position should be one of the following:
194+
195+
* The end of the message.
196+
* The end of the received buffer if no message was found.
197+
198+
The single message case has the most potential for errors. Passing the wrong values to *examined* can result in an out of memory exception or an infinite loop. For more information, see the [PipeReader common problems](#gotchas) section in this article.
199+
200+
### Reading multiple messages
201+
202+
The following code reads all messages from a `PipeReader` and calls `ProcessMessageAsync` on each.
203+
204+
[!code-csharp[MyConnection1](~/samples/snippets/csharp/pipelines/MyConnection1.cs?name=snippet)]
205+
206+
### Cancellation
207+
208+
`PipeReader.ReadAsync`:
209+
210+
* Supports passing a <xref:System.Threading.CancellationToken>.
211+
* Throws an <xref:System.OperationCanceledException> if the `CancellationToken` is canceled while there's a read pending.
212+
* Supports a way to cancel the current read operation via <xref:System.IO.Pipelines.PipeReader.CancelPendingRead%2A?displayProperty=nameWithType>, which avoids raising an exception. Calling `PipeReader.CancelPendingRead` causes the current or next call to `PipeReader.ReadAsync` to return a <xref:System.IO.Pipelines.ReadResult> with `IsCanceled` set to `true`. This can be useful for halting the existing read loop in a non-destructive and non-exceptional way.
213+
214+
[!code-csharp[MyConnection](~/samples/snippets/csharp/pipelines/MyConnection.cs?name=snippet)]
215+
216+
<a name="gotchas"></a>
217+
218+
### PipeReader common problems
219+
220+
* Passing the wrong values to `consumed` or `examined` may result in reading already read data.
221+
* Passing `buffer.End` as examined may result in:
222+
223+
* Stalled data
224+
* Possibly an eventual Out of Memory (OOM) exception if data isn't consumed. For example, `PipeReader.AdvanceTo(position, buffer.End)` when processing a single message at a time from the buffer.
225+
226+
* Passing the wrong values to `consumed` or `examined` may result in an infinite loop. For example, `PipeReader.AdvanceTo(buffer.Start)` if `buffer.Start` hasn't changed will cause the next call to `PipeReader.ReadAsync` to return immediately before new data arrives.
227+
* Passing the wrong values to `consumed` or `examined` may result in infinite buffering (eventual OOM).
228+
* Using the `ReadOnlySequence<byte>` after calling `PipeReader.AdvanceTo` may result in memory corruption (use after free).
229+
* Failing to call `PipeReader.Complete/CompleteAsync` may result in a memory leak.
230+
* Checking <xref:System.IO.Pipelines.ReadResult.IsCompleted?displayProperty=nameWithType> and exiting the reading logic before processing the buffer results in data loss. The loop exit condition should be based on `ReadResult.Buffer.IsEmpty` and `ReadResult.IsCompleted`. Doing this incorrectly could result in an infinite loop.
231+
232+
#### Problematic code
233+
234+
**Data loss**
235+
236+
The `ReadResult` can return the final segment of data when `IsCompleted` is set to `true`. Not reading that data before exiting the read loop will result in data loss.
237+
238+
[!INCLUDE [pipelines-do-not-use-1](../../../includes/pipelines-do-not-use-1.md)]
239+
240+
[!code-csharp[DoNotUse#1](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippet)]
241+
242+
[!INCLUDE [pipelines-do-not-use-2](../../../includes/pipelines-do-not-use-2.md)]
243+
244+
**Infinite loop**
245+
246+
The following logic may result in an infinite loop if the `Result.IsCompleted` is `true` but there's never a complete message in the buffer.
247+
248+
[!INCLUDE [pipelines-do-not-use-1](../../../includes/pipelines-do-not-use-1.md)]
249+
250+
[!code-csharp[DoNotUse#2](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippet2)]
251+
252+
[!INCLUDE [pipelines-do-not-use-2](../../../includes/pipelines-do-not-use-2.md)]
253+
254+
Here's another piece of code with the same problem. It's checking for a non-empty buffer before checking `ReadResult.IsCompleted`. Because it's in an `else if`, it will loop forever if there's never a complete message in the buffer.
255+
256+
[!INCLUDE [pipelines-do-not-use-1](../../../includes/pipelines-do-not-use-1.md)]
257+
258+
[!code-csharp[DoNotUse#3](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippet3)]
259+
260+
[!INCLUDE [pipelines-do-not-use-2](../../../includes/pipelines-do-not-use-2.md)]
261+
262+
**Unexpected Hang**
263+
264+
Unconditionally calling `PipeReader.AdvanceTo` with `buffer.End` in the `examined` position may result in hangs when parsing a single message. The next call to `PipeReader.AdvanceTo` won't return until:
265+
266+
* There's more data written to the pipe.
267+
* And the new data wasn't previously examined.
268+
269+
[!INCLUDE [pipelines-do-not-use-1](../../../includes/pipelines-do-not-use-1.md)]
270+
271+
[!code-csharp[DoNotUse#4](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippet4)]
272+
273+
[!INCLUDE [pipelines-do-not-use-2](../../../includes/pipelines-do-not-use-2.md)]
274+
275+
**Out of Memory (OOM)**
276+
277+
With the following conditions, the following code keeps buffering until an <xref:System.OutOfMemoryException> occurs:
278+
279+
* There's no maximum message size.
280+
* The data returned from the `PipeReader` doesn't make a complete message. For example, it doesn't make a complete message because the other side is writing a large message (For example, a 4-GB message).
281+
282+
[!INCLUDE [pipelines-do-not-use-1](../../../includes/pipelines-do-not-use-1.md)]
283+
284+
[!code-csharp[DoNotUse#5](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippet5)]
285+
286+
[!INCLUDE [pipelines-do-not-use-2](../../../includes/pipelines-do-not-use-2.md)]
287+
288+
**Memory Corruption**
289+
290+
When writing helpers that read the buffer, any returned payload should be copied before calling `Advance`. The following example will return memory that the `Pipe` has discarded and may reuse it for the next operation (read/write).
291+
292+
[!INCLUDE [pipelines-do-not-use-1](../../../includes/pipelines-do-not-use-1.md)]
293+
294+
[!code-csharp[DoNotUse#Message](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippetMessage)]
295+
296+
[!code-csharp[DoNotUse#6](~/samples/snippets/csharp/pipelines/DoNotUse.cs?name=snippet6)]
297+
298+
[!INCLUDE [pipelines-do-not-use-2](../../../includes/pipelines-do-not-use-2.md)]
299+
300+
## PipeWriter
301+
302+
The <xref:System.IO.Pipelines.PipeWriter> manages buffers for writing on the caller's behalf. `PipeWriter` implements [`IBufferWriter<byte>`](xref:System.Buffers.IBufferWriter`1). `IBufferWriter<byte>` makes it possible to get access to buffers to perform writes without additional buffer copies.
303+
304+
[!code-csharp[MyPipeWriter](~/samples/snippets/csharp/pipelines/MyPipeWriter.cs?name=snippet)]
305+
306+
The previous code:
307+
308+
* Requests a buffer of at least 5 bytes from the `PipeWriter` using <xref:System.IO.Pipelines.PipeWriter.GetSpan%2A>.
309+
* Writes bytes for the ASCII string `"Hello"` to the returned `Span<byte>`.
310+
* Calls <xref:System.IO.Pipelines.PipeWriter.Advance%2A> to indicate how many bytes were written to the buffer.
311+
* Flushes the `PipeWriter`, which sends the bytes to the underlying device.
312+
313+
The previous method of writing uses the buffers provided by the `PipeWriter`. Alternatively, <xref:System.IO.Pipelines.PipeWriter.WriteAsync%2A?displayProperty=nameWithType>:
314+
315+
* Copies the existing buffer to the `PipeWriter`.
316+
* Calls `GetSpan`, `Advance` as appropriate and calls <xref:System.IO.Pipelines.PipeWriter.FlushAsync%2A>.
317+
318+
[!code-csharp[MyPipeWriter#2](~/samples/snippets/csharp/pipelines/MyPipeWriter.cs?name=snippet2)]
319+
320+
### Cancellation
321+
322+
<xref:System.IO.Pipelines.PipeWriter.FlushAsync%2A> supports passing a <xref:System.Threading.CancellationToken>. Passing a `CancellationToken` results in an `OperationCanceledException` if the token is canceled while there's a flush pending. `PipeWriter.FlushAsync` supports a way to cancel the current flush operation via <xref:System.IO.Pipelines.PipeWriter.CancelPendingFlush%2A?displayProperty=nameWithType> without raising an exception. Calling `PipeWriter.CancelPendingFlush` causes the current or next call to `PipeWriter.FlushAsync` or `PipeWriter.WriteAsync` to return a <xref:System.IO.Pipelines.FlushResult> with `IsCanceled` set to `true`. This can be useful for halting the yielding flush in a non-destructive and non-exceptional way.
323+
324+
<a name="pwcp"></a>
325+
326+
### PipeWriter common problems
327+
328+
* <xref:System.IO.Pipelines.PipeWriter.GetSpan%2A> and <xref:System.IO.Pipelines.PipeWriter.GetMemory%2A> return a buffer with at least the requested amount of memory. **Don't** assume exact buffer sizes.
329+
* There's no guarantee that successive calls will return the same buffer or the same-sized buffer.
330+
* A new buffer must be requested after calling <xref:System.IO.Pipelines.PipeWriter.Advance%2A> to continue writing more data. The previously acquired buffer can't be written to.
331+
* Calling `GetMemory` or `GetSpan` while there's an incomplete call to `FlushAsync` isn't safe.
332+
* Calling `Complete` or `CompleteAsync` while there's unflushed data can result in memory corruption.
333+
334+
## IDuplexPipe
335+
336+
The <xref:System.IO.Pipelines.IDuplexPipe> is a contract for types that support both reading and writing. For example, a network connection would be represented by an `IDuplexPipe`.
337+
338+
Unlike `Pipe` which contains a `PipeReader` and a `PipeWriter`, `IDuplexPipe` represents a single side of a full duplex connection. That means what is written to the `PipeWriter` will not be read from the `PipeReader`.
339+
340+
## Streams
341+
342+
When reading or writing stream data, you typically read data using a de-serializer and write data using a serializer. Most of these read and write stream APIs have a `Stream` parameter. To make it easier to integrate with these existing APIs, `PipeReader` and `PipeWriter` expose an <xref:System.IO.Pipelines.PipeReader.AsStream%2A>. <xref:System.IO.Pipelines.PipeWriter.AsStream%2A> returns a `Stream` implementation around the `PipeReader` or `PipeWriter`.

0 commit comments

Comments
 (0)