Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an ActiveJob extension for throttling #315

Open
bensheldon opened this issue Jul 29, 2021 · 12 comments
Open

Add an ActiveJob extension for throttling #315

bensheldon opened this issue Jul 29, 2021 · 12 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@bensheldon
Copy link
Owner

Suggested on Reddit.

I think this could be done as a before_perform hook in which GoodJob queries the previous time-period for a count, and if the value is met/exceeded, then retries the job with an incremental backoff, similar to how the Concurrency extension does it:

perform_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).advisory_locked.count
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError if perform_concurrency > limit

I think this maybe should be part of the Concurrency extension so that they would re-use the same key. So maybe it would be:

class MyJob < ApplicationJob
  include GoodJob::ActiveJobExtensions::Concurrency

  good_job_control_concurrency_with(
    # New stuff....
    enqueue_throttle: { limit: 1, period: 1.min },
    perform_throttle: { limit: 100, period: 1.hour },

    # Existing stuff...
    # Maximum number of jobs with the concurrency key to be concurrently enqueued
    enqueue_limit: 2,

    # Maximum number of jobs with the concurrency key to be concurrently performed
    perform_limit: 1,

    # A unique key to be globally locked against.
    # Can be String or Lambda/Proc that is invoked in the context of the job.
    # Note: Arguments passed to #perform_later must be accessed through `arguments` method.
    key: -> { "Unique-#{arguments.first}" } #  MyJob.perform_later("Alice") => "Unique-Alice"
  )

  def perform(first_name)
    # do work
  end
end
@34code
Copy link

34code commented Sep 20, 2021

why not use activejob-traffic-control?

https://github.com/nickelser/activejob-traffic_control

I guess it adds a redis/memcached dependency

@bensheldon
Copy link
Owner Author

@34code thank for suggesting ActiveJob-traffic_control! I wasn't aware of that gem; and it's good to know there is an existing option until the feature exists in GoodJob.

I'd like to build it into GoodJob because throttling seems like a useful function of concurrency control, as well as it could be powered by Postgres.

@aried3r
Copy link
Contributor

aried3r commented Oct 4, 2021

Maybe not directly related, but in case you wanted to look into your own throttling algorithm, schneems wrote a nice article about it, although it not being about job scheduling.

Overview:
https://schneems.com/2020/06/25/rate-limiting-rate-throttling-and-how-they-work-together/

Experiments and measurements:
https://www.schneems.com/2020/07/08/a-fast-car-needs-good-brakes-how-we-added-client-rate-throttling-to-the-platform-api-gem/

@bensheldon
Copy link
Owner Author

bensheldon commented Oct 4, 2021

@aried3r thank you for sharing that! It's helpful to have the strategies enumerated.

The strategy I'm thinking of is a moving window, because I think that would be easiest to implement in Postgres.

  • period would be the size of the window
  • limit would be the number of allowed requests in the window

And then the query would be like:

if GoodJob::Execution.where("created_at > ?", period.ago).count < limit
  enqueue_job # or perform it
end

I'm still stuck on how to do a better job of managing perform-concurrency. I'm not happy with the current strategy of raising an error, and catching it with a rescue_from because that enqueues another Execution record. I'm thinking I need to separate a the concept of the time an ActiveJob Job is enqueued, and the time that GoodJob will try to execute/re-execute the job, and then allowing GoodJob to do the incremental backoff specifically for concurrency control (rather than pushing it down entirely into ActiveJob).

@bensheldon bensheldon moved this to Inbox in GoodJob Backlog v2 Aug 9, 2022
@bensheldon bensheldon moved this from Inbox to Prioritized Backlog in GoodJob Backlog v2 Aug 9, 2022
@julik
Copy link
Contributor

julik commented Nov 14, 2022

We already have a leaky-bucket rate limiter which is Postgres-based, it works quite well. If we were to open-source it, could good_job take it on as a dep?

@bensheldon
Copy link
Owner Author

@julik I have a high bar for pulling in additional dependencies, but I'm curious about your queries/algorithm.

@julik
Copy link
Contributor

julik commented Nov 15, 2022

It is described here http://live.julik.nl/2022/08/the-unreasonable-effectiveness-of-leaky-buckets and roughly does this:

def fillup(n_tokens)
  conn = ActiveRecord::Base.connection

  # Take double the time it takes the bucket to empty under normal circumstances
  # until the bucket may be deleted.
  may_be_deleted_after_seconds = (@capacity.to_f / @leak_rate.to_f) * 2.0

  # Create the leaky bucket if it does not exist, and update
  # to the new level, taking the leak rate into account - if the bucket exists.
  query_params = {
    name: @key,
    capa: @capacity.to_f,
    delete_after_s: may_be_deleted_after_seconds,
    leak_rate: @leak_rate.to_f,
    fillup: n_tokens.to_f
  }
  sql = ActiveRecord::Base.sanitize_sql_array([<<~SQL, query_params])
    INSERT INTO leaky_buckets AS t
      (name, atime, expires_at, level)
    VALUES
      (
        :name,
        clock_timestamp(),
        clock_timestamp() + ':delete_after_s second'::interval,
        LEAST(:capa, :fillup)
      )
    ON CONFLICT (key) DO UPDATE SET
      atime = EXCLUDED.atime,
      expires_at = EXCLUDED.may_be_deleted_after,
      level = GREATEST(
          0.0, LEAST(
            :capa,
            t.level + :fillup - (EXTRACT(EPOCH FROM (EXCLUDED.atime - t.atime)) * :leak_rate)
          )
        )
    RETURNING level
  SQL

  # Note the use of .uncached here. The AR query cache will actually see our
  # query as a repeat (since we use "select_value" for the RETURNING bit) and will not call into Postgres
  # correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here.
  # See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test
  level_after_fillup = conn.uncached { conn.select_value(sql) }

  State.new(level_after_fillup, (@capacity - level_after_fillup).abs < 0.01).tap do
    # Prune buckets which are no longer used. No "uncached" needed here since we are using "execute"
    conn.execute("DELETE FROM leaky_buckets WHERE expires_at < clock_timestamp()")
  end
end

We are not using it in JOINs but I believe it could be done with CTEs if needed

@andreas-it-dev
Copy link

just wanted to add a tiny little bit of input regarding the throttleing:

i often times find myself in a situation where there might be say 10 different apis with 10 different limits for, say, historical data. and often i need to call all of them to download the data. I tend to do this via a "meta job" that loops through all things needed and schedules a "sub job" thats actually doing the data download. This sub job therefor calls the different apis (with the different rate limits).

In sidekiq i ended up using multiple queues but even that did not really fly and i had to use one sidekiq process per queue to fully isolate everything. only then did it really honor the limits imposed.

i could use different jobs for the different limits but this somehow feels wrong and is a slap in the face of the dry principle.

so, it would really be cool if the throttling could be per queue instead of per job, i think(?)

thank you guys for considering this.

@doits
Copy link
Contributor

doits commented Oct 24, 2023

I'm not happy with the current strategy of raising an error, and catching it with a rescue_from because that enqueues another Execution record

AFAIK this happens with https://github.com/nickelser/activejob-traffic_control, too. I hope another solution can be found, because otherwise the db could get really polluted, for example in such a scenario:

  • I want to throttle execution to 10 jobs per second (might be an external mail server rate limit)
  • I enqueue 10.000 jobs at once (I want to send out a newsletter to 10.000 recipients)

This would create a massive number of Executions if every try creates a new record.

Maybe you have some ideas how to overcome this (probably first for the concurrency feature)?

Maybe the schedule_at could simply be updated for the existing Execution in such a case without creating a new record?

@bensheldon bensheldon added the help wanted Extra attention is needed label Nov 27, 2023
@bensheldon
Copy link
Owner Author

To cross-link efforts: there is some good code cooking over in #1198

@julik
Copy link
Contributor

julik commented Feb 29, 2024

For what it's worth, maybe this concern can be external to ActiveJob or to GoodJob. We have released our rate limiter as https://github.com/cheddar-me/pecorino and it is working very well. You could easily do something like this with Pecorino:

class AmazingJob < ApplicationJob
  around_perform do |job, block|
    t = Pecorino::Throttle.new(key: "#{job.class}_job_throttle", over_time: 1.minute, capacity: 1)
    t.request!
    block.call
  rescue Pecorino::Throttled => e
    job.set(wait: e.retry_after).perform_later
  end

  # ...
end

/cc @Marcovecchio

@bjeanes
Copy link

bjeanes commented Dec 17, 2024

Given #1270 can this be closed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
Status: Prioritized Backlog
Development

No branches or pull requests

7 participants