Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added metrics to Scheduler and track in Process state #984

Merged
merged 23 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b657d1f
added metrics
AndersGM Jun 13, 2023
3111a28
thread safe counters
AndersGM Jun 20, 2023
1f3ae5f
reset metrics after restart
AndersGM Jun 20, 2023
2960a86
specced both error and success in #task_observer
AndersGM Jun 20, 2023
30db43a
added doc
AndersGM Jun 20, 2023
78d8aba
include metrics in name
AndersGM Jun 20, 2023
042ed03
Merge branch 'main' into metrics
AndersGM Jun 20, 2023
17e5bd7
renamed and reverted
AndersGM Jun 25, 2023
02dae4a
Merge remote-tracking branch 'fork/metrics' into metrics
AndersGM Jun 25, 2023
afeae87
consistent naming
AndersGM Jun 25, 2023
2c3c21e
fixe broken current state
AndersGM Jun 25, 2023
6069cac
Merge branch 'main' into metrics
AndersGM Jun 25, 2023
822a0be
Merge remote-tracking branch 'origin/main' into metrics
bensheldon Jul 1, 2023
e9a26d2
Track Scheduler name and queues separately in stats and display
bensheldon Jul 1, 2023
28b99ad
Fix Scheduler tests
bensheldon Jul 1, 2023
6c9af51
Remove default value for state
bensheldon Jul 1, 2023
3ca81d4
Renamed `failed_` to `errored_`
bensheldon Jul 1, 2023
e59c8da
Track empty and unlocked executions too
bensheldon Jul 1, 2023
74ad726
Have stat totals add up consistently
bensheldon Jul 1, 2023
6fe747a
Ensure stats can be reported when shutdown
bensheldon Jul 1, 2023
169a23c
Don't add object to instances list until fully initialized
bensheldon Jul 1, 2023
0964768
Fix race condition in Scheduler#stats test
bensheldon Jul 1, 2023
b8f6f08
Rename "unlocked" to "unexecutable" and fix counting
bensheldon Jul 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions|
execution = executions.first
break if execution.blank?
break :unlocked unless execution&.executable?

unless execution.executable?
result = ExecutionResult.new(value: nil, unexecutable: true)
break
end

yield(execution) if block_given?
result = execution.perform
Expand Down
11 changes: 10 additions & 1 deletion app/models/good_job/execution_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,26 @@ class ExecutionResult
# @return [Exception, nil]
attr_reader :unhandled_error
# @return [Exception, nil]
attr_reader :unexecutable
# @return [Exception, nil]
attr_reader :retried
alias retried? retried

# @param value [Object, nil]
# @param handled_error [Exception, nil]
# @param unhandled_error [Exception, nil]
def initialize(value:, handled_error: nil, unhandled_error: nil, retried: false)
# @param executable [Boolean, nil]
# @param retried [Boolean, nil]
def initialize(value:, handled_error: nil, unhandled_error: nil, unexecutable: nil, retried: false)
@value = value
@handled_error = handled_error
@unhandled_error = unhandled_error
@unexecutable = unexecutable
@retried = retried
end

def succeeded?
!(handled_error || unhandled_error || unexecutable || retried)
end
end
end
14 changes: 12 additions & 2 deletions app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def self.ns_current_state
proctitle: $PROGRAM_NAME,
preserve_job_records: GoodJob.preserve_job_records,
retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
schedulers: GoodJob::Scheduler.instances.map(&:name),
schedulers: GoodJob::Scheduler.instances.map(&:stats),
bensheldon marked this conversation as resolved.
Show resolved Hide resolved
cron_enabled: GoodJob.configuration.enable_cron?,
total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
}
end

Expand Down Expand Up @@ -98,8 +100,16 @@ def deregister
end
end

def state
super || {}
end

def basename
File.basename(state["proctitle"])
File.basename(state.fetch("proctitle", ""))
end

def schedulers
state.fetch("schedulers", [])
end

def refresh_if_stale(cleanup: false)
Expand Down
4 changes: 2 additions & 2 deletions app/views/good_job/processes/index.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
</div>
</div>
<div class="col">
<% process.state["schedulers"].each do |scheduler| %>
<pre class="mb-0"><%= scheduler %></pre>
<% process.schedulers.each do |scheduler| %>
<pre class="mb-0"><%= scheduler.is_a?(Hash) ? scheduler['name'] : scheduler %></pre>
<% end %>
</div>
<div class="col-2 small"><%= relative_time(process.created_at) %></div>
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable
GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override
@capsule = _capsule

self.class.instances << self
start_async if GoodJob.async_ready?
self.class.instances << self
end

# Enqueues the ActiveJob job to be performed.
Expand Down
4 changes: 2 additions & 2 deletions lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ class Capsule

# @param configuration [GoodJob::Configuration] Configuration to use for this capsule.
def initialize(configuration: GoodJob.configuration)
self.class.instances << self
@configuration = configuration

@startable = true
@running = false
@mutex = Mutex.new

self.class.instances << self
end

# Start the capsule once. After a shutdown, {#restart} must be used to start again.
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/cleanup_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def initialize(cleanup_interval_seconds: false, cleanup_interval_jobs: false)
end

# Increments job count.
# @return [void]
# @return [Integer]
def increment
self.job_count += 1
end
Expand Down
3 changes: 1 addition & 2 deletions lib/good_job/cron_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ def initialize(cron_entries = [], start_on_initialize: false)
@cron_entries = cron_entries
@tasks = Concurrent::Hash.new

self.class.instances << self

start if start_on_initialize
self.class.instances << self
end

# Schedule tasks that will enqueue jobs based on their schedule
Expand Down
56 changes: 56 additions & 0 deletions lib/good_job/metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true
module GoodJob # :nodoc:
# Metrics for the scheduler.
class Metrics
def initialize
AndersGM marked this conversation as resolved.
Show resolved Hide resolved
@empty_executions = Concurrent::AtomicFixnum.new
@errored_executions = Concurrent::AtomicFixnum.new
@succeeded_executions = Concurrent::AtomicFixnum.new
@unexecutable_executions = Concurrent::AtomicFixnum.new
end

# Increments number of empty queried executions.
# @return [Integer]
def increment_empty_executions
@empty_executions.increment
end

# Increments number of failed executions.
# @return [Integer]
def increment_errored_executions
@errored_executions.increment
end

# Increments number of succeeded executions.
# @return [Integer]
def increment_succeeded_executions
@succeeded_executions.increment
end

# Increments number of unlocked executions.
# @return [Integer]
def increment_unexecutable_executions
@unexecutable_executions.increment
end

def to_h
{
empty_executions_count: @empty_executions.value,
errored_executions_count: @errored_executions.value,
succeeded_executions_count: @succeeded_executions.value,
unexecutable_executions_count: @unexecutable_executions.value,
}.tap do |values|
values[:total_executions_count] = values.values.sum
end
end

# Reset counters.
# @return [void]
def reset
@empty_executions.value = 0
@errored_executions.value = 0
@succeeded_executions.value = 0
@unexecutable_executions.value = 0
end
end
end
3 changes: 1 addition & 2 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ def initialize(*recipients, enable_listening: true)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
@enable_listening = enable_listening

self.class.instances << self

create_executor
listen
self.class.instances << self
end

# Tests whether the notifier is active and has acquired a dedicated database connection.
Expand Down
3 changes: 1 addition & 2 deletions lib/good_job/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ def initialize(*recipients, poll_interval: nil)
@timer_options = DEFAULT_TIMER_OPTIONS.dup
@timer_options[:execution_interval] = poll_interval if poll_interval.present?

self.class.instances << self

create_timer
self.class.instances << self
end

# Tests whether the timer is running.
Expand Down
34 changes: 26 additions & 8 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "concurrent/executor/timer_set"
require "concurrent/scheduled_task"
require "concurrent/utility/processor_counter"
require 'good_job/metrics'

module GoodJob # :nodoc:
#
Expand Down Expand Up @@ -74,8 +75,6 @@ def self.from_configuration(configuration, warm_cache_on_initialize: false)
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

self.class.instances << self

@performer = performer

@max_cache = max_cache || 0
Expand All @@ -88,8 +87,12 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
@executor_options[:name] = name

@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
@metrics = ::GoodJob::Metrics.new
@executor_options[:name] = name

create_executor
warm_cache if warm_cache_on_initialize
self.class.instances << self
end

# Tests whether the scheduler is running.
Expand Down Expand Up @@ -139,6 +142,7 @@ def restart(timeout: -1)

instrument("scheduler_restart_pools") do
shutdown(timeout: timeout)
@metrics.reset
create_executor
warm_cache
end
Expand Down Expand Up @@ -197,8 +201,20 @@ def create_thread(state = nil)
# @!visibility private
# @return [void]
def task_observer(time, output, thread_error)
error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil)
GoodJob._on_thread_error(error) if error
result = output.is_a?(GoodJob::ExecutionResult) ? output : nil

unhandled_error = thread_error || result&.unhandled_error
GoodJob._on_thread_error(unhandled_error) if unhandled_error

if unhandled_error || result&.handled_error
@metrics.increment_errored_executions
elsif result&.unexecutable
@metrics.increment_unexecutable_executions
elsif result
@metrics.increment_succeeded_executions
else
@metrics.increment_empty_executions
end

instrument("finished_job_task", { result: output, error: thread_error, time: time })
return unless output
Expand All @@ -214,15 +230,17 @@ def task_observer(time, output, thread_error)
# Information about the Scheduler
# @return [Hash]
def stats
available_threads = executor.ready_worker_count
{
name: performer.name,
name: name,
queues: performer.name,
max_threads: @executor_options[:max_threads],
active_threads: @executor_options[:max_threads] - executor.ready_worker_count,
available_threads: executor.ready_worker_count,
active_threads: @executor_options[:max_threads] - available_threads,
available_threads: available_threads,
max_cache: @max_cache,
active_cache: cache_count,
available_cache: remaining_cache_count,
}
}.merge!(@metrics.to_h)
end

# Preload existing runnable and future-scheduled jobs
Expand Down
Loading