Skip to content

Commit 57092e9

Browse files
authored
Implement Kestrel Request PipeReader (#7603)
1 parent 5315446 commit 57092e9

33 files changed

+2996
-1238
lines changed

src/Servers/Kestrel/Core/src/Internal/Http/Http1ChunkedEncodingMessageBody.cs

Lines changed: 542 additions & 0 deletions
Large diffs are not rendered by default.

src/Servers/Kestrel/Core/src/Internal/Http/Http1Connection.cs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ public Http1Connection(HttpConnectionContext context)
4444
_keepAliveTicks = ServerOptions.Limits.KeepAliveTimeout.Ticks;
4545
_requestHeadersTimeoutTicks = ServerOptions.Limits.RequestHeadersTimeout.Ticks;
4646

47-
RequestBodyPipe = CreateRequestBodyPipe();
48-
4947
_http1Output = new Http1OutputProducer(
5048
_context.Transport.Output,
5149
_context.ConnectionId,
@@ -57,6 +55,7 @@ public Http1Connection(HttpConnectionContext context)
5755

5856
Input = _context.Transport.Input;
5957
Output = _http1Output;
58+
MemoryPool = _context.MemoryPool;
6059
}
6160

6261
public PipeReader Input { get; }
@@ -67,6 +66,8 @@ public Http1Connection(HttpConnectionContext context)
6766

6867
public MinDataRate MinResponseDataRate { get; set; }
6968

69+
public MemoryPool<byte> MemoryPool { get; }
70+
7071
protected override void OnRequestProcessingEnded()
7172
{
7273
Input.Complete();
@@ -531,17 +532,5 @@ protected override bool TryParseRequest(ReadResult result, out bool endConnectio
531532
}
532533

533534
void IRequestProcessor.Tick(DateTimeOffset now) { }
534-
535-
private Pipe CreateRequestBodyPipe()
536-
=> new Pipe(new PipeOptions
537-
(
538-
pool: _context.MemoryPool,
539-
readerScheduler: ServiceContext.Scheduler,
540-
writerScheduler: PipeScheduler.Inline,
541-
pauseWriterThreshold: 1,
542-
resumeWriterThreshold: 1,
543-
useSynchronizationContext: false,
544-
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
545-
));
546535
}
547536
}
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.IO.Pipelines;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.AspNetCore.Connections;
9+
10+
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
11+
{
12+
public class Http1ContentLengthMessageBody : Http1MessageBody
13+
{
14+
private readonly long _contentLength;
15+
private long _inputLength;
16+
private ReadResult _readResult;
17+
private bool _completed;
18+
private int _userCanceled;
19+
20+
public Http1ContentLengthMessageBody(bool keepAlive, long contentLength, Http1Connection context)
21+
: base(context)
22+
{
23+
RequestKeepAlive = keepAlive;
24+
_contentLength = contentLength;
25+
_inputLength = _contentLength;
26+
}
27+
28+
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
29+
{
30+
ThrowIfCompleted();
31+
32+
if (_inputLength == 0)
33+
{
34+
_readResult = new ReadResult(default, isCanceled: false, isCompleted: true);
35+
return _readResult;
36+
}
37+
38+
TryStart();
39+
40+
// The while(true) loop is required because the Http1 connection calls CancelPendingRead to unblock
41+
// the call to StartTimingReadAsync to check if the request timed out.
42+
// However, if the user called CancelPendingRead, we want that to return a canceled ReadResult
43+
// We internally track an int for that.
44+
while (true)
45+
{
46+
// The issue is that TryRead can get a canceled read result
47+
// which is unknown to StartTimingReadAsync.
48+
if (_context.RequestTimedOut)
49+
{
50+
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
51+
}
52+
53+
try
54+
{
55+
var readAwaitable = _context.Input.ReadAsync(cancellationToken);
56+
_readResult = await StartTimingReadAsync(readAwaitable, cancellationToken);
57+
}
58+
catch (ConnectionAbortedException ex)
59+
{
60+
throw new TaskCanceledException("The request was aborted", ex);
61+
}
62+
63+
if (_context.RequestTimedOut)
64+
{
65+
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
66+
}
67+
68+
// Make sure to handle when this is canceled here.
69+
if (_readResult.IsCanceled)
70+
{
71+
if (Interlocked.Exchange(ref _userCanceled, 0) == 1)
72+
{
73+
// Ignore the readResult if it wasn't by the user.
74+
break;
75+
}
76+
else
77+
{
78+
// Reset the timing read here for the next call to read.
79+
StopTimingRead(0);
80+
continue;
81+
}
82+
}
83+
84+
var readableBuffer = _readResult.Buffer;
85+
var readableBufferLength = readableBuffer.Length;
86+
StopTimingRead(readableBufferLength);
87+
88+
CheckCompletedReadResult(_readResult);
89+
90+
if (readableBufferLength > 0)
91+
{
92+
CreateReadResultFromConnectionReadResult();
93+
94+
break;
95+
}
96+
}
97+
98+
return _readResult;
99+
}
100+
101+
public override bool TryRead(out ReadResult readResult)
102+
{
103+
ThrowIfCompleted();
104+
105+
if (_inputLength == 0)
106+
{
107+
readResult = new ReadResult(default, isCanceled: false, isCompleted: true);
108+
return true;
109+
}
110+
111+
TryStart();
112+
113+
if (!_context.Input.TryRead(out _readResult))
114+
{
115+
readResult = default;
116+
return false;
117+
}
118+
119+
if (_readResult.IsCanceled)
120+
{
121+
if (Interlocked.Exchange(ref _userCanceled, 0) == 0)
122+
{
123+
// Cancellation wasn't by the user, return default ReadResult
124+
readResult = default;
125+
return false;
126+
}
127+
}
128+
129+
CreateReadResultFromConnectionReadResult();
130+
131+
readResult = _readResult;
132+
133+
return true;
134+
}
135+
136+
private void ThrowIfCompleted()
137+
{
138+
if (_completed)
139+
{
140+
throw new InvalidOperationException("Reading is not allowed after the reader was completed.");
141+
}
142+
}
143+
144+
private void CreateReadResultFromConnectionReadResult()
145+
{
146+
if (_readResult.Buffer.Length > _inputLength)
147+
{
148+
_readResult = new ReadResult(_readResult.Buffer.Slice(0, _inputLength), _readResult.IsCanceled, isCompleted: true);
149+
}
150+
else if (_readResult.Buffer.Length == _inputLength)
151+
{
152+
_readResult = new ReadResult(_readResult.Buffer, _readResult.IsCanceled, isCompleted: true);
153+
}
154+
155+
if (_readResult.IsCompleted)
156+
{
157+
TryStop();
158+
}
159+
}
160+
161+
public override void AdvanceTo(SequencePosition consumed)
162+
{
163+
AdvanceTo(consumed, consumed);
164+
}
165+
166+
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
167+
{
168+
if (_inputLength == 0)
169+
{
170+
return;
171+
}
172+
173+
var dataLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
174+
175+
_inputLength -= dataLength;
176+
177+
_context.Input.AdvanceTo(consumed, examined);
178+
179+
OnDataRead(dataLength);
180+
}
181+
182+
protected override void OnReadStarting()
183+
{
184+
if (_contentLength > _context.MaxRequestBodySize)
185+
{
186+
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTooLarge);
187+
}
188+
}
189+
190+
public override void Complete(Exception exception)
191+
{
192+
_context.ReportApplicationError(exception);
193+
_completed = true;
194+
}
195+
196+
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
197+
{
198+
// TODO make this work with ContentLength.
199+
}
200+
201+
public override void CancelPendingRead()
202+
{
203+
Interlocked.Exchange(ref _userCanceled, 1);
204+
_context.Input.CancelPendingRead();
205+
}
206+
207+
protected override Task OnStopAsync()
208+
{
209+
Complete(null);
210+
return Task.CompletedTask;
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)