Skip to content

Doc on Pipelines types #11347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/standard/memory-and-spans/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ms.technology: dotnet-standard
helpviewer_keywords:
- "Memory<T>"
- "Span<T>"
- buffers"
- "buffers"
- "pipeline processing"
author: "rpetrusha"
ms.author: "ronpet"
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
176 changes: 176 additions & 0 deletions docs/standard/memory-and-spans/media/pipelines/source/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
<!DOCTYPE html>
<html>
<head>
<title>Pipelines</title>
<script src="svg.min.js" src-cdn="https://cdnjs.cloudflare.com/ajax/libs/svg.js/2.6.4/svg.min.js" type="text/javascript"></script>
</head>
<body>
<div id="drawing" style="background-color: white;"></div>
<script type="text/javascript">
var strokeWidth = 4;
var fontSize = 16;

function getUser(draw) {
// https://commons.wikimedia.org/wiki/File:Linecons_user-avatar.svg
var user = draw.symbol().path(
"M28.921,26.344c-0.188-0.124-3.577-2.347-8.621-3.438c1.85-2.345,3.038-5.496,3.543-7.595" +
" c0.701-2.909,0.429-8.599-2.364-12.145C19.846,1.094,17.562,0,14.874,0C12.185,0,9.901,1.095,8.27,3.167" +
" c-2.793,3.545-3.064,9.235-2.364,12.144c0.505,2.098,1.692,5.25,3.543,7.595c-5.044,1.091-8.434,3.314-8.62,3.438" +
" c-0.681,0.454-0.986,1.303-0.748,2.084c0.236,0.786,0.96,1.323,1.779,1.323h26.031c0.819,0,1.541-0.537,1.779-1.321" +
" C29.907,27.645,29.602,26.797,28.921,26.344z M7.587,9.992c0.507-3.978,2.786-8.133,7.286-8.133s6.779,4.156,7.287,8.134" +
" c0.551,4.142-0.729,8.477-3.32,11.76l-0.303,0.386c-2.218,2.547-5.109,2.547-7.325,0l-0.304-0.386" +
" C8.319,18.473,7.034,14.132,7.587,9.992z M1.857,27.891c0.128-0.086,3.278-2.15,7.982-3.168l2.309-0.498" +
" c0.825,0.541,1.729,0.879,2.724,0.879c0.996,0,1.898-0.338,2.724-0.879l2.31,0.498c4.667,1.01,7.806,3.053,7.981,3.168H1.857z")
.translate(45, 45).scale(4);
return user;
}

function getCloud(draw) {
// https://commons.wikimedia.org/wiki/File:Linecons_small-cloud.svg
var cloud = draw.symbol().path(
"M25.913,12.642C25.476,8.08,21.677,4.5,16.999,4.5c-3.619,0-6.717,2.148-8.146,5.23" +
" C8.423,9.594,7.975,9.5,7.5,9.5C5.015,9.5,3,11.516,3,14c0,0.494,0.099,0.961,0.246,1.404C1.313,16.531,0,18.599,0,20.998" +
" c0,3.59,2.91,6.5,6.5,6.5V27.5h17.999v-0.002c4.143,0,7.5-3.357,7.5-7.5C31.999,16.341,29.378,13.305,25.913,12.642z" +
" M24.499,25.497V25.5H6.5C4.019,25.497,2,23.479,2,20.998c0-1.6,0.843-3.045,2.254-3.867c1.371-0.787,1.465-0.943,0.89-2.357" +
" C5.047,14.483,5,14.23,5,14.001c0-1.379,1.121-2.5,2.5-2.5c0,0,0.656-0.047,1.353,0.23c1.147,0.457,1.272,0.02,1.814-1.159" +
" C11.814,8.099,14.299,6.5,16.999,6.5c3.6,0,6.576,2.723,6.924,6.334c0.123,1.432,0.123,1.432,1.615,1.773" +
" c2.586,0.494,4.461,2.762,4.461,5.391C29.999,23.031,27.532,25.497,24.499,25.497z")
.translate(48, 30).scale(4);
return cloud;
}

function getArrowLeftToRight(draw, color) {
return draw.symbol().polygon("0,15 200,15 200,0 400,25 200,50 200,35 0,35").fill("none")
.stroke({ color, width: strokeWidth });
}

function getPipeLeftToRight(draw, color, suffix) {
if (suffix == null) suffix = "";
var arrowLeftToRight = getArrowLeftToRight(draw, color).translate(strokeWidth / 2, strokeWidth / 2 + 1);
var pipeLeftToRight = draw.symbol();
pipeLeftToRight.use(arrowLeftToRight);
pipeLeftToRight.plain("Pipe" + suffix + ".Writer").fill(color).font({ size: fontSize }).translate(0 + 40, 12);
pipeLeftToRight.plain("Pipe" + suffix + ".Reader").fill(color).font({ size: fontSize }).translate(349 - 40, 12);
return pipeLeftToRight;
}

function getPipeRightToLeft(draw, color, suffix) {
if (suffix == null) suffix = "";
var arrowRightToLeft = getArrowLeftToRight(draw, color).flip("x").translate(400, strokeWidth / 2 + 1);
var pipeRightToLeft = draw.symbol();
pipeRightToLeft.use(arrowRightToLeft);
pipeRightToLeft.plain("Pipe" + suffix + ".Reader").fill(color).font({ size: fontSize }).translate(-1 + 40, 55.5 + strokeWidth - 4);
pipeRightToLeft.plain("Pipe" + suffix + ".Writer").fill(color).font({ size: fontSize }).translate(359 - 40, 55.5 + strokeWidth - 4);
return pipeRightToLeft;
}

function getPipe(draw, color, party1, party2) {
var pipe = draw.symbol();
pipe.plain(party1 + ".Send()").fill("darkslategray")
.font({ size: fontSize, anchor: "end" }).translate(120, 12);
pipe.use(getPipeLeftToRight(draw, color)).translate(120, 0);
pipe.plain(party2 + ".Receive()").fill("darkslategray")
.font({ size: fontSize }).translate(120 + 402, 12);
return pipe;
}

function getEndPoint(draw, color) {
return draw.symbol().ellipse(50, 150).fill("none").stroke({ color, width: strokeWidth })
.translate(strokeWidth / 2, strokeWidth / 2);
}

function getDuplexPipeLeft(draw, color, prefix) {
if (prefix == null) prefix = "";
else prefix += ".";
var duplexPipeLeft = draw.symbol();
duplexPipeLeft.use(getEndPoint(draw, color)).translate(110 + 40, 0);
duplexPipeLeft.plain(prefix + "Output").fill(color).font({ size: fontSize, anchor: "end" })
.translate(110 + 40, 21 + 12);
duplexPipeLeft.plain(prefix + "Input").fill(color).font({ size: fontSize, anchor: "end" })
.translate(110 + 40, 21 + (55.5 + strokeWidth - 4) * 2);
return duplexPipeLeft;
}

function getDuplexPipeRight(draw, color, prefix) {
if (prefix == null) prefix = "";
else prefix += ".";
var duplexPipeRight = draw.symbol();
duplexPipeRight.use(getEndPoint(draw, color));
duplexPipeRight.plain(prefix + "Input").fill(color).font({ size: fontSize }).translate(50, 21 + 12);
duplexPipeRight.plain(prefix + "Output").fill(color).font({ size: fontSize })
.translate(51, 21 + (55.5 + strokeWidth - 4) * 2);
return duplexPipeRight;
}

function getDuplexPipePair(draw, duplexPipe1, duplexPipe2, party1, party2, instance) {
if (instance == null) instance = "";
var duplexPipePair = draw.symbol();
duplexPipePair.use(getDuplexPipeLeft(draw, "blue", duplexPipe1 + "")).translate(170, 0);
duplexPipePair.use(getDuplexPipeRight(draw, "purple", duplexPipe2 + "")).translate(170 + 110 + 443, 0);
duplexPipePair.use(getPipeLeftToRight(draw, "red", instance + "X")).translate(170 + 110 + 68, 21);
duplexPipePair.use(getPipeRightToLeft(draw, "green", instance + "Y")).translate(170 + 110 + 68, 77);
duplexPipePair.plain(party1 + instance + "1" + ".Send()").fill("darkslategray")
.font({ size: fontSize, anchor: "end" }).translate(160 + 15, 21 + 12);
duplexPipePair.plain(party1 + instance + "1" + ".Receive()").fill("darkslategray")
.font({ size: fontSize, anchor: "end" }).translate(160 + 15, 21 + (55.5 + strokeWidth - 4) * 2);
duplexPipePair.plain(party2 + instance + "2" + ".Receive()").fill("darkslategray")
.font({ size: fontSize }).translate(170 + 119 + 531 + 128 - 15, 21 + 12);
duplexPipePair.plain(party2 + instance + "2" + ".Send()").fill("darkslategray")
.font({ size: fontSize }).translate(170 + 119 + 531 + 128 - 15, 21 + (55.5 + strokeWidth - 4) * 2);
return duplexPipePair;
}

function getDuplexPipe(draw, name, party) {
var duplexPipe = draw.symbol();
duplexPipe.use(getDuplexPipePair(draw, name, "?", party, "?"));
duplexPipe.rect(400, 150 + strokeWidth).fill("white").opacity(0.9).translate(480 + 170, 0);
return duplexPipe;
}

function getConnectedDuplexPipePair(draw, party1, party2, instance) {
if (instance == null) instance = "";
var connectedDuplexPipePair = draw.symbol();
connectedDuplexPipePair.use(getUser(draw)).fill("darkslateblue").opacity(0.3).translate(160, (154 - 119) / 2);
connectedDuplexPipePair.use(getCloud(draw)).fill("darkslateblue").opacity(0.3)
.translate(160 + 119 + 531, (154 - 92) / 2);
connectedDuplexPipePair
.use(getDuplexPipePair(draw, "Transport" + instance + "1", "Application" + instance + "2", party1, party2, instance))
.translate(-10 - 110 + 119, 0);
return connectedDuplexPipePair;
}

function getDuplexConnection(draw) {
var duplexConnection = draw.symbol();
duplexConnection.polyline("750,53 840,53 840,309 750,309").fill("none")
.stroke({ color: "gray", width: strokeWidth, dasharray: "7, 3" });
duplexConnection.polyline("750,109 800,109 800,253 750,253").fill("none")
.stroke({ color: "gray", width: strokeWidth, dasharray: "7, 3" });
duplexConnection.polyline("830,167 840,187 850,167").fill("none")
.stroke({ color: "black", width: strokeWidth, dasharray: "7, 3" });
duplexConnection.polyline("790,190 800,170 810,190").fill("none")
.stroke({ color: "black", width: strokeWidth, dasharray: "7, 3" });
duplexConnection.rect(1105 - strokeWidth, 162 - strokeWidth).fill("none").opacity(0.3)
.stroke({ color: "darkgoldenrod", width: strokeWidth, dasharray: "7, 3" }).translate(strokeWidth / 2, strokeWidth / 2);
duplexConnection.rect(1105 - strokeWidth, 162 - strokeWidth).fill("none").opacity(0.3)
.stroke({ color: "darkgoldenrod", width: strokeWidth, dasharray: "7, 3" }).translate(strokeWidth / 2, strokeWidth / 2 + 200);
duplexConnection.plain("A").fill("darkgoldenrod").font({ size: fontSize * 2 }).translate(1120, (162 - strokeWidth) / 2 + fontSize);
duplexConnection.plain("B").fill("darkgoldenrod").font({ size: fontSize * 2 }).translate(1120, (162 - strokeWidth) / 2 + 200 + fontSize);
duplexConnection.use(getConnectedDuplexPipePair(draw, "Application", "Transport", "A")).translate(strokeWidth, strokeWidth);
duplexConnection.use(getConnectedDuplexPipePair(draw, "Application", "Transport", "B")).translate(strokeWidth, strokeWidth + 200);
return duplexConnection;
}

SVG.on(document,
"DOMContentLoaded",
function () {
var imgScale = 0.9;
var draw = window.SVG("drawing").size(1200, 360 + 500);

draw.use(getPipe(draw, "red", "Producer", "Consumer")).scale(imgScale, imgScale).translate(0, 0);
draw.use(getDuplexPipe(draw, "DuplexPipe", "Party")).scale(imgScale, imgScale).translate(0, 100);
draw.use(getDuplexPipePair(draw, "DuplexPipe1", "DuplexPipe2", "Party", "Party")).scale(imgScale, imgScale).translate(0, 300);
draw.use(getDuplexConnection(draw)).scale(imgScale, imgScale).translate(0, 500 * 1);
});
</script>
</body>
</html>

Large diffs are not rendered by default.

145 changes: 145 additions & 0 deletions docs/standard/memory-and-spans/pipelines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
---
title: "Pipelines types"
ms.date: "03/17/2019"
ms.technology: dotnet-standard
helpviewer_keywords:
- "Pipe"
- "IDuplexPipe"
- "DuplexPipe"
- "Duplex Connection"
author: "KPixel"
---
# Pipelines types

## Pipe

We can use <xref:System.Span%601> and <xref:System.Memory%601> to implement various message-passing patterns.
A simple example is a `Pipe`.

![Pipe](media/pipelines/pipe.png)

A <xref:System.IO.Pipelines.Pipe%601> is a class that [a Producer can use to send data to a Consumer](https://en.wikipedia.org/wiki/Producer–consumer_problem) (one-way).
Conceptually, it looks a lot like a [Stream](https://docs.microsoft.com/dotnet/api/system.io.stream).

Although creating a new Pipe is simple, "closing" it properly is not. You must call `Complete()` on its `Writer` and `Reader`. This will gracefully end the `Receive()` and `Send()` loops (if you have them).

### Simple Pipe sample

```C#
var pipe = new Pipe();

// Producer
pipe.Writer.Write(new ReadOnlySpan<byte>(new byte[] { 0, 1, 2, 3 }));
await pipe.Writer.WriteAsync(new ReadOnlyMemory<byte>(new byte[] { 4, 5, 6, 7 }));

// Consumer
var result = await pipe.Reader.ReadAsync(); // == { 0, 1, 2, 3, 4, 5, 6, 7 }
Console.Out.WriteLine($"{result.Buffer.Length}"); // 8
```

### The `Receive()` loop

Receiving data from a Pipe should be done in a loop with specific patterns.

```C#
private static async Task ReceiveFromPipeLoop(PipeReader pipeReader)
{
try
{
while (true)
{
var result = await pipeReader.ReadAsync();
var buffer = result.Buffer;

try
{
if (!buffer.IsEmpty)
{
Console.Out.WriteLine($"Received {buffer.Length} bytes.");
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
pipeReader.AdvanceTo(buffer.End);
}
}
}
finally
{
pipeReader.Complete();
}
}
```

Reading from the Pipe returns a buffer that contains the data. This buffer can also indicate if the Pipe has been canceled or completed (i.e. closed).

Note that this loop can be more complex if you need to buffer the data.

### Pipes in ASP.NET Core

If the Producer acquires the data that it sends through a network connection, it could do (pseudo-code): `var data = await network.ReceiveAsync(); await pipe.Writer.WriteAsync(data);`

However, new APIs have been added to .NET Core 2.1 (especially in the network stacks) so that it can do: `var memory = pipe.Writer.GetMemory(); await network.ReceiveAsync(memory);`
Here, the network connection writes its data straight into the pipe.

In ASP.NET Core, such a Producer can be called a Transport (Refer to the section "Duplex Connection").

## Introducing IDuplexPipe

![DuplexPipe](media/pipelines/duplexpipe.png)

With a `Pipe`, data flows one-way: One party sends data (the Producer), and the other party receives data (the Consumer).
If you want the data to flow both ways, that is, for both parties to be able to send and receive data from each other, you need DuplexPipes.

A `DuplexPipe` is an endpoint for a party to be a Producer and a Consumer at the same time. It holds the writer of one `Pipe` and the read of another `Pipe`.

A simple example where you would need DuplexPipes is for a [Request/Response pattern](https://en.wikipedia.org/wiki/Request–response).

### Creating a pair of DuplexPipes

![Pair of DuplexPipes](media/pipelines/duplexpipe-pair.png)

A pair is required so each party gets a complementary `DuplexPipe` that represents half of each of the two Pipes.

You typically won't need to implement the <xref:System.IO.Pipelines.IDuplexPipe%601> interface yourself. Instead, you can copy the default implementation provided by [Kestrel](https://github.com/aspnet/AspNetCore/blob/master/src/Servers/Kestrel/Core/src/Internal/DuplexPipe.cs) or by [SignalR](https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/common/Shared/DuplexPipe.cs) (they are identical).

```C#
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);

// Producer
pair.Transport.Output.Write(new byte[] { 0, 1, 2, 3 });
await pair.Transport.Output.WriteAsync(new ReadOnlyMemory<byte>(new byte[] { 4, 5, 6, 7 }));

// Consumer
var result = await pair.Application.Input.ReadAsync(); // == { 0, 1, 2, 3, 4, 5, 6, 7 }
Console.Out.WriteLine($"{result.Buffer.Length}"); // 8
```

This code is functionally identical to the one in the section "Simple Pipe sample"; however, it illustrates the names changes that occur with DuplexPipe.

### Duplex Connection

When using DuplexPipes to implement a network connection, the convention is to call the parties: Transport and Application.

The Transport is the DuplexPipe used to write (outgoing) data to the network and the Application is the DuplexPipe used by the network connection to write incoming data.

Note: For the Transport party, "Send" and "Receive" can mean two things: The Transport receives data from the network and sends it to the Application. And it receives data from the Application and sends it to the network.
In this tutorial, since we are focused on the pipelines, we use Send and Receive as meaning to/from the Application. However, in actual code, those words may be used for to/from the network.

These parties belong to a network node. So, in a connection between two nodes, Node A and Node B, you will have:

![Duplex Connection](media/pipelines/duplex-connection.png)

Node A is connected to Node B through their Transports.

Most of the time, Node A will be a computer or smartphone, and Node B will be a Server.
For example: In Kestrel (and other HTTP servers), Node A is the user's browser, Node B is the Kestrel server, and they are connected through a TCP connection. In SignalR, they are instead connected through WebSockets.

## See also

- [Pipelines namespace](xref:System.IO.Pipelines)
- [ASP.NET Core SignalR](https://docs.microsoft.com/aspnet/core/signalr/introduction)
1 change: 1 addition & 0 deletions docs/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
### [Threading](standard/threading/)
## [Memory and span-related types](standard/memory-and-spans/index.md)
### [Memory<T> and Span<T> usage guidelines](standard/memory-and-spans/memory-t-usage-guidelines.md)
### [Pipelines](standard/memory-and-spans/pipelines.md)
## [Native interoperability](standard/native-interop/index.md)
### [P/Invoke](standard/native-interop/pinvoke.md)
### [Type marshalling](standard/native-interop/type-marshalling.md)
Expand Down