2
2
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3
3
4
4
using System ;
5
+ using System . Diagnostics ;
5
6
using System . Net . Quic ;
6
7
using System . Threading ;
7
8
using System . Threading . Tasks ;
@@ -14,6 +15,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
14
15
{
15
16
internal class QuicConnectionContext : TransportMultiplexedConnection , IProtocolErrorCodeFeature
16
17
{
18
+ // Internal for testing.
19
+ internal QuicStreamStack StreamPool ;
20
+
21
+ private bool _streamPoolHeartbeatInitialized ;
22
+ // Ticks updated once per-second in heartbeat event.
23
+ private long _heartbeatTicks ;
24
+ private readonly object _poolLock = new object ( ) ;
25
+
17
26
private readonly QuicConnection _connection ;
18
27
private readonly QuicTransportContext _context ;
19
28
private readonly IQuicTrace _log ;
@@ -23,6 +32,10 @@ internal class QuicConnectionContext : TransportMultiplexedConnection, IProtocol
23
32
24
33
public long Error { get ; set ; }
25
34
35
+ internal const int InitialStreamPoolSize = 5 ;
36
+ internal const int MaxStreamPoolSize = 100 ;
37
+ internal const long StreamPoolExpiryTicks = TimeSpan . TicksPerSecond * 5 ;
38
+
26
39
public QuicConnectionContext ( QuicConnection connection , QuicTransportContext context )
27
40
{
28
41
_log = context . Log ;
@@ -31,6 +44,8 @@ public QuicConnectionContext(QuicConnection connection, QuicTransportContext con
31
44
ConnectionClosed = _connectionClosedTokenSource . Token ;
32
45
Features . Set < ITlsConnectionFeature > ( new FakeTlsConnectionFeature ( ) ) ;
33
46
Features . Set < IProtocolErrorCodeFeature > ( this ) ;
47
+
48
+ StreamPool = new QuicStreamStack ( InitialStreamPoolSize ) ;
34
49
}
35
50
36
51
public override async ValueTask DisposeAsync ( )
@@ -62,7 +77,25 @@ public override void Abort(ConnectionAbortedException abortReason)
62
77
try
63
78
{
64
79
var stream = await _connection . AcceptStreamAsync ( cancellationToken ) ;
65
- var context = new QuicStreamContext ( stream , this , _context ) ;
80
+
81
+ QuicStreamContext ? context = null ;
82
+
83
+ // Only use pool for bidirectional streams. Just a handful of unidirecitonal
84
+ // streams are created for a connection and they live for the lifetime of the connection.
85
+ if ( stream . CanRead && stream . CanWrite )
86
+ {
87
+ lock ( _poolLock )
88
+ {
89
+ StreamPool . TryPop ( out context ) ;
90
+ }
91
+ }
92
+
93
+ if ( context == null )
94
+ {
95
+ context = new QuicStreamContext ( this , _context ) ;
96
+ }
97
+
98
+ context . Initialize ( stream ) ;
66
99
context . Start ( ) ;
67
100
68
101
_log . AcceptedStream ( context ) ;
@@ -124,12 +157,61 @@ public override ValueTask<ConnectionContext> ConnectAsync(IFeatureCollection? fe
124
157
quicStream = _connection . OpenBidirectionalStream ( ) ;
125
158
}
126
159
127
- var context = new QuicStreamContext ( quicStream , this , _context ) ;
160
+ // Only a handful of control streams are created by the server and they last for the
161
+ // lifetime of the connection. No value in pooling them.
162
+ QuicStreamContext ? context = new QuicStreamContext ( this , _context ) ;
163
+ context . Initialize ( quicStream ) ;
128
164
context . Start ( ) ;
129
165
130
166
_log . ConnectedStream ( context ) ;
131
167
132
168
return new ValueTask < ConnectionContext > ( context ) ;
133
169
}
170
+
171
+ internal bool TryReturnStream ( QuicStreamContext stream )
172
+ {
173
+ lock ( _poolLock )
174
+ {
175
+ if ( ! _streamPoolHeartbeatInitialized )
176
+ {
177
+ // Heartbeat feature is added to connection features by Kestrel.
178
+ // No event is on the context is raised between feature being added and serving
179
+ // connections so initialize heartbeat the first time a stream is added to
180
+ // the connection's stream pool.
181
+ var heartbeatFeature = Features . Get < IConnectionHeartbeatFeature > ( ) ;
182
+ if ( heartbeatFeature != null )
183
+ {
184
+ heartbeatFeature . OnHeartbeat ( static state => ( ( QuicConnectionContext ) state ) . RemoveExpiredStreams ( ) , this ) ;
185
+ }
186
+
187
+ // Set ticks for the first time. Ticks are then updated in heartbeat.
188
+ var now = _context . Options . SystemClock . UtcNow . Ticks ;
189
+ Volatile . Write ( ref _heartbeatTicks , now ) ;
190
+
191
+ _streamPoolHeartbeatInitialized = true ;
192
+ }
193
+
194
+ if ( stream . CanReuse && StreamPool . Count < MaxStreamPoolSize )
195
+ {
196
+ stream . PoolExpirationTicks = Volatile . Read ( ref _heartbeatTicks ) + StreamPoolExpiryTicks ;
197
+ StreamPool . Push ( stream ) ;
198
+ return true ;
199
+ }
200
+ }
201
+
202
+ return false ;
203
+ }
204
+
205
+ private void RemoveExpiredStreams ( )
206
+ {
207
+ lock ( _poolLock )
208
+ {
209
+ // Update ticks on heartbeat. A precise value isn't necessary.
210
+ var now = _context . Options . SystemClock . UtcNow . Ticks ;
211
+ Volatile . Write ( ref _heartbeatTicks , now ) ;
212
+
213
+ StreamPool . RemoveExpired ( now ) ;
214
+ }
215
+ }
134
216
}
135
217
}
0 commit comments