Skip to content

Commit

Permalink
Ensure Process records always exist when executing jobs and are refre…
Browse files Browse the repository at this point in the history
…shed in background
  • Loading branch information
bensheldon committed Sep 15, 2023
1 parent 3c45787 commit e98c176
Show file tree
Hide file tree
Showing 38 changed files with 921 additions and 272 deletions.
2 changes: 1 addition & 1 deletion app/controllers/good_job/processes_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module GoodJob
class ProcessesController < GoodJob::ApplicationController
def index
@processes = GoodJob::Process.active.order(created_at: :desc)
@processes = GoodJob::CapsuleRecord.active.order(created_at: :desc)
end
end
end
2 changes: 1 addition & 1 deletion app/models/concerns/good_job/advisory_lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ module AdvisoryLockable
# @return [Boolean]
attr_accessor :create_with_advisory_lock

after_create -> { advisory_lock }, if: :create_with_advisory_lock
after_create -> { advisory_lock || (raise ActiveRecord::RecordInvalid) }, if: :create_with_advisory_lock
end

class_methods do
Expand Down
7 changes: 7 additions & 0 deletions app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def error_event_migrated?
migration_pending_warning!
false
end

def process_lock_migrated?
return true if columns_hash["locked_by_id"].present?

migration_pending_warning!
false
end
end

# The ActiveJob job class, as a string
Expand Down
4 changes: 2 additions & 2 deletions app/models/good_job/base_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def self.migrated?

# Runs the block with self.logger silenced.
# If self.logger is nil, simply runs the block.
def self.with_logger_silenced(&block)
def self.with_logger_silenced(silent: true, &block)
# Assign to a local variable, just in case it's modified in another thread concurrently
logger = self.logger
if logger.respond_to? :silence
if silent && logger.respond_to?(:silence)
logger.silence(&block)
else
yield
Expand Down
153 changes: 153 additions & 0 deletions app/models/good_job/capsule_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# frozen_string_literal: true

require 'socket'

module GoodJob # :nodoc:
# Active Record model that represents a GoodJob capsule/process (either async or CLI).
class CapsuleRecord < BaseRecord
include AdvisoryLockable
include AssignableConnection

# Interval until the process record being updated
STALE_INTERVAL = 30.seconds
# Interval until the process record is treated as expired
EXPIRED_INTERVAL = 5.minutes

LOCK_TYPES = [
LOCK_TYPE_ADVISORY = 'advisory',
].freeze

LOCK_TYPE_ENUMS = {
LOCK_TYPE_ADVISORY => 1,
}.freeze

self.table_name = 'good_job_processes'

# Processes that are active and locked.
# @!method active
# @!scope class
# @return [ActiveRecord::Relation]
scope :active, (lambda do
if lock_type_migrated?
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
else
advisory_locked
end
end)

# Processes that are inactive and unlocked (e.g. SIGKILLed)
# @!method active
# @!scope class
# @return [ActiveRecord::Relation]
scope :inactive, (lambda do
if lock_type_migrated?
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_unlocked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].lt(EXPIRED_INTERVAL.ago)))
else
advisory_unlocked
end
end)

# Deletes all inactive process records.
def self.cleanup
inactive.find_each(&:cleanup)
end

# @return [Boolean]
def self.lock_type_migrated?
columns_hash["lock_type"].present?
end

def self.create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
}
if with_advisory_lock
attributes[:create_with_advisory_lock] = true
attributes[:lock_type] = LOCK_TYPE_ADVISORY if lock_type_migrated?
end
create!(attributes)
end

def self.process_state
{
hostname: Socket.gethostname,
pid: ::Process.pid,
proctitle: $PROGRAM_NAME,
preserve_job_records: GoodJob.preserve_job_records,
retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
schedulers: GoodJob::Scheduler.instances.map(&:stats),
cron_enabled: GoodJob.configuration.enable_cron?,
total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
database_connection_pool: {
size: connection_pool.size,
active: connection_pool.connections.count(&:in_use?),
},
}
end

def refresh
self.state = self.class.process_state
reload.update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end

def refresh_if_stale(cleanup: false)
return unless stale?

result = refresh
self.class.cleanup if cleanup
result
end

def cleanup
GoodJob::Job.where(locked_by_id: id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.process_lock_migrated? # rubocop:disable Rails/SkipsModelValidations
delete
end

def state
super || {}
end

def stale?
updated_at < STALE_INTERVAL.ago
end

def expired?
updated_at < EXPIRED_INTERVAL.ago
end

def basename
File.basename(state.fetch("proctitle", ""))
end

def schedulers
state.fetch("schedulers", [])
end

def lock_type
return unless self.class.columns_hash['lock_type']

enum = super
LOCK_TYPE_ENUMS.key(enum) if enum
end

def lock_type=(value)
return unless self.class.columns_hash['lock_type']

enum = LOCK_TYPE_ENUMS[value]
raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum

super(enum)
end
end
end
30 changes: 21 additions & 9 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def self.enqueue_args(active_job, overrides = {})
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil, capsule: GoodJob.capsule)
execution = nil
result = nil
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions|
Expand All @@ -266,7 +266,9 @@ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
end

yield(execution) if block_given?
result = execution.perform
capsule.tracker.register do
result = execution.perform(id_for_lock: capsule.tracker.id_for_lock)
end
end
execution&.run_callbacks(:perform_unlocked)

Expand Down Expand Up @@ -356,7 +358,7 @@ def self.format_error(error)
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
def perform(id_for_lock: nil)
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

Expand Down Expand Up @@ -385,17 +387,23 @@ def perform
if discrete?
transaction do
now = Time.current
discrete_execution = discrete_executions.create!(
discrete_execution = discrete_executions.create!({
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
created_at: now
)
created_at: now,
}.tap do |args|
args[:process_id] = id_for_lock if id_for_lock && self.class.process_lock_migrated?
end)

assign_attributes(locked_by_id: id_for_lock, locked_at: now) if id_for_lock && self.class.process_lock_migrated?
update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
end
else
update!(performed_at: Time.current)
now = Time.current
assign_attributes(locked_by_id: id_for_lock, locked_at: now) if id_for_lock && self.class.process_lock_migrated?
update!(performed_at: now)
end

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
Expand Down Expand Up @@ -439,7 +447,6 @@ def perform
end

job_error = result.handled_error || result.unhandled_error

if job_error
error_string = self.class.format_error(job_error)
self.error = error_string
Expand All @@ -452,8 +459,13 @@ def perform
self.error = nil
self.error_event = nil if self.class.error_event_migrated?
end

reenqueued = result.retried? || retried_good_job_id.present?

if self.class.process_lock_migrated?
self.locked_by_id = nil
self.locked_at = nil
end

if result.unhandled_error && GoodJob.retry_on_unhandled_error
if discrete_execution
transaction do
Expand Down
Loading

0 comments on commit e98c176

Please sign in to comment.