Skip to content

Commit

Permalink
Create "discrete" good_job_executions table to separate Job records…
Browse files Browse the repository at this point in the history
… from Execution records and have a 1-to-1 correspondence between `good_jobs` records and Active Job jobs (#928)

* Create "discrete" `good_job_executions` table to separate Job records from Execution records

* Change time precision in test

* Add benchmark script

* For discrete executions, id = active_job_id, scheduled_at = created_at if unscheduled

* Allow wider time assertions in specs

* Delete discrete executions before executions for better resiliency

* Rename `perform_expected_at` to simpler `scheduled_at`

* Add job_class and queue_name as columns because they will be queried in dashboard
  • Loading branch information
bensheldon authored Apr 22, 2023
1 parent 1b5a16e commit e5fbe91
Show file tree
Hide file tree
Showing 29 changed files with 742 additions and 137 deletions.
15 changes: 14 additions & 1 deletion app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,25 @@ def params_execution_count
def coalesce_scheduled_at_created_at
arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at'])
end

def discrete_support?
if connection.table_exists?('good_job_executions')
true
else
migration_pending_warning!
false
end
end
end

# The ActiveJob job class, as a string
# @return [String]
def job_class
serialized_params['job_class']
discrete? ? attributes['job_class'] : serialized_params['job_class']
end

def discrete?
self.class.discrete_support? && is_discrete?
end
end
end
52 changes: 52 additions & 0 deletions app/models/good_job/discrete_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module GoodJob # :nodoc:
class DiscreteExecution < BaseRecord
self.table_name = 'good_job_executions'

belongs_to :execution, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true

scope :finished, -> { where.not(finished_at: nil) }

alias_attribute :performed_at, :created_at

def number
serialized_params.fetch('executions', 0) + 1
end

# Time between when this job was expected to run and when it started running
def queue_latency
created_at - scheduled_at
end

# Time between when this job started and finished
def runtime_latency
(finished_at || Time.current) - performed_at if performed_at
end

def last_status_at
finished_at || created_at
end

def status
if finished_at.present?
if error.present?
:retried
elsif error.present? && job.finished_at.present?
:discarded
else
:succeeded
end
else
:running
end
end

def display_serialized_params
serialized_params.merge({
_good_job_execution: attributes.except('serialized_params'),
})
end
end
end
133 changes: 118 additions & 15 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ def self.queue_parser(string)
end

belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions

belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent

after_destroy lambda {
GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
self.class.active_job_id(active_job_id).delete_all
}, if: -> { @_destroy_job }

# Get executions with given ActiveJob ID
# @!method active_job_id
Expand Down Expand Up @@ -201,8 +205,12 @@ def self.queue_parser(string)
end
end)

# Construct a GoodJob::Execution from an ActiveJob instance.
def self.build_for_enqueue(active_job, overrides = {})
new(**enqueue_args(active_job, overrides))
end

# Construct arguments for GoodJob::Execution from an ActiveJob instance.
def self.enqueue_args(active_job, overrides = {})
if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil?
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority.
Expand All @@ -218,6 +226,7 @@ def self.build_for_enqueue(active_job, overrides = {})
serialized_params: active_job.serialize,
scheduled_at: active_job.scheduled_at,
}

execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)

reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
Expand All @@ -238,7 +247,7 @@ def self.build_for_enqueue(active_job, overrides = {})
execution_args[:cron_at] = CurrentThread.cron_at
end

new(**execution_args.merge(overrides))
execution_args.merge(overrides)
end

# Finds the next eligible Execution, acquire an advisory lock related to it, and
Expand Down Expand Up @@ -298,19 +307,47 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
# The new {Execution} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
current_execution = CurrentThread.execution

retried = current_execution && current_execution.active_job_id == active_job.job_id
if retried
if current_execution.discrete?
execution = current_execution
execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at }))
execution.scheduled_at ||= Time.current
execution.performed_at = nil
execution.finished_at = nil
else
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
end
else
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
execution.make_discrete if discrete_support?
end

execution.create_with_advisory_lock = create_with_advisory_lock
instrument_payload[:execution] = execution
if create_with_advisory_lock
if execution.persisted?
execution.advisory_lock
else
execution.create_with_advisory_lock = true
end
end

instrument_payload[:execution] = execution
execution.save!
active_job.provider_job_id = execution.id
CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id

CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete?
active_job.provider_job_id = execution.id
execution
end
end

def self.format_error(error)
raise ArgumentError unless error.is_a?(Exception)

[error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join
end

# Execute the ActiveJob job this {Execution} represents.
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
Expand All @@ -320,12 +357,39 @@ def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self

current_thread.execution_interrupted = performed_at if performed_at
update!(performed_at: Time.current)
if performed_at
current_thread.execution_interrupted = performed_at

if discrete?
interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'"))
self.error = interrupt_error_string
discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations
error: interrupt_error_string,
finished_at: Time.current
)
end
end

if discrete?
transaction do
now = Time.current
discrete_execution = discrete_executions.create!(
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
created_at: now
)
update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
end
else
update!(performed_at: Time.current)
end

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
Expand All @@ -349,14 +413,42 @@ def perform
end

job_error = result.handled_error || result.unhandled_error
self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error

if job_error
error_string = self.class.format_error(job_error)
self.error = error_string
discrete_execution.error = error_string if discrete_execution
else
self.error = nil
end

reenqueued = result.retried? || retried_good_job_id.present?
if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
if discrete_execution
transaction do
discrete_execution.update!(finished_at: Time.current)
update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil)
end
else
save!
end
elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
self.finished_at = Time.current
save!
now = Time.current
if discrete_execution
if reenqueued
self.performed_at = nil
else
self.finished_at = now
end
discrete_execution.finished_at = now
transaction do
discrete_execution.save!
save!
end
else
self.finished_at = now
save!
end
else
destroy_job
end
Expand All @@ -371,6 +463,17 @@ def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end

def make_discrete
self.is_discrete = true
self.id = active_job_id
self.job_class = serialized_params['job_class']
self.executions_count ||= 0

current_time = Time.current
self.created_at ||= current_time
self.scheduled_at ||= current_time
end

# Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`.
#
# @param ignore_deserialization_errors [Boolean]
Expand Down
12 changes: 10 additions & 2 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def table_name=(_value)

belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent
has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent

after_destroy lambda {
GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
}

# Only the most-recent unretried execution represents a "Job"
default_scope { where(retried_good_job_id: nil) }
Expand All @@ -56,6 +61,8 @@ def table_name=(_value)
# Errored but will not be retried
scope :discarded, -> { finished.where.not(error: nil) }

scope :unfinished_undiscrete, -> { where(finished_at: nil, retried_good_job_id: nil, is_discrete: [nil, false]) }

# The job's ActiveJob UUID
# @return [String]
def id
Expand Down Expand Up @@ -191,9 +198,10 @@ def retry_job

execution.class.transaction(joinable: false, requires_new: true) do
new_active_job = active_job.retry_job(wait: 0, error: execution.error)
execution.save
execution.save!
end
end

new_active_job
end
end
Expand All @@ -213,7 +221,7 @@ def discard_job(message)
update_execution = proc do
execution.update(
finished_at: Time.current,
error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join
error: GoodJob::Execution.format_error(job_error)
)
end

Expand Down
34 changes: 30 additions & 4 deletions app/views/good_job/jobs/show.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
<nav aria-label="breadcrumb">
<ol class="breadcrumb small mb-0">
<li class="breadcrumb-item"><%= link_to t(".jobs"), jobs_path %></li>
<li class="breadcrumb-item active" aria-current="page"><%= tag.code @job.id, class: "text-muted" %></li>
<li class="breadcrumb-item active" aria-current="page">
<%= tag.code @job.id, class: "text-muted" %>
<% if @job.discrete? %>
<span class="badge bg-info text-dark">Discrete</span>
<% end %>
</li>
</ol>
</nav>
<div class="row align-items-center">
Expand All @@ -21,6 +26,10 @@
<div class="font-monospace fw-bold small my-2"><%= tag.strong @job.priority %></div>
</div>
<div class="col text-end">
<div class="mb-2">
<%= tag.span relative_time(@job.last_status_at), class: "small" %>
<%= status_badge @job.status %>
</div>
<% if @job.status.in? [:scheduled, :retried, :queued] %>
<%= button_to reschedule_job_path(@job.id), method: :put,
class: "btn btn-sm btn-outline-primary",
Expand Down Expand Up @@ -59,8 +68,25 @@
</div>

<div class="my-4">
<h5><%= t "good_job.models.job.arguments" %></h5>
<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>
<h5>
<%= t "good_job.models.job.arguments" %>
<%= tag.button type: "button", class: "btn btn-sm text-muted", role: "button",
title: t("good_job.actions.inspect"),
data: { bs_toggle: "collapse", bs_target: "##{dom_id(@job, 'params')}" },
aria: { expanded: false, controls: dom_id(@job, "params") } do %>
<%= render_icon "info" %>
<span class="visually-hidden"><%= t "good_job.actions.inspect" %></span>
<% end %>
</h5>
</div>
<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>

<%= tag.div id: dom_id(@job, "params"), class: "list-group-item collapse small bg-dark text-light" do %>
<%= tag.pre JSON.pretty_generate(@job.display_serialized_params) %>
<% end %>

<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
<% if @job.discrete? %>
<%= render 'executions', executions: @job.discrete_executions.reverse %>
<% else %>
<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
<% end %>
Loading

0 comments on commit e5fbe91

Please sign in to comment.