Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a ScheduledTaskQueue to manage future scheduled tasks stored in memory #163

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,15 @@ def enqueue_at(active_job, timestamp)
ensure
good_job.advisory_unlock
end
end
else
job_state = {
queue_name: good_job.queue_name,
scheduled_at: good_job.scheduled_at,
}

executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name)
Notifier.notify(queue_name: good_job.queue_name) unless executed_locally
executed_locally = execute_async? && @scheduler.create_thread(job_state)
Notifier.notify(job_state) unless executed_locally
end

good_job
end
Expand Down
14 changes: 14 additions & 0 deletions lib/good_job/custom_concurrent/scheduled_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module GoodJob
module CustomConcurrent
class ScheduledTask < Concurrent::ScheduledTask
attr_reader :scheduled_at

def initialize(timestamp, **args, &block)
@scheduled_at = timestamp

delay = [(timestamp - Time.current).to_f, 0].max
super(delay, **args, &block)
end
end
end
end
21 changes: 21 additions & 0 deletions lib/good_job/custom_concurrent/thread_pool_executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require "concurrent/executor/thread_pool_executor"

module GoodJob
module CustomConcurrent
# Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status.
# @private
class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
# Number of inactive threads available to execute tasks.
# https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
# @return [Integer]
def ready_worker_count
synchronize do
workers_still_to_be_created = @max_length - @pool.length
workers_created_but_waiting = @ready.length

workers_still_to_be_created + workers_created_but_waiting
end
end
end
end
end
16 changes: 16 additions & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ def self.queue_parser(string)
# @return [ActiveRecord::Relation]
scope :priority_ordered, -> { order('priority DESC NULLS LAST') }

# Order jobs by scheduled (unscheduled or soonest first).
# @!method schedule_ordered
# @!scope class
# @return [ActiveRecord::Relation]
scope :schedule_ordered, -> { order('scheduled_at ASC NULLS FIRST, created_at ASC') }

# Get Jobs were completed before the given timestamp. If no timestamp is
# provided, get all jobs that have been completed. By default, GoodJob
# deletes jobs after they are completed and this will find no jobs.
Expand Down Expand Up @@ -147,6 +153,16 @@ def self.perform_with_advisory_lock
[good_job, result, error] if good_job
end

# Fetches the scheduled execution time of the next eligible Job(s).
# @return [Array<(DateTime)>]
def self.next_scheduled_at(count = 1, after: nil)
query = advisory_unlocked.unfinished.schedule_ordered.limit(count)
if after
query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after)
end
query.pluck(:created_at, :scheduled_at).map { |timestamps| timestamps.compact.max }
end

# Places an ActiveJob job on a queue by creating a new {Job} record.
# @param active_job [ActiveJob::Base]
# The job to enqueue.
Expand Down
16 changes: 15 additions & 1 deletion lib/good_job/performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ class Performer
# Used to determine whether the performer should be used in GoodJob's
# current state. GoodJob state is a +Hash+ that will be passed as the
# first argument to +filter+ and includes info like the current queue.
def initialize(target, method_name, name: nil, filter: nil)
# @param next_at_method [Symbol]
# The name of a method on +target+ that returns timestamps of when next
# tasks may be available.
def initialize(target, method_name, name: nil, filter: nil, next_at_method: nil)
@target = target
@method_name = method_name
@name = name
@filter = filter
@next_at_method = next_at_method
end

# Find and perform any eligible jobs.
Expand All @@ -56,5 +60,15 @@ def next?(state = {})

@filter.call(state)
end

# The Returns timestamps of when next tasks may be available.
# @param count [Integer] number of timestamps to return
# @param count [DateTime, Time, nil] jobs scheduled after this time
# @return [Array<(Time, Timestamp)>, nil]
def next_at(count = 1, after: nil)
return unless @next_at_method_name

@target.public_send(@next_at_method_name, count, after: after)
end
end
end
43 changes: 43 additions & 0 deletions lib/good_job/scheduled_task_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
module GoodJob
class ScheduledTaskQueue
DEFAULT_MAX_SIZE = 5

attr_reader :max_size

def initialize(max_size: nil)
@max_size = max_size || DEFAULT_MAX_SIZE
@queue = Concurrent::Array.new
@mutex = Mutex.new
end

def push(scheduled_task)
@mutex.synchronize do
queue.select!(&:pending?)

if max_size.size == 0 || queue.size == max_size && scheduled_task.scheduled_at > queue.last.scheduled_at
scheduled_task.cancel
return false
end

queue.unshift(scheduled_task)
queue.sort_by!(&:scheduled_at)

removed_items = queue.slice!(max_size..-1)
removed_items&.each(&:cancel)

true
end
end

def size
@mutex.synchronize do
queue.select!(&:pending?)
queue.size
end
end

private

attr_reader :queue
end
end
81 changes: 49 additions & 32 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
require "concurrent/executor/thread_pool_executor"
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"

module GoodJob # :nodoc:
#
# Schedulers are generic thread pools that are responsible for
Expand All @@ -22,7 +18,7 @@ class Scheduler
max_threads: Configuration::DEFAULT_MAX_THREADS,
auto_terminate: true,
idletime: 60,
max_queue: -1,
max_queue: 0,
fallback_policy: :discard,
}.freeze

Expand Down Expand Up @@ -76,6 +72,8 @@ def initialize(performer, max_threads: nil)
@pool_options[:max_threads] = max_threads if max_threads.present?
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})"

@scheduled_task_queue = ScheduledTaskQueue.new(max_size: @pool_options[:max_threads])

create_pool
end

Expand Down Expand Up @@ -121,18 +119,46 @@ def restart(wait: true)
# Returns +true+ if the performer started executing work.
# Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
def create_thread(state = nil)
return nil unless @pool.running? && @pool.ready_worker_count.positive?
return false if state && [email protected]?(state)
return nil unless @pool.running?

if state
return false unless @performer.next?(state)

future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
if state[:scheduled_at]
scheduled_at = if state[:scheduled_at].is_a? String
Time.zone.parse state[:scheduled_at]
else
state[:scheduled_at]
end
end
end
future.add_observer(self, :task_observer)
future.execute

true
if scheduled_at && scheduled_at > Time.current
task = CustomConcurrent::ScheduledTask.new(scheduled_at, args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end

if @scheduled_task_queue.push(task)
task.add_observer(self, :task_observer)
task.execute
end

true
elsif @pool.ready_worker_count.positive?
task = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
task.add_observer(self, :task_observer)
task.execute

true
else
nil
end
end

# Invoked on completion of ThreadPoolExecutor task
Expand All @@ -141,14 +167,21 @@ def create_thread(state = nil)
def task_observer(time, output, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_job_task", { result: output, error: thread_error, time: time })
create_thread if output

if output
create_thread
elsif @performer.respond_to?(:next_at)
@scheduled_task_queue.max_size - @scheduled_task_queue.size
Array(@performer.next_at(
@timer_wake.push(next_at) if next_at
end
end

private

def create_pool
instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
@pool = CustomConcurrent::ThreadPoolExecutor.new(@pool_options)
end
end

Expand All @@ -162,20 +195,4 @@ def instrument(name, payload = {}, &block)
ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block)
end
end

# Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status.
# @private
class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
# Number of inactive threads available to execute tasks.
# https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
# @return [Integer]
def ready_worker_count
synchronize do
workers_still_to_be_created = @max_length - @pool.length
workers_created_but_waiting = @ready.length

workers_still_to_be_created + workers_created_but_waiting
end
end
end
end
2 changes: 1 addition & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
RSpec.describe GoodJob::Adapter do
let(:adapter) { described_class.new }
let(:active_job) { instance_double(ApplicationJob) }
let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default') }
let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default', scheduled_at: nil) }

describe '#initialize' do
it 'guards against improper execution modes' do
Expand Down
6 changes: 6 additions & 0 deletions spec/lib/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@

expect(scheduler.create_thread(queue_name: 'elephant')).to eq false
end

it 'will schedule in the future' do
configuration = GoodJob::Configuration.new({ queues: 'mice:2' })
scheduler = described_class.from_configuration(configuration)
expect(scheduler.create_thread(queue_name: 'mice', scheduled_at: 1.second.from_now)).to eq true
end
end

describe '.from_configuration' do
Expand Down