From e5fbe917c09c8f26affcdb158db22cdc355ef218 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 21 Apr 2023 19:31:01 -0700 Subject: [PATCH] Create "discrete" `good_job_executions` table to separate Job records from Execution records and have a 1-to-1 correspondence between `good_jobs` records and Active Job jobs (#928) * Create "discrete" `good_job_executions` table to separate Job records from Execution records * Change time precision in test * Add benchmark script * For discrete executions, id = active_job_id, scheduled_at = created_at if unscheduled * Allow wider time assertions in specs * Delete discrete executions before executions for better resiliency * Rename `perform_expected_at` to simpler `scheduled_at` * Add job_class and queue_name as columns because they will be queried in dashboard --- app/models/good_job/base_execution.rb | 15 +- app/models/good_job/discrete_execution.rb | 52 ++++++ app/models/good_job/execution.rb | 133 ++++++++++++++-- app/models/good_job/job.rb | 12 +- app/views/good_job/jobs/show.html.erb | 34 +++- .../migrations/create_good_jobs.rb.erb | 18 +++ .../05_create_good_job_executions.rb.erb | 32 ++++ lib/good_job.rb | 20 ++- lib/good_job/adapter.rb | 14 +- scripts/benchmark_discrete_jobs.rb | 38 +++++ spec/app/filters/good_job/jobs_filter_spec.rb | 4 +- spec/app/jobs/example_job_spec.rb | 23 ++- spec/app/models/good_job/execution_spec.rb | 104 ++++++++++++ spec/app/models/good_job/job_spec.rb | 148 +++++++++++++----- spec/integration/adapter_spec.rb | 4 +- spec/integration/batch_spec.rb | 14 +- .../active_job_extensions/concurrency_spec.rb | 48 ++++-- .../interrupt_errors_spec.rb | 56 ++++++- .../notify_options_spec.rb | 6 +- spec/lib/good_job/adapter_spec.rb | 2 +- spec/lib/good_job_spec.rb | 15 +- .../requests/good_job/jobs_controller_spec.rb | 13 +- spec/support/example_app_helper.rb | 1 + spec/support/postgres_notices.rb | 4 +- spec/support/rspec_not_change.rb | 2 + spec/system/jobs_spec.rb | 2 +- spec/test_app/app/jobs/example_job.rb | 2 +- ...230412144442_create_good_job_executions.rb | 32 ++++ spec/test_app/db/schema.rb | 31 +++- 29 files changed, 742 insertions(+), 137 deletions(-) create mode 100644 app/models/good_job/discrete_execution.rb create mode 100644 lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb create mode 100644 scripts/benchmark_discrete_jobs.rb create mode 100644 spec/support/rspec_not_change.rb create mode 100644 spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index 0cc113c37..15f56221f 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -33,12 +33,25 @@ def params_execution_count def coalesce_scheduled_at_created_at arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) end + + def discrete_support? + if connection.table_exists?('good_job_executions') + true + else + migration_pending_warning! + false + end + end end # The ActiveJob job class, as a string # @return [String] def job_class - serialized_params['job_class'] + discrete? ? attributes['job_class'] : serialized_params['job_class'] + end + + def discrete? + self.class.discrete_support? && is_discrete? end end end diff --git a/app/models/good_job/discrete_execution.rb b/app/models/good_job/discrete_execution.rb new file mode 100644 index 000000000..08d0e7e7a --- /dev/null +++ b/app/models/good_job/discrete_execution.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module GoodJob # :nodoc: + class DiscreteExecution < BaseRecord + self.table_name = 'good_job_executions' + + belongs_to :execution, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true + belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true + + scope :finished, -> { where.not(finished_at: nil) } + + alias_attribute :performed_at, :created_at + + def number + serialized_params.fetch('executions', 0) + 1 + end + + # Time between when this job was expected to run and when it started running + def queue_latency + created_at - scheduled_at + end + + # Time between when this job started and finished + def runtime_latency + (finished_at || Time.current) - performed_at if performed_at + end + + def last_status_at + finished_at || created_at + end + + def status + if finished_at.present? + if error.present? + :retried + elsif error.present? && job.finished_at.present? + :discarded + else + :succeeded + end + else + :running + end + end + + def display_serialized_params + serialized_params.merge({ + _good_job_execution: attributes.except('serialized_params'), + }) + end + end +end diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index b845aeabf..46e6caf27 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -70,9 +70,13 @@ def self.queue_parser(string) end belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions - belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions - after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job } + has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent + + after_destroy lambda { + GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4 + self.class.active_job_id(active_job_id).delete_all + }, if: -> { @_destroy_job } # Get executions with given ActiveJob ID # @!method active_job_id @@ -201,8 +205,12 @@ def self.queue_parser(string) end end) - # Construct a GoodJob::Execution from an ActiveJob instance. def self.build_for_enqueue(active_job, overrides = {}) + new(**enqueue_args(active_job, overrides)) + end + + # Construct arguments for GoodJob::Execution from an ActiveJob instance. + def self.enqueue_args(active_job, overrides = {}) if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil? ActiveSupport::Deprecation.warn(<<~DEPRECATION) The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority. @@ -218,6 +226,7 @@ def self.build_for_enqueue(active_job, overrides = {}) serialized_params: active_job.serialize, scheduled_at: active_job.scheduled_at, } + execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id @@ -238,7 +247,7 @@ def self.build_for_enqueue(active_job, overrides = {}) execution_args[:cron_at] = CurrentThread.cron_at end - new(**execution_args.merge(overrides)) + execution_args.merge(overrides) end # Finds the next eligible Execution, acquire an advisory lock related to it, and @@ -298,19 +307,47 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) # The new {Execution} instance representing the queued ActiveJob job. def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| - execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + current_execution = CurrentThread.execution + + retried = current_execution && current_execution.active_job_id == active_job.job_id + if retried + if current_execution.discrete? + execution = current_execution + execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) + execution.scheduled_at ||= Time.current + execution.performed_at = nil + execution.finished_at = nil + else + execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + end + else + execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + execution.make_discrete if discrete_support? + end - execution.create_with_advisory_lock = create_with_advisory_lock - instrument_payload[:execution] = execution + if create_with_advisory_lock + if execution.persisted? + execution.advisory_lock + else + execution.create_with_advisory_lock = true + end + end + instrument_payload[:execution] = execution execution.save! - active_job.provider_job_id = execution.id - CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id + CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete? + active_job.provider_job_id = execution.id execution end end + def self.format_error(error) + raise ArgumentError unless error.is_a?(Exception) + + [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join + end + # Execute the ActiveJob job this {Execution} represents. # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the @@ -320,12 +357,39 @@ def perform run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + discrete_execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.execution = self - current_thread.execution_interrupted = performed_at if performed_at - update!(performed_at: Time.current) + if performed_at + current_thread.execution_interrupted = performed_at + + if discrete? + interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'")) + self.error = interrupt_error_string + discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations + error: interrupt_error_string, + finished_at: Time.current + ) + end + end + + if discrete? + transaction do + now = Time.current + discrete_execution = discrete_executions.create!( + job_class: job_class, + queue_name: queue_name, + serialized_params: serialized_params, + scheduled_at: (scheduled_at || created_at), + created_at: now + ) + update!(performed_at: now, executions_count: ((executions_count || 0) + 1)) + end + else + update!(performed_at: Time.current) + end ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) @@ -349,14 +413,42 @@ def perform end job_error = result.handled_error || result.unhandled_error - self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error + + if job_error + error_string = self.class.format_error(job_error) + self.error = error_string + discrete_execution.error = error_string if discrete_execution + else + self.error = nil + end reenqueued = result.retried? || retried_good_job_id.present? if result.unhandled_error && GoodJob.retry_on_unhandled_error - save! + if discrete_execution + transaction do + discrete_execution.update!(finished_at: Time.current) + update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil) + end + else + save! + end elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present? - self.finished_at = Time.current - save! + now = Time.current + if discrete_execution + if reenqueued + self.performed_at = nil + else + self.finished_at = now + end + discrete_execution.finished_at = now + transaction do + discrete_execution.save! + save! + end + else + self.finished_at = now + save! + end else destroy_job end @@ -371,6 +463,17 @@ def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) end + def make_discrete + self.is_discrete = true + self.id = active_job_id + self.job_class = serialized_params['job_class'] + self.executions_count ||= 0 + + current_time = Time.current + self.created_at ||= current_time + self.scheduled_at ||= current_time + end + # Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`. # # @param ignore_deserialization_errors [Boolean] diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index 0761db07a..3fa26c19b 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -30,6 +30,11 @@ def table_name=(_value) belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent + has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent + + after_destroy lambda { + GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4 + } # Only the most-recent unretried execution represents a "Job" default_scope { where(retried_good_job_id: nil) } @@ -56,6 +61,8 @@ def table_name=(_value) # Errored but will not be retried scope :discarded, -> { finished.where.not(error: nil) } + scope :unfinished_undiscrete, -> { where(finished_at: nil, retried_good_job_id: nil, is_discrete: [nil, false]) } + # The job's ActiveJob UUID # @return [String] def id @@ -191,9 +198,10 @@ def retry_job execution.class.transaction(joinable: false, requires_new: true) do new_active_job = active_job.retry_job(wait: 0, error: execution.error) - execution.save + execution.save! end end + new_active_job end end @@ -213,7 +221,7 @@ def discard_job(message) update_execution = proc do execution.update( finished_at: Time.current, - error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join + error: GoodJob::Execution.format_error(job_error) ) end diff --git a/app/views/good_job/jobs/show.html.erb b/app/views/good_job/jobs/show.html.erb index 01e20e306..2e59f45db 100644 --- a/app/views/good_job/jobs/show.html.erb +++ b/app/views/good_job/jobs/show.html.erb @@ -3,7 +3,12 @@
@@ -21,6 +26,10 @@
<%= tag.strong @job.priority %>
+
+ <%= tag.span relative_time(@job.last_status_at), class: "small" %> + <%= status_badge @job.status %> +
<% if @job.status.in? [:scheduled, :retried, :queued] %> <%= button_to reschedule_job_path(@job.id), method: :put, class: "btn btn-sm btn-outline-primary", @@ -59,8 +68,25 @@
-
<%= t "good_job.models.job.arguments" %>
- <%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %> +
+ <%= t "good_job.models.job.arguments" %> + <%= tag.button type: "button", class: "btn btn-sm text-muted", role: "button", + title: t("good_job.actions.inspect"), + data: { bs_toggle: "collapse", bs_target: "##{dom_id(@job, 'params')}" }, + aria: { expanded: false, controls: dom_id(@job, "params") } do %> + <%= render_icon "info" %> + <%= t "good_job.actions.inspect" %> + <% end %> +
+<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %> + +<%= tag.div id: dom_id(@job, "params"), class: "list-group-item collapse small bg-dark text-light" do %> + <%= tag.pre JSON.pretty_generate(@job.display_serialized_params) %> +<% end %> -<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %> +<% if @job.discrete? %> + <%= render 'executions', executions: @job.discrete_executions.reverse %> +<% else %> + <%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %> +<% end %> diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index 73cc1dba3..27d23ab85 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -22,6 +22,10 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.uuid :batch_id t.uuid :batch_callback_id + + t.boolean :is_discrete + t.integer :executions_count + t.text :job_class end create_table :good_job_batches, id: :uuid do |t| @@ -38,6 +42,18 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.datetime :finished_at end + create_table :good_job_executions, id: :uuid do |t| + t.timestamps + + t.uuid :active_job_id, null: false + t.text :job_class + t.text :queue_name + t.jsonb :serialized_params + t.datetime :scheduled_at + t.datetime :finished_at + t.text :error + end + create_table :good_job_processes, id: :uuid do |t| t.timestamps t.jsonb :state @@ -62,5 +78,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL" add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL" + + add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at end end diff --git a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb new file mode 100644 index 000000000..eb2fd1392 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb @@ -0,0 +1,32 @@ +# frozen_string_literal: true +class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.table_exists?(:good_job_executions) + end + end + + create_table :good_job_executions, id: :uuid do |t| + t.timestamps + + t.uuid :active_job_id, null: false + t.text :job_class + t.text :queue_name + t.jsonb :serialized_params + t.datetime :scheduled_at + t.datetime :finished_at + t.text :error + + t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at + end + + change_table :good_jobs do |t| + t.boolean :is_discrete + t.integer :executions_count + t.text :job_class + end + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 14be87d2c..85f034e91 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -170,14 +170,19 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000) ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { older_than: older_than, timestamp: timestamp }) do |payload| deleted_executions_count = 0 deleted_batches_count = 0 + deleted_discrete_executions_count = 0 jobs_query = GoodJob::Job.where('finished_at <= ?', timestamp).order(finished_at: :asc).limit(in_batches_of) jobs_query = jobs_query.succeeded unless include_discarded loop do - deleted = GoodJob::Execution.where(job: jobs_query).delete_all - break if deleted.zero? + active_job_ids = jobs_query.pluck(:active_job_id) + break if active_job_ids.empty? - deleted_executions_count += deleted + deleted_discrete_executions = GoodJob::DiscreteExecution.where(active_job_id: active_job_ids).delete_all + deleted_discrete_executions_count += deleted_discrete_executions + + deleted_executions = GoodJob::Execution.where(active_job_id: active_job_ids).delete_all + deleted_executions_count += deleted_executions end if GoodJob::BatchRecord.migrated? @@ -191,9 +196,14 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000) end end - payload[:destroyed_executions_count] = deleted_executions_count payload[:destroyed_batches_count] = deleted_batches_count - payload[:destroyed_records_count] = deleted_executions_count + deleted_batches_count + payload[:destroyed_discrete_executions_count] = deleted_discrete_executions_count + payload[:destroyed_executions_count] = deleted_executions_count + + destroyed_records_count = deleted_batches_count + deleted_discrete_executions_count + deleted_executions_count + payload[:destroyed_records_count] = destroyed_records_count + + destroyed_records_count end end diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 17712437d..82a26e054 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -50,11 +50,15 @@ def enqueue_all(active_jobs) current_time = Time.current executions = active_jobs.map do |active_job| - GoodJob::Execution.build_for_enqueue(active_job, { - id: SecureRandom.uuid, - created_at: current_time, - updated_at: current_time, - }) + GoodJob::Execution.build_for_enqueue(active_job).tap do |execution| + if GoodJob::Execution.discrete_support? + execution.make_discrete + execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at + end + + execution.created_at = current_time + execution.updated_at = current_time + end end inline_executions = [] diff --git a/scripts/benchmark_discrete_jobs.rb b/scripts/benchmark_discrete_jobs.rb new file mode 100644 index 000000000..1e9b47a76 --- /dev/null +++ b/scripts/benchmark_discrete_jobs.rb @@ -0,0 +1,38 @@ +ENV['GOOD_JOB_EXECUTION_MODE'] = 'external' + +require_relative '../spec/test_app/config/environment' +require_relative '../lib/good_job' +require 'benchmark/ips' + +performer = GoodJob::JobPerformer.new("*") +Benchmark.ips do |x| + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new } + GoodJob::Execution.update_all(is_discrete: true) + x.report("discrete jobs and no errors") do + performer.next + end + + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new } + GoodJob::Execution.update_all(is_discrete: false) + x.report("undiscrete jobs and no errors") do + performer.next + end + + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new(ExampleJob::ERROR_FIVE_TIMES_TYPE) } + GoodJob::Execution.update_all(is_discrete: true) + x.report("discrete jobs and 5 errors") do + performer.next + end + + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new(ExampleJob::ERROR_FIVE_TIMES_TYPE) } + GoodJob::Execution.update_all(is_discrete: false) + x.report("undiscrete jobs and 5 errors") do + performer.next + end + + x.compare! +end diff --git a/spec/app/filters/good_job/jobs_filter_spec.rb b/spec/app/filters/good_job/jobs_filter_spec.rb index c24a62915..1a961a6dd 100644 --- a/spec/app/filters/good_job/jobs_filter_spec.rb +++ b/spec/app/filters/good_job/jobs_filter_spec.rb @@ -62,9 +62,9 @@ expect(filter.states).to eq({ "scheduled" => 1, "retried" => 0, - "queued" => 0, + "queued" => 1, "running" => 1, - "succeeded" => 2, + "succeeded" => 1, "discarded" => 1, }) end diff --git a/spec/app/jobs/example_job_spec.rb b/spec/app/jobs/example_job_spec.rb index e0dea7faf..327b531e4 100644 --- a/spec/app/jobs/example_job_spec.rb +++ b/spec/app/jobs/example_job_spec.rb @@ -25,9 +25,9 @@ end travel_back - executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc) - expect(executions.size).to eq 2 - expect(executions.last.error).to be_nil + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + expect(good_job.discrete_executions.count).to eq 2 + expect(good_job.discrete_executions.last.error).to be_nil end end @@ -40,26 +40,25 @@ end travel_back - executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc) - expect(executions.size).to eq 6 - expect(executions.last.error).to be_nil + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + + expect(good_job.discrete_executions.count).to eq 6 + expect(good_job.discrete_executions.last.error).to be_nil end end describe "DEAD_TYPE" do it 'errors but does not retry' do - described_class.perform_later(described_class::DEAD_TYPE) + active_job = described_class.perform_later(described_class::DEAD_TYPE) 10.times do GoodJob.perform_inline travel(5.minutes) end travel_back - active_job_id = GoodJob::Execution.last.active_job_id - - executions = GoodJob::Execution.where(active_job_id: active_job_id).order(created_at: :asc) - expect(executions.size).to eq 3 - expect(executions.last.error).to be_present + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + expect(good_job.discrete_executions.count).to eq 3 + expect(good_job.discrete_executions.last.error).to be_present end end diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index 258b4544f..0282153ad 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -3,6 +3,8 @@ RSpec.describe GoodJob::Execution do before do + allow(described_class).to receive(:discrete_support?).and_return(false) + stub_const "RUN_JOBS", Concurrent::Array.new stub_const 'TestJob', (Class.new(ActiveJob::Base) do self.queue_name = 'test' @@ -21,6 +23,41 @@ def perform(result_value = nil, raise_error: false) describe '.enqueue' do let(:active_job) { TestJob.new } + context 'when discrete' do + before do + allow(described_class).to receive(:discrete_support?).and_return(true) + end + + it 'assigns is discrete, id, scheduled_at' do + expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) + + execution = described_class.last + expect(execution).to have_attributes( + is_discrete: true, + id: active_job.job_id, + active_job_id: active_job.job_id, + created_at: execution.scheduled_at, + scheduled_at: execution.created_at + ) + end + end + + context 'when NOT discrete' do + before { allow(described_class).to receive(:discrete_support?).and_return(false) } + + it 'does not assign id, scheduled_at' do + expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) + + execution = described_class.last + expect(execution.id).not_to eq(active_job.job_id) + expect(execution).to have_attributes( + is_discrete: nil, + active_job_id: active_job.job_id, + scheduled_at: nil + ) + end + end + it 'creates a new GoodJob record' do execution = nil @@ -621,6 +658,73 @@ def job_params end end end + + context 'when Discrete' do + before do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) + allow(described_class).to receive(:discrete_support?).and_return(true) + good_job.update!(is_discrete: true) + end + + it 'updates the Execution record and creates a DiscreteExecution record' do + good_job.perform + + expect(good_job.reload).to have_attributes( + executions_count: 1, + finished_at: within(1.second).of(Time.current) + ) + + dexecution = good_job.discrete_executions.first + expect(dexecution).to be_present + expect(dexecution).to have_attributes( + active_job_id: good_job.active_job_id, + job_class: good_job.job_class, + queue_name: good_job.queue_name, + created_at: within(0.001).of(good_job.performed_at), + scheduled_at: within(0.001).of(good_job.created_at), + finished_at: within(1.second).of(Time.current), + error: nil, + serialized_params: good_job.serialized_params + ) + end + + context 'when ActiveJob rescues an error' do + let(:active_job) { TestJob.new("a string", raise_error: true) } + let!(:good_job) { described_class.enqueue(active_job) } + + before do + allow(described_class).to receive(:discrete_support?).and_return(true) + allow(GoodJob).to receive(:preserve_job_records).and_return(true) + TestJob.retry_on(StandardError, wait: 1.hour, attempts: 2) { nil } + good_job.update!(is_discrete: true) + end + + it 'updates the existing Execution/Job record instead of creating a new one' do + expect { good_job.perform } + .to not_change(described_class, :count) + .and change { good_job.reload.serialized_params["executions"] }.by(1) + .and not_change { good_job.reload.id } + .and not_change { described_class.count } + + expect(good_job.reload).to have_attributes( + error: "TestJob::ExpectedError: Raised expected error", + created_at: within(1.second).of(Time.current), + performed_at: nil, + finished_at: nil, + scheduled_at: within(10.minutes).of(1.hour.from_now) # interval because of retry jitter + ) + expect(GoodJob::DiscreteExecution.count).to eq(1) + discrete_execution = good_job.discrete_executions.first + expect(discrete_execution).to have_attributes( + active_job_id: good_job.active_job_id, + error: "TestJob::ExpectedError: Raised expected error", + created_at: within(1.second).of(Time.current), + scheduled_at: within(1.second).of(Time.current), + finished_at: within(1.second).of(Time.current) + ) + end + end + end end describe '#destroy_job' do diff --git a/spec/app/models/good_job/job_spec.rb b/spec/app/models/good_job/job_spec.rb index 18a47eb1d..01bcd1f99 100644 --- a/spec/app/models/good_job/job_spec.rb +++ b/spec/app/models/good_job/job_spec.rb @@ -2,23 +2,38 @@ require 'rails_helper' RSpec.describe GoodJob::Job do - subject(:job) { described_class.find(head_execution.active_job_id) } + let(:active_job_id) { SecureRandom.uuid } - before do - allow(GoodJob).to receive(:preserve_job_records).and_return(true) - ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) - - stub_const 'TestJob', (Class.new(ActiveJob::Base) do - def perform(feline = nil, canine: nil) - end - end) - stub_const 'TestJob::Error', Class.new(StandardError) + let(:job) do + described_class.create!( + is_discrete: true, + active_job_id: active_job_id, + scheduled_at: 10.minutes.from_now, + queue_name: 'mice', + priority: 10, + serialized_params: { + 'job_id' => active_job_id, + 'job_class' => 'TestJob', + 'executions' => 1, + 'exception_executions' => { 'TestJob::Error' => 1 }, + 'queue_name' => 'mice', + 'priority' => 10, + 'arguments' => ['cat', { 'canine' => 'dog' }], + } + ).tap do |job| + job.discrete_executions.create!( + scheduled_at: 1.minute.ago, + created_at: 1.minute.ago, + finished_at: 1.minute.ago, + error: "TestJob::Error: TestJob::Error" + ) + end end + let(:undiscrete_job) { described_class.find(head_execution.active_job_id) } - let!(:tail_execution) do - active_job_id = SecureRandom.uuid + let(:tail_execution) do GoodJob::Execution.create!( - active_job_id: SecureRandom.uuid, + active_job_id: active_job_id, created_at: 1.minute.ago, queue_name: 'mice', priority: 10, @@ -33,7 +48,7 @@ def perform(feline = nil, canine: nil) ) end - let!(:head_execution) do + let(:head_execution) do GoodJob::Execution.create!( active_job_id: tail_execution.active_job_id, scheduled_at: 10.minutes.from_now, @@ -57,6 +72,18 @@ def perform(feline = nil, canine: nil) end end + before do + allow(GoodJob).to receive(:preserve_job_records).and_return(true) + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) + + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + def perform + raise "Didn't expect to perform this job" + end + end) + stub_const 'TestJob::Error', Class.new(StandardError) + end + describe '.find' do it 'returns a record that is the same as the head execution' do job = described_class.find(head_execution.active_job_id) @@ -66,7 +93,7 @@ def perform(feline = nil, canine: nil) describe '#id' do it 'is the ActiveJob ID' do - expect(job.id).to eq head_execution.active_job_id + expect(job.id).to eq job.active_job_id end end @@ -78,6 +105,7 @@ def perform(feline = nil, canine: nil) describe '#head_execution' do it 'is the head execution (which should be the same record)' do + job = undiscrete_job expect(job.head_execution).to eq head_execution expect(job._execution_id).to eq head_execution.id end @@ -85,6 +113,7 @@ def perform(feline = nil, canine: nil) describe '#tail_execution' do it 'is the tail execution' do + job = undiscrete_job expect(job.tail_execution).to eq tail_execution end end @@ -160,29 +189,50 @@ def perform(feline = nil, canine: nil) describe '#retry_job' do context 'when job is retried' do before do - head_execution.update!( + job.update!( finished_at: Time.current, error: "TestJob::Error: TestJob::Error" ) end - it 'enqueues another execution and updates the original job' do - original_head_execution = job.head_execution + it 'updates the original job' do + expect(job).to be_discrete expect do job.retry_job - end.to change { job.executions.reload.size }.by(1) - - new_head_execution = job.head_execution(reload: true) - expect(new_head_execution.serialized_params).to include( - "executions" => 2, - "queue_name" => "mice", - "priority" => 10, - "arguments" => ['cat', hash_including('canine' => 'dog')] - ) + end.to change { job.reload.finished? }.from(true).to(false) + expect(job.executions.count).to eq 1 + end + + context 'when job is not discrete' do + let(:job) { undiscrete_job } - original_head_execution.reload - expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id + before do + head_execution.update!( + finished_at: Time.current, + error: "TestJob::Error: TestJob::Error" + ) + end + + it 'enqueues another execution and updates the original job' do + original_head_execution = undiscrete_job.head_execution + + expect do + job.retry_job + end.to change { job.executions.reload.size }.by(1) + expect(job.reload).not_to be_finished + + new_head_execution = job.head_execution(reload: true) + expect(new_head_execution.serialized_params).to include( + "executions" => 2, + "queue_name" => "mice", + "priority" => 10, + "arguments" => ['cat', hash_including('canine' => 'dog')] + ) + + original_head_execution.reload + expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id + end end end @@ -279,18 +329,12 @@ def perform(feline = nil, canine: nil) end describe '#destroy_job' do - context 'when a job is finished' do - before do - job.head_execution.update! finished_at: Time.current - end - - it 'destroys all the job executions' do - job.destroy_job + it 'destroys job and executions' do + job.update! finished_at: Time.current + job.destroy_job - expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound - expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound - expect { job.reload }.to raise_error ActiveRecord::RecordNotFound - end + expect { job.reload }.to raise_error ActiveRecord::RecordNotFound + expect(GoodJob::DiscreteExecution.count).to eq 0 end context 'when a job is not finished' do @@ -298,5 +342,29 @@ def perform(feline = nil, canine: nil) expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError end end + + context "when undiscrete job" do + let(:job) { undiscrete_job } + + context 'when a job is finished' do + before do + job.head_execution.update! finished_at: Time.current + end + + it 'destroys all the job executions' do + job.destroy_job + + expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { job.reload }.to raise_error ActiveRecord::RecordNotFound + end + end + + context 'when a job is not finished' do + it 'raises an ActionForStateMismatchError' do + expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError + end + end + end end end diff --git a/spec/integration/adapter_spec.rb b/spec/integration/adapter_spec.rb index 9445c3e39..978ff4d75 100644 --- a/spec/integration/adapter_spec.rb +++ b/spec/integration/adapter_spec.rb @@ -37,7 +37,7 @@ def perform(*_args, **_kwargs) end it 'assigns successfully_enqueued' do - ok_job = TestJob.perform_later + ok_job = TestJob.new expect { ok_job.enqueue }.not_to raise_error expect(ok_job.successfully_enqueued?).to be true if ok_job.respond_to?(:successfully_enqueued?) @@ -58,7 +58,7 @@ def perform(*_args, **_kwargs) expect(execution).to have_attributes( queue_name: 'test', priority: 50, - scheduled_at: nil + scheduled_at: within(1).of(Time.current) ) end diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb index 9a00f042b..bb4396fc7 100644 --- a/spec/integration/batch_spec.rb +++ b/spec/integration/batch_spec.rb @@ -215,9 +215,10 @@ def perform(*_args, **_kwargs) GoodJob.perform_inline expect(GoodJob::Job.count).to eq 3 - expect(GoodJob::Execution.count).to eq 5 - expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1 - expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4 + expect(GoodJob::Execution.count).to eq 3 + expect(GoodJob::DiscreteExecution.count).to eq 5 + expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1 + expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2 callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second) expect(callback_arguments).to contain_exactly({ event: :discard }, { event: :finish }) @@ -230,9 +231,10 @@ def perform(*_args, **_kwargs) GoodJob.perform_inline expect(GoodJob::Job.count).to eq 3 - expect(GoodJob::Execution.count).to eq 5 - expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1 - expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4 + expect(GoodJob::Execution.count).to eq 3 + expect(GoodJob::DiscreteExecution.count).to eq 5 + expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1 + expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2 callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second) expect(callback_arguments).to contain_exactly({ event: :success }, { event: :finish }) diff --git a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb index 3526c7b27..f580bd147 100644 --- a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb +++ b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb @@ -113,19 +113,21 @@ def perform(name:) end it "will error and retry jobs if concurrency is exceeded" do - TestJob.perform_later(name: "Alice") + active_job = TestJob.perform_later(name: "Alice") performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer, max_threads: 5) 5.times { scheduler.create_thread } sleep_until(max: 10, increments_of: 0.5) do - GoodJob::Execution.where(concurrency_key: "Alice").finished.count >= 1 + GoodJob::DiscreteExecution.where(active_job_id: active_job.job_id).finished.count >= 1 end scheduler.shutdown - expect(GoodJob::Execution.count).to be >= 1 - expect(GoodJob::Execution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present + expect(GoodJob::Job.find_by(active_job_id: active_job.job_id).concurrency_key).to eq "Alice" + + expect(GoodJob::DiscreteExecution.count).to be >= 1 + expect(GoodJob::DiscreteExecution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present end it 'is ignored with the job is executed via perform_now' do @@ -153,17 +155,37 @@ def perform end) end - it 'preserves the key value across retries' do - TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice") - begin - GoodJob.perform_inline - rescue StandardError - nil + describe 'retries' do + it 'preserves the value' do + TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice") + + begin + GoodJob.perform_inline + rescue StandardError + nil + end + + expect(GoodJob::Execution.count).to eq 1 + expect(GoodJob::Execution.first.concurrency_key).to be_present + expect(GoodJob::Job.first).not_to be_finished end - expect(GoodJob::Execution.count).to eq 2 - first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a - expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key + context 'when not discrete' do + it 'preserves the key value across retries' do + TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice") + GoodJob::Job.first.update!(is_discrete: false) + + begin + GoodJob.perform_inline + rescue StandardError + nil + end + + expect(GoodJob::Execution.count).to eq 2 + first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a + expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key + end + end end end diff --git a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb index 35da102f3..9454e042d 100644 --- a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb +++ b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb @@ -16,21 +16,63 @@ def perform context 'when a dequeued job has a performed_at but no finished_at' do before do active_job = TestJob.perform_later - GoodJob::Execution.find_by(active_job_id: active_job.job_id).update!(performed_at: Time.current) + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + good_job.update!(performed_at: Time.current, finished_at: nil) + good_job.discrete_executions.create!(performed_at: Time.current, finished_at: nil) end it 'raises a GoodJob::InterruptError' do expect { GoodJob.perform_inline }.to raise_error(GoodJob::InterruptError) end - it 'is rescuable' do - TestJob.retry_on GoodJob::InterruptError + context 'when discrete execution is NOT enabled' do + before do + allow(GoodJob::Execution).to receive(:discrete_support?).and_return(false) + end - expect { GoodJob.perform_inline }.not_to raise_error - expect(GoodJob::Execution.count).to eq(2) + it 'is rescuable' do + TestJob.retry_on GoodJob::InterruptError - job = GoodJob::Job.first - expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + expect { GoodJob.perform_inline }.not_to raise_error + expect(GoodJob::Execution.count).to eq(2) + + job = GoodJob::Job.first + expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + end + end + + context 'when discrete execution is enabled' do + before do + allow(GoodJob::Execution).to receive(:discrete_support?).and_return(true) + end + + it 'does not create a new execution' do + TestJob.retry_on GoodJob::InterruptError + + expect { GoodJob.perform_inline }.not_to raise_error + expect(GoodJob::Job.count).to eq(1) + expect(GoodJob::Execution.count).to eq(1) + expect(GoodJob::DiscreteExecution.count).to eq(2) + + job = GoodJob::Job.first + expect(job.executions.count).to eq(1) + expect(job.finished?).to be false + expect(job.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + + initial_discrete_execution = job.discrete_executions.first + expect(initial_discrete_execution).to have_attributes( + performed_at: be_present, + finished_at: be_present, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at') + ) + + retried_discrete_execution = job.discrete_executions.last + expect(retried_discrete_execution).to have_attributes( + performed_at: be_present, + finished_at: be_present, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at') + ) + end end end diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb index 4394e0b39..8847c3540 100644 --- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb +++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb @@ -30,10 +30,10 @@ def perform it 'can be overridden by set(good_job_notify: true)' do TestJob.good_job_notify = false TestJob.set(good_job_notify: true).perform_later - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', scheduled_at: within(1).of(Time.current) }) GoodJob::Bulk.enqueue { TestJob.set(good_job_notify: true).perform_later } - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1 }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(1).of(Time.current) }) end it 'works for bulk enqueuing' do @@ -85,7 +85,7 @@ def perform scheduler = GoodJob::Scheduler.new(performer, max_threads: 5) scheduler.create_thread - sleep_until(max: 5, increments_of: 0.5) { GoodJob::Execution.count >= 2 } + sleep_until(max: 5, increments_of: 0.5) { GoodJob::DiscreteExecution.count >= 2 } scheduler.shutdown expect(GoodJob::Notifier).not_to have_received(:notify) diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index ffc60bf76..20bc8a604 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -129,7 +129,7 @@ def perform(succeed: true) expect(result).to eq 1 expect(active_jobs.map(&:provider_job_id)).to eq [active_jobs.first.provider_job_id, nil] - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1 }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(0.1).of(Time.current) }) end it 'sets successfully_enqueued, if Rails supports it' do diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index d423acaea..b6679711a 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -42,17 +42,20 @@ let!(:old_unfinished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, scheduled_at: 15.days.ago, finished_at: nil) } let!(:old_finished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) } let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, retried_good_job_id: old_finished_job.id, finished_at: 16.days.ago) } + let!(:old_finished_job_discrete_execution) { GoodJob::DiscreteExecution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } let!(:old_discarded_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") } let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) } it 'deletes finished jobs' do destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1) - expect(destroyed_records_count).to eq 4 + expect(destroyed_records_count).to eq 5 expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end @@ -60,12 +63,15 @@ it 'takes arguments' do destroyed_records_count = described_class.cleanup_preserved_jobs(older_than: 10.seconds) - expect(destroyed_records_count).to eq 5 + expect(destroyed_records_count).to eq 6 expect { recent_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end it 'is instrumented' do @@ -83,12 +89,15 @@ allow(described_class.configuration).to receive(:env).and_return ENV.to_hash.merge({ 'GOOD_JOB_CLEANUP_DISCARDED_JOBS' => 'false' }) destroyed_records_count = described_class.cleanup_preserved_jobs - expect(destroyed_records_count).to eq 3 + expect(destroyed_records_count).to eq 4 expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.not_to raise_error + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end end diff --git a/spec/requests/good_job/jobs_controller_spec.rb b/spec/requests/good_job/jobs_controller_spec.rb index ac57b23ce..8e247fa71 100644 --- a/spec/requests/good_job/jobs_controller_spec.rb +++ b/spec/requests/good_job/jobs_controller_spec.rb @@ -92,19 +92,22 @@ describe 'mass_action=retry' do before do job.update(error: "Error message") + job.discrete_executions.first.update(error: "Error message") end it 'retries the job' do - put good_job.mass_update_jobs_path, params: { - mass_action: 'retry', - job_ids: [job.id], - } + expect do + put good_job.mass_update_jobs_path, params: { + mass_action: 'retry', + job_ids: [job.id], + } + end.to change { job.reload.status }.from(:discarded).to(:succeeded) expect(response).to have_http_status(:found) expect(flash[:notice]).to eq('Successfully retried 1 job') job.reload - expect(job.executions.count).to eq 2 + expect(job.discrete_executions.count).to eq 2 end end diff --git a/spec/support/example_app_helper.rb b/spec/support/example_app_helper.rb index 30b46317c..70ab9af75 100644 --- a/spec/support/example_app_helper.rb +++ b/spec/support/example_app_helper.rb @@ -48,6 +48,7 @@ def within_example_app tables = %i[ good_jobs good_job_batches + good_job_executions good_job_processes good_job_settings ] diff --git a/spec/support/postgres_notices.rb b/spec/support/postgres_notices.rb index 8b0dd4be4..9257f51fa 100644 --- a/spec/support/postgres_notices.rb +++ b/spec/support/postgres_notices.rb @@ -16,7 +16,9 @@ end RSpec.configure do |config| - config.after do + config.around do |example| + POSTGRES_NOTICES.clear + example.run expect(POSTGRES_NOTICES).to be_empty end end diff --git a/spec/support/rspec_not_change.rb b/spec/support/rspec_not_change.rb new file mode 100644 index 000000000..8f0ffae77 --- /dev/null +++ b/spec/support/rspec_not_change.rb @@ -0,0 +1,2 @@ +# frozen_string_literal: true +RSpec::Matchers.define_negated_matcher :not_change, :change diff --git a/spec/system/jobs_spec.rb b/spec/system/jobs_spec.rb index 4aa2f2177..d1f14370f 100644 --- a/spec/system/jobs_spec.rb +++ b/spec/system/jobs_spec.rb @@ -123,7 +123,7 @@ accept_confirm { click_on 'Retry job' } end expect(page).to have_content "Job has been retried" - end.to change { discarded_job.executions.reload.size }.by(1) + end.to change { discarded_job.reload.status }.from(:discarded).to(:queued) end it 'can discard jobs' do diff --git a/spec/test_app/app/jobs/example_job.rb b/spec/test_app/app/jobs/example_job.rb index 491eff8bf..27842b80f 100644 --- a/spec/test_app/app/jobs/example_job.rb +++ b/spec/test_app/app/jobs/example_job.rb @@ -4,7 +4,7 @@ class ExampleJob < ApplicationJob TYPES = [ SUCCESS_TYPE = 'success', - ERROR_ONCE_TYPE = 'error_once', + ERROR_ONCE_TYPE = 'error_once', ERROR_FIVE_TIMES_TYPE = 'error_five_times', DEAD_TYPE = 'dead', SLOW_TYPE = 'slow', diff --git a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb new file mode 100644 index 000000000..a3eaf68b9 --- /dev/null +++ b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true +class CreateGoodJobExecutions < ActiveRecord::Migration[7.0] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.table_exists?(:good_job_executions) + end + end + + create_table :good_job_executions, id: :uuid do |t| + t.timestamps + + t.uuid :active_job_id, null: false + t.text :job_class + t.text :queue_name + t.jsonb :serialized_params + t.datetime :scheduled_at + t.datetime :finished_at + t.text :error + + t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at + end + + change_table :good_jobs do |t| + t.boolean :is_discrete + t.integer :executions_count + t.text :job_class + end + end +end diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index 9b68e84ab..dc2a1cb37 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -10,15 +10,14 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_01_31_214927) do - +ActiveRecord::Schema.define(version: 2023_04_12_144442) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" create_table "good_job_batches", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.text "description" t.jsonb "serialized_properties" t.text "on_finish" @@ -31,15 +30,28 @@ t.datetime "finished_at" end + create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.uuid "active_job_id", null: false + t.text "job_class" + t.text "queue_name" + t.jsonb "serialized_params" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.text "error" + t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at" + end + create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.jsonb "state" end create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.text "key" t.jsonb "value" t.index ["key"], name: "index_good_job_settings_on_key", unique: true @@ -62,6 +74,9 @@ t.datetime "cron_at" t.uuid "batch_id" t.uuid "batch_callback_id" + t.boolean "is_discrete" + t.integer "executions_count" + t.text "job_class" t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id" t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)"