Skip to content

Reimplement the concurrency limiter middleware to use the new abstractions & implementations #39040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Threading.RateLimiting;
using Microsoft.Extensions.Options;
using Limiter = System.Threading.RateLimiting.ConcurrencyLimiter;
using LimiterOptions = System.Threading.RateLimiting.ConcurrencyLimiterOptions;

namespace Microsoft.AspNetCore.ConcurrencyLimiter;

internal class BasePolicy : IQueuePolicy, IDisposable
{
private readonly Limiter _limiter;
private readonly ConcurrentQueue<RateLimitLease> _leases = new ConcurrentQueue<RateLimitLease>();

public int TotalRequests => _leases.Count;

public BasePolicy(IOptions<QueuePolicyOptions> options, QueueProcessingOrder order)
{
var queuePolicyOptions = options.Value;

var maxConcurrentRequests = queuePolicyOptions.MaxConcurrentRequests;
if (maxConcurrentRequests <= 0)
{
throw new ArgumentException("MaxConcurrentRequests must be a positive integer.", nameof(options));
}

var requestQueueLimit = queuePolicyOptions.RequestQueueLimit;
if (requestQueueLimit < 0)
{
throw new ArgumentException("The RequestQueueLimit cannot be a negative number.", nameof(options));
}

_limiter = new Limiter(new LimiterOptions(permitLimit: maxConcurrentRequests, order, queueLimit: requestQueueLimit));
}

public ValueTask<bool> TryEnterAsync()
{
// a return value of 'false' indicates that the request is rejected
// a return value of 'true' indicates that the request may proceed

var lease = _limiter.Acquire();
if (lease.IsAcquired)
{
_leases.Enqueue(lease);
return ValueTask.FromResult(true);
}

var task = _limiter.WaitAsync();
if (task.IsCompletedSuccessfully)
{
lease = task.Result;
if (lease.IsAcquired)
{
_leases.Enqueue(lease);
return ValueTask.FromResult(true);
}

return ValueTask.FromResult(false);
}

return Awaited(task);
}

public void OnExit()
{
if (!_leases.TryDequeue(out var lease))
{
throw new InvalidOperationException("No outstanding leases.");
}

lease.Dispose();
}

public void Dispose()
{
_limiter.Dispose();
}

private async ValueTask<bool> Awaited(ValueTask<RateLimitLease> task)
{
var lease = await task;

if (lease.IsAcquired)
{
_leases.Enqueue(lease);
return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,79 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading.RateLimiting;
using Microsoft.Extensions.Options;

namespace Microsoft.AspNetCore.ConcurrencyLimiter;

internal class QueuePolicy : IQueuePolicy, IDisposable
internal class QueuePolicy : BasePolicy
{
private readonly int _maxTotalRequest;
private readonly SemaphoreSlim _serverSemaphore;

private int _totalRequests;

public int TotalRequests => _totalRequests;

public QueuePolicy(IOptions<QueuePolicyOptions> options)
: base(options, QueueProcessingOrder.OldestFirst)
{
var queuePolicyOptions = options.Value;

var maxConcurrentRequests = queuePolicyOptions.MaxConcurrentRequests;
if (maxConcurrentRequests <= 0)
{
throw new ArgumentException("MaxConcurrentRequests must be a positive integer.", nameof(options));
}

var requestQueueLimit = queuePolicyOptions.RequestQueueLimit;
if (requestQueueLimit < 0)
{
throw new ArgumentException("The RequestQueueLimit cannot be a negative number.", nameof(options));
}

_serverSemaphore = new SemaphoreSlim(maxConcurrentRequests);

_maxTotalRequest = maxConcurrentRequests + requestQueueLimit;
}

public ValueTask<bool> TryEnterAsync()
{
// a return value of 'false' indicates that the request is rejected
// a return value of 'true' indicates that the request may proceed
// _serverSemaphore.Release is *not* called in this method, it is called externally when requests leave the server

int totalRequests = Interlocked.Increment(ref _totalRequests);

if (totalRequests > _maxTotalRequest)
{
Interlocked.Decrement(ref _totalRequests);
return new ValueTask<bool>(false);
}

Task task = _serverSemaphore.WaitAsync();
if (task.IsCompletedSuccessfully)
{
return new ValueTask<bool>(true);
}

return SemaphoreAwaited(task);
}

public void OnExit()
{
_serverSemaphore.Release();

Interlocked.Decrement(ref _totalRequests);
}

public void Dispose()
{
_serverSemaphore.Dispose();
}

private static async ValueTask<bool> SemaphoreAwaited(Task task)
{
await task;

return true;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,104 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading.RateLimiting;
using Microsoft.Extensions.Options;

namespace Microsoft.AspNetCore.ConcurrencyLimiter;

internal class StackPolicy : IQueuePolicy
internal class StackPolicy : BasePolicy
{
private readonly List<ResettableBooleanCompletionSource> _buffer;
public ResettableBooleanCompletionSource? _cachedResettableTCS;

private readonly int _maxQueueCapacity;
private readonly int _maxConcurrentRequests;
private bool _hasReachedCapacity;
private int _head;
private int _queueLength;

private readonly object _bufferLock = new Object();

private int _freeServerSpots;

public StackPolicy(IOptions<QueuePolicyOptions> options)
: base(options, QueueProcessingOrder.NewestFirst)
{
_buffer = new List<ResettableBooleanCompletionSource>();
_maxQueueCapacity = options.Value.RequestQueueLimit;
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
_freeServerSpots = options.Value.MaxConcurrentRequests;
}

public ValueTask<bool> TryEnterAsync()
{
lock (_bufferLock)
{
if (_freeServerSpots > 0)
{
_freeServerSpots--;
return new ValueTask<bool>(true);
}

// if queue is full, cancel oldest request
if (_queueLength == _maxQueueCapacity)
{
_hasReachedCapacity = true;
_buffer[_head].Complete(false);
_queueLength--;
}

var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
_cachedResettableTCS = null;

if (_hasReachedCapacity || _queueLength < _buffer.Count)
{
_buffer[_head] = tcs;
}
else
{
_buffer.Add(tcs);
}
_queueLength++;

// increment _head for next time
_head++;
if (_head == _maxQueueCapacity)
{
_head = 0;
}

return tcs.GetValueTask();
}
}

public void OnExit()
{
lock (_bufferLock)
{
if (_queueLength == 0)
{
_freeServerSpots++;

if (_freeServerSpots > _maxConcurrentRequests)
{
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}

return;
}

// step backwards and launch a new task
if (_head == 0)
{
_head = _maxQueueCapacity - 1;
}
else
{
_head--;
}

_buffer[_head].Complete(true);
_queueLength--;
}
}
}
Loading