Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
// ensure the enumerator already is stored
// in the WriteStack for proper disposal.
moveNextTask = enumerator.MoveNextAsync();

if (!moveNextTask.IsCompleted)
{
// It is common for first-time MoveNextAsync() calls to return pending tasks,
// since typically that is when underlying network connections are being established.
// For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of the written array)
// to give the stream owner the ability to recover in case of a connection error.
state.SuppressFlush = true;
goto SuspendDueToPendingTask;
}
}
else
{
Expand All @@ -81,10 +91,11 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
}
}

Debug.Assert(moveNextTask.IsCompleted);
JsonConverter<TElement> converter = GetElementConverter(ref state);

// iterate through the enumerator while elements are being returned synchronously
for (; moveNextTask.IsCompleted; moveNextTask = enumerator.MoveNextAsync())
do
{
if (!moveNextTask.Result)
{
Expand All @@ -105,8 +116,11 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
{
return false;
}
}

moveNextTask = enumerator.MoveNextAsync();
} while (moveNextTask.IsCompleted);

SuspendDueToPendingTask:
// we have a pending MoveNextAsync() call;
// wrap inside a regular task so that it can be awaited multiple times;
// mark the current stackframe as pending completion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public static partial class JsonSerializer
// needs to be expanded\doubled because it is not large enough to write the current property or element.
// We check for flush after each JSON property and element is written to the buffer.
// Once the buffer is expanded to contain the largest single element\property, a 90% thresold
// means the buffer may be expanded a maximum of 4 times: 1-(1\(2^4))==.9375.
private const float FlushThreshold = .9f;
// means the buffer may be expanded a maximum of 4 times: 1-(1/(2^4))==.9375.
private const float FlushThreshold = .90f;

/// <summary>
/// Converts the provided value to UTF-8 encoded JSON text and write it to the <see cref="System.IO.Stream"/>.
Expand Down Expand Up @@ -329,8 +329,18 @@ private static async Task WriteStreamAsync<TValue>(
try
{
isFinalBlock = WriteCore(converter, writer, value, options, ref state);
await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false);
bufferWriter.Clear();

if (state.SuppressFlush)
{
Debug.Assert(!isFinalBlock);
Debug.Assert(state.PendingTask is not null);
state.SuppressFlush = false;
}
else
{
await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false);
bufferWriter.Clear();
}
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ internal struct WriteStack
/// </summary>
public CancellationToken CancellationToken;

/// <summary>
/// In the case of async serialization, used by resumable converters to signal that
/// the current buffer contents should not be flushed to the underlying stream.
/// </summary>
public bool SuppressFlush;

/// <summary>
/// Stores a pending task that a resumable converter depends on to continue work.
/// It must be awaited by the root context before serialization is resumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,24 @@ static async IAsyncEnumerable<int> GetNumbersAsync()
}
}

[Fact]
public async Task RegressionTest_ExceptionOnFirstMoveNextShouldNotFlushBuffer()
{
// Regression test for https://github.com/dotnet/aspnetcore/issues/36977
using var stream = new MemoryStream();
await Assert.ThrowsAsync<NotImplementedException>(async () => await JsonSerializer.SerializeAsync(stream, new { Data = GetFailingAsyncEnumerable() }));
Assert.Equal(0, stream.Length);

static async IAsyncEnumerable<int> GetFailingAsyncEnumerable()
{
await Task.Yield();
throw new NotImplementedException();
#pragma warning disable CS0162 // Unreachable code detected
yield break;
#pragma warning restore CS0162 // Unreachable code detected
}
}

public class MockedAsyncEnumerable<TElement> : IAsyncEnumerable<TElement>, IEnumerable<TElement>
{
private readonly IEnumerable<TElement> _source;
Expand Down