From e9d154c94a8a70709508bb5a00072dbb4d6c756d Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 25 Jan 2023 17:25:09 +0800 Subject: [PATCH 1/5] Multiple accept loops in named pipes transport --- .../Internal/NamedPipeConnectionListener.cs | 25 +++++--- .../src/NamedPipeTransportOptions.cs | 5 ++ .../src/PublicAPI.Unshipped.txt | 2 + .../test/NamedPipeConnectionListenerTests.cs | 57 +++++++++++++++++++ 4 files changed, 80 insertions(+), 9 deletions(-) diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index 5f05a46300ca..3f85b0de00b6 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -26,7 +26,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener private readonly PipeOptions _inputOptions; private readonly PipeOptions _outputOptions; private readonly Mutex _mutex; - private Task? _listeningTask; + private Task[]? _listeningTasks; private int _disposed; public NamedPipeConnectionListener( @@ -45,7 +45,7 @@ public NamedPipeConnectionListener( // The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection. // We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream // when AcceptAsync is called, so that the server is always the owner of the pipe name. - _acceptedQueue = Channel.CreateBounded(new BoundedChannelOptions(capacity: 1) { SingleWriter = true }); + _acceptedQueue = Channel.CreateBounded(new BoundedChannelOptions(capacity: 1)); var maxReadBufferSize = _options.MaxReadBufferSize ?? 0; var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0; @@ -56,17 +56,22 @@ public NamedPipeConnectionListener( public void Start() { - Debug.Assert(_listeningTask == null, "Already started"); + Debug.Assert(_listeningTasks == null, "Already started"); - // Start first stream inline to catch creation errors. - var initialStream = CreateServerStream(); + _listeningTasks = new Task[_options.AcceptQueueCount]; - _listeningTask = StartAsync(initialStream); + for (var i = 0; i < _listeningTasks.Length; i++) + { + // Start first stream inline to catch creation errors. + var initialStream = CreateServerStream(); + + _listeningTasks[i] = StartAsync(initialStream, i); + } } public EndPoint EndPoint => _endpoint; - private async Task StartAsync(NamedPipeServerStream nextStream) + private async Task StartAsync(NamedPipeServerStream nextStream, int index) { try { @@ -78,6 +83,8 @@ private async Task StartAsync(NamedPipeServerStream nextStream) await stream.WaitForConnectionAsync(_listeningToken); + _log.LogInformation("Connection accepted on " + index); + var connection = new NamedPipeConnection(stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions); connection.Start(); @@ -182,9 +189,9 @@ public async ValueTask DisposeAsync() _listeningTokenSource.Dispose(); _mutex.Dispose(); - if (_listeningTask != null) + if (_listeningTasks != null) { - await _listeningTask; + await Task.WhenAll(_listeningTasks); } } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs index 824c7ba74357..8c6e509988fa 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs @@ -11,6 +11,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes; /// public sealed class NamedPipeTransportOptions { + /// + /// + /// + public int AcceptQueueCount { get; set; } = Math.Clamp(Environment.ProcessorCount, 1, 16); + /// /// Gets or sets the maximum unconsumed incoming bytes the transport will buffer. /// diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt b/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt index 74e0993c7a85..5657b929f27e 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt @@ -1,6 +1,8 @@ #nullable enable Microsoft.AspNetCore.Hosting.WebHostBuilderNamedPipeExtensions Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions +Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.AcceptQueueCount.get -> int +Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.AcceptQueueCount.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.get -> bool Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.get -> long? diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index f0ba91815179..b1c00396fc55 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -68,6 +68,63 @@ public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog() Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted"); } + [Fact] + public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyAccepted() + { + // Arrange + const int ParallelCount = 10; + const int ParallelCallCount = 5000; + const int TotalCallCount = ParallelCount * ParallelCallCount; + + var currentCallCount = 0; + var options = new NamedPipeTransportOptions(); + options.AcceptQueueCount = 16; + await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options); + + // Act + var serverTask = Task.Run(async () => + { + while (currentCallCount < TotalCallCount) + { + var connection = await connectionListener.AcceptAsync().DefaultTimeout(); + await connection.DisposeAsync(); + + currentCallCount++; + + Logger.LogInformation($"Server accepted {currentCallCount} calls."); + } + + Logger.LogInformation($"Server task complete."); + }); + + var parallelTasks = new List(); + for (var i = 0; i < ParallelCount; i++) + { + parallelTasks.Add(Task.Run(async () => + { + var clientStreamCount = 0; + while (clientStreamCount < ParallelCallCount) + { + try + { + var clientStream = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint); + await clientStream.ConnectAsync(); + + await clientStream.WriteAsync(new byte[1]); + await clientStream.ReadAsync(new byte[1]); + clientStreamCount++; + } + catch (IOException ex) + { + Logger.LogInformation(ex, "Client exception."); + } + } + })); + } + + await serverTask.DefaultTimeout(); + } + [ConditionalFact] [OSSkipCondition(OperatingSystems.Linux | OperatingSystems.MacOSX, SkipReason = "Non-OS implementations use UDS with an unlimited accept limit.")] public async Task AcceptAsync_HitBacklogLimit_ClientConnectionsSuccessfullyAccepted() From 8bbe2fa6b0afbefcb5e6648a71201847a1233443 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 25 Jan 2023 20:01:50 +0800 Subject: [PATCH 2/5] Update --- .../src/Internal/NamedPipeConnectionListener.cs | 2 +- .../Transport.NamedPipes/src/NamedPipeTransportOptions.cs | 2 +- .../test/NamedPipeConnectionListenerTests.cs | 7 +++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index 3f85b0de00b6..ae6a15bee57e 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -65,7 +65,7 @@ public void Start() // Start first stream inline to catch creation errors. var initialStream = CreateServerStream(); - _listeningTasks[i] = StartAsync(initialStream, i); + _listeningTasks[i] = Task.Run(() => StartAsync(initialStream, i)); } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs index 8c6e509988fa..53957af07fb4 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs @@ -14,7 +14,7 @@ public sealed class NamedPipeTransportOptions /// /// /// - public int AcceptQueueCount { get; set; } = Math.Clamp(Environment.ProcessorCount, 1, 16); + public int AcceptQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16); /// /// Gets or sets the maximum unconsumed incoming bytes the transport will buffer. diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index b1c00396fc55..1c33e08a71c7 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -78,7 +78,7 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA var currentCallCount = 0; var options = new NamedPipeTransportOptions(); - options.AcceptQueueCount = 16; + options.AcceptQueueCount = 1; await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options); // Act @@ -86,8 +86,7 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA { while (currentCallCount < TotalCallCount) { - var connection = await connectionListener.AcceptAsync().DefaultTimeout(); - await connection.DisposeAsync(); + _ = await connectionListener.AcceptAsync(); currentCallCount++; @@ -111,7 +110,7 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA await clientStream.ConnectAsync(); await clientStream.WriteAsync(new byte[1]); - await clientStream.ReadAsync(new byte[1]); + await clientStream.DisposeAsync(); clientStreamCount++; } catch (IOException ex) From a4b467e516b2b7069ca73f9fd01353caf991cba9 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 25 Jan 2023 20:52:16 +0800 Subject: [PATCH 3/5] Clean up --- .../src/Internal/NamedPipeConnectionListener.cs | 6 ++---- .../test/NamedPipeConnectionListenerTests.cs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index ae6a15bee57e..c766b2d856eb 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -65,13 +65,13 @@ public void Start() // Start first stream inline to catch creation errors. var initialStream = CreateServerStream(); - _listeningTasks[i] = Task.Run(() => StartAsync(initialStream, i)); + _listeningTasks[i] = Task.Run(() => StartAsync(initialStream)); } } public EndPoint EndPoint => _endpoint; - private async Task StartAsync(NamedPipeServerStream nextStream, int index) + private async Task StartAsync(NamedPipeServerStream nextStream) { try { @@ -83,8 +83,6 @@ private async Task StartAsync(NamedPipeServerStream nextStream, int index) await stream.WaitForConnectionAsync(_listeningToken); - _log.LogInformation("Connection accepted on " + index); - var connection = new NamedPipeConnection(stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions); connection.Start(); diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index 1c33e08a71c7..95da6c85ef7b 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -78,7 +78,7 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA var currentCallCount = 0; var options = new NamedPipeTransportOptions(); - options.AcceptQueueCount = 1; + options.AcceptQueueCount = 16; await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options); // Act From 28a46b745fd566554e738ea1255ed700b9dda592 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 25 Jan 2023 21:29:38 +0800 Subject: [PATCH 4/5] Update --- .../src/Internal/NamedPipeConnectionListener.cs | 2 +- .../Transport.NamedPipes/src/NamedPipeTransportOptions.cs | 7 +++++-- .../Transport.NamedPipes/src/PublicAPI.Unshipped.txt | 4 ++-- .../test/NamedPipeConnectionListenerTests.cs | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index c766b2d856eb..928aeea09030 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -58,7 +58,7 @@ public void Start() { Debug.Assert(_listeningTasks == null, "Already started"); - _listeningTasks = new Task[_options.AcceptQueueCount]; + _listeningTasks = new Task[_options.ListenerQueueCount]; for (var i = 0; i < _listeningTasks.Length; i++) { diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs index 53957af07fb4..612ef6256b68 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs @@ -12,9 +12,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes; public sealed class NamedPipeTransportOptions { /// - /// + /// The number of listener queues used to accept name pipe connections. /// - public int AcceptQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16); + /// + /// Defaults to rounded down and clamped between 1 and 16. + /// + public int ListenerQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16); /// /// Gets or sets the maximum unconsumed incoming bytes the transport will buffer. diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt b/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt index 5657b929f27e..95024ad9a9e5 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt @@ -1,10 +1,10 @@ #nullable enable Microsoft.AspNetCore.Hosting.WebHostBuilderNamedPipeExtensions Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions -Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.AcceptQueueCount.get -> int -Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.AcceptQueueCount.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.get -> bool Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.set -> void +Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.get -> int +Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.get -> long? Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxWriteBufferSize.get -> long? diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index 95da6c85ef7b..262c142fd23f 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -78,7 +78,7 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA var currentCallCount = 0; var options = new NamedPipeTransportOptions(); - options.AcceptQueueCount = 16; + options.ListenerQueueCount = 16; await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options); // Act From d8ed70a6471a6917767d671b1907ac06aeb9e497 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Mon, 6 Feb 2023 09:31:25 +0800 Subject: [PATCH 5/5] PR feedback --- .../test/NamedPipeConnectionListenerTests.cs | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index 262c142fd23f..4c2540f81f13 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -68,17 +68,20 @@ public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog() Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted"); } - [Fact] - public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyAccepted() + [Theory] + [InlineData(1)] + [InlineData(4)] + [InlineData(16)] + public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyAccepted(int listenerQueueCount) { // Arrange const int ParallelCount = 10; - const int ParallelCallCount = 5000; + const int ParallelCallCount = 250; const int TotalCallCount = ParallelCount * ParallelCallCount; var currentCallCount = 0; var options = new NamedPipeTransportOptions(); - options.ListenerQueueCount = 16; + options.ListenerQueueCount = listenerQueueCount; await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options); // Act @@ -96,6 +99,7 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA Logger.LogInformation($"Server task complete."); }); + var cts = new CancellationTokenSource(); var parallelTasks = new List(); for (var i = 0; i < ParallelCount; i++) { @@ -107,9 +111,9 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA try { var clientStream = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint); - await clientStream.ConnectAsync(); + await clientStream.ConnectAsync(cts.Token); - await clientStream.WriteAsync(new byte[1]); + await clientStream.WriteAsync(new byte[1], cts.Token); await clientStream.DisposeAsync(); clientStreamCount++; } @@ -117,11 +121,18 @@ public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyA { Logger.LogInformation(ex, "Client exception."); } + catch (OperationCanceledException) + { + break; + } } })); } await serverTask.DefaultTimeout(); + + cts.Cancel(); + await Task.WhenAll(parallelTasks).DefaultTimeout(); } [ConditionalFact]