Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("System.Threading.RateLimiting.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004b86c4cb78549b34bab61a3b1800e23bfeb5b3ec390074041536a7e3cbd97f5f04cf0f857155a8928eaa29ebfd11cfbbad3ba70efea7bda3226c6a8d370a4cd303f714486b6ebc225985a638471e6ef571cc92a4613c00b8fa65d61ccee0cbe5f36330c9a01f4183559f1bef24cc2917c6d913e3a541333a1d05d9bed22b38cb")]
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
return new ValueTask<RateLimitLease>(SuccessfulLease);
}

using var disposer = default(RequestRegistration.Disposer);

// Perf: Check SemaphoreSlim implementation instead of locking
lock (Lock)
{
Expand All @@ -150,18 +152,13 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
// Updating queue count is handled by the cancellation code
_queueCount += oldestRequest.Count;
}
else
if (oldestRequest.TrySetResult(FailedLease))
{
// Updating queue count is handled by the cancellation/cleanup code
Interlocked.Increment(ref _failedLeasesCount);
}
oldestRequest.CancellationTokenRegistration.Dispose();
disposer.CleanupAndAdd(oldestRequest);
Debug.Assert(_queueCount >= 0);
}
while (_options.QueueLimit - _queueCount < permitCount);
}
Expand All @@ -173,22 +170,12 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
}
}

CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueState)obj!).TrySetCanceled();
}, tcs);
}

RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
var request = new RequestRegistration(permitCount, this, cancellationToken);
_queue.EnqueueTail(request);
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

return new ValueTask<RateLimitLease>(request.Tcs.Task);
return new ValueTask<RateLimitLease>(request.Task);
}
}

Expand Down Expand Up @@ -224,8 +211,15 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat
return false;
}

#if DEBUG
// for unit testing
internal event Action? ReleasePreHook;
internal event Action? ReleasePostHook;
#endif

private void Release(int releaseCount)
{
using var disposer = default(RequestRegistration.Disposer);
lock (Lock)
{
if (_disposed)
Expand All @@ -236,6 +230,10 @@ private void Release(int releaseCount)
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);

#if DEBUG
ReleasePreHook?.Invoke();
#endif

while (_queue.Count > 0)
{
RequestRegistration nextPendingRequest =
Expand All @@ -245,39 +243,43 @@ private void Release(int releaseCount)

// Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued.
// We just need to remove the item and let the next queued item be considered for completion.
if (nextPendingRequest.Tcs.Task.IsCompleted)
if (nextPendingRequest.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
disposer.CleanupAndAdd(nextPendingRequest);
continue;
}
else if (_permitCount >= nextPendingRequest.Count)

#if DEBUG
ReleasePostHook?.Invoke();
#endif

if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();

// Updating queue count is handled by the cancellation/cleanup code
_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_permitCount >= 0);

ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
// Check if request was canceled
if (!nextPendingRequest.Tcs.TrySetResult(lease))
if (!nextPendingRequest.TrySetResult(lease))
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
// Updating queue count is handled by the cancellation code
_queueCount += nextPendingRequest.Count;
}
else
{
Interlocked.Increment(ref _successfulLeasesCount);
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
disposer.CleanupAndAdd(nextPendingRequest);
Debug.Assert(_queueCount >= 0);
}
else
Expand All @@ -302,6 +304,7 @@ protected override void Dispose(bool disposing)
return;
}

using var disposer = default(RequestRegistration.Disposer);
lock (Lock)
{
if (_disposed)
Expand All @@ -314,9 +317,10 @@ protected override void Dispose(bool disposing)
RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
next.Tcs.TrySetResult(FailedLease);
disposer.CleanupAndAdd(next);
next.TrySetResult(FailedLease);
}
Debug.Assert(_queueCount == 0);
}
}

Expand Down Expand Up @@ -383,49 +387,78 @@ protected override void Dispose(bool disposing)
}
}

private readonly struct RequestRegistration
private sealed class RequestRegistration : TaskCompletionSource<RateLimitLease>
{
public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLease> tcs,
CancellationTokenRegistration cancellationTokenRegistration)
{
Count = requestedCount;
// Perf: Use AsyncOperation<TResult> instead
Tcs = tcs;
CancellationTokenRegistration = cancellationTokenRegistration;
}
private readonly CancellationToken _cancellationToken;
private CancellationTokenRegistration _cancellationTokenRegistration;

public int Count { get; }
// this field is used only by the disposal mechanics and never shared between threads
private RequestRegistration? _next;

public TaskCompletionSource<RateLimitLease> Tcs { get; }
public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken)
: base(limiter, TaskCreationOptions.RunContinuationsAsynchronously)
{
Count = permitCount;
_cancellationToken = cancellationToken;

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}
// RequestRegistration objects are created while the limiter lock is held
// if cancellationToken fires before or while the lock is held, UnsafeRegister
// is going to invoke the callback synchronously, but this does not create
// a deadlock because lock are reentrant
if (cancellationToken.CanBeCanceled)
#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER
_cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this);
#else
_cancellationTokenRegistration = cancellationToken.Register(Cancel, this);
#endif
}

private sealed class CancelQueueState : TaskCompletionSource<RateLimitLease>
{
private readonly int _permitCount;
private readonly ConcurrencyLimiter _limiter;
private readonly CancellationToken _cancellationToken;
/// <remarks>
/// This property is only accessed under limiter lock.
/// </remarks>
public int Count { get; private set; }

public CancelQueueState(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken)
: base(TaskCreationOptions.RunContinuationsAsynchronously)
private static void Cancel(object? state)
{
_permitCount = permitCount;
_limiter = limiter;
_cancellationToken = cancellationToken;
if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken))
{
registration.Cleanup();
}
}

private void Cleanup()
{
var limiter = (ConcurrencyLimiter)Task.AsyncState!;
lock (limiter.Lock)
{
limiter._queueCount -= Count;
Count = 0;
}
}

public new bool TrySetCanceled()
/// <summary>
/// Collects registrations to dispose outside the limiter lock to avoid deadlock.
/// </summary>
public struct Disposer : IDisposable
{
if (TrySetCanceled(_cancellationToken))
private RequestRegistration? _next;

public void CleanupAndAdd(RequestRegistration request)
{
request.Cleanup();
request._next = _next;
_next = request;
}

public void Dispose()
{
lock (_limiter.Lock)
for (var current = _next; current is not null; current = current._next)
{
_limiter._queueCount -= _permitCount;
current._cancellationTokenRegistration.Dispose();
}
return true;

_next = null;
}
return false;
}
}
}
Expand Down
Loading