Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ internal static class Constants
{
public const string NoContent = "[no-content-type]";
public const string UnreadableContent = "[unreadable-content-type]";
public const string ReadCancelled = "[read-cancelled]";
public const string ReadCancelledByTimeout = "[read-timeout]";
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private static async ValueTask<string> ReadFromStreamWithTimeoutAsync(HttpReques
// when readTimeout occurred:
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return Constants.ReadCancelled;
return Constants.ReadCancelledByTimeout;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

using System;
using System.Collections.Frozen;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;
using Microsoft.IO;
using Microsoft.Shared.Diagnostics;
using Microsoft.Shared.Pools;

namespace Microsoft.Extensions.Http.Logging.Internal;

Expand All @@ -22,15 +22,18 @@ internal sealed class HttpResponseBodyReader
/// </summary>
internal readonly TimeSpan ResponseReadTimeout;

private static readonly ObjectPool<BufferWriter<byte>> _bufferWriterPool = BufferWriterPool.SharedBufferWriterPool;
// The chunk size of 8192 bytes (8 KB) is chosen as a balance between memory usage and performance.
// It is large enough to efficiently handle typical HTTP response sizes without excessive memory allocation,
// while still being small enough to avoid large object heap allocations and reduce memory fragmentation.
private const int ChunkSize = 8 * 1024;

private readonly FrozenSet<string> _readableResponseContentTypes;
private readonly int _responseReadLimit;

private readonly RecyclableMemoryStreamManager _streamManager;

public HttpResponseBodyReader(LoggingOptions responseOptions, IDebuggerState? debugger = null)
{
_streamManager = new RecyclableMemoryStreamManager();
_ = Throw.IfNull(responseOptions);

_readableResponseContentTypes = responseOptions.ResponseBodyContentTypes.ToFrozenSet(StringComparer.OrdinalIgnoreCase);
_responseReadLimit = responseOptions.BodySizeLimit;

Expand All @@ -43,7 +46,7 @@ public HttpResponseBodyReader(LoggingOptions responseOptions, IDebuggerState? de

public ValueTask<string> ReadAsync(HttpResponseMessage response, CancellationToken cancellationToken)
{
var contentType = response.Content.Headers.ContentType;
MediaTypeHeaderValue? contentType = response.Content.Headers.ContentType;
if (contentType == null)
{
return new(Constants.NoContent);
Expand All @@ -54,90 +57,186 @@ public ValueTask<string> ReadAsync(HttpResponseMessage response, CancellationTok
return new(Constants.UnreadableContent);
}

return ReadFromStreamWithTimeoutAsync(response, ResponseReadTimeout, _responseReadLimit, _streamManager,
cancellationToken).Preserve();
return ReadFromStreamWithTimeoutAsync(response, ResponseReadTimeout, _responseReadLimit, cancellationToken).Preserve();
}

private static async ValueTask<string> ReadFromStreamAsync(HttpResponseMessage response, int readSizeLimit,
RecyclableMemoryStreamManager streamManager, CancellationToken cancellationToken)
private static async ValueTask<string> ReadFromStreamWithTimeoutAsync(HttpResponseMessage response, TimeSpan readTimeout, int readSizeLimit, CancellationToken cancellationToken)
{
#if NET5_0_OR_GREATER
var streamToReadFrom = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
#else
var streamToReadFrom = await response.Content.ReadAsStreamAsync().WaitAsync(cancellationToken).ConfigureAwait(false);
#endif
using var joinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
joinedTokenSource.CancelAfter(readTimeout);

// TimeSpan.Zero cannot be set from user's code as
// validation prevents values less than one millisecond
// However, this is useful during unit tests
if (readTimeout <= TimeSpan.Zero)
{
// cancel immediately, async cancel not required in tests
#pragma warning disable CA1849 // Call async methods when in an async method
joinedTokenSource.Cancel();
#pragma warning restore CA1849 // Call async methods when in an async method
}

var bufferWriter = _bufferWriterPool.Get();
var memory = bufferWriter.GetMemory(readSizeLimit).Slice(0, readSizeLimit);
#if !NETCOREAPP3_1_OR_GREATER
byte[] buffer = memory.ToArray();
#endif
try
{
#if NETCOREAPP3_1_OR_GREATER
var charsWritten = await streamToReadFrom.ReadAsync(memory, cancellationToken).ConfigureAwait(false);
bufferWriter.Advance(charsWritten);
return Encoding.UTF8.GetString(memory.Slice(0, charsWritten).Span);
return await ReadFromStreamAsync(response, readSizeLimit, joinedTokenSource.Token).ConfigureAwait(false);
}

// when readTimeout occurred: joined token source is cancelled and cancellationToken is not
catch (OperationCanceledException) when (joinedTokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
return Constants.ReadCancelledByTimeout;
}
}

private static async ValueTask<string> ReadFromStreamAsync(HttpResponseMessage response, int readSizeLimit, CancellationToken cancellationToken)
{
#if NET6_0_OR_GREATER
Stream streamToReadFrom = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
#else
var charsWritten = await streamToReadFrom.ReadAsync(buffer, 0, readSizeLimit, cancellationToken).ConfigureAwait(false);
bufferWriter.Advance(charsWritten);
return Encoding.UTF8.GetString(buffer.AsMemory(0, charsWritten).ToArray());
Stream streamToReadFrom = await response.Content.ReadAsStreamAsync().WaitAsync(cancellationToken).ConfigureAwait(false);
#endif

var pipe = new Pipe();

string bufferedString = await BufferStreamAndWriteToPipeAsync(streamToReadFrom, pipe.Writer, readSizeLimit, cancellationToken).ConfigureAwait(false);

// if stream is seekable we can just rewind it and return the buffered string
if (streamToReadFrom.CanSeek)
{
streamToReadFrom.Seek(0, SeekOrigin.Begin);

await pipe.Reader.CompleteAsync().ConfigureAwait(false);

return bufferedString;
}
finally

// if stream is not seekable we need to write the rest of the stream to the pipe
// and create a new response content with the pipe reader as stream
_ = Task.Run(async () =>
{
if (streamToReadFrom.CanSeek)
await WriteStreamToPipeAsync(streamToReadFrom, pipe.Writer, cancellationToken).ConfigureAwait(false);
}, CancellationToken.None);

// use the pipe reader as stream for the new content
var newContent = new StreamContent(pipe.Reader.AsStream());
foreach (KeyValuePair<string, IEnumerable<string>> header in response.Content.Headers)
{
_ = newContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
}

response.Content = newContent;

return bufferedString;
}

#if NET6_0_OR_GREATER
private static async Task<string> BufferStreamAndWriteToPipeAsync(Stream stream, PipeWriter writer, int bufferSize, CancellationToken cancellationToken)
{
Memory<byte> memory = writer.GetMemory(bufferSize)[..bufferSize];

#if NET8_0_OR_GREATER
int bytesRead = await stream.ReadAtLeastAsync(memory, bufferSize, false, cancellationToken).ConfigureAwait(false);
#else
int bytesRead = 0;
while (bytesRead < bufferSize)
{
int read = await stream.ReadAsync(memory.Slice(bytesRead), cancellationToken).ConfigureAwait(false);
if (read == 0)
{
streamToReadFrom.Seek(0, SeekOrigin.Begin);
break;
}
else
{
var freshStream = streamManager.GetStream();
#if NETCOREAPP3_1_OR_GREATER
var remainingSpace = memory.Slice(bufferWriter.WrittenCount, memory.Length - bufferWriter.WrittenCount);
var writtenCount = await streamToReadFrom.ReadAsync(remainingSpace, cancellationToken)
.ConfigureAwait(false);

await freshStream.WriteAsync(memory.Slice(0, writtenCount + bufferWriter.WrittenCount), cancellationToken)
.ConfigureAwait(false);
#else
var writtenCount = await streamToReadFrom.ReadAsync(buffer, bufferWriter.WrittenCount,
buffer.Length - bufferWriter.WrittenCount, cancellationToken).ConfigureAwait(false);

await freshStream.WriteAsync(buffer, 0, writtenCount + bufferWriter.WrittenCount, cancellationToken).ConfigureAwait(false);
bytesRead += read;
}
#endif
freshStream.Seek(0, SeekOrigin.Begin);

var newContent = new StreamContent(freshStream);
if (bytesRead == 0)
{
return string.Empty;
}

writer.Advance(bytesRead);

return Encoding.UTF8.GetString(memory[..bytesRead].Span);
}

foreach (var header in response.Content.Headers)
{
_ = newContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
private static async Task WriteStreamToPipeAsync(Stream stream, PipeWriter writer, CancellationToken cancellationToken)
{
while (true)
{
Memory<byte> memory = writer.GetMemory(ChunkSize)[..ChunkSize];

response.Content = newContent;
int bytesRead = await stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}

_bufferWriterPool.Return(bufferWriter);
writer.Advance(bytesRead);

FlushResult result = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}
}

private static async ValueTask<string> ReadFromStreamWithTimeoutAsync(HttpResponseMessage response, TimeSpan readTimeout,
int readSizeLimit, RecyclableMemoryStreamManager streamManager, CancellationToken cancellationToken)
await writer.CompleteAsync().ConfigureAwait(false);
}
#else
private static async Task<string> BufferStreamAndWriteToPipeAsync(Stream stream, PipeWriter writer, int bufferSize, CancellationToken cancellationToken)
{
using var joinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
joinedTokenSource.CancelAfter(readTimeout);
var sb = new StringBuilder();

try
int bytesRead = 0;

while (bytesRead < bufferSize)
{
return await ReadFromStreamAsync(response, readSizeLimit, streamManager, joinedTokenSource.Token)
.ConfigureAwait(false);
int chunkSize = Math.Min(ChunkSize, bufferSize - bytesRead);

Memory<byte> memory = writer.GetMemory(chunkSize).Slice(0, chunkSize);

byte[] buffer = memory.ToArray();

int read = await stream.ReadAsync(buffer, 0, chunkSize, cancellationToken).ConfigureAwait(false);
if (read == 0)
{
break;
}

bytesRead += read;

buffer.CopyTo(memory);

writer.Advance(read);

_ = sb.Append(Encoding.UTF8.GetString(buffer.AsMemory(0, read).ToArray()));
}

// when readTimeout occurred:
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
return sb.ToString();
}

private static async Task WriteStreamToPipeAsync(Stream stream, PipeWriter writer, CancellationToken cancellationToken)
{
while (true)
{
return Constants.ReadCancelled;
Memory<byte> memory = writer.GetMemory(ChunkSize).Slice(0, ChunkSize);
byte[] buffer = memory.ToArray();

int bytesRead = await stream.ReadAsync(buffer, 0, ChunkSize, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}

FlushResult result = await writer.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}

await writer.CompleteAsync().ConfigureAwait(false);
}
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" />
<PackageReference Include="System.IO.Pipelines" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="Microsoft.Extensions.Http" />
</ItemGroup>
Expand Down
Loading
Loading