Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,14 @@ private int ReadResponseContent(HttpResponseMessage response, Span<byte> buffer)
_responseDataPayloadRemaining -= copyLen;
_recvBuffer.Discard(copyLen);
buffer = buffer.Slice(copyLen);

// Stop, if we've reached the end of a data frame and start of the next data frame is not buffered yet
// Waiting for the next data frame may cause a hang, e.g. in echo scenario
// TODO: this is inefficient if data is already available in transport
if (_responseDataPayloadRemaining == 0 && _recvBuffer.ActiveLength == 0)
{
break;
}
}
else
{
Expand All @@ -1068,10 +1076,9 @@ private int ReadResponseContent(HttpResponseMessage response, Span<byte> buffer)
_responseDataPayloadRemaining -= bytesRead;
buffer = buffer.Slice(bytesRead);

if (_responseDataPayloadRemaining == 0)
{
break;
}
// Stop, even if we are in the middle of a data frame. Waiting for the next data may cause a hang
// TODO: this is inefficient if data is already available in transport
break;
}
}

Expand Down Expand Up @@ -1113,6 +1120,14 @@ private async ValueTask<int> ReadResponseContentAsync(HttpResponseMessage respon
_responseDataPayloadRemaining -= copyLen;
_recvBuffer.Discard(copyLen);
buffer = buffer.Slice(copyLen);

// Stop, if we've reached the end of a data frame and start of the next data frame is not buffered yet
// Waiting for the next data frame may cause a hang, e.g. in echo scenario
// TODO: this is inefficient if data is already available in transport
if (_responseDataPayloadRemaining == 0 && _recvBuffer.ActiveLength == 0)
{
break;
}
}
else
{
Expand All @@ -1130,10 +1145,9 @@ private async ValueTask<int> ReadResponseContentAsync(HttpResponseMessage respon
_responseDataPayloadRemaining -= bytesRead;
buffer = buffer.Slice(bytesRead);

if (_responseDataPayloadRemaining == 0)
{
break;
}
// Stop, even if we are in the middle of a data frame. Waiting for the next data may cause a hang
// TODO: this is inefficient if data is already available in transport
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,90 @@ public async Task StatusCodes_ReceiveSuccess(HttpStatusCode statusCode, bool qpa
connection.Dispose();
}

[Theory]
[InlineData(1)] // frame fits into Http3RequestStream buffer
[InlineData(10)]
[InlineData(100)] // frame doesn't fit into Http3RequestStream buffer
[InlineData(1000)]
public async Task EchoServerStreaming_DifferentMessageSize_Success(int messageSize)
{
int iters = 5;
var message = new byte[messageSize];
var readBuffer = new byte[5 * messageSize]; // bigger than message
var random = new Random(0);

using Http3LoopbackServer server = CreateHttp3LoopbackServer();
Http3LoopbackConnection connection = null;
Http3LoopbackStream serverStream = null;

Task serverTask = Task.Run(async () =>
{
connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
serverStream = await connection.AcceptRequestStreamAsync();

HttpRequestData requestData = await serverStream.ReadRequestDataAsync(readBody: false).WaitAsync(TimeSpan.FromSeconds(30));

await serverStream.SendResponseHeadersAsync().ConfigureAwait(false);

while (true)
{
(long? frameType, byte[] payload) = await serverStream.ReadFrameAsync();
if (frameType == null)
{
// EOS
break;
}
// echo back
await serverStream.SendDataFrameAsync(payload).WaitAsync(TimeSpan.FromSeconds(30));
}
// send FIN
await serverStream.SendResponseBodyAsync(Array.Empty<byte>(), isFinal: true);
});

StreamingHttpContent requestContent = new StreamingHttpContent();

using HttpClient client = CreateHttpClient();
using HttpRequestMessage request = new()
{
Method = HttpMethod.Post,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact,
Content = requestContent
};

var responseTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeSpan.FromSeconds(10));

Stream requestStream = await requestContent.GetStreamAsync();
// Send headers
await requestStream.FlushAsync();

using HttpResponseMessage response = await responseTask;

var responseStream = await response.Content.ReadAsStreamAsync();

for (int i = 0; i < iters; ++i)
{
random.NextBytes(message);
await requestStream.WriteAsync(message).AsTask().WaitAsync(TimeSpan.FromSeconds(10));
await requestStream.FlushAsync();

int bytesRead = await responseStream.ReadAsync(readBuffer).AsTask().WaitAsync(TimeSpan.FromSeconds(10));
Assert.Equal(bytesRead, messageSize);
Assert.Equal(message, readBuffer[..bytesRead]);
}
// Send FIN
requestContent.CompleteStream();
// Receive FIN
Assert.Equal(0, await responseStream.ReadAsync(readBuffer).AsTask().WaitAsync(TimeSpan.FromSeconds(10)));

await serverTask.WaitAsync(TimeSpan.FromSeconds(60));

serverStream.Dispose();
Assert.NotNull(connection);
connection.Dispose();
}

public static TheoryData<HttpStatusCode, bool> StatusCodesTestData()
{
var statuses = Enum.GetValues(typeof(HttpStatusCode)).Cast<HttpStatusCode>().Where(s => s >= HttpStatusCode.OK); // exclude informational
Expand Down