Skip to content

Commit dc0b496

Browse files
committed
Pool SocketSenders
- SocketAsyncEventArgs have lots of state on them and as a result are quite big (~350) bytes at runtime. We can pool these since sends are usually very fast and we can reduce the per connection overhead as a result. - We also allocate one per IOQueue to reduce contention. - Fixed buffer list management - Disposed pool when the transport is disposed - Added project to slnf so running tests in VS was possible - Clear the buffer and buffer list before returning to the pool - This cleans up dumps as the pooled senders don't see references to buffers while pooled in the queue
1 parent 7e33542 commit dc0b496

File tree

9 files changed

+129
-72
lines changed

9 files changed

+129
-72
lines changed

src/Servers/Kestrel/Kestrel.slnf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
"src\\Servers\\Kestrel\\test\\Sockets.BindTests\\Sockets.BindTests.csproj",
3636
"src\\Servers\\Kestrel\\test\\Sockets.FunctionalTests\\Sockets.FunctionalTests.csproj",
3737
"src\\Servers\\Kestrel\\tools\\CodeGenerator\\CodeGenerator.csproj",
38-
"src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj"
38+
"src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj",
39+
"src\\Testing\\src\\Microsoft.AspNetCore.Testing.csproj"
3940
]
4041
}
4142
}

src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable
2222
private readonly SocketsTrace _trace;
2323
private readonly PipeOptions _inputOptions;
2424
private readonly PipeOptions _outputOptions;
25+
private readonly SocketSenderPool _socketSenderPool;
2526

2627
public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILoggerFactory loggerFactory)
2728
{
@@ -46,9 +47,11 @@ public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILogger
4647
// These are the same, it's either the thread pool or inline
4748
var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
4849
var transportScheduler = applicationScheduler;
50+
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;
4951

5052
_inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
5153
_outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
54+
_socketSenderPool = new SocketSenderPool(awaiterScheduler);
5255
}
5356

5457
public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default)
@@ -72,6 +75,7 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
7275
_memoryPool,
7376
_inputOptions.ReaderScheduler, // This is either threadpool or inline
7477
_trace,
78+
_socketSenderPool,
7579
_inputOptions,
7680
_outputOptions,
7781
_options.WaitForDataBeforeAllocatingBuffer);

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
1313
{
14-
internal sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
14+
internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
1515
{
1616
private static readonly Action _callbackCompleted = () => { };
1717

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ internal sealed class SocketConnection : TransportConnection
2020
private readonly Socket _socket;
2121
private readonly ISocketsTrace _trace;
2222
private readonly SocketReceiver _receiver;
23-
private readonly SocketSender _sender;
23+
private SocketSender? _sender;
24+
private readonly SocketSenderPool _socketSenderPool;
2425
private readonly IDuplexPipe _originalTransport;
2526
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();
2627

@@ -36,6 +37,7 @@ internal SocketConnection(Socket socket,
3637
MemoryPool<byte> memoryPool,
3738
PipeScheduler transportScheduler,
3839
ISocketsTrace trace,
40+
SocketSenderPool socketSenderPool,
3941
PipeOptions inputOptions,
4042
PipeOptions outputOptions,
4143
bool waitForData = true)
@@ -48,8 +50,9 @@ internal SocketConnection(Socket socket,
4850
MemoryPool = memoryPool;
4951
_trace = trace;
5052
_waitForData = waitForData;
53+
_socketSenderPool = socketSenderPool;
5154

52-
LocalEndPoint = _socket.LocalEndPoint;
55+
LocalEndPoint = _socket.LocalEndPoint;
5356
RemoteEndPoint = _socket.RemoteEndPoint;
5457

5558
ConnectionClosed = _connectionClosedTokenSource.Token;
@@ -59,8 +62,7 @@ internal SocketConnection(Socket socket,
5962
// https://github.com/aspnet/KestrelHttpServer/issues/2573
6063
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;
6164

62-
_receiver = new SocketReceiver(_socket, awaiterScheduler);
63-
_sender = new SocketSender(_socket, awaiterScheduler);
65+
_receiver = new SocketReceiver(awaiterScheduler);
6466

6567
var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);
6668

@@ -93,7 +95,7 @@ private async Task StartAsync()
9395
await sendTask;
9496

9597
_receiver.Dispose();
96-
_sender.Dispose();
98+
_sender?.Dispose();
9799
}
98100
catch (Exception ex)
99101
{
@@ -183,13 +185,13 @@ private async Task ProcessReceives()
183185
if (_waitForData)
184186
{
185187
// Wait for data before allocating a buffer.
186-
await _receiver.WaitForDataAsync();
188+
await _receiver.WaitForDataAsync(_socket);
187189
}
188190

189191
// Ensure we have some reasonable amount of buffer space
190192
var buffer = input.GetMemory(MinAllocBufferSize);
191193

192-
var bytesReceived = await _receiver.ReceiveAsync(buffer);
194+
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer);
193195

194196
if (bytesReceived == 0)
195197
{
@@ -282,7 +284,10 @@ private async Task ProcessSends()
282284
var isCompleted = result.IsCompleted;
283285
if (!buffer.IsEmpty)
284286
{
285-
await _sender.SendAsync(buffer);
287+
_sender = _socketSenderPool.Rent();
288+
await _sender.SendAsync(_socket, buffer);
289+
_socketSenderPool.Return(_sender);
290+
_sender = null;
286291
}
287292

288293
output.AdvanceTo(end);

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketReceiver.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,34 @@
77

88
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
99
{
10-
internal sealed class SocketReceiver : SocketSenderReceiverBase
10+
internal sealed class SocketReceiver : SocketAwaitableEventArgs
1111
{
12-
public SocketReceiver(Socket socket, PipeScheduler scheduler) : base(socket, scheduler)
12+
public SocketReceiver(PipeScheduler ioScheduler) : base(ioScheduler)
1313
{
1414
}
1515

16-
public SocketAwaitableEventArgs WaitForDataAsync()
16+
public SocketAwaitableEventArgs WaitForDataAsync(Socket socket)
1717
{
18-
_awaitableEventArgs.SetBuffer(Memory<byte>.Empty);
18+
SetBuffer(Memory<byte>.Empty);
1919

20-
if (!_socket.ReceiveAsync(_awaitableEventArgs))
20+
if (!socket.ReceiveAsync(this))
2121
{
22-
_awaitableEventArgs.Complete();
22+
Complete();
2323
}
2424

25-
return _awaitableEventArgs;
25+
return this;
2626
}
2727

28-
public SocketAwaitableEventArgs ReceiveAsync(Memory<byte> buffer)
28+
public SocketAwaitableEventArgs ReceiveAsync(Socket socket, Memory<byte> buffer)
2929
{
30-
_awaitableEventArgs.SetBuffer(buffer);
30+
SetBuffer(buffer);
3131

32-
if (!_socket.ReceiveAsync(_awaitableEventArgs))
32+
if (!socket.ReceiveAsync(this))
3333
{
34-
_awaitableEventArgs.Complete();
34+
Complete();
3535
}
3636

37-
return _awaitableEventArgs;
37+
return this;
3838
}
3939
}
4040
}

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,55 +11,56 @@
1111

1212
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
1313
{
14-
internal sealed class SocketSender : SocketSenderReceiverBase
14+
internal sealed class SocketSender : SocketAwaitableEventArgs
1515
{
1616
private List<ArraySegment<byte>>? _bufferList;
1717

18-
public SocketSender(Socket socket, PipeScheduler scheduler) : base(socket, scheduler)
18+
public SocketSender(PipeScheduler scheduler) : base(scheduler)
1919
{
2020
}
2121

22-
public SocketAwaitableEventArgs SendAsync(in ReadOnlySequence<byte> buffers)
22+
public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence<byte> buffers)
2323
{
2424
if (buffers.IsSingleSegment)
2525
{
26-
return SendAsync(buffers.First);
26+
return SendAsync(socket, buffers.First);
2727
}
2828

29-
if (!_awaitableEventArgs.MemoryBuffer.Equals(Memory<byte>.Empty))
30-
{
31-
_awaitableEventArgs.SetBuffer(null, 0, 0);
32-
}
33-
34-
_awaitableEventArgs.BufferList = GetBufferList(buffers);
29+
SetBufferList(buffers);
3530

36-
if (!_socket.SendAsync(_awaitableEventArgs))
31+
if (!socket.SendAsync(this))
3732
{
38-
_awaitableEventArgs.Complete();
33+
Complete();
3934
}
4035

41-
return _awaitableEventArgs;
36+
return this;
4237
}
4338

44-
private SocketAwaitableEventArgs SendAsync(ReadOnlyMemory<byte> memory)
39+
public void Reset()
4540
{
46-
// The BufferList getter is much less expensive then the setter.
47-
if (_awaitableEventArgs.BufferList != null)
48-
{
49-
_awaitableEventArgs.BufferList = null;
50-
}
41+
// We clear the buffer and buffer list before we put it back into the pool
42+
// it's a small performance hit but it removes the confusion when looking at dumps to see this still
43+
// holder onto the buffer when it's back in the pool
44+
BufferList = null;
5145

52-
_awaitableEventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
46+
SetBuffer(null, 0, 0);
47+
48+
_bufferList?.Clear();
49+
}
50+
51+
private SocketAwaitableEventArgs SendAsync(Socket socket, ReadOnlyMemory<byte> memory)
52+
{
53+
SetBuffer(MemoryMarshal.AsMemory(memory));
5354

54-
if (!_socket.SendAsync(_awaitableEventArgs))
55+
if (!socket.SendAsync(this))
5556
{
56-
_awaitableEventArgs.Complete();
57+
Complete();
5758
}
5859

59-
return _awaitableEventArgs;
60+
return this;
6061
}
6162

62-
private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer)
63+
private void SetBufferList(in ReadOnlySequence<byte> buffer)
6364
{
6465
Debug.Assert(!buffer.IsEmpty);
6566
Debug.Assert(!buffer.IsSingleSegment);
@@ -79,7 +80,8 @@ private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer)
7980
_bufferList.Add(b.GetArray());
8081
}
8182

82-
return _bufferList;
83+
// The act of setting this list, sets the buffers in the internal buffer list
84+
BufferList = _bufferList;
8385
}
8486
}
8587
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.IO.Pipelines;
4+
5+
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
6+
{
7+
internal class SocketSenderPool : IDisposable
8+
{
9+
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?
10+
11+
private readonly ConcurrentQueue<SocketSender> _queue = new();
12+
private readonly PipeScheduler _scheduler;
13+
private bool _disposed;
14+
15+
public SocketSenderPool(PipeScheduler scheduler)
16+
{
17+
_scheduler = scheduler;
18+
}
19+
20+
public SocketSender Rent()
21+
{
22+
if (_queue.TryDequeue(out var sender))
23+
{
24+
return sender;
25+
}
26+
return new SocketSender(_scheduler);
27+
}
28+
29+
public void Return(SocketSender sender)
30+
{
31+
if (_disposed || _queue.Count > MaxQueueSize)
32+
{
33+
sender.Dispose();
34+
return;
35+
}
36+
37+
sender.Reset();
38+
39+
_queue.Enqueue(sender);
40+
}
41+
42+
public void Dispose()
43+
{
44+
if (!_disposed)
45+
{
46+
_disposed = true;
47+
while (_queue.TryDequeue(out var sender))
48+
{
49+
sender.Dispose();
50+
}
51+
}
52+
}
53+
54+
}
55+
}

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderReceiverBase.cs

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)