Skip to content

Commit

Permalink
Merge pull request #1111 from clement911/master
Browse files Browse the repository at this point in the history
Removed code that removes item upon TaskCanceledException.
  • Loading branch information
clement911 authored Nov 27, 2024
2 parents a762d76 + 78760ff commit e3297cd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,4 @@ public void Enqueue(T i)
public T Peek() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Peek() : _BackgroundQueue.Peek();

public T Dequeue() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Dequeue() : _BackgroundQueue.Dequeue();

/// Removes the item and updates the queue.
public void RemoveAndUpdateQueue(T itemToRemove)
{
if (_ForegroundQueue.Contains(itemToRemove))
{
_ForegroundQueue = CopyQueueExcludingItem(_ForegroundQueue, itemToRemove);
return;
}

if (_BackgroundQueue.Contains(itemToRemove))
{
_BackgroundQueue = CopyQueueExcludingItem(_BackgroundQueue, itemToRemove);
}
}

/// Copies the items in <paramref name="existingQueue"/> to a new queue, excluding <paramref name="itemToExclude"/>.
private static Queue<T> CopyQueueExcludingItem(Queue<T> existingQueue, T itemToExclude)
{
var newQueue = new Queue<T>();

while (existingQueue.Count > 0)
{
var itemToAdd = existingQueue.Dequeue();
if (!itemToAdd.Equals(itemToExclude))
{
newQueue.Enqueue(itemToAdd);
}
}

return newQueue;
}
}
28 changes: 9 additions & 19 deletions ShopifySharp/Infrastructure/Policies/LeakyBucket/LeakyBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,22 @@ public async Task WaitForAvailableAsync(int requestCost, CancellationToken cance
{
if (ComputedCurrentlyAvailable >= requestCost && _waitingRequests.Count == 0)
{
//there is enough capacity to proceed immediately
ConsumeAvailable(r);
return;
}

//otherwise, we queue the request for further processing
_waitingRequests.Enqueue(r);

//if it's the very first request, we schedule it to be released in the future once enough capacity is available
if (_waitingRequests.Count == 1)
ScheduleTryGrantNextPendingRequest(r);
}

try
{
await r.Semaphore.WaitAsync(cancellationToken);
}
catch (OperationCanceledException)
{
// TODO: log here once ShopifySharp supports logging
lock (_lock)
{
_waitingRequests.RemoveAndUpdateQueue(r);
}

throw;
}
//TaskCanceledException can bubble up
//The request will be dequeued when the semaphore is released
await r.WaitAsync(cancellationToken);
}

private void ScheduleTryGrantNextPendingRequest(LeakyBucketRequest r)
Expand All @@ -134,15 +126,13 @@ private void TryGrantNextPendingRequest()
if (nextRequest.CancellationToken.IsCancellationRequested)
{
_waitingRequests.Dequeue();
// TODO: dispose the request here? Is the HttpRequestMessage still sitting in memory?
continue;
nextRequest.Release();
}

if (ComputedCurrentlyAvailable >= nextRequest.Cost)
else if (ComputedCurrentlyAvailable >= nextRequest.Cost)
{
// Proceed with current request
_waitingRequests.Dequeue();
nextRequest.Semaphore.Release();
nextRequest.Release();
ConsumeAvailable(nextRequest);
}
else
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ShopifySharp.Infrastructure.Policies.LeakyBucket;

internal sealed class LeakyBucketRequest(int cost, CancellationToken cancellationToken) : IDisposable
{
public readonly int Cost = cost;
public readonly SemaphoreSlim Semaphore = new(0, 1);

public readonly CancellationToken CancellationToken = cancellationToken;

public bool IsDisposed { get; private set; }

private readonly SemaphoreSlim Semaphore = new(0, 1);

public void Dispose()
{
Semaphore?.Dispose();
if (IsDisposed)
return;

Semaphore.Dispose();
IsDisposed = true;
}

public void Release()
{
if (!IsDisposed)
Semaphore.Release();
}

public Task WaitAsync(CancellationToken t)
{
return Semaphore.WaitAsync(t);
}
}

0 comments on commit e3297cd

Please sign in to comment.