diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index 5f05a46300ca..928aeea09030 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -26,7 +26,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener private readonly PipeOptions _inputOptions; private readonly PipeOptions _outputOptions; private readonly Mutex _mutex; - private Task? _listeningTask; + private Task[]? _listeningTasks; private int _disposed; public NamedPipeConnectionListener( @@ -45,7 +45,7 @@ public NamedPipeConnectionListener( // The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection. // We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream // when AcceptAsync is called, so that the server is always the owner of the pipe name. - _acceptedQueue = Channel.CreateBounded(new BoundedChannelOptions(capacity: 1) { SingleWriter = true }); + _acceptedQueue = Channel.CreateBounded(new BoundedChannelOptions(capacity: 1)); var maxReadBufferSize = _options.MaxReadBufferSize ?? 0; var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0; @@ -56,12 +56,17 @@ public NamedPipeConnectionListener( public void Start() { - Debug.Assert(_listeningTask == null, "Already started"); + Debug.Assert(_listeningTasks == null, "Already started"); - // Start first stream inline to catch creation errors. - var initialStream = CreateServerStream(); + _listeningTasks = new Task[_options.ListenerQueueCount]; - _listeningTask = StartAsync(initialStream); + for (var i = 0; i < _listeningTasks.Length; i++) + { + // Start first stream inline to catch creation errors. + var initialStream = CreateServerStream(); + + _listeningTasks[i] = Task.Run(() => StartAsync(initialStream)); + } } public EndPoint EndPoint => _endpoint; @@ -182,9 +187,9 @@ public async ValueTask DisposeAsync() _listeningTokenSource.Dispose(); _mutex.Dispose(); - if (_listeningTask != null) + if (_listeningTasks != null) { - await _listeningTask; + await Task.WhenAll(_listeningTasks); } } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs index 824c7ba74357..612ef6256b68 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs @@ -11,6 +11,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes; /// public sealed class NamedPipeTransportOptions { + /// + /// The number of listener queues used to accept name pipe connections. + /// + /// + /// Defaults to rounded down and clamped between 1 and 16. + /// + public int ListenerQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16); + /// /// Gets or sets the maximum unconsumed incoming bytes the transport will buffer. /// diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt b/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt index 74e0993c7a85..95024ad9a9e5 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt @@ -3,6 +3,8 @@ Microsoft.AspNetCore.Hosting.WebHostBuilderNamedPipeExtensions Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.get -> bool Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.set -> void +Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.get -> int +Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.get -> long? Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.set -> void Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxWriteBufferSize.get -> long? diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index f0ba91815179..4c2540f81f13 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -68,6 +68,73 @@ public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog() Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted"); } + [Theory] + [InlineData(1)] + [InlineData(4)] + [InlineData(16)] + public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyAccepted(int listenerQueueCount) + { + // Arrange + const int ParallelCount = 10; + const int ParallelCallCount = 250; + const int TotalCallCount = ParallelCount * ParallelCallCount; + + var currentCallCount = 0; + var options = new NamedPipeTransportOptions(); + options.ListenerQueueCount = listenerQueueCount; + await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options); + + // Act + var serverTask = Task.Run(async () => + { + while (currentCallCount < TotalCallCount) + { + _ = await connectionListener.AcceptAsync(); + + currentCallCount++; + + Logger.LogInformation($"Server accepted {currentCallCount} calls."); + } + + Logger.LogInformation($"Server task complete."); + }); + + var cts = new CancellationTokenSource(); + var parallelTasks = new List(); + for (var i = 0; i < ParallelCount; i++) + { + parallelTasks.Add(Task.Run(async () => + { + var clientStreamCount = 0; + while (clientStreamCount < ParallelCallCount) + { + try + { + var clientStream = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint); + await clientStream.ConnectAsync(cts.Token); + + await clientStream.WriteAsync(new byte[1], cts.Token); + await clientStream.DisposeAsync(); + clientStreamCount++; + } + catch (IOException ex) + { + Logger.LogInformation(ex, "Client exception."); + } + catch (OperationCanceledException) + { + break; + } + } + })); + } + + await serverTask.DefaultTimeout(); + + cts.Cancel(); + await Task.WhenAll(parallelTasks).DefaultTimeout(); + } + [ConditionalFact] [OSSkipCondition(OperatingSystems.Linux | OperatingSystems.MacOSX, SkipReason = "Non-OS implementations use UDS with an unlimited accept limit.")] public async Task AcceptAsync_HitBacklogLimit_ClientConnectionsSuccessfullyAccepted()