Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature request]: Determine policy which caused rejection #2345

Closed
lonix1 opened this issue Oct 14, 2024 · 12 comments
Closed

[Feature request]: Determine policy which caused rejection #2345

lonix1 opened this issue Oct 14, 2024 · 12 comments
Assignees
Milestone

Comments

@lonix1
Copy link

lonix1 commented Oct 14, 2024

Is your feature request related to a specific problem? Or an existing feature?

I've spent some time learning how Polly works, and specifically how to combine policies (which is probably a common need in a non-trivial production system).

I've found a pain point in how Polly handles rejections (excuse the pun).

Suppose one combines rate limiting and concurrency control, without queuing (or after reaching the queue limit), and there is a rejection:

  • if rate limited: read delay from RateLimiterRejectedException.RetryAfter then retry
  • if concurrency limited: return to caller with error

To do that one needs to know the source of the rejection.

Describe the solution you'd like

There are two workarounds, both bad:

  • If the RateLimiterRejectedException.RetryAfter is null, one can infer that it was concurrency limited (rather than fixed window, sliding window or token bucket). But this is a dangerous assumption as the library could add more policies in the future, or change it's internals.
  • One could use a variation of the roundabout approach shown here (though I'm not even certain it works... I couldn't get it to work). Assuming it works, it's very messy and overly complicated. I wouldn't want to maintain such code if I only ever touch Polly code once or twice a year.

The ideal solution is mentioned in that thread, which is to add a new property to the exception, which identifies the source of the rejection.

Perhaps RateLimiterRejectedException.Source could be a string, equal to a configurable Name property for the policy, or if unset then equal to policy.GetType().Name.

Additional context

Thank you for considering it!

@peter-csala
Copy link
Contributor

Just to clarify: which Polly version are we talking about? The RateLimiterRejectedException was introduced in the V8 API. V7 API uses policies whereas the V8 uses strategies. I educated guess is that you are using the V8 API, can you please confirm it?

@lonix1
Copy link
Author

lonix1 commented Oct 14, 2024

Sorry. I've read docs, blog posts and SO threads for both v7 and v8 - so I guess I've been using those term interchangeably.

I'm using Polly v8.

@peter-csala
Copy link
Contributor

Could you please describe the desired behavior in a bit more detail (how many strategies are chain, in what order, how are they configured, etc.)? Or you could also share an ideal pipeline setup.

@lonix1
Copy link
Author

lonix1 commented Oct 14, 2024

Suppose I'm making requests to an external API.

Pipeline example, in order

  • concurrency limiter: no queuing, one thread ("permit")
  • chained rate limiter (fixed window per-day and fixed window per-second)

Init:

private const int LIMIT_THREADS    = 1;
private const int LIMIT_PER_SECOND = 10;
private const int LIMIT_PER_DAY    = 1_000;


private readonly PartitionedRateLimiter<ResilienceContext> _rateLimiterConcurrent;
private readonly PartitionedRateLimiter<ResilienceContext> _rateLimiterPerSecond;
private readonly PartitionedRateLimiter<ResilienceContext> _rateLimiterPerDay;
private readonly PartitionedRateLimiter<ResilienceContext> _rateLimiterChained;
private readonly ResiliencePipeline _resiliencePipeline;


public void Dispose() {
  _rateLimiterConcurrent.Dispose();
  _rateLimiterPerSecond.Dispose();
  _rateLimiterPerDay.Dispose();
  _rateLimiterChained.Dispose();
}


public void InitPolly() {

  var partitionKey = "aws-ses";

  _rateLimiterConcurrent = PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
    RateLimitPartition.GetConcurrencyLimiter(
      partitionKey,
      partitionKey => new ConcurrencyLimiterOptions {
        PermitLimit = LIMIT_THREADS,
        QueueLimit  = 0,
    })
  );

  _rateLimiterPerSecond = PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
    RateLimitPartition.GetFixedWindowLimiter(
      partitionKey,
      partitionKey => new FixedWindowRateLimiterOptions {
        PermitLimit = LIMIT_PER_SECOND,
        QueueLimit  = 0,
        Window      = TimeSpan.FromSeconds(1),
    })
  );
  _rateLimiterPerDay = PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
    RateLimitPartition.GetFixedWindowLimiter(
      partitionKey,
      partitionKey => new FixedWindowRateLimiterOptions {
        PermitLimit = LIMIT_PER_DAY,
        QueueLimit  = 0,
        Window      = TimeSpan.FromDays(1),
    })
  );
  _rateLimiterChained = PartitionedRateLimiter.CreateChained(_rateLimiterPerSecond, _rateLimiterPerDay);

  _resiliencePipeline = new ResiliencePipelineBuilder()
    // outer strategy: limit threads
    .AddRateLimiter(new RateLimiterStrategyOptions {
      RateLimiter = args => _rateLimiterConcurrent.AcquireAsync(args.Context, permitCount:1, args.Context.CancellationToken),
    })
    // inner strategy: limit requests (per second and per day)
    .AddRateLimiter(new RateLimiterStrategyOptions {
      RateLimiter = args => _rateLimiterChained.AcquireAsync(args.Context, permitCount:1, args.Context.CancellationToken),
    })
    .Build();
}

Execution:

public async Task ScheduleRequests(IEnumerable<Request> requests, CancellationToken cancellationToken) {

  var pendingRequests = requests.ToList();

  while (pendingRequests.Any()) {

    try {
      var request = pendingRequests.First();
      await _resiliencePipeline.ExecuteAsync(
        cancellationTokenInner => PerformRequest(request, cancellationTokenInner),
        cancellationToken);
      pendingRequests.Remove(request);
    }

    catch (RateLimiterRejectedException e) {
      // rejected by rate limiter
      if (e.RetryAfter is TimeSpan retryAfter) {
        Console.WriteLine($"Throttled; retry in {retryAfter}...");
        await Task.Delay(retryAfter);
      }
      // rejected by concurrency limiter
      else if (e.Source == "ConcurrencyLimiter") {              // <----------
        throw new InvalidOperationException("Rejected: too many concurrent attempts.");
      }
      // other rejection (unsure if even possible?)
      else {
        throw new InvalidOperationException("Rejected.");
      }
    }

  }
}


public async Task PerformRequest(Request request, CancellationToken cancellationToken) {
  // call external API...
}

I don't know if that's ideal; it's just a quick example. The idea is that in the catch block there is some way to know which limiter rejected the request.

@peter-csala
Copy link
Contributor

peter-csala commented Oct 14, 2024

Well, as far as I know there is only a single place where Polly throws RateLimiterRejectedException:

var exception = retryAfter.HasValue ? new RateLimiterRejectedException(retryAfter.Value) : new RateLimiterRejectedException();

I think we could extend the RLRE with a Source property and populate it similarly like the Telemetry event's Source:

Resilience event occurred. EventName: 'OnRateLimiterRejected', Source: '(null)/(null)/RateLimiter', Operation Key: '', Result: ''
Resilience event occurred. EventName: 'OnRateLimiterRejected', Source: 'MyPipeline/MyPipelineInstance/MyRateLimiterStrategy', Operation Key: 'MyRateLimitedOperation', Result: ''

public static partial void ResilienceEvent(
this ILogger logger,
LogLevel logLevel,
string eventName,
string pipelineName,
string pipelineInstance,
string? strategyName,
string? operationKey,
object? result,
Exception? exception);

_logger.ResilienceEvent(
level,
args.Event.EventName,
args.Source.PipelineName.GetValueOrPlaceholder(),
args.Source.PipelineInstanceName.GetValueOrPlaceholder(),
args.Source.StrategyName.GetValueOrPlaceholder(),
args.Context.OperationKey,
result,
args.Outcome?.Exception);

That would require minimal code code on your pipeline setup

_resiliencePipeline = new ResiliencePipelineBuilder()
    // outer strategy: limit threads
    .AddRateLimiter(new RateLimiterStrategyOptions {
      Name = "ThreadLimiter"
      RateLimiter = args => _rateLimiterConcurrent.AcquireAsync(args.Context, permitCount:1, args.Context.CancellationToken),
    })
    // inner strategy: limit requests (per second and per day)
    .AddRateLimiter(new RateLimiterStrategyOptions {
      Name = "RequestLimiter"
      RateLimiter = args => _rateLimiterChained.AcquireAsync(args.Context, permitCount:1, args.Context.CancellationToken),
    })
    .Build();

With this in your hand, the RateLimiterRejectedException catch block could branch based on the Source with Contains/EndsWith.

Does it sound good for you?

@lonix1
Copy link
Author

lonix1 commented Oct 14, 2024

To be honest, I haven't used the telemetry stuff yet, so I can't comment on that. I hope someone with more advanced Polly experience can provide feedback on that.

But regarding your final code block: if I understand correctly, one can name each limiter, and then detect that name later in the catch block... PERFECT.

Unrelated side issue: is it correct to have the concurrency or rate limiter first?

@peter-csala
Copy link
Contributor

@lonix1 Do you want to give it a try and file a PR?

Unrelated side issue: is it correct to have the concurrency or rate limiter first?

The ordering should not matter:

  • rate limiter controls inbound load
  • concurrency limiter controls outbound load

They are both proactive strategies. If they were reactive then the outer's ShouldHandle could be adjusted to handle the inner's thrown exception as well. But that's not the case here.

@martintmk
Copy link
Contributor

The proposal looks good to be. The only thing I suggest is too actually use the following class as a source property:

https://github.com/App-vNext/Polly/blob/main/src/Polly.Core/Telemetry/ResilienceTelemetrySource.cs

Basically, to uniquely identify strategy you also need pipeline name and pipeline instance name.

@lonix1
Copy link
Author

lonix1 commented Oct 15, 2024

PR: I don't know... I'm still new to Polly and there's many things I don't yet grok.

Ordering: I hope I'm not about to derail this thread... Actually I am using both for outgoing load, for requests to an external API. I assumed the order matters in that case (excluding the scenario of unhandled exceptions).

@peter-csala
Copy link
Contributor

PR: I don't know... I'm still new to Polly and there's many things I don't yet grok.

Sure, no problem. @martincostello could you please assign to me this issue?

@peter-csala
Copy link
Contributor

Hi @lonix1 we had made the required changes and it will be available as part of the 8.5.0 release. The new property name will be TelemetrySource (the Source was already defined by the Exception type...). It is not a string rather a ResilienceTelemetrySource. This class includes not just the strategy name but information about the pipeline as well.

So, the usage will look like this:

try 
{
    await pipeline.ExecuteAsync(...);
}
catch (RateLimiterRejectedException ex) when (ex.TelemetrySource.StrategyName?.Contains("ThreadLimiter"))
{
  // Your concurrency limiter related logic
}

@lonix1
Copy link
Author

lonix1 commented Oct 25, 2024

Wow you guys have been busy... so much work! Thanks and sorry for triggering all that effort 😄

I assume you had to change many things in order to support the original issue above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants