diff --git a/src/Servers/Kestrel/Kestrel.slnf b/src/Servers/Kestrel/Kestrel.slnf index 14be661c19a9..c23b30b99bed 100644 --- a/src/Servers/Kestrel/Kestrel.slnf +++ b/src/Servers/Kestrel/Kestrel.slnf @@ -35,7 +35,8 @@ "src\\Servers\\Kestrel\\test\\Sockets.BindTests\\Sockets.BindTests.csproj", "src\\Servers\\Kestrel\\test\\Sockets.FunctionalTests\\Sockets.FunctionalTests.csproj", "src\\Servers\\Kestrel\\tools\\CodeGenerator\\CodeGenerator.csproj", - "src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj" + "src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj", + "src\\Testing\\src\\Microsoft.AspNetCore.Testing.csproj" ] } } \ No newline at end of file diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs b/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs index c0c6772be049..7465ece15f98 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs @@ -22,6 +22,7 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable private readonly SocketsTrace _trace; private readonly PipeOptions _inputOptions; private readonly PipeOptions _outputOptions; + private readonly SocketSenderPool _socketSenderPool; public SocketConnectionFactory(IOptions options, ILoggerFactory loggerFactory) { @@ -46,9 +47,12 @@ public SocketConnectionFactory(IOptions options, ILogger // These are the same, it's either the thread pool or inline var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool; var transportScheduler = applicationScheduler; + // https://github.com/aspnet/KestrelHttpServer/issues/2573 + var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; _inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false); _outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false); + _socketSenderPool = new SocketSenderPool(awaiterScheduler); } public async ValueTask ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default) @@ -72,6 +76,7 @@ public async ValueTask ConnectAsync(EndPoint endpoint, Cancel _memoryPool, _inputOptions.ReaderScheduler, // This is either threadpool or inline _trace, + _socketSenderPool, _inputOptions, _outputOptions, _options.WaitForDataBeforeAllocatingBuffer); diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs index 50b8805d6c44..5ed561b0ed6e 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { - internal sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion + internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion { private static readonly Action _callbackCompleted = () => { }; diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs index cf81d90e372b..4d8f6f27771b 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs @@ -20,7 +20,8 @@ internal sealed class SocketConnection : TransportConnection private readonly Socket _socket; private readonly ISocketsTrace _trace; private readonly SocketReceiver _receiver; - private readonly SocketSender _sender; + private SocketSender? _sender; + private readonly SocketSenderPool _socketSenderPool; private readonly IDuplexPipe _originalTransport; private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource(); @@ -36,6 +37,7 @@ internal SocketConnection(Socket socket, MemoryPool memoryPool, PipeScheduler transportScheduler, ISocketsTrace trace, + SocketSenderPool socketSenderPool, PipeOptions inputOptions, PipeOptions outputOptions, bool waitForData = true) @@ -48,6 +50,7 @@ internal SocketConnection(Socket socket, MemoryPool = memoryPool; _trace = trace; _waitForData = waitForData; + _socketSenderPool = socketSenderPool; LocalEndPoint = _socket.LocalEndPoint; RemoteEndPoint = _socket.RemoteEndPoint; @@ -59,8 +62,7 @@ internal SocketConnection(Socket socket, // https://github.com/aspnet/KestrelHttpServer/issues/2573 var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; - _receiver = new SocketReceiver(_socket, awaiterScheduler); - _sender = new SocketSender(_socket, awaiterScheduler); + _receiver = new SocketReceiver(awaiterScheduler); var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions); @@ -93,7 +95,7 @@ private async Task StartAsync() await sendTask; _receiver.Dispose(); - _sender.Dispose(); + _sender?.Dispose(); } catch (Exception ex) { @@ -183,13 +185,13 @@ private async Task ProcessReceives() if (_waitForData) { // Wait for data before allocating a buffer. - await _receiver.WaitForDataAsync(); + await _receiver.WaitForDataAsync(_socket); } // Ensure we have some reasonable amount of buffer space var buffer = input.GetMemory(MinAllocBufferSize); - var bytesReceived = await _receiver.ReceiveAsync(buffer); + var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer); if (bytesReceived == 0) { @@ -282,7 +284,12 @@ private async Task ProcessSends() var isCompleted = result.IsCompleted; if (!buffer.IsEmpty) { - await _sender.SendAsync(buffer); + _sender = _socketSenderPool.Rent(); + await _sender.SendAsync(_socket, buffer); + // We don't return to the pool if there was an exception, and + // we keep the _sender assigned so that we can dispose it in StartAsync. + _socketSenderPool.Return(_sender); + _sender = null; } output.AdvanceTo(end); diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs index c4b048fdf7af..91e1c8e5bd2d 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs @@ -7,34 +7,34 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { - internal sealed class SocketReceiver : SocketSenderReceiverBase + internal sealed class SocketReceiver : SocketAwaitableEventArgs { - public SocketReceiver(Socket socket, PipeScheduler scheduler) : base(socket, scheduler) + public SocketReceiver(PipeScheduler ioScheduler) : base(ioScheduler) { } - public SocketAwaitableEventArgs WaitForDataAsync() + public SocketAwaitableEventArgs WaitForDataAsync(Socket socket) { - _awaitableEventArgs.SetBuffer(Memory.Empty); + SetBuffer(Memory.Empty); - if (!_socket.ReceiveAsync(_awaitableEventArgs)) + if (!socket.ReceiveAsync(this)) { - _awaitableEventArgs.Complete(); + Complete(); } - return _awaitableEventArgs; + return this; } - public SocketAwaitableEventArgs ReceiveAsync(Memory buffer) + public SocketAwaitableEventArgs ReceiveAsync(Socket socket, Memory buffer) { - _awaitableEventArgs.SetBuffer(buffer); + SetBuffer(buffer); - if (!_socket.ReceiveAsync(_awaitableEventArgs)) + if (!socket.ReceiveAsync(this)) { - _awaitableEventArgs.Complete(); + Complete(); } - return _awaitableEventArgs; + return this; } } } diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs index 97cd0b685f77..666d71776087 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs @@ -11,55 +11,61 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { - internal sealed class SocketSender : SocketSenderReceiverBase + internal sealed class SocketSender : SocketAwaitableEventArgs { private List>? _bufferList; - public SocketSender(Socket socket, PipeScheduler scheduler) : base(socket, scheduler) + public SocketSender(PipeScheduler scheduler) : base(scheduler) { } - public SocketAwaitableEventArgs SendAsync(in ReadOnlySequence buffers) + public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence buffers) { if (buffers.IsSingleSegment) { - return SendAsync(buffers.First); + return SendAsync(socket, buffers.First); } - if (!_awaitableEventArgs.MemoryBuffer.Equals(Memory.Empty)) - { - _awaitableEventArgs.SetBuffer(null, 0, 0); - } - - _awaitableEventArgs.BufferList = GetBufferList(buffers); + SetBufferList(buffers); - if (!_socket.SendAsync(_awaitableEventArgs)) + if (!socket.SendAsync(this)) { - _awaitableEventArgs.Complete(); + Complete(); } - return _awaitableEventArgs; + return this; } - private SocketAwaitableEventArgs SendAsync(ReadOnlyMemory memory) + public void Reset() { - // The BufferList getter is much less expensive then the setter. - if (_awaitableEventArgs.BufferList != null) + // We clear the buffer and buffer list before we put it back into the pool + // it's a small performance hit but it removes the confusion when looking at dumps to see this still + // holds onto the buffer when it's back in the pool + if (BufferList != null) + { + BufferList = null; + + _bufferList?.Clear(); + } + else { - _awaitableEventArgs.BufferList = null; + SetBuffer(null, 0, 0); } + } - _awaitableEventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); + private SocketAwaitableEventArgs SendAsync(Socket socket, ReadOnlyMemory memory) + { + SetBuffer(MemoryMarshal.AsMemory(memory)); - if (!_socket.SendAsync(_awaitableEventArgs)) + if (!socket.SendAsync(this)) { - _awaitableEventArgs.Complete(); + Complete(); } - return _awaitableEventArgs; + return this; } - private List> GetBufferList(in ReadOnlySequence buffer) + private void SetBufferList(in ReadOnlySequence buffer) { Debug.Assert(!buffer.IsEmpty); Debug.Assert(!buffer.IsSingleSegment); @@ -68,18 +74,14 @@ private List> GetBufferList(in ReadOnlySequence buffer) { _bufferList = new List>(); } - else - { - // Buffers are pooled, so it's OK to root them until the next multi-buffer write. - _bufferList.Clear(); - } foreach (var b in buffer) { _bufferList.Add(b.GetArray()); } - return _bufferList; + // The act of setting this list, sets the buffers in the internal buffer list + BufferList = _bufferList; } } } diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs new file mode 100644 index 000000000000..558fe16480f2 --- /dev/null +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs @@ -0,0 +1,61 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.IO.Pipelines; +using System.Threading; + +namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal +{ + internal class SocketSenderPool : IDisposable + { + private const int MaxQueueSize = 1024; // REVIEW: Is this good enough? + + private readonly ConcurrentQueue _queue = new(); + private int _count; + private readonly PipeScheduler _scheduler; + private bool _disposed; + + public SocketSenderPool(PipeScheduler scheduler) + { + _scheduler = scheduler; + } + + public SocketSender Rent() + { + if (_queue.TryDequeue(out var sender)) + { + Interlocked.Decrement(ref _count); + return sender; + } + return new SocketSender(_scheduler); + } + + public void Return(SocketSender sender) + { + // This counting isn't accurate, but it's good enough for what we need to avoid using _queue.Count which could be expensive + if (_disposed || Interlocked.Increment(ref _count) > MaxQueueSize) + { + Interlocked.Decrement(ref _count); + sender.Dispose(); + return; + } + + sender.Reset(); + _queue.Enqueue(sender); + } + + public void Dispose() + { + if (!_disposed) + { + _disposed = true; + while (_queue.TryDequeue(out var sender)) + { + sender.Dispose(); + } + } + } + } +} diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderReceiverBase.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderReceiverBase.cs deleted file mode 100644 index b28c7d798976..000000000000 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderReceiverBase.cs +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO.Pipelines; -using System.Net.Sockets; - -namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal -{ - internal abstract class SocketSenderReceiverBase : IDisposable - { - protected readonly Socket _socket; - protected readonly SocketAwaitableEventArgs _awaitableEventArgs; - - protected SocketSenderReceiverBase(Socket socket, PipeScheduler scheduler) - { - _socket = socket; - _awaitableEventArgs = new SocketAwaitableEventArgs(scheduler); - } - - public void Dispose() => _awaitableEventArgs.Dispose(); - } -} diff --git a/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs b/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs index 0571ad4c8765..6b618e5d5f76 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs @@ -50,18 +50,23 @@ internal SocketConnectionListener( for (var i = 0; i < _settingsCount; i++) { var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : new IOQueue(); + // https://github.com/aspnet/KestrelHttpServer/issues/2573 + var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; _settings[i] = new Settings { Scheduler = transportScheduler, InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false), - OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false) + OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false), + SocketSenderPool = new SocketSenderPool(awaiterScheduler) }; } } else { var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool; + // https://github.com/aspnet/KestrelHttpServer/issues/2573 + var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; var directScheduler = new Settings[] { @@ -69,7 +74,8 @@ internal SocketConnectionListener( { Scheduler = transportScheduler, InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false), - OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false) + OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false), + SocketSenderPool = new SocketSenderPool(awaiterScheduler) } }; @@ -155,6 +161,7 @@ void BindSocket() _memoryPool, setting.Scheduler, _trace, + setting.SocketSenderPool, setting.InputOptions, setting.OutputOptions, waitForData: _options.WaitForDataBeforeAllocatingBuffer); @@ -199,6 +206,13 @@ public ValueTask DisposeAsync() // Dispose the memory pool _memoryPool.Dispose(); + + // Dispose any pooled senders + foreach (var setting in _settings) + { + setting.SocketSenderPool.Dispose(); + } + return default; } @@ -207,6 +221,7 @@ private class Settings public PipeScheduler Scheduler { get; init; } = default!; public PipeOptions InputOptions { get; init; } = default!; public PipeOptions OutputOptions { get; init; } = default!; + public SocketSenderPool SocketSenderPool { get; init; } = default!; } } }