Skip to content

Change to an activity based timeout #1289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/docfx/articles/config-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,10 @@ For additional fields see [ClusterConfig](xref:Yarp.ReverseProxy.Configuration.C
"RequestHeaderEncoding" : "Latin1" // How to interpret non ASCII characters in header values
},
"HttpRequest" : { // Options for sending request to destination
"Timeout" : "00:02:00",
"ActivityTimeout" : "00:02:00",
"Version" : "2",
"VersionPolicy" : "RequestVersionOrLower"
"VersionPolicy" : "RequestVersionOrLower",
"AllowResponseBuffering" : "false"
},
"MetaData" : { // Custom Key value pairs
"TransportFailureRateHealthPolicy.RateLimit": "0.5", // Used by Passive health policy
Expand Down
4 changes: 2 additions & 2 deletions docs/docfx/articles/direct-forwarding.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void Configure(IApplicationBuilder app, IHttpForwarder forwarder)
UseCookies = false
});
var transformer = new CustomTransformer(); // or HttpTransformer.Default;
var requestOptions = new RequestProxyOptions { Timeout = TimeSpan.FromSeconds(100) };
var requestConfig = new ForwarderRequestConfig { ActivityTimeout = TimeSpan.FromSeconds(100) };

app.UseRouting();
app.UseAuthorization();
Expand All @@ -65,7 +65,7 @@ public void Configure(IApplicationBuilder app, IHttpForwarder forwarder)
endpoints.Map("/{**catch-all}", async httpContext =>
{
var error = await forwarder.SendAsync(httpContext, "https://localhost:10000/",
httpClient, requestOptions, transformer);
httpClient, requestConfig, transformer);
// Check if the operation was successful
if (error != ForwarderError.None)
{
Expand Down
6 changes: 3 additions & 3 deletions docs/docfx/articles/http-client-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ At the moment, there is no solution for changing encoding for response headers i
HTTP request configuration is based on [ForwarderRequestConfig](xref:Yarp.ReverseProxy.Forwarder.ForwarderRequestConfig) and represented by the following configuration schema.
```JSON
"HttpRequest": {
"Timeout": "<timespan>",
"ActivityTimeout": "<timespan>",
"Version": "<string>",
"VersionPolicy": ["RequestVersionOrLower", "RequestVersionOrHigher", "RequestVersionExact"],
"AllowResponseBuffering": "<bool>"
}
```

Configuration settings:
- Timeout - the timeout for the outgoing request sent by [HttpMessageInvoker.SendAsync](https://docs.microsoft.com/dotnet/api/system.net.http.httpmessageinvoker.sendasync). If not specified, 100 seconds is used.
- ActivityTimeout - how long a request is allowed to remain idle between any operation completing, after which it will be canceled. The default is 100 seconds. The timeout will reset when response headers are received or after successfully reading or writing any request, response, or streaming data like gRPC or WebSockets. TCP keep-alives and HTTP/2 protocol pings will not reset the timeout, but WebSocket pings will.
- Version - outgoing request [version](https://docs.microsoft.com/dotnet/api/system.net.http.httprequestmessage.version). The supported values at the moment are `1.0`, `1.1` and `2`. Default value is 2.
- VersionPolicy - defines how the final version is selected for the outgoing requests. **This feature is available from .NET 5.0**, see [HttpRequestMessage.VersionPolicy](https://docs.microsoft.com/dotnet/api/system.net.http.httprequestmessage.versionpolicy). The default value is `RequestVersionOrLower`.
- AllowResponseBuffering - allows to use write buffering when sending a response back to the client, if the server hosting YARP (e.g. IIS) supports it. **NOTE**: enabling it can break SSE (server side event) scenarios.
Expand All @@ -115,7 +115,7 @@ The below example shows 2 samples of HTTP client and request configurations for
"DangerousAcceptAnyServerCertificate": "true"
},
"HttpRequest": {
"Timeout": "00:00:30"
"ActivityTimeout": "00:00:30"
},
"Destinations": {
"cluster1/destination1": {
Expand Down
2 changes: 1 addition & 1 deletion samples/ReverseProxy.Direct.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void Configure(IApplicationBuilder app, IHttpForwarder forwarder)

// Setup our own request transform class
var transformer = new CustomTransformer(); // or HttpTransformer.Default;
var requestOptions = new ForwarderRequestConfig { Timeout = TimeSpan.FromSeconds(100) };
var requestOptions = new ForwarderRequestConfig { ActivityTimeout = TimeSpan.FromSeconds(100) };

app.UseRouting();
app.UseEndpoints(endpoints =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private static RouteQueryParameter CreateRouteQueryParameter(IConfigurationSecti

return new ForwarderRequestConfig
{
Timeout = section.ReadTimeSpan(nameof(ForwarderRequestConfig.Timeout)),
ActivityTimeout = section.ReadTimeSpan(nameof(ForwarderRequestConfig.ActivityTimeout)),
Version = section.ReadVersion(nameof(ForwarderRequestConfig.Version)),
#if NET
VersionPolicy = section.ReadEnum<HttpVersionPolicy>(nameof(ForwarderRequestConfig.VersionPolicy)),
Expand Down
2 changes: 1 addition & 1 deletion src/ReverseProxy/Configuration/HeaderMatchMode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum HeaderMatchMode
/// </summary>
Contains,

// <summary>
/// <summary>
/// The header name must exist and the value must be non-empty and not match, subject to case sensitivity settings.
/// If there are multiple values then it needs to not contain ANY of the values
/// Only single headers are supported. If there are multiple headers with the same name then the match fails.
Expand Down
12 changes: 7 additions & 5 deletions src/ReverseProxy/Forwarder/ForwarderRequestConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ public sealed record ForwarderRequestConfig
public static ForwarderRequestConfig Empty { get; } = new();

/// <summary>
/// The time allowed to send the request and receive the response headers. This may include
/// the time needed to send the request body. The default is 100 seconds.
/// How long a request is allowed to remain idle between any operation completing, after which it will be canceled.
/// The default is 100 seconds. The timeout will reset when response headers are received or after successfully reading or
/// writing any request, response, or streaming data like gRPC or WebSockets. TCP keep-alives and HTTP/2 protocol pings will
/// not reset the timeout, but WebSocket pings will.
/// </summary>
public TimeSpan? Timeout { get; init; }
public TimeSpan? ActivityTimeout { get; init; }

/// <summary>
/// Preferred version of the outgoing request.
Expand Down Expand Up @@ -50,7 +52,7 @@ public bool Equals(ForwarderRequestConfig? other)
return false;
}

return Timeout == other.Timeout
return ActivityTimeout == other.ActivityTimeout
#if NET
&& VersionPolicy == other.VersionPolicy
#endif
Expand All @@ -60,7 +62,7 @@ public bool Equals(ForwarderRequestConfig? other)

public override int GetHashCode()
{
return HashCode.Combine(Timeout,
return HashCode.Combine(ActivityTimeout,
#if NET
VersionPolicy,
#endif
Expand Down
50 changes: 25 additions & 25 deletions src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public async ValueTask<ForwarderError> SendAsync(

ForwarderTelemetry.Log.ForwarderStart(destinationPrefix);

var requestCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
requestCancellationSource.CancelAfter(requestConfig?.Timeout ?? DefaultTimeout);
var activityCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
var activityTimeout = requestConfig?.ActivityTimeout ?? DefaultTimeout;
activityCancellationSource.CancelAfter(activityTimeout);
try
{
var isClientHttp2 = ProtocolHelper.IsHttp2(context.Request.Protocol);
Expand All @@ -118,22 +119,22 @@ public async ValueTask<ForwarderError> SendAsync(

// :: Step 1-3: Create outgoing HttpRequestMessage
var (destinationRequest, requestContent) = await CreateRequestMessageAsync(
context, destinationPrefix, transformer, requestConfig, isStreamingRequest, requestCancellationSource.Token);
context, destinationPrefix, transformer, requestConfig, isStreamingRequest, activityCancellationSource, activityTimeout);

// :: Step 4: Send the outgoing request using HttpClient
HttpResponseMessage destinationResponse;
try
{
ForwarderTelemetry.Log.ForwarderStage(ForwarderStage.SendAsyncStart);
destinationResponse = await httpClient.SendAsync(destinationRequest, requestCancellationSource.Token);
destinationResponse = await httpClient.SendAsync(destinationRequest, activityCancellationSource.Token);
ForwarderTelemetry.Log.ForwarderStage(ForwarderStage.SendAsyncStop);

// Remove the timeout, only listen to the linked token / manual Cancel from now on
requestCancellationSource.CancelAfter(Timeout.Infinite);
// Reset the timeout since we received the response headers.
activityCancellationSource.CancelAfter(activityTimeout);
}
catch (Exception requestException)
{
return await HandleRequestFailureAsync(context, requestContent, requestException, transformer, requestCancellationSource);
return await HandleRequestFailureAsync(context, requestContent, requestException, transformer, activityCancellationSource);
}

// Detect connection downgrade, which may be problematic for e.g. gRPC.
Expand All @@ -156,7 +157,7 @@ public async ValueTask<ForwarderError> SendAsync(

if (requestContent is not null && requestContent.InProgress)
{
requestCancellationSource.Cancel();
activityCancellationSource.Cancel();
await requestContent.ConsumptionTask;
}

Expand All @@ -169,7 +170,7 @@ public async ValueTask<ForwarderError> SendAsync(

if (requestContent is not null && requestContent.InProgress)
{
requestCancellationSource.Cancel();
activityCancellationSource.Cancel();
await requestContent.ConsumptionTask;
}

Expand All @@ -184,7 +185,7 @@ public async ValueTask<ForwarderError> SendAsync(
if (destinationResponse.StatusCode == HttpStatusCode.SwitchingProtocols)
{
Debug.Assert(requestContent?.Started != true);
return await HandleUpgradedResponse(context, destinationResponse, context.RequestAborted);
return await HandleUpgradedResponse(context, destinationResponse, activityCancellationSource, activityTimeout);
}

// NOTE: it may *seem* wise to call `context.Response.StartAsync()` at this point
Expand All @@ -198,11 +199,11 @@ public async ValueTask<ForwarderError> SendAsync(
// and clients misbehave if the initial headers response does not indicate stream end.

// :: Step 7-B: Copy response body Client ◄-- Proxy ◄-- Destination
var (responseBodyCopyResult, responseBodyException) = await CopyResponseBodyAsync(destinationResponse.Content, context.Response.Body, context.RequestAborted);
var (responseBodyCopyResult, responseBodyException) = await CopyResponseBodyAsync(destinationResponse.Content, context.Response.Body, activityCancellationSource, activityTimeout);

if (responseBodyCopyResult != StreamCopyResult.Success)
{
return await HandleResponseBodyErrorAsync(context, requestContent, responseBodyCopyResult, responseBodyException!, requestCancellationSource);
return await HandleResponseBodyErrorAsync(context, requestContent, responseBodyCopyResult, responseBodyException!, activityCancellationSource);
}

// :: Step 8: Copy response trailer headers and finish response Client ◄-- Proxy ◄-- Destination
Expand Down Expand Up @@ -247,15 +248,15 @@ public async ValueTask<ForwarderError> SendAsync(
}
finally
{
requestCancellationSource.Dispose();
activityCancellationSource.Dispose();
ForwarderTelemetry.Log.ForwarderStop(context.Response.StatusCode);
}

return ForwarderError.None;
}

private async ValueTask<(HttpRequestMessage, StreamCopyHttpContent?)> CreateRequestMessageAsync(HttpContext context, string destinationPrefix,
HttpTransformer transformer, ForwarderRequestConfig? requestConfig, bool isStreamingRequest, CancellationToken contentCancellation)
HttpTransformer transformer, ForwarderRequestConfig? requestConfig, bool isStreamingRequest, CancellationTokenSource activityToken, TimeSpan activityTimeout)
{
// "http://a".Length = 8
if (destinationPrefix == null || destinationPrefix.Length < 8)
Expand Down Expand Up @@ -287,7 +288,7 @@ public async ValueTask<ForwarderError> SendAsync(

// :: Step 2: Setup copy of request body (background) Client --► Proxy --► Destination
// Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here.
var requestContent = SetupRequestBodyCopy(context.Request, isStreamingRequest, contentCancellation);
var requestContent = SetupRequestBodyCopy(context.Request, isStreamingRequest, activityToken, activityTimeout);
destinationRequest.Content = requestContent;

// :: Step 3: Copy request headers Client --► Proxy --► Destination
Expand Down Expand Up @@ -333,7 +334,7 @@ private static void RestoreUpgradeHeaders(HttpContext context, HttpRequestMessag
}
}

private StreamCopyHttpContent? SetupRequestBodyCopy(HttpRequest request, bool isStreamingRequest, CancellationToken contentCancellation)
private StreamCopyHttpContent? SetupRequestBodyCopy(HttpRequest request, bool isStreamingRequest, CancellationTokenSource activityToken, TimeSpan activityTimeout)
{
// If we generate an HttpContent without a Content-Length then for HTTP/1.1 HttpClient will add a Transfer-Encoding: chunked header
// even if it's a GET request. Some servers reject requests containing a Transfer-Encoding header if they're not expecting a body.
Expand Down Expand Up @@ -409,7 +410,8 @@ private static void RestoreUpgradeHeaders(HttpContext context, HttpRequestMessag
source: request.Body,
autoFlushHttpClientOutgoingStream: isStreamingRequest,
clock: _clock,
cancellation: contentCancellation);
activityToken,
activityTimeout);
}

return null;
Expand Down Expand Up @@ -544,7 +546,7 @@ private static void RestoreUpgradeHeaders(HttpContext context, HttpResponseMessa
}

private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext context, HttpResponseMessage destinationResponse,
CancellationToken longCancellation)
CancellationTokenSource activityCancellationSource, TimeSpan activityTimeout)
{
ForwarderTelemetry.Log.ForwarderStage(ForwarderStage.ResponseUpgrade);

Expand Down Expand Up @@ -576,10 +578,8 @@ private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext conte
// :: Step 7-A-2: Copy duplex streams
using var destinationStream = await destinationResponse.Content.ReadAsStreamAsync();

using var abortTokenSource = CancellationTokenSource.CreateLinkedTokenSource(longCancellation);

var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, _clock, abortTokenSource.Token).AsTask();
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, _clock, abortTokenSource.Token).AsTask();
var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, _clock, activityCancellationSource, activityTimeout).AsTask();
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, _clock, activityCancellationSource, activityTimeout).AsTask();

// Make sure we report the first failure.
var firstTask = await Task.WhenAny(requestTask, responseTask);
Expand All @@ -593,7 +593,7 @@ private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext conte
{
error = ReportResult(context, requestFinishedFirst, firstResult, firstException);
// Cancel the other direction
abortTokenSource.Cancel();
activityCancellationSource.Cancel();
// Wait for this to finish before exiting so the resources get cleaned up properly.
await secondTask;
}
Expand Down Expand Up @@ -627,7 +627,7 @@ ForwarderError ReportResult(HttpContext context, bool reqeuest, StreamCopyResult
}

private async ValueTask<(StreamCopyResult, Exception?)> CopyResponseBodyAsync(HttpContent destinationResponseContent, Stream clientResponseStream,
CancellationToken cancellation)
CancellationTokenSource activityCancellationSource, TimeSpan activityTimeout)
{
// SocketHttpHandler and similar transports always provide an HttpContent object, even if it's empty.
// In 3.1 this is only likely to return null in tests.
Expand All @@ -636,7 +636,7 @@ ForwarderError ReportResult(HttpContext context, bool reqeuest, StreamCopyResult
if (destinationResponseContent != null)
{
using var destinationResponseStream = await destinationResponseContent.ReadAsStreamAsync();
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, _clock, cancellation);
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, _clock, activityCancellationSource, activityTimeout);
}

return (StreamCopyResult.Success, null);
Expand Down
Loading