Skip to content

Commit cb8d177

Browse files
BrennanConroyjaviercn
authored andcommitted
ConcurrencyLimiter implementation (#387)
1 parent bff27c9 commit cb8d177

13 files changed

+744
-61
lines changed

src/RateLimiting/src/AggregatedRateLimiter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
namespace System.Threading.RateLimiting
99
{
10+
#pragma warning disable 1591
1011
// Represent an aggregated resource (e.g. a resource limiter aggregated by IP)
1112
public abstract class AggregatedRateLimiter<TKey>
1213
{
@@ -19,4 +20,5 @@ public abstract class AggregatedRateLimiter<TKey>
1920
// Wait until the requested resources are available
2021
public abstract ValueTask<RateLimitLease> WaitAsync(TKey resourceID, int requestedCount, CancellationToken cancellationToken = default);
2122
}
23+
#pragma warning restore
2224
}

src/RateLimiting/src/ConcurrencyLimiter.cs

Lines changed: 118 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,125 +4,185 @@
44
// Pending dotnet API review
55

66
using System.Collections.Generic;
7+
using System.Diagnostics;
8+
using System.Diagnostics.CodeAnalysis;
79
using System.Threading.Tasks;
810

911
namespace System.Threading.RateLimiting
1012
{
13+
/// <summary>
14+
/// <see cref="RateLimiter"/> implementation that helps manage concurrent access to a resource.
15+
/// </summary>
1116
public sealed class ConcurrencyLimiter : RateLimiter
1217
{
1318
private int _permitCount;
1419
private int _queueCount;
1520

16-
private readonly object _lock = new object();
1721
private readonly ConcurrencyLimiterOptions _options;
1822
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
1923

2024
private static readonly ConcurrencyLease SuccessfulLease = new ConcurrencyLease(true, null, 0);
2125
private static readonly ConcurrencyLease FailedLease = new ConcurrencyLease(false, null, 0);
26+
private static readonly ConcurrencyLease QueueLimitLease = new ConcurrencyLease(false, null, 0, "Queue limit reached");
2227

28+
// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
29+
private object Lock => _queue;
30+
31+
/// <summary>
32+
/// Initializes the <see cref="ConcurrencyLimiter"/>.
33+
/// </summary>
34+
/// <param name="options">Options to specify the behavior of the <see cref="ConcurrencyLimiter"/>.</param>
2335
public ConcurrencyLimiter(ConcurrencyLimiterOptions options)
2436
{
2537
_options = options;
2638
_permitCount = _options.PermitLimit;
2739
}
2840

41+
/// <inheritdoc/>
2942
public override int GetAvailablePermits() => _permitCount;
3043

44+
/// <inheritdoc/>
3145
protected override RateLimitLease AcquireCore(int permitCount)
3246
{
3347
// These amounts of resources can never be acquired
3448
if (permitCount > _options.PermitLimit)
3549
{
36-
throw new InvalidOperationException($"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
50+
throw new ArgumentOutOfRangeException(nameof(permitCount), $"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
3751
}
3852

39-
// Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
53+
// Return SuccessfulLease or FailedLease to indicate limiter state
4054
if (permitCount == 0)
4155
{
42-
return GetAvailablePermits() > 0 ? SuccessfulLease : FailedLease;
56+
return _permitCount > 0 ? SuccessfulLease : FailedLease;
4357
}
4458

4559
// Perf: Check SemaphoreSlim implementation instead of locking
46-
if (GetAvailablePermits() >= permitCount)
60+
if (_permitCount >= permitCount)
4761
{
48-
lock (_lock)
62+
lock (Lock)
4963
{
50-
if (GetAvailablePermits() >= permitCount)
64+
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
5165
{
52-
_permitCount -= permitCount;
53-
return new ConcurrencyLease(true, this, permitCount);
66+
return lease;
5467
}
5568
}
5669
}
5770

5871
return FailedLease;
5972
}
6073

74+
/// <inheritdoc/>
6175
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
6276
{
77+
cancellationToken.ThrowIfCancellationRequested();
78+
6379
// These amounts of resources can never be acquired
64-
if (permitCount < 0 || permitCount > _options.PermitLimit)
80+
if (permitCount > _options.PermitLimit)
6581
{
66-
throw new ArgumentOutOfRangeException();
82+
throw new ArgumentOutOfRangeException(nameof(permitCount), $"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
6783
}
6884

6985
// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
70-
if (permitCount == 0 && GetAvailablePermits() > 0)
86+
if (permitCount == 0 && _permitCount > 0)
7187
{
72-
// Perf: static failed/successful value tasks?
7388
return new ValueTask<RateLimitLease>(SuccessfulLease);
7489
}
7590

7691
// Perf: Check SemaphoreSlim implementation instead of locking
77-
lock (_lock) // Check lock check
92+
lock (Lock)
7893
{
79-
if (GetAvailablePermits() >= permitCount)
94+
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
8095
{
81-
_permitCount -= permitCount;
82-
return new ValueTask<RateLimitLease>(new ConcurrencyLease(true, this, permitCount));
96+
return new ValueTask<RateLimitLease>(lease);
8397
}
8498

8599
// Don't queue if queue limit reached
86100
if (_queueCount + permitCount > _options.QueueLimit)
87101
{
88102
// Perf: static failed/successful value tasks?
89-
return new ValueTask<RateLimitLease>(FailedLease);
103+
return new ValueTask<RateLimitLease>(QueueLimitLease);
104+
}
105+
106+
TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
107+
CancellationTokenRegistration ctr;
108+
if (cancellationToken.CanBeCanceled)
109+
{
110+
ctr = cancellationToken.Register(obj =>
111+
{
112+
((TaskCompletionSource<RateLimitLease>)obj).TrySetException(new OperationCanceledException(cancellationToken));
113+
}, tcs);
90114
}
91115

92-
var request = new RequestRegistration(permitCount);
116+
RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
93117
_queue.EnqueueTail(request);
94118
_queueCount += permitCount;
119+
Debug.Assert(_queueCount <= _options.QueueLimit);
95120

96-
// TODO: handle cancellation
97-
return new ValueTask<RateLimitLease>(request.TCS.Task);
121+
return new ValueTask<RateLimitLease>(request.Tcs.Task);
98122
}
99123
}
100124

125+
private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
126+
{
127+
// if permitCount is 0 we want to queue it if there are no available permits
128+
if (_permitCount >= permitCount && _permitCount != 0)
129+
{
130+
if (permitCount == 0)
131+
{
132+
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
133+
lease = SuccessfulLease;
134+
return true;
135+
}
136+
137+
// a. if there are no items queued we can lease
138+
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
139+
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
140+
{
141+
_permitCount -= permitCount;
142+
Debug.Assert(_permitCount >= 0);
143+
lease = new ConcurrencyLease(true, this, permitCount);
144+
return true;
145+
}
146+
}
147+
148+
lease = null;
149+
return false;
150+
}
151+
101152
private void Release(int releaseCount)
102153
{
103-
lock (_lock) // Check lock check
154+
lock (Lock)
104155
{
105156
_permitCount += releaseCount;
157+
Debug.Assert(_permitCount <= _options.PermitLimit);
106158

107159
while (_queue.Count > 0)
108160
{
109-
var nextPendingRequest =
161+
RequestRegistration nextPendingRequest =
110162
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
111163
? _queue.PeekHead()
112164
: _queue.PeekTail();
113165

114-
if (GetAvailablePermits() >= nextPendingRequest.Count)
166+
if (_permitCount >= nextPendingRequest.Count)
115167
{
116-
var request =
168+
nextPendingRequest =
117169
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
118170
? _queue.DequeueHead()
119171
: _queue.DequeueTail();
120172

121-
_permitCount -= request.Count;
122-
_queueCount -= request.Count;
123-
124-
// requestToFulfill == request
125-
request.TCS.SetResult(new ConcurrencyLease(true, this, request.Count));
173+
_permitCount -= nextPendingRequest.Count;
174+
_queueCount -= nextPendingRequest.Count;
175+
Debug.Assert(_queueCount >= 0);
176+
Debug.Assert(_permitCount >= 0);
177+
178+
ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
179+
// Check if request was canceled
180+
if (!nextPendingRequest.Tcs.TrySetResult(lease))
181+
{
182+
// Queued item was canceled so add count back
183+
_permitCount += nextPendingRequest.Count;
184+
}
185+
nextPendingRequest.CancellationTokenRegistration.Dispose();
126186
}
127187
else
128188
{
@@ -134,25 +194,41 @@ private void Release(int releaseCount)
134194

135195
private class ConcurrencyLease : RateLimitLease
136196
{
137-
private static readonly IEnumerable<string> Empty = new string[0];
138-
139197
private bool _disposed;
140198
private readonly ConcurrencyLimiter? _limiter;
141199
private readonly int _count;
200+
private readonly string? _reason;
142201

143-
public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count)
202+
public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count, string? reason = null)
144203
{
145204
IsAcquired = isAcquired;
146205
_limiter = limiter;
147206
_count = count;
207+
_reason = reason;
208+
209+
// No need to set the limiter if count is 0, Dispose will noop
210+
Debug.Assert(count == 0 ? limiter is null : true);
148211
}
149212

150213
public override bool IsAcquired { get; }
151214

152-
public override IEnumerable<string> MetadataNames => Empty;
215+
public override IEnumerable<string> MetadataNames => Enumerable();
216+
217+
private IEnumerable<string> Enumerable()
218+
{
219+
if (_reason is not null)
220+
{
221+
yield return MetadataName.ReasonPhrase.Name;
222+
}
223+
}
153224

154225
public override bool TryGetMetadata(string metadataName, out object? metadata)
155226
{
227+
if (_reason is not null && metadataName == MetadataName.ReasonPhrase.Name)
228+
{
229+
metadata = _reason;
230+
return true;
231+
}
156232
metadata = default;
157233
return false;
158234
}
@@ -170,18 +246,22 @@ protected override void Dispose(bool disposing)
170246
}
171247
}
172248

173-
private struct RequestRegistration
249+
private readonly struct RequestRegistration
174250
{
175-
public RequestRegistration(int requestedCount)
251+
public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLease> tcs,
252+
CancellationTokenRegistration cancellationTokenRegistration)
176253
{
177254
Count = requestedCount;
178255
// Perf: Use AsyncOperation<TResult> instead
179-
TCS = new TaskCompletionSource<RateLimitLease>();
256+
Tcs = tcs;
257+
CancellationTokenRegistration = cancellationTokenRegistration;
180258
}
181259

182260
public int Count { get; }
183261

184-
public TaskCompletionSource<RateLimitLease> TCS { get; }
262+
public TaskCompletionSource<RateLimitLease> Tcs { get; }
263+
264+
public CancellationTokenRegistration CancellationTokenRegistration { get; }
185265
}
186266
}
187267
}

src/RateLimiting/src/ConcurrencyLimiterOptions.cs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,49 @@
55

66
namespace System.Threading.RateLimiting
77
{
8-
sealed public class ConcurrencyLimiterOptions
8+
/// <summary>
9+
/// Options to specify the behavior of a <see cref="ConcurrencyLimiter"/>.
10+
/// </summary>
11+
public sealed class ConcurrencyLimiterOptions
912
{
13+
/// <summary>
14+
/// Initializes the <see cref="ConcurrencyLimiterOptions"/>.
15+
/// </summary>
16+
/// <param name="permitLimit">Maximum number of permits that can be leased concurrently.</param>
17+
/// <param name="queueProcessingOrder">Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.</param>
18+
/// <param name="queueLimit">Maximum number of permits that can be queued concurrently.</param>
19+
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="permitLimit"/> or <paramref name="queueLimit"/> are less than 0.</exception>
1020
public ConcurrencyLimiterOptions(int permitLimit, QueueProcessingOrder queueProcessingOrder, int queueLimit)
1121
{
22+
if (permitLimit < 0)
23+
{
24+
throw new ArgumentOutOfRangeException(nameof(permitLimit));
25+
}
26+
if (queueLimit < 0)
27+
{
28+
throw new ArgumentOutOfRangeException(nameof(queueLimit));
29+
}
1230
PermitLimit = permitLimit;
1331
QueueProcessingOrder = queueProcessingOrder;
1432
QueueLimit = queueLimit;
1533
}
1634

17-
// Maximum number of permits allowed to be leased
35+
/// <summary>
36+
/// Maximum number of permits that can be leased concurrently.
37+
/// </summary>
1838
public int PermitLimit { get; }
1939

20-
// Behaviour of WaitAsync when not enough resources can be leased
21-
public QueueProcessingOrder QueueProcessingOrder { get; }
40+
/// <summary>
41+
/// Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.
42+
/// </summary>
43+
/// <value>
44+
/// <see cref="QueueProcessingOrder.OldestFirst"/> by default.
45+
/// </value>
46+
public QueueProcessingOrder QueueProcessingOrder { get; } = QueueProcessingOrder.OldestFirst;
2247

23-
// Maximum cumulative permit count of queued acquisition requests
48+
/// <summary>
49+
/// Maximum number of permits that can be queued concurrently.
50+
/// </summary>
2451
public int QueueLimit { get; }
2552
}
2653
}

0 commit comments

Comments
 (0)