diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 3664e6a49..3589814ac 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -262,7 +262,11 @@ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions| execution = executions.first break if execution.blank? - break :unlocked unless execution&.executable? + + unless execution.executable? + result = ExecutionResult.new(value: nil, unexecutable: true) + break + end yield(execution) if block_given? result = execution.perform diff --git a/app/models/good_job/execution_result.rb b/app/models/good_job/execution_result.rb index ff6ea9991..ac7a0ecaa 100644 --- a/app/models/good_job/execution_result.rb +++ b/app/models/good_job/execution_result.rb @@ -9,17 +9,26 @@ class ExecutionResult # @return [Exception, nil] attr_reader :unhandled_error # @return [Exception, nil] + attr_reader :unexecutable + # @return [Exception, nil] attr_reader :retried alias retried? retried # @param value [Object, nil] # @param handled_error [Exception, nil] # @param unhandled_error [Exception, nil] - def initialize(value:, handled_error: nil, unhandled_error: nil, retried: false) + # @param executable [Boolean, nil] + # @param retried [Boolean, nil] + def initialize(value:, handled_error: nil, unhandled_error: nil, unexecutable: nil, retried: false) @value = value @handled_error = handled_error @unhandled_error = unhandled_error + @unexecutable = unexecutable @retried = retried end + + def succeeded? + !(handled_error || unhandled_error || unexecutable || retried) + end end end diff --git a/app/models/good_job/process.rb b/app/models/good_job/process.rb index de3e9465c..68b4cabf7 100644 --- a/app/models/good_job/process.rb +++ b/app/models/good_job/process.rb @@ -58,8 +58,10 @@ def self.ns_current_state proctitle: $PROGRAM_NAME, preserve_job_records: GoodJob.preserve_job_records, retry_on_unhandled_error: GoodJob.retry_on_unhandled_error, - schedulers: GoodJob::Scheduler.instances.map(&:name), + schedulers: GoodJob::Scheduler.instances.map(&:stats), cron_enabled: GoodJob.configuration.enable_cron?, + total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) }, + total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) }, } end @@ -98,8 +100,16 @@ def deregister end end + def state + super || {} + end + def basename - File.basename(state["proctitle"]) + File.basename(state.fetch("proctitle", "")) + end + + def schedulers + state.fetch("schedulers", []) end def refresh_if_stale(cleanup: false) diff --git a/app/views/good_job/processes/index.html.erb b/app/views/good_job/processes/index.html.erb index 8b47847a2..1112ac31f 100644 --- a/app/views/good_job/processes/index.html.erb +++ b/app/views/good_job/processes/index.html.erb @@ -40,8 +40,8 @@
- <% process.state["schedulers"].each do |scheduler| %> -
<%= scheduler %>
+ <% process.schedulers.each do |scheduler| %> +
<%= scheduler.is_a?(Hash) ? scheduler['name'] : scheduler %>
<% end %>
<%= relative_time(process.created_at) %>
diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 82a26e054..9e2c0950d 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -29,8 +29,8 @@ def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override @capsule = _capsule - self.class.instances << self start_async if GoodJob.async_ready? + self.class.instances << self end # Enqueues the ActiveJob job to be performed. diff --git a/lib/good_job/capsule.rb b/lib/good_job/capsule.rb index ac715e062..ef058037c 100644 --- a/lib/good_job/capsule.rb +++ b/lib/good_job/capsule.rb @@ -12,12 +12,12 @@ class Capsule # @param configuration [GoodJob::Configuration] Configuration to use for this capsule. def initialize(configuration: GoodJob.configuration) - self.class.instances << self @configuration = configuration - @startable = true @running = false @mutex = Mutex.new + + self.class.instances << self end # Start the capsule once. After a shutdown, {#restart} must be used to start again. diff --git a/lib/good_job/cleanup_tracker.rb b/lib/good_job/cleanup_tracker.rb index a0c34cbd1..9822f4e41 100644 --- a/lib/good_job/cleanup_tracker.rb +++ b/lib/good_job/cleanup_tracker.rb @@ -17,7 +17,7 @@ def initialize(cleanup_interval_seconds: false, cleanup_interval_jobs: false) end # Increments job count. - # @return [void] + # @return [Integer] def increment self.job_count += 1 end diff --git a/lib/good_job/cron_manager.rb b/lib/good_job/cron_manager.rb index 0de6c9adc..2724d379f 100644 --- a/lib/good_job/cron_manager.rb +++ b/lib/good_job/cron_manager.rb @@ -35,9 +35,8 @@ def initialize(cron_entries = [], start_on_initialize: false) @cron_entries = cron_entries @tasks = Concurrent::Hash.new - self.class.instances << self - start if start_on_initialize + self.class.instances << self end # Schedule tasks that will enqueue jobs based on their schedule diff --git a/lib/good_job/metrics.rb b/lib/good_job/metrics.rb new file mode 100644 index 000000000..add168ee3 --- /dev/null +++ b/lib/good_job/metrics.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true +module GoodJob # :nodoc: + # Metrics for the scheduler. + class Metrics + def initialize + @empty_executions = Concurrent::AtomicFixnum.new + @errored_executions = Concurrent::AtomicFixnum.new + @succeeded_executions = Concurrent::AtomicFixnum.new + @unexecutable_executions = Concurrent::AtomicFixnum.new + end + + # Increments number of empty queried executions. + # @return [Integer] + def increment_empty_executions + @empty_executions.increment + end + + # Increments number of failed executions. + # @return [Integer] + def increment_errored_executions + @errored_executions.increment + end + + # Increments number of succeeded executions. + # @return [Integer] + def increment_succeeded_executions + @succeeded_executions.increment + end + + # Increments number of unlocked executions. + # @return [Integer] + def increment_unexecutable_executions + @unexecutable_executions.increment + end + + def to_h + { + empty_executions_count: @empty_executions.value, + errored_executions_count: @errored_executions.value, + succeeded_executions_count: @succeeded_executions.value, + unexecutable_executions_count: @unexecutable_executions.value, + }.tap do |values| + values[:total_executions_count] = values.values.sum + end + end + + # Reset counters. + # @return [void] + def reset + @empty_executions.value = 0 + @errored_executions.value = 0 + @succeeded_executions.value = 0 + @unexecutable_executions.value = 0 + end + end +end diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index 7e6d17691..731ece2b8 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -77,10 +77,9 @@ def initialize(*recipients, enable_listening: true) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening - self.class.instances << self - create_executor listen + self.class.instances << self end # Tests whether the notifier is active and has acquired a dedicated database connection. diff --git a/lib/good_job/poller.rb b/lib/good_job/poller.rb index 8ed87bf35..e86cb5ef3 100644 --- a/lib/good_job/poller.rb +++ b/lib/good_job/poller.rb @@ -38,9 +38,8 @@ def initialize(*recipients, poll_interval: nil) @timer_options = DEFAULT_TIMER_OPTIONS.dup @timer_options[:execution_interval] = poll_interval if poll_interval.present? - self.class.instances << self - create_timer + self.class.instances << self end # Tests whether the timer is running. diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 500f2d150..7bceed077 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -3,6 +3,7 @@ require "concurrent/executor/timer_set" require "concurrent/scheduled_task" require "concurrent/utility/processor_counter" +require 'good_job/metrics' module GoodJob # :nodoc: # @@ -74,8 +75,6 @@ def self.from_configuration(configuration, warm_cache_on_initialize: false) def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) - self.class.instances << self - @performer = performer @max_cache = max_cache || 0 @@ -88,8 +87,12 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia @executor_options[:name] = name @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs) + @metrics = ::GoodJob::Metrics.new + @executor_options[:name] = name + create_executor warm_cache if warm_cache_on_initialize + self.class.instances << self end # Tests whether the scheduler is running. @@ -139,6 +142,7 @@ def restart(timeout: -1) instrument("scheduler_restart_pools") do shutdown(timeout: timeout) + @metrics.reset create_executor warm_cache end @@ -197,8 +201,20 @@ def create_thread(state = nil) # @!visibility private # @return [void] def task_observer(time, output, thread_error) - error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil) - GoodJob._on_thread_error(error) if error + result = output.is_a?(GoodJob::ExecutionResult) ? output : nil + + unhandled_error = thread_error || result&.unhandled_error + GoodJob._on_thread_error(unhandled_error) if unhandled_error + + if unhandled_error || result&.handled_error + @metrics.increment_errored_executions + elsif result&.unexecutable + @metrics.increment_unexecutable_executions + elsif result + @metrics.increment_succeeded_executions + else + @metrics.increment_empty_executions + end instrument("finished_job_task", { result: output, error: thread_error, time: time }) return unless output @@ -214,15 +230,17 @@ def task_observer(time, output, thread_error) # Information about the Scheduler # @return [Hash] def stats + available_threads = executor.ready_worker_count { - name: performer.name, + name: name, + queues: performer.name, max_threads: @executor_options[:max_threads], - active_threads: @executor_options[:max_threads] - executor.ready_worker_count, - available_threads: executor.ready_worker_count, + active_threads: @executor_options[:max_threads] - available_threads, + available_threads: available_threads, max_cache: @max_cache, active_cache: cache_count, available_cache: remaining_cache_count, - } + }.merge!(@metrics.to_h) end # Preload existing runnable and future-scheduled jobs diff --git a/spec/lib/good_job/scheduler_spec.rb b/spec/lib/good_job/scheduler_spec.rb index 8bdec7b7c..93ce983a5 100644 --- a/spec/lib/good_job/scheduler_spec.rb +++ b/spec/lib/good_job/scheduler_spec.rb @@ -8,7 +8,7 @@ described_class.instances.each(&:shutdown) end - describe 'name' do + describe '#name' do it 'is human readable and contains configuration values' do scheduler = described_class.new(performer) expect(scheduler.name).to eq('GoodJob::Scheduler(queues= max_threads=5)') @@ -76,6 +76,38 @@ end end + describe '#task_observer' do + it 'increases metric counters' do + allow(GoodJob).to receive(:on_thread_error) + + failed_job_count = 0 + succeeded_job_count = 0 + + allow(performer).to receive(:next) do + if failed_job_count < 7 + failed_job_count += 1 + GoodJob::ExecutionResult.new(value: nil, unhandled_error: StandardError.new("oopsy")) + elsif succeeded_job_count < 9 + succeeded_job_count += 1 + GoodJob::ExecutionResult.new(value: 'success') + end + end + + scheduler = described_class.new(performer) + scheduler.create_thread + sleep_until { scheduler.stats[:total_executions_count] == 17 } + scheduler.shutdown + + expect(scheduler.stats).to include( + empty_executions_count: 1, + errored_executions_count: 7, + succeeded_executions_count: 9, + unexecutable_executions_count: 0, + total_executions_count: 17 + ) + end + end + describe '#shutdown' do it 'shuts down the theadpools' do scheduler = described_class.new(performer) @@ -115,6 +147,25 @@ .to change(scheduler, :running?).from(false).to(true) end + it 'resets metrics' do + allow(performer).to receive(:next).and_return GoodJob::ExecutionResult.new(value: 'hello'), nil + + scheduler = described_class.new(performer) + scheduler.create_thread + + sleep_until do + scheduler.stats.fetch(:succeeded_executions_count) == 1 + end + + scheduler.shutdown + expect(scheduler.stats.fetch(:succeeded_executions_count)).to eq 1 + + expect { scheduler.restart } + .to change(scheduler, :running?).from(false).to(true) + + expect(scheduler.stats.fetch(:succeeded_executions_count)).to eq 0 + end + it 'can be called multiple times' do scheduler = described_class.new(performer) scheduler.shutdown @@ -170,13 +221,19 @@ scheduler = described_class.new(performer, max_threads: max_threads, max_cache: max_cache) expect(scheduler.stats).to eq({ - name: performer.name, + name: scheduler.name, + queues: performer.name, max_threads: max_threads, active_threads: 0, available_threads: max_threads, max_cache: max_cache, active_cache: 0, available_cache: max_cache, + empty_executions_count: 0, + errored_executions_count: 0, + succeeded_executions_count: 0, + unexecutable_executions_count: 0, + total_executions_count: 0, }) end end @@ -222,17 +279,17 @@ all_scheduler, rodents_scheduler, elephants_scheduler = multi_scheduler.schedulers expect(all_scheduler.stats).to include( - name: '*', + queues: '*', max_threads: 1 ) expect(rodents_scheduler.stats).to include( - name: 'mice,ferrets', + queues: 'mice,ferrets', max_threads: 2 ) expect(elephants_scheduler.stats).to include( - name: 'elephant', + queues: 'elephant', max_threads: 4 ) end