diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 1d7e96000..bc6af7ec4 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -76,10 +76,15 @@ def enqueue_at(active_job, timestamp) ensure good_job.advisory_unlock end - end + else + job_state = { + queue_name: good_job.queue_name, + scheduled_at: good_job.scheduled_at, + } - executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name) - Notifier.notify(queue_name: good_job.queue_name) unless executed_locally + executed_locally = execute_async? && @scheduler.create_thread(job_state) + Notifier.notify(job_state) unless executed_locally + end good_job end diff --git a/lib/good_job/custom_concurrent/scheduled_task.rb b/lib/good_job/custom_concurrent/scheduled_task.rb new file mode 100644 index 000000000..696da9152 --- /dev/null +++ b/lib/good_job/custom_concurrent/scheduled_task.rb @@ -0,0 +1,14 @@ +module GoodJob + module CustomConcurrent + class ScheduledTask < Concurrent::ScheduledTask + attr_reader :scheduled_at + + def initialize(timestamp, **args, &block) + @scheduled_at = timestamp + + delay = [(timestamp - Time.current).to_f, 0].max + super(delay, **args, &block) + end + end + end +end diff --git a/lib/good_job/custom_concurrent/thread_pool_executor.rb b/lib/good_job/custom_concurrent/thread_pool_executor.rb new file mode 100644 index 000000000..d2b5e6c68 --- /dev/null +++ b/lib/good_job/custom_concurrent/thread_pool_executor.rb @@ -0,0 +1,21 @@ +require "concurrent/executor/thread_pool_executor" + +module GoodJob + module CustomConcurrent + # Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status. + # @private + class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor + # Number of inactive threads available to execute tasks. + # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437 + # @return [Integer] + def ready_worker_count + synchronize do + workers_still_to_be_created = @max_length - @pool.length + workers_created_but_waiting = @ready.length + + workers_still_to_be_created + workers_created_but_waiting + end + end + end + end +end diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index 7a1264571..800590961 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -72,6 +72,12 @@ def self.queue_parser(string) # @return [ActiveRecord::Relation] scope :priority_ordered, -> { order('priority DESC NULLS LAST') } + # Order jobs by scheduled (unscheduled or soonest first). + # @!method schedule_ordered + # @!scope class + # @return [ActiveRecord::Relation] + scope :schedule_ordered, -> { order('scheduled_at ASC NULLS FIRST, created_at ASC') } + # Get Jobs were completed before the given timestamp. If no timestamp is # provided, get all jobs that have been completed. By default, GoodJob # deletes jobs after they are completed and this will find no jobs. @@ -147,6 +153,16 @@ def self.perform_with_advisory_lock [good_job, result, error] if good_job end + # Fetches the scheduled execution time of the next eligible Job(s). + # @return [Array<(DateTime)>] + def self.next_scheduled_at(count = 1, after: nil) + query = advisory_unlocked.unfinished.schedule_ordered.limit(count) + if after + query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after) + end + query.pluck(:created_at, :scheduled_at).map { |timestamps| timestamps.compact.max } + end + # Places an ActiveJob job on a queue by creating a new {Job} record. # @param active_job [ActiveJob::Base] # The job to enqueue. diff --git a/lib/good_job/performer.rb b/lib/good_job/performer.rb index a24d87026..72b8aa2d8 100644 --- a/lib/good_job/performer.rb +++ b/lib/good_job/performer.rb @@ -28,11 +28,15 @@ class Performer # Used to determine whether the performer should be used in GoodJob's # current state. GoodJob state is a +Hash+ that will be passed as the # first argument to +filter+ and includes info like the current queue. - def initialize(target, method_name, name: nil, filter: nil) + # @param next_at_method [Symbol] + # The name of a method on +target+ that returns timestamps of when next + # tasks may be available. + def initialize(target, method_name, name: nil, filter: nil, next_at_method: nil) @target = target @method_name = method_name @name = name @filter = filter + @next_at_method = next_at_method end # Find and perform any eligible jobs. @@ -56,5 +60,15 @@ def next?(state = {}) @filter.call(state) end + + # The Returns timestamps of when next tasks may be available. + # @param count [Integer] number of timestamps to return + # @param count [DateTime, Time, nil] jobs scheduled after this time + # @return [Array<(Time, Timestamp)>, nil] + def next_at(count = 1, after: nil) + return unless @next_at_method_name + + @target.public_send(@next_at_method_name, count, after: after) + end end end diff --git a/lib/good_job/scheduled_task_queue.rb b/lib/good_job/scheduled_task_queue.rb new file mode 100644 index 000000000..e00942761 --- /dev/null +++ b/lib/good_job/scheduled_task_queue.rb @@ -0,0 +1,43 @@ +module GoodJob + class ScheduledTaskQueue + DEFAULT_MAX_SIZE = 5 + + attr_reader :max_size + + def initialize(max_size: nil) + @max_size = max_size || DEFAULT_MAX_SIZE + @queue = Concurrent::Array.new + @mutex = Mutex.new + end + + def push(scheduled_task) + @mutex.synchronize do + queue.select!(&:pending?) + + if max_size.size == 0 || queue.size == max_size && scheduled_task.scheduled_at > queue.last.scheduled_at + scheduled_task.cancel + return false + end + + queue.unshift(scheduled_task) + queue.sort_by!(&:scheduled_at) + + removed_items = queue.slice!(max_size..-1) + removed_items&.each(&:cancel) + + true + end + end + + def size + @mutex.synchronize do + queue.select!(&:pending?) + queue.size + end + end + + private + + attr_reader :queue + end +end diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 1e2907057..4c6152ec4 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -1,7 +1,3 @@ -require "concurrent/executor/thread_pool_executor" -require "concurrent/timer_task" -require "concurrent/utility/processor_counter" - module GoodJob # :nodoc: # # Schedulers are generic thread pools that are responsible for @@ -22,7 +18,7 @@ class Scheduler max_threads: Configuration::DEFAULT_MAX_THREADS, auto_terminate: true, idletime: 60, - max_queue: -1, + max_queue: 0, fallback_policy: :discard, }.freeze @@ -76,6 +72,8 @@ def initialize(performer, max_threads: nil) @pool_options[:max_threads] = max_threads if max_threads.present? @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})" + @scheduled_task_queue = ScheduledTaskQueue.new(max_size: @pool_options[:max_threads]) + create_pool end @@ -121,18 +119,46 @@ def restart(wait: true) # Returns +true+ if the performer started executing work. # Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. def create_thread(state = nil) - return nil unless @pool.running? && @pool.ready_worker_count.positive? - return false if state && !@performer.next?(state) + return nil unless @pool.running? + + if state + return false unless @performer.next?(state) - future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| - output = nil - Rails.application.executor.wrap { output = performer.next } - output + if state[:scheduled_at] + scheduled_at = if state[:scheduled_at].is_a? String + Time.zone.parse state[:scheduled_at] + else + state[:scheduled_at] + end + end end - future.add_observer(self, :task_observer) - future.execute - true + if scheduled_at && scheduled_at > Time.current + task = CustomConcurrent::ScheduledTask.new(scheduled_at, args: [@performer], executor: @pool) do |performer| + output = nil + Rails.application.executor.wrap { output = performer.next } + output + end + + if @scheduled_task_queue.push(task) + task.add_observer(self, :task_observer) + task.execute + end + + true + elsif @pool.ready_worker_count.positive? + task = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| + output = nil + Rails.application.executor.wrap { output = performer.next } + output + end + task.add_observer(self, :task_observer) + task.execute + + true + else + nil + end end # Invoked on completion of ThreadPoolExecutor task @@ -141,14 +167,21 @@ def create_thread(state = nil) def task_observer(time, output, thread_error) GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) instrument("finished_job_task", { result: output, error: thread_error, time: time }) - create_thread if output + + if output + create_thread + elsif @performer.respond_to?(:next_at) + @scheduled_task_queue.max_size - @scheduled_task_queue.size + Array(@performer.next_at( + @timer_wake.push(next_at) if next_at + end end private def create_pool instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do - @pool = ThreadPoolExecutor.new(@pool_options) + @pool = CustomConcurrent::ThreadPoolExecutor.new(@pool_options) end end @@ -162,20 +195,4 @@ def instrument(name, payload = {}, &block) ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block) end end - - # Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status. - # @private - class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor - # Number of inactive threads available to execute tasks. - # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437 - # @return [Integer] - def ready_worker_count - synchronize do - workers_still_to_be_created = @max_length - @pool.length - workers_created_but_waiting = @ready.length - - workers_still_to_be_created + workers_created_but_waiting - end - end - end end diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index 0d281f5ff..3d6dfee27 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -4,7 +4,7 @@ RSpec.describe GoodJob::Adapter do let(:adapter) { described_class.new } let(:active_job) { instance_double(ApplicationJob) } - let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default') } + let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default', scheduled_at: nil) } describe '#initialize' do it 'guards against improper execution modes' do diff --git a/spec/lib/good_job/scheduler_spec.rb b/spec/lib/good_job/scheduler_spec.rb index cd91c5b7a..acf68a920 100644 --- a/spec/lib/good_job/scheduler_spec.rb +++ b/spec/lib/good_job/scheduler_spec.rb @@ -86,6 +86,12 @@ expect(scheduler.create_thread(queue_name: 'elephant')).to eq false end + + it 'will schedule in the future' do + configuration = GoodJob::Configuration.new({ queues: 'mice:2' }) + scheduler = described_class.from_configuration(configuration) + expect(scheduler.create_thread(queue_name: 'mice', scheduled_at: 1.second.from_now)).to eq true + end end describe '.from_configuration' do