From a030bd839ee087aeae975e1a66377cd36b3549c8 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Sun, 17 Dec 2023 12:39:56 -0800 Subject: [PATCH] Add Active Job extension for Labels --- README.md | 25 +++++ app/models/concerns/good_job/filterable.rb | 2 +- app/models/good_job/base_execution.rb | 15 +++ app/models/good_job/execution.rb | 7 ++ .../20231216183914_create_good_job_labels.rb | 15 +++ ...1216183915_create_good_job_labels_index.rb | 22 ++++ demo/db/schema.rb | 4 +- .../migrations/create_good_jobs.rb.erb | 2 + .../08_create_good_job_labels.rb.erb | 15 +++ .../09_create_good_job_labels_index.rb.erb | 22 ++++ lib/good_job.rb | 5 +- lib/good_job/active_job_extensions/labels.rb | 32 ++++++ .../concerns/good_job/filterable_spec.rb | 32 ++++-- .../active_job_extensions/labels_spec.rb | 105 ++++++++++++++++++ spec/support/reset_good_job.rb | 2 +- 15 files changed, 293 insertions(+), 12 deletions(-) create mode 100644 demo/db/migrate/20231216183914_create_good_job_labels.rb create mode 100644 demo/db/migrate/20231216183915_create_good_job_labels_index.rb create mode 100644 lib/generators/good_job/templates/update/migrations/08_create_good_job_labels.rb.erb create mode 100644 lib/generators/good_job/templates/update/migrations/09_create_good_job_labels_index.rb.erb create mode 100644 lib/good_job/active_job_extensions/labels.rb create mode 100644 spec/lib/good_job/active_job_extensions/labels_spec.rb diff --git a/README.md b/README.md index 69bbb5bf5..09d32e8b4 100644 --- a/README.md +++ b/README.md @@ -438,6 +438,31 @@ The Dashboard can be set to automatically refresh by checking "Live Poll" in the Higher priority numbers run first in all versions of GoodJob v3.x and below. GoodJob v4.x will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority (see #524). To opt-in to this behavior now, set `config.good_job.smaller_number_is_higher_priority = true` in your GoodJob initializer or `application.rb`. +### Labelled jobs + +Labels are the recommended way to add context or metadata to specific jobs. For example, all jobs that have a dependency on an email service could be labeled `email`. Using labels requires adding the Active Job extension `GoodJob::ActiveJobExtensions::Labels` to your job class. + +```ruby +class ApplicationRecord < ActiveJob::Base + include GoodJob::ActiveJobExtensions::Labels +end + +# Add a default label to every job within the class +class WelcomeJob < ApplicationRecord + self.good_job_labels = ["email"] + + def perform + # Labels can be inspected from within the job + puts good_job_labels # => ["email"] + end +end + +# Or add to individual jobs when enqueued +WelcomeJob.set(good_job_labels: ["email"]).perform_later +``` + +Labels can be used to search jobs in the Dashboard. For example, to find all jobs labeled `email`, search for `email`. + ### Concurrency controls GoodJob can extend ActiveJob to provide limits on concurrently running jobs, either at time of _enqueue_ or at _perform_. Limiting concurrency can help prevent duplicate, double or unnecessary jobs from being enqueued, or race conditions when performing, for example when interacting with 3rd-party APIs. diff --git a/app/models/concerns/good_job/filterable.rb b/app/models/concerns/good_job/filterable.rb index 91f1eff4a..539a293da 100644 --- a/app/models/concerns/good_job/filterable.rb +++ b/app/models/concerns/good_job/filterable.rb @@ -34,7 +34,7 @@ module Filterable query = query.to_s.strip next if query.blank? - tsvector = "(to_tsvector('english', serialized_params) || to_tsvector('english', id::text) || to_tsvector('english', COALESCE(error, '')::text))" + tsvector = "(to_tsvector('english', id::text) || to_tsvector('english', COALESCE(active_job_id::text, '')) || to_tsvector('english', serialized_params) || to_tsvector('english', COALESCE(error, ''))#{" || to_tsvector('english', COALESCE(array_to_string(labels, ' '), ''))" if labels_migrated?})" to_tsquery_function = database_supports_websearch_to_tsquery? ? 'websearch_to_tsquery' : 'plainto_tsquery' where("#{tsvector} @@ #{to_tsquery_function}(?)", query) .order(sanitize_sql_for_order([Arel.sql("ts_rank(#{tsvector}, #{to_tsquery_function}(?))"), query]) => 'DESC') diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index 01075b235..1e2c783a1 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -56,6 +56,20 @@ def cron_indices_migrated? migration_pending_warning! false end + + def labels_migrated? + return true if columns_hash["labels"].present? + + migration_pending_warning! + false + end + + def labels_indices_migrated? + return true if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels) + + migration_pending_warning! + false + end end # The ActiveJob job class, as a string @@ -88,6 +102,7 @@ def active_job_data .tap do |job_data| job_data["provider_job_id"] = id job_data["good_job_concurrency_key"] = concurrency_key if concurrency_key + job_data["good_job_labels"] = Array(labels) if self.class.labels_migrated? && labels.present? end end end diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index cf268744b..895fb9863 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -225,6 +225,13 @@ def self.enqueue_args(active_job, overrides = {}) execution_args[:scheduled_at] = Time.zone.at(active_job.scheduled_at) if active_job.scheduled_at execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) + if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? && labels_migrated? + labels = active_job.good_job_labels.dup + labels.map! { |label| label.to_s.strip.presence } + labels.tap(&:compact!).tap(&:uniq!) + execution_args[:labels] = labels + end + reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_execution = CurrentThread.execution diff --git a/demo/db/migrate/20231216183914_create_good_job_labels.rb b/demo/db/migrate/20231216183914_create_good_job_labels.rb new file mode 100644 index 000000000..1ba514e8b --- /dev/null +++ b/demo/db/migrate/20231216183914_create_good_job_labels.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class CreateGoodJobLabels < ActiveRecord::Migration[7.1] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :labels) + end + end + + add_column :good_jobs, :labels, :text, array: true + end +end diff --git a/demo/db/migrate/20231216183915_create_good_job_labels_index.rb b/demo/db/migrate/20231216183915_create_good_job_labels_index.rb new file mode 100644 index 000000000..48c4e59c0 --- /dev/null +++ b/demo/db/migrate/20231216183915_create_good_job_labels_index.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class CreateGoodJobLabelsIndex < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels) + add_index :good_jobs, :labels, where: "(labels IS NOT NULL)", + using: :gin, name: :index_good_jobs_on_labels, algorithm: :concurrently + end + end + + dir.down do + if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels) + remove_index :good_jobs, name: :index_good_jobs_on_labels + end + end + end + end +end diff --git a/demo/db/schema.rb b/demo/db/schema.rb index 4c88b8089..6fc2fca3c 100644 --- a/demo/db/schema.rb +++ b/demo/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_11_28_075428) do +ActiveRecord::Schema.define(version: 2023_12_16_183915) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -79,6 +79,7 @@ t.integer "executions_count" t.text "job_class" t.integer "error_event", limit: 2 + t.text "labels", array: true t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id" t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)" @@ -87,6 +88,7 @@ t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at_cond", where: "(cron_key IS NOT NULL)" t.index ["cron_key", "cron_at"], name: "index_good_jobs_on_cron_key_and_cron_at_cond", unique: true, where: "(cron_key IS NOT NULL)" t.index ["finished_at"], name: "index_good_jobs_jobs_on_finished_at", where: "((retried_good_job_id IS NULL) AND (finished_at IS NOT NULL))" + t.index ["labels"], name: "index_good_jobs_on_labels", where: "(labels IS NOT NULL)", using: :gin t.index ["priority", "created_at"], name: "index_good_jobs_jobs_on_priority_created_at_when_unfinished", order: { priority: "DESC NULLS LAST" }, where: "(finished_at IS NULL)" t.index ["queue_name", "scheduled_at"], name: "index_good_jobs_on_queue_name_and_scheduled_at", where: "(finished_at IS NULL)" t.index ["scheduled_at"], name: "index_good_jobs_on_scheduled_at", where: "(finished_at IS NULL)" diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index 9d82e9806..72059d01f 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -29,6 +29,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.integer :executions_count t.text :job_class t.integer :error_event, limit: 2 + t.text :labels, array: true end create_table :good_job_batches, id: :uuid do |t| @@ -82,6 +83,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL" add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL" + add_index :good_jobs, :labels, using: :gin, where: "(labels IS NOT NULL)", name: :index_good_jobs_on_labels add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at end diff --git a/lib/generators/good_job/templates/update/migrations/08_create_good_job_labels.rb.erb b/lib/generators/good_job/templates/update/migrations/08_create_good_job_labels.rb.erb new file mode 100644 index 000000000..0bc3ec97e --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/08_create_good_job_labels.rb.erb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class CreateGoodJobLabels < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :labels) + end + end + + add_column :good_jobs, :labels, :text, array: true + end +end diff --git a/lib/generators/good_job/templates/update/migrations/09_create_good_job_labels_index.rb.erb b/lib/generators/good_job/templates/update/migrations/09_create_good_job_labels_index.rb.erb new file mode 100644 index 000000000..e069bd9c5 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/09_create_good_job_labels_index.rb.erb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class CreateGoodJobLabelsIndex < ActiveRecord::Migration<%= migration_version %> + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels) + add_index :good_jobs, :labels, using: :gin, where: "(labels IS NOT NULL)", + name: :index_good_jobs_on_labels, algorithm: :concurrently + end + end + + dir.down do + if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels) + remove_index :good_jobs, name: :index_good_jobs_on_labels + end + end + end + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 95987ad56..e1f3b7075 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -12,6 +12,7 @@ require "good_job/active_job_extensions/concurrency" require "good_job/interrupt_error" require "good_job/active_job_extensions/interrupt_errors" +require "good_job/active_job_extensions/labels" require "good_job/active_job_extensions/notify_options" require "good_job/assignable_connection" @@ -22,7 +23,7 @@ require "good_job/cli" require "good_job/configuration" require "good_job/cron_manager" -require 'good_job/current_thread' +require "good_job/current_thread" require "good_job/daemon" require "good_job/dependencies" require "good_job/job_performer" @@ -272,7 +273,7 @@ def self.deprecator def self.migrated? # Always update with the most recent migration check GoodJob::Execution.reset_column_information - GoodJob::Execution.cron_indices_migrated? + GoodJob::Execution.labels_indices_migrated? end ActiveSupport.run_load_hooks(:good_job, self) diff --git a/lib/good_job/active_job_extensions/labels.rb b/lib/good_job/active_job_extensions/labels.rb new file mode 100644 index 000000000..3f7365344 --- /dev/null +++ b/lib/good_job/active_job_extensions/labels.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module GoodJob + module ActiveJobExtensions + module Labels + extend ActiveSupport::Concern + + module Prepends + def initialize(*arguments) + super + self.good_job_labels = Array(self.class.good_job_labels) + end + + def enqueue(options = {}) + self.good_job_labels = Array(options[:good_job_labels]) if options.key?(:good_job_labels) + super + end + + def deserialize(job_data) + super + self.good_job_labels = job_data.delete("good_job_labels")&.dup || [] + end + end + + included do + prepend Prepends + class_attribute :good_job_labels, instance_accessor: false, instance_predicate: false, default: [] + attr_accessor :good_job_labels + end + end + end +end diff --git a/spec/app/models/concerns/good_job/filterable_spec.rb b/spec/app/models/concerns/good_job/filterable_spec.rb index df3a891b7..5c6cbfa44 100644 --- a/spec/app/models/concerns/good_job/filterable_spec.rb +++ b/spec/app/models/concerns/good_job/filterable_spec.rb @@ -3,24 +3,42 @@ require 'rails_helper' RSpec.describe GoodJob::Filterable do - let(:model_class) { GoodJob::Execution } - let!(:execution) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default", serialized_params: { example_key: 'example_value' }, error: "ExampleJob::ExampleError: a message") } + let(:model_class) { GoodJob::Job } + let!(:job) do + model_class.create( + active_job_id: SecureRandom.uuid, + queue_name: "default", + serialized_params: { example_key: 'example_value' }, + labels: %w[buffalo gopher], + error: "ExampleJob::ExampleError: a message" + ) + end describe '.search_test' do it 'searches serialized params' do - expect(model_class.search_text('example_value')).to include(execution) + expect(model_class.search_text('example_value')).to include(job) end it 'searches record id' do - expect(model_class.search_text(execution.id)).to include(execution) + expect(model_class.search_text(job.id)).to include(job) + end + + it 'searches active_job_id' do + expect(model_class.search_text(job.active_job_id)).to include(job) + end + + it 'searches labels' do + expect(model_class.search_text('buffalo')).to include(job) + expect(model_class.search_text('gopher')).to include(job) + expect(model_class.search_text('hippo')).not_to include(job) end it 'searches errors' do - expect(model_class.search_text('ExampleError')).to include(execution) + expect(model_class.search_text('ExampleError')).to include(job) end it 'searches strings with colons' do - expect(model_class.search_text('ExampleJob::ExampleError')).to include(execution) + expect(model_class.search_text('ExampleJob::ExampleError')).to include(job) end it 'filters out non-matching records' do @@ -28,7 +46,7 @@ end it 'is chainable and reversible' do - expect(model_class.where.not(id: nil).search_text('example_value').reverse).to include(execution) + expect(model_class.where.not(id: nil).search_text('example_value').reverse).to include(job) end end end diff --git a/spec/lib/good_job/active_job_extensions/labels_spec.rb b/spec/lib/good_job/active_job_extensions/labels_spec.rb new file mode 100644 index 000000000..6eea20350 --- /dev/null +++ b/spec/lib/good_job/active_job_extensions/labels_spec.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe GoodJob::ActiveJobExtensions::Labels do + before do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) + + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + include GoodJob::ActiveJobExtensions::Labels + + def perform + end + end) + end + + it "is an empty array by default to Active Job, but null on the record" do + expect(TestJob.good_job_labels).to eq [] + TestJob.perform_later + + job = GoodJob::Job.last + expect(job.labels).to eq nil + expect(job.active_job.good_job_labels).to eq [] + end + + it "is serialized and deserialized" do + TestJob.good_job_labels = %w[buffalo gopher] + + expect(TestJob.good_job_labels).to eq %w[buffalo gopher] + TestJob.perform_later + + job = GoodJob::Job.last + expect(job.labels).to eq %w[buffalo gopher] + expect(job.active_job.good_job_labels).to eq %w[buffalo gopher] + end + + it "doesn't leak into the serialized params" do + TestJob.good_job_labels = %w[buffalo gopher] + TestJob.perform_later + + expect(GoodJob::Job.last.serialized_params).not_to have_key("good_job_labels") + end + + it 'appropriately deserializes a nil value even when the class value is set' do + TestJob.good_job_labels = ["buffalo"] + + TestJob.set(good_job_labels: []).perform_later + + job = GoodJob::Job.last + expect(job.labels).to eq nil + + active_job = job.active_job + expect(active_job.good_job_labels).to eq [] + end + + it 'is unique' do + TestJob.good_job_labels = %w[buffalo gopher gopher] + TestJob.perform_later + + job = GoodJob::Job.last + expect(job.labels).to eq %w[buffalo gopher] + + active_job = job.active_job + expect(active_job.good_job_labels).to eq %w[buffalo gopher] + end + + it 'strips values' do + TestJob.good_job_labels = ["buffalo", " gopher ", "gopher"] + TestJob.perform_later + + active_job = GoodJob::Job.last.active_job + expect(active_job.good_job_labels).to eq %w[buffalo gopher] + end + + it "can contain non-string values" do + TestJob.good_job_labels = ["buffalo", "key:value", 1, true, nil] + TestJob.perform_later + + active_job = GoodJob::Job.last.active_job + expect(active_job.good_job_labels).to eq ["buffalo", "key:value", "1", "true"] + end + + context 'when a job is retried' do + before do + stub_const 'ExpectedError', Class.new(StandardError) + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + include GoodJob::ActiveJobExtensions::Labels + retry_on ExpectedError, wait: 0, attempts: 3 + + def perform + good_job_labels << "gopher" + raise ExpectedError if executions < 3 + end + end) + end + + it 'retains the label when retried' do + TestJob.set(good_job_labels: ["buffalo"]).perform_later + GoodJob.perform_inline + + expect(GoodJob::DiscreteExecution.count).to eq 3 + expect(GoodJob::Job.first).to have_attributes(labels: %w[buffalo gopher]) + end + end +end diff --git a/spec/support/reset_good_job.rb b/spec/support/reset_good_job.rb index 67409d990..300b929fd 100644 --- a/spec/support/reset_good_job.rb +++ b/spec/support/reset_good_job.rb @@ -5,7 +5,7 @@ ActiveSupport.on_load :active_record do ActiveRecord::ConnectionAdapters::AbstractAdapter.set_callback :checkout, :before, lambda { |conn| thread_name = Thread.current.name || Thread.current.object_id - conn.exec_query("SET application_name = '#{thread_name}'", "Set application name") + conn.exec_query("SET application_name = #{conn.quote(thread_name)}", "Set application name") } end