From 47ef4b1bf9a3751266bb45edee9c2b8fed8f9f3d Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 27 Mar 2021 02:23:58 -0700 Subject: [PATCH 1/3] Reduce the per connection overhead in SocketConnection - Removed 3 state machines (StartAsync, ProcessSends and ProcessReceives) - Use ValueTask to remove delegate allocation on both senders and receivers - Remove field from DoSend and DoReceive state machine --- .../src/Internal/SocketAwaitableEventArgs.cs | 63 +++--- .../src/Internal/SocketConnection.cs | 205 +++++++++--------- .../src/Internal/SocketReceiver.cs | 27 ++- .../src/Internal/SocketSender.cs | 27 ++- 4 files changed, 168 insertions(+), 154 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs index 5ed561b0ed6e..98cf634de588 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs @@ -2,22 +2,20 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Diagnostics; using System.IO.Pipelines; using System.Net.Sockets; -using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Tasks; +using System.Threading.Tasks.Sources; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { - internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion + internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, IValueTaskSource { - private static readonly Action _callbackCompleted = () => { }; + private static readonly Action _continuationCompleted = _ => { }; private readonly PipeScheduler _ioScheduler; - private Action? _callback; + private Action? _continuation; public SocketAwaitableEventArgs(PipeScheduler ioScheduler) : base(unsafeSuppressExecutionContextFlow: true) @@ -25,14 +23,23 @@ public SocketAwaitableEventArgs(PipeScheduler ioScheduler) _ioScheduler = ioScheduler; } - public SocketAwaitableEventArgs GetAwaiter() => this; - public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); - - public int GetResult() + protected override void OnCompleted(SocketAsyncEventArgs _) { - Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); + var c = _continuation; + + if (c != null || (c = Interlocked.CompareExchange(ref _continuation, _continuationCompleted, null)) != null) + { + var continuationState = UserToken; + UserToken = null; + _continuation = _continuationCompleted; // in case someone's polling IsCompleted - _callback = null; + _ioScheduler.Schedule(c, continuationState); + } + } + + public int GetResult(short token) + { + _continuation = null; if (SocketError != SocketError.Success) { @@ -43,36 +50,30 @@ public int GetResult() static void ThrowSocketException(SocketError e) { - throw new SocketException((int)e); + throw CreateException(e); } } - public void OnCompleted(Action continuation) + protected static SocketException CreateException(SocketError e) { - if (ReferenceEquals(_callback, _callbackCompleted) || - ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) - { - Task.Run(continuation); - } + return new SocketException((int)e); } - public void UnsafeOnCompleted(Action continuation) + public ValueTaskSourceStatus GetStatus(short token) { - OnCompleted(continuation); + return !ReferenceEquals(_continuation, _continuationCompleted) ? ValueTaskSourceStatus.Pending : + SocketError == SocketError.Success ? ValueTaskSourceStatus.Succeeded : + ValueTaskSourceStatus.Faulted; } - public void Complete() + public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) { - OnCompleted(this); - } - - protected override void OnCompleted(SocketAsyncEventArgs _) - { - var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); - - if (continuation != null) + UserToken = state; + var prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (ReferenceEquals(prevContinuation, _continuationCompleted)) { - _ioScheduler.Schedule(state => ((Action)state!)(), continuation); + UserToken = null; + ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); } } } diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs index 4d8f6f27771b..57adcffb6505 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs @@ -28,7 +28,8 @@ internal sealed class SocketConnection : TransportConnection private readonly object _shutdownLock = new object(); private volatile bool _socketDisposed; private volatile Exception? _shutdownReason; - private Task? _processingTask; + private Task? _sendingTask; + private Task? _receivingTask; private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource(); private bool _connectionClosed; private readonly bool _waitForData; @@ -78,28 +79,16 @@ internal SocketConnection(Socket socket, public override MemoryPool MemoryPool { get; } public void Start() - { - _processingTask = StartAsync(); - } - - private async Task StartAsync() { try { // Spawn send and receive logic - var receiveTask = DoReceive(); - var sendTask = DoSend(); - - // Now wait for both to complete - await receiveTask; - await sendTask; - - _receiver.Dispose(); - _sender?.Dispose(); + _receivingTask = DoReceive(); + _sendingTask = DoSend(); } catch (Exception ex) { - _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}."); + _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}."); } } @@ -118,9 +107,28 @@ public override async ValueTask DisposeAsync() _originalTransport.Input.Complete(); _originalTransport.Output.Complete(); - if (_processingTask != null) + try + { + // Now wait for both to complete + if (_receivingTask != null) + { + await _receivingTask; + } + + if (_sendingTask != null) + { + await _sendingTask; + } + + } + catch (Exception ex) + { + _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}."); + } + finally { - await _processingTask; + _receiver.Dispose(); + _sender?.Dispose(); } _connectionClosedTokenSource.Dispose(); @@ -129,10 +137,53 @@ public override async ValueTask DisposeAsync() private async Task DoReceive() { Exception? error = null; - + try { - await ProcessReceives(); + while (true) + { + if (_waitForData) + { + // Wait for data before allocating a buffer. + await _receiver.WaitForDataAsync(_socket); + } + + // Ensure we have some reasonable amount of buffer space + var buffer = Input.GetMemory(MinAllocBufferSize); + + var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer); + + if (bytesReceived == 0) + { + // FIN + _trace.ConnectionReadFin(ConnectionId); + break; + } + + Input.Advance(bytesReceived); + + var flushTask = Input.FlushAsync(); + + var paused = !flushTask.IsCompleted; + + if (paused) + { + _trace.ConnectionPause(ConnectionId); + } + + var result = await flushTask; + + if (paused) + { + _trace.ConnectionResume(ConnectionId); + } + + if (result.IsCompleted || result.IsCanceled) + { + // Pipe consumer is shut down, do we stop writing + break; + } + } } catch (SocketException ex) when (IsConnectionResetError(ex.SocketErrorCode)) { @@ -176,56 +227,6 @@ private async Task DoReceive() } } - private async Task ProcessReceives() - { - // Resolve `input` PipeWriter via the IDuplexPipe interface prior to loop start for performance. - var input = Input; - while (true) - { - if (_waitForData) - { - // Wait for data before allocating a buffer. - await _receiver.WaitForDataAsync(_socket); - } - - // Ensure we have some reasonable amount of buffer space - var buffer = input.GetMemory(MinAllocBufferSize); - - var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer); - - if (bytesReceived == 0) - { - // FIN - _trace.ConnectionReadFin(ConnectionId); - break; - } - - input.Advance(bytesReceived); - - var flushTask = input.FlushAsync(); - - var paused = !flushTask.IsCompleted; - - if (paused) - { - _trace.ConnectionPause(ConnectionId); - } - - var result = await flushTask; - - if (paused) - { - _trace.ConnectionResume(ConnectionId); - } - - if (result.IsCompleted || result.IsCanceled) - { - // Pipe consumer is shut down, do we stop writing - break; - } - } - } - private async Task DoSend() { Exception? shutdownReason = null; @@ -233,7 +234,33 @@ private async Task DoSend() try { - await ProcessSends(); + while (true) + { + var result = await Output.ReadAsync(); + + if (result.IsCanceled) + { + break; + } + var buffer = result.Buffer; + + if (!buffer.IsEmpty) + { + _sender = _socketSenderPool.Rent(); + await _sender.SendAsync(_socket, buffer); + // We don't return to the pool if there was an exception, and + // we keep the _sender assigned so that we can dispose it in StartAsync. + _socketSenderPool.Return(_sender); + _sender = null; + } + + Output.AdvanceTo(buffer.End); + + if (result.IsCompleted) + { + break; + } + } } catch (SocketException ex) when (IsConnectionResetError(ex.SocketErrorCode)) { @@ -265,42 +292,6 @@ private async Task DoSend() } } - private async Task ProcessSends() - { - // Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance. - var output = Output; - while (true) - { - var result = await output.ReadAsync(); - - if (result.IsCanceled) - { - break; - } - - var buffer = result.Buffer; - - var end = buffer.End; - var isCompleted = result.IsCompleted; - if (!buffer.IsEmpty) - { - _sender = _socketSenderPool.Rent(); - await _sender.SendAsync(_socket, buffer); - // We don't return to the pool if there was an exception, and - // we keep the _sender assigned so that we can dispose it in StartAsync. - _socketSenderPool.Return(_sender); - _sender = null; - } - - output.AdvanceTo(end); - - if (isCompleted) - { - break; - } - } - } - private void FireConnectionClosed() { // Guard against scheduling this multiple times diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs index 91e1c8e5bd2d..8aed520fddfa 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs @@ -4,6 +4,7 @@ using System; using System.IO.Pipelines; using System.Net.Sockets; +using System.Threading.Tasks; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { @@ -13,28 +14,38 @@ public SocketReceiver(PipeScheduler ioScheduler) : base(ioScheduler) { } - public SocketAwaitableEventArgs WaitForDataAsync(Socket socket) + public ValueTask WaitForDataAsync(Socket socket) { SetBuffer(Memory.Empty); - if (!socket.ReceiveAsync(this)) + if (socket.ReceiveAsync(this)) { - Complete(); + return new ValueTask(this, 0); } - return this; + var bytesTransferred = BytesTransferred; + var error = SocketError; + + return error == SocketError.Success ? + new ValueTask(bytesTransferred) : + ValueTask.FromException(CreateException(error)); } - public SocketAwaitableEventArgs ReceiveAsync(Socket socket, Memory buffer) + public ValueTask ReceiveAsync(Socket socket, Memory buffer) { SetBuffer(buffer); - if (!socket.ReceiveAsync(this)) + if (socket.ReceiveAsync(this)) { - Complete(); + return new ValueTask(this, 0); } - return this; + var bytesTransferred = BytesTransferred; + var error = SocketError; + + return error == SocketError.Success ? + new ValueTask(bytesTransferred) : + ValueTask.FromException(CreateException(error)); } } } diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs index 666d71776087..dc15e051229e 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs @@ -8,6 +8,7 @@ using System.IO.Pipelines; using System.Net.Sockets; using System.Runtime.InteropServices; +using System.Threading.Tasks; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { @@ -19,7 +20,7 @@ public SocketSender(PipeScheduler scheduler) : base(scheduler) { } - public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence buffers) + public ValueTask SendAsync(Socket socket, in ReadOnlySequence buffers) { if (buffers.IsSingleSegment) { @@ -28,12 +29,17 @@ public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence(this, 0); } - return this; + var bytesTransferred = BytesTransferred; + var error = SocketError; + + return error == SocketError.Success ? + new ValueTask(bytesTransferred) : + ValueTask.FromException(CreateException(error)); } public void Reset() @@ -53,16 +59,21 @@ public void Reset() } } - private SocketAwaitableEventArgs SendAsync(Socket socket, ReadOnlyMemory memory) + private ValueTask SendAsync(Socket socket, ReadOnlyMemory memory) { SetBuffer(MemoryMarshal.AsMemory(memory)); - if (!socket.SendAsync(this)) + if (socket.SendAsync(this)) { - Complete(); + return new ValueTask(this, 0); } - return this; + var bytesTransferred = BytesTransferred; + var error = SocketError; + + return error == SocketError.Success ? + new ValueTask(bytesTransferred) : + ValueTask.FromException(CreateException(error)); } private void SetBufferList(in ReadOnlySequence buffer) From c8ced2455f812f28be5b952d346cfbc6901c5dd2 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 27 Mar 2021 08:52:24 -0700 Subject: [PATCH 2/3] Remove space --- .../Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs index 57adcffb6505..d72ccc50f7a2 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs @@ -137,7 +137,7 @@ public override async ValueTask DisposeAsync() private async Task DoReceive() { Exception? error = null; - + try { while (true) From 0e59dc3b99234ebde65e7fce6040d8f17696b7f3 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 27 Mar 2021 09:20:39 -0700 Subject: [PATCH 3/3] Added a comment about the custom SAEA impl --- .../src/Internal/SocketAwaitableEventArgs.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs index 98cf634de588..120a4f837e78 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs @@ -9,6 +9,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { + // A slimmed down version of https://github.com/dotnet/runtime/blob/82ca681cbac89d813a3ce397e0c665e6c051ed67/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs#L798 that + // 1. Doesn't support any custom scheduling other than the PipeScheduler (no sync context, no task scheduler) + // 2. Doesn't do ValueTask validation using the token + // 3. Doesn't support usage outside of async/await (doesn't try to capture and restore the execution context) + // 4. Doesn't use cancellation tokens internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, IValueTaskSource { private static readonly Action _continuationCompleted = _ => { };