Skip to content

Commit

Permalink
Removed code that removes item upon TaskCanceledException.
Browse files Browse the repository at this point in the history
It is not necessary as the request will be simply dequeued when TryGrantNextPendingRequest gets called shortly after.
  • Loading branch information
clement911 committed Nov 27, 2024
1 parent a762d76 commit 78760ff
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,4 @@ public void Enqueue(T i)
public T Peek() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Peek() : _BackgroundQueue.Peek();

public T Dequeue() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Dequeue() : _BackgroundQueue.Dequeue();

/// Removes the item and updates the queue.
public void RemoveAndUpdateQueue(T itemToRemove)
{
if (_ForegroundQueue.Contains(itemToRemove))
{
_ForegroundQueue = CopyQueueExcludingItem(_ForegroundQueue, itemToRemove);
return;
}

if (_BackgroundQueue.Contains(itemToRemove))
{
_BackgroundQueue = CopyQueueExcludingItem(_BackgroundQueue, itemToRemove);
}
}

/// Copies the items in <paramref name="existingQueue"/> to a new queue, excluding <paramref name="itemToExclude"/>.
private static Queue<T> CopyQueueExcludingItem(Queue<T> existingQueue, T itemToExclude)
{
var newQueue = new Queue<T>();

while (existingQueue.Count > 0)
{
var itemToAdd = existingQueue.Dequeue();
if (!itemToAdd.Equals(itemToExclude))
{
newQueue.Enqueue(itemToAdd);
}
}

return newQueue;
}
}
28 changes: 9 additions & 19 deletions ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,22 @@ public async Task WaitForAvailableAsync(int requestCost, CancellationToken cance
{
if (ComputedCurrentlyAvailable >= requestCost && _waitingRequests.Count == 0)
{
//there is enough capacity to proceed immediately
ConsumeAvailable(r);
return;
}

//otherwise, we queue the request for further processing
_waitingRequests.Enqueue(r);

//if it's the very first request, we schedule it to be released in the future once enough capacity is available
if (_waitingRequests.Count == 1)
ScheduleTryGrantNextPendingRequest(r);
}

try
{
await r.Semaphore.WaitAsync(cancellationToken);
}
catch (OperationCanceledException)
{
// TODO: log here once ShopifySharp supports logging
lock (_lock)
{
_waitingRequests.RemoveAndUpdateQueue(r);
}

throw;
}
//TaskCanceledException can bubble up
//The request will be dequeued when the semaphore is released
await r.WaitAsync(cancellationToken);
}

private void ScheduleTryGrantNextPendingRequest(LeakyBucketRequest r)
Expand All @@ -134,15 +126,13 @@ private void TryGrantNextPendingRequest()
if (nextRequest.CancellationToken.IsCancellationRequested)
{
_waitingRequests.Dequeue();
// TODO: dispose the request here? Is the HttpRequestMessage still sitting in memory?
continue;
nextRequest.Release();
}

if (ComputedCurrentlyAvailable >= nextRequest.Cost)
else if (ComputedCurrentlyAvailable >= nextRequest.Cost)
{
// Proceed with current request
_waitingRequests.Dequeue();
nextRequest.Semaphore.Release();
nextRequest.Release();
ConsumeAvailable(nextRequest);
}
else
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ShopifySharp.Infrastructure.Policies.LeakyBucket;

internal sealed class LeakyBucketRequest(int cost, CancellationToken cancellationToken) : IDisposable
{
public readonly int Cost = cost;
public readonly SemaphoreSlim Semaphore = new(0, 1);

public readonly CancellationToken CancellationToken = cancellationToken;

public bool IsDisposed { get; private set; }

private readonly SemaphoreSlim Semaphore = new(0, 1);

public void Dispose()
{
Semaphore?.Dispose();
if (IsDisposed)
return;

Semaphore.Dispose();
IsDisposed = true;
}

public void Release()
{
if (!IsDisposed)
Semaphore.Release();
}

public Task WaitAsync(CancellationToken t)
{
return Semaphore.WaitAsync(t);
}
}

0 comments on commit 78760ff

Please sign in to comment.