Skip to content

Commit

Permalink
Add cron-like support for recurring/repeating jobs (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Jul 27, 2021
1 parent e769a52 commit 811fb26
Show file tree
Hide file tree
Showing 18 changed files with 361 additions and 13 deletions.
8 changes: 7 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ PATH
activejob (>= 5.2.0)
activerecord (>= 5.2.0)
concurrent-ruby (>= 1.0.2)
fugit (>= 1.1)
railties (>= 5.2.0)
thor (>= 0.14.1)
zeitwerk (>= 2.0)
Expand Down Expand Up @@ -154,6 +155,8 @@ GEM
rubocop
smart_properties
erubi (1.10.0)
et-orbi (1.2.4)
tzinfo
faraday (1.5.1)
faraday-em_http (~> 1.0)
faraday-em_synchrony (~> 1.0)
Expand All @@ -177,6 +180,9 @@ GEM
ffi (1.15.3-java)
fiber-local (1.0.0)
foreman (0.87.2)
fugit (1.5.0)
et-orbi (~> 1.1, >= 1.1.8)
raabro (~> 1.4)
gem-release (2.2.2)
github_changelog_generator (1.16.4)
activesupport
Expand Down Expand Up @@ -220,7 +226,6 @@ GEM
mixlib-shellout (3.2.5)
chef-utils
msgpack (1.4.2)
msgpack (1.4.2-java)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.7)
Expand Down Expand Up @@ -262,6 +267,7 @@ GEM
nio4r (~> 2.0)
puma (5.3.2-java)
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.5.2)
racc (1.5.2-java)
rack (2.2.3)
Expand Down
50 changes: 47 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Configuration options](#configuration-options)
- [Global options](#global-options)
- [Dashboard](#dashboard)
- [ActiveJob Concurrency](#activejob-concurrency)
- [ActiveJob concurrency](#activejob-concurrency)
- [Cron-style repeating/recurring jobs](#cron-style-repeatingrecurring-jobs)
- [Updating](#updating)
- [Go deeper](#go-deeper)
- [Exceptions, retries, and reliability](#exceptions-retries-and-reliability)
Expand Down Expand Up @@ -156,6 +157,7 @@ Options:
[--poll-interval=SECONDS] # Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1)
[--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)
[--shutdown-timeout=SECONDS] # Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever))
[--enable-cron] # Whether to run cron process (default: false)
[--daemonize] # Run as a background daemon (default: false)
[--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid)
Expand Down Expand Up @@ -212,14 +214,22 @@ config.good_job.execution_mode = :async_server
config.good_job.max_threads = 5
config.good_job.poll_interval = 30 # seconds
config.good_job.shutdown_timeout = 25 # seconds
config.good_job.enable_cron = true
config.good_job.cron = { example: { cron: '0 * * * *', class: 'ExampleJob' } }
# ...or all at once.
config.good_job = {
execution_mode: :async_server,
max_threads: 5,
poll_interval: 30,
shutdown_timeout: 25,
enable_cron: true,
cron: {
example: {
cron: '0 * * * *',
class: 'ExampleJob'
},
},
}
```

Expand All @@ -235,6 +245,8 @@ Available configuration options are:
- `poll_interval` (integer) sets the number of seconds between polls for jobs when `execution_mode` is set to `:async` or `:async_server`. You can also set this with the environment variable `GOOD_JOB_POLL_INTERVAL`.
- `max_cache` (integer) sets the maximum number of scheduled jobs that will be stored in memory to reduce execution latency when also polling for scheduled jobs. Caching 10,000 scheduled jobs uses approximately 20MB of memory. You can also set this with the environment variable `GOOD_JOB_MAX_CACHE`.
- `shutdown_timeout` (float) number of seconds to wait for jobs to finish when shutting down before stopping the thread. Defaults to forever: `-1`. You can also set this with the environment variable `GOOD_JOB_SHUTDOWN_TIMEOUT`.
- `enable_cron` (boolean) whether to run cron process. Defaults to `false`. You can also set this with the environment variable `GOOD_JOB_ENABLE_CRON`.
- `cron` (hash) cron configuration. Defaults to `{}`. You can also set this as a JSON string with the environment variable `GOOD_JOB_CRON`

By default, GoodJob configures the following execution modes per environment:

Expand Down Expand Up @@ -320,7 +332,7 @@ GoodJob includes a Dashboard as a mountable `Rails::Engine`.
end
```
### ActiveJob Concurrency
### ActiveJob concurrency
GoodJob can extend ActiveJob to provide limits on concurrently running jobs, either at time of _enqueue_ or at _perform_.
Expand Down Expand Up @@ -349,6 +361,38 @@ class MyJob < ApplicationJob
end
```
### Cron-style repeating/recurring jobs
GoodJob can enqueue jobs on a recurring basis that can be used as a replacement for cron.
Cron-style jobs are run on every GoodJob process (e.g. CLI or `async` execution mode) when `config.good_job.enable_cron = true`; use GoodJob's [ActiveJob concurrency](#activejob-concurrency) extension to limit the number of jobs that are enqueued.

Cron-format is parsed by the [`fugit`](https://github.com/floraison/fugit) gem, which has support for seconds-level resolution (e.g. `* * * * * *`).

```ruby
# config/environments/application.rb or a specific environment e.g. production.rb
# Enable cron in this process; e.g. only run on the first Heroku worker process
config.good_job.enable_cron = ENV['DYNO'] == 'worker.1' # or `true` or via $GOOD_JOB_ENABLE_CRON
# Configure cron with a hash that has a unique key for each recurring job
config.good_job.cron = {
# Every 15 minutes, enqueue `ExampleJob.set(priority: -10).perform_later(52, name: "Alice")`
frequent_task: { # each recurring job must have a unique key
cron: "*/15 * * * *", # cron-style scheduling format by fugit gem
class: "ExampleJob", # reference the Job class with a string
args: [42, { name: "Alice" }], # arguments to pass; can also be a proc e.g. `-> { { when: Time.now } }`
set: { priority: -10 }, # additional ActiveJob properties; can also be a lambda/proc e.g. `-> { { priority: [1,2].sample } }`
description: "Something helpful", # optional description that appears in Dashboard (coming soon!)
},
another_task: {
cron: "0 0,12 * * *",
class: "AnotherJob",
},
# etc.
}
```

### Updating

GoodJob follows semantic versioning, though updates may be encouraged through deprecation warnings in minor versions.
Expand Down
1 change: 1 addition & 0 deletions good_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "activejob", ">= 5.2.0"
spec.add_dependency "activerecord", ">= 5.2.0"
spec.add_dependency "concurrent-ruby", ">= 1.0.2"
spec.add_dependency "fugit", ">= 1.1"
spec.add_dependency "railties", ">= 5.2.0"
spec.add_dependency "thor", ">= 0.14.1"
spec.add_dependency "zeitwerk", ">= 2.0"
Expand Down
19 changes: 12 additions & 7 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,13 @@ def self.shutdown(timeout: -1, wait: nil)
wait ? -1 : nil
end

executables = Array(Notifier.instances) + Array(Poller.instances) + Array(Scheduler.instances)
_shutdown_all(executables, timeout: timeout)
_shutdown_all(_executables, timeout: timeout)
end

# Tests whether jobs have stopped executing.
# @return [Boolean] whether background threads are shut down
def self.shutdown?
Notifier.instances.all?(&:shutdown?) &&
Poller.instances.all?(&:shutdown?) &&
Scheduler.instances.all?(&:shutdown?)
_executables.all?(&:shutdown?)
end

# Stops and restarts executing jobs.
Expand All @@ -126,8 +123,7 @@ def self.shutdown?
# @param timeout [Numeric, nil] Seconds to wait for active threads to finish.
# @return [void]
def self.restart(timeout: -1)
executables = Array(Notifier.instances) + Array(Poller.instances) + Array(Scheduler.instances)
_shutdown_all(executables, :restart, timeout: timeout)
_shutdown_all(_executables, :restart, timeout: timeout)
end

# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler})
Expand All @@ -146,5 +142,14 @@ def self._shutdown_all(executables, method_name = :shutdown, timeout: -1)
end
end

def self._executables
[].concat(
CronManager.instances,
Notifier.instances,
Poller.instances,
Scheduler.instances
)
end

ActiveSupport.run_load_hooks(:good_job, self)
end
2 changes: 2 additions & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval
@scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: Rails.application.initialized?)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]

@cron_manager = GoodJob::CronManager.new(@configuration.cron, start_on_initialize: Rails.application.initialized?) if @configuration.enable_cron?
end
end

Expand Down
9 changes: 7 additions & 2 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ def exit_on_failure?
type: :numeric,
banner: 'SECONDS',
desc: "Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever))"
method_option :enable_cron,
type: :boolean,
desc: "Whether to run cron process (default: false)"
method_option :daemonize,
type: :boolean,
desc: "Run as a background daemon (default: false)"
method_option :pidfile,
type: :string,
desc: "Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid)"

def start
set_up_application!
configuration = GoodJob::Configuration.new(options)
Expand All @@ -87,7 +91,7 @@ def start
scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]

cron_manager = GoodJob::CronManager.new(configuration.cron, start_on_initialize: true) if configuration.enable_cron?
@stop_good_job_executable = false
%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
Expand All @@ -98,7 +102,7 @@ def start
break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
end

executors = [notifier, poller, scheduler]
executors = [notifier, poller, cron_manager, scheduler].compact
GoodJob._shutdown_all(executors, timeout: configuration.shutdown_timeout)
end

Expand All @@ -124,6 +128,7 @@ def start
type: :numeric,
banner: 'SECONDS',
desc: "Delete records finished more than this many seconds ago (env var: GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO, default: 86400)"

def cleanup_preserved_jobs
set_up_application!

Expand Down
24 changes: 24 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class Configuration
DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO = 24 * 60 * 60
# Default to always wait for jobs to finish for {Adapter#shutdown}
DEFAULT_SHUTDOWN_TIMEOUT = -1
# Default to not running cron
DEFAULT_ENABLE_CRON = false

# The options that were explicitly set when initializing +Configuration+.
# @return [Hash]
Expand Down Expand Up @@ -129,6 +131,28 @@ def shutdown_timeout
).to_f
end

# Whether to run cron
# @return [Boolean]
def enable_cron
value = ActiveModel::Type::Boolean.new.cast(
options[:enable_cron] ||
rails_config[:enable_cron] ||
env['GOOD_JOB_ENABLE_CRON'] ||
false
)
value && cron.size.positive?
end
alias enable_cron? enable_cron

def cron
env_cron = JSON.parse(ENV['GOOD_JOB_CRON']) if ENV['GOOD_JOB_CRON'].present?

options[:cron] ||
rails_config[:cron] ||
env_cron ||
{}
end

# Number of seconds to preserve jobs when using the +good_job cleanup_preserved_jobs+ CLI command.
# This configuration is only used when {GoodJob.preserve_job_records} is +true+.
# @return [Integer]
Expand Down
115 changes: 115 additions & 0 deletions lib/good_job/cron_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# frozen_string_literal: true
require "concurrent/hash"
require "concurrent/scheduled_task"
require "fugit"

module GoodJob # :nodoc:
#
# CronManagers enqueue jobs on a repeating schedule.
#
class CronManager
# @!attribute [r] instances
# @!scope class
# List of all instantiated CronManagers in the current process.
# @return [Array<GoodJob::CronManagers>, nil]
cattr_reader :instances, default: [], instance_reader: false

# Task observer for cron task
# @param time [Time]
# @param output [Object]
# @param thread_error [Exception]
def self.task_observer(time, output, thread_error) # rubocop:disable Lint/UnusedMethodArgument
return if thread_error.is_a? Concurrent::CancelledOperationError

GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
end

# Job configuration to be scheduled
# @return [Hash]
attr_reader :schedules

# @param schedules [Hash]
# @param start_on_initialize [Boolean]
def initialize(schedules = {}, start_on_initialize: false)
@running = false
@schedules = schedules
@tasks = Concurrent::Hash.new

self.class.instances << self

start if start_on_initialize
end

# Schedule tasks that will enqueue jobs based on their schedule
def start
ActiveSupport::Notifications.instrument("cron_manager_start.good_job", cron_jobs: @schedules) do
@running = true
schedules.each_key { |cron_key| create_task(cron_key) }
end
end

# Stop/cancel any scheduled tasks
# @param timeout [Numeric, nil] Unused but retained for compatibility
def shutdown(timeout: nil) # rubocop:disable Lint/UnusedMethodArgument
@running = false
@tasks.each do |_cron_key, task|
task.cancel
end
@tasks.clear
end

# Stop and restart
# @param timeout [Numeric, nil] Unused but retained for compatibility
def restart(timeout: nil) # rubocop:disable Lint/UnusedMethodArgument
shutdown
start
end

# Tests whether the manager is running.
# @return [Boolean, nil]
def running?
@running
end

# Tests whether the manager is shutdown.
# @return [Boolean, nil]
def shutdown?
!running?
end

# Enqueues a scheduled task
# @param cron_key [Symbol, String] the key within the schedule to use
def create_task(cron_key)
schedule = @schedules[cron_key]
return false if schedule.blank?

fugit = Fugit::Cron.parse(schedule.fetch(:cron))
delay = [(fugit.next_time - Time.current).to_f, 0].max

future = Concurrent::ScheduledTask.new(delay, args: [self, cron_key]) do |thr_scheduler, thr_cron_key|
# Re-schedule the next cron task before executing the current task
thr_scheduler.create_task(thr_cron_key)

CurrentExecution.reset
CurrentExecution.cron_key = thr_cron_key

Rails.application.executor.wrap do
schedule = thr_scheduler.schedules.fetch(thr_cron_key).with_indifferent_access
job_class = schedule.fetch(:class).constantize

job_set_value = schedule.fetch(:set, {})
job_set = job_set_value.respond_to?(:call) ? job_set_value.call : job_set_value

job_args_value = schedule.fetch(:args, [])
job_args = job_args_value.respond_to?(:call) ? job_args_value.call : job_args_value

job_class.set(job_set).perform_later(*job_args)
end
end

@tasks[cron_key] = future
future.add_observer(self.class, :task_observer)
future.execute
end
end
end
Loading

0 comments on commit 811fb26

Please sign in to comment.