diff --git a/ShopifySharp/Infrastructure/Policies/LeakyBucket/ContextAwareQueue.cs b/ShopifySharp/Infrastructure/Policies/LeakyBucket/ContextAwareQueue.cs index 76ec2c04..b8ef5212 100644 --- a/ShopifySharp/Infrastructure/Policies/LeakyBucket/ContextAwareQueue.cs +++ b/ShopifySharp/Infrastructure/Policies/LeakyBucket/ContextAwareQueue.cs @@ -26,36 +26,4 @@ public void Enqueue(T i) public T Peek() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Peek() : _BackgroundQueue.Peek(); public T Dequeue() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Dequeue() : _BackgroundQueue.Dequeue(); - - /// Removes the item and updates the queue. - public void RemoveAndUpdateQueue(T itemToRemove) - { - if (_ForegroundQueue.Contains(itemToRemove)) - { - _ForegroundQueue = CopyQueueExcludingItem(_ForegroundQueue, itemToRemove); - return; - } - - if (_BackgroundQueue.Contains(itemToRemove)) - { - _BackgroundQueue = CopyQueueExcludingItem(_BackgroundQueue, itemToRemove); - } - } - - /// Copies the items in to a new queue, excluding . - private static Queue CopyQueueExcludingItem(Queue existingQueue, T itemToExclude) - { - var newQueue = new Queue(); - - while (existingQueue.Count > 0) - { - var itemToAdd = existingQueue.Dequeue(); - if (!itemToAdd.Equals(itemToExclude)) - { - newQueue.Enqueue(itemToAdd); - } - } - - return newQueue; - } } diff --git a/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucket.cs b/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucket.cs index c065d9b3..119c0929 100644 --- a/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucket.cs +++ b/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucket.cs @@ -87,30 +87,22 @@ public async Task WaitForAvailableAsync(int requestCost, CancellationToken cance { if (ComputedCurrentlyAvailable >= requestCost && _waitingRequests.Count == 0) { + //there is enough capacity to proceed immediately ConsumeAvailable(r); return; } + //otherwise, we queue the request for further processing _waitingRequests.Enqueue(r); + //if it's the very first request, we schedule it to be released in the future once enough capacity is available if (_waitingRequests.Count == 1) ScheduleTryGrantNextPendingRequest(r); } - try - { - await r.Semaphore.WaitAsync(cancellationToken); - } - catch (OperationCanceledException) - { - // TODO: log here once ShopifySharp supports logging - lock (_lock) - { - _waitingRequests.RemoveAndUpdateQueue(r); - } - - throw; - } + //TaskCanceledException can bubble up + //The request will be dequeued when the semaphore is released + await r.WaitAsync(cancellationToken); } private void ScheduleTryGrantNextPendingRequest(LeakyBucketRequest r) @@ -134,15 +126,13 @@ private void TryGrantNextPendingRequest() if (nextRequest.CancellationToken.IsCancellationRequested) { _waitingRequests.Dequeue(); - // TODO: dispose the request here? Is the HttpRequestMessage still sitting in memory? - continue; + nextRequest.Release(); } - - if (ComputedCurrentlyAvailable >= nextRequest.Cost) + else if (ComputedCurrentlyAvailable >= nextRequest.Cost) { // Proceed with current request _waitingRequests.Dequeue(); - nextRequest.Semaphore.Release(); + nextRequest.Release(); ConsumeAvailable(nextRequest); } else diff --git a/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucketRequest.cs b/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucketRequest.cs index cbbeec1d..a0a4f226 100644 --- a/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucketRequest.cs +++ b/ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucketRequest.cs @@ -1,16 +1,36 @@ using System; using System.Threading; +using System.Threading.Tasks; namespace ShopifySharp.Infrastructure.Policies.LeakyBucket; internal sealed class LeakyBucketRequest(int cost, CancellationToken cancellationToken) : IDisposable { public readonly int Cost = cost; - public readonly SemaphoreSlim Semaphore = new(0, 1); + public readonly CancellationToken CancellationToken = cancellationToken; + public bool IsDisposed { get; private set; } + + private readonly SemaphoreSlim Semaphore = new(0, 1); + public void Dispose() { - Semaphore?.Dispose(); + if (IsDisposed) + return; + + Semaphore.Dispose(); + IsDisposed = true; + } + + public void Release() + { + if (!IsDisposed) + Semaphore.Release(); + } + + public Task WaitAsync(CancellationToken t) + { + return Semaphore.WaitAsync(t); } }