Skip to content

Commit

Permalink
Wrap Adapter enqueue methods and Batch callbacks with Rails Executor;…
Browse files Browse the repository at this point in the history
… verify in tests that no Advisory locks remain at database connection check-in
  • Loading branch information
bensheldon committed Oct 23, 2023
1 parent c0fcdb4 commit 31ff3de
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 89 deletions.
10 changes: 7 additions & 3 deletions app/models/good_job/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@ def enqueue(active_jobs = [], **properties, &block)

active_jobs = add(active_jobs, &block)

record.with_advisory_lock(function: "pg_advisory_lock") do
record.update!(enqueued_at: Time.current)
record._continue_discard_or_finish(lock: false)
Rails.application.reloader.wrap do
record.with_advisory_lock(function: "pg_advisory_lock") do
record.update!(enqueued_at: Time.current)

# During inline execution, this could enqueue and execute further jobs
record._continue_discard_or_finish(lock: false)
end
end

active_jobs
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000)
def self.perform_inline(queue_string = "*")
job_performer = JobPerformer.new(queue_string)
loop do
result = job_performer.next
result = Rails.application.reloader.wrap { job_performer.next }
break unless result
raise result.unhandled_error if result.unhandled_error
end
Expand Down
156 changes: 80 additions & 76 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,74 +49,76 @@ def enqueue_all(active_jobs)
active_jobs = Array(active_jobs)
return 0 if active_jobs.empty?

current_time = Time.current
executions = active_jobs.map do |active_job|
GoodJob::Execution.build_for_enqueue(active_job).tap do |execution|
if GoodJob::Execution.discrete_support?
execution.make_discrete
execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
end
Rails.application.reloader.wrap do
current_time = Time.current
executions = active_jobs.map do |active_job|
GoodJob::Execution.build_for_enqueue(active_job).tap do |execution|
if GoodJob::Execution.discrete_support?
execution.make_discrete
execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
end

execution.created_at = current_time
execution.updated_at = current_time
execution.created_at = current_time
execution.updated_at = current_time
end
end
end

inline_executions = []
GoodJob::Execution.transaction(requires_new: true, joinable: false) do
execution_attributes = executions.map do |execution|
if GoodJob::Execution.error_event_migrated?
execution.attributes
else
execution.attributes.except('error_event')
inline_executions = []
GoodJob::Execution.transaction(requires_new: true, joinable: false) do
execution_attributes = executions.map do |execution|
if GoodJob::Execution.error_event_migrated?
execution.attributes
else
execution.attributes.except('error_event')
end
end
end

results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations
results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations

job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
active_jobs.each do |active_job|
active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
end
executions.each do |execution|
execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
end
executions = executions.select(&:persisted?) # prune unpersisted executions
job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
active_jobs.each do |active_job|
active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
end
executions.each do |execution|
execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
end
executions = executions.select(&:persisted?) # prune unpersisted executions

if execute_inline?
inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
inline_executions.each(&:advisory_lock!)
if execute_inline?
inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
inline_executions.each(&:advisory_lock!)
end
end
end

begin
until inline_executions.empty?
begin
inline_execution = inline_executions.shift
inline_result = inline_execution.perform
ensure
inline_execution.advisory_unlock
inline_execution.run_callbacks(:perform_unlocked)
begin
until inline_executions.empty?
begin
inline_execution = inline_executions.shift
inline_result = inline_execution.perform
ensure
inline_execution.advisory_unlock
inline_execution.run_callbacks(:perform_unlocked)
end
raise inline_result.unhandled_error if inline_result.unhandled_error
end
raise inline_result.unhandled_error if inline_result.unhandled_error
ensure
inline_executions.each(&:advisory_unlock)
end
ensure
inline_executions.each(&:advisory_unlock)
end

non_inline_executions = executions.reject(&:finished_at)
if non_inline_executions.any?
job_id_to_active_jobs = active_jobs.index_by(&:job_id)
non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
state[:scheduled_at] = scheduled_at if scheduled_at

executed_locally = execute_async? && @capsule&.create_thread(state)
unless executed_locally
state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
Notifier.notify(state) unless state[:count].zero?
non_inline_executions = executions.reject(&:finished_at)
if non_inline_executions.any?
job_id_to_active_jobs = active_jobs.index_by(&:job_id)
non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
state[:scheduled_at] = scheduled_at if scheduled_at

executed_locally = execute_async? && @capsule&.create_thread(state)
unless executed_locally
state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
Notifier.notify(state) unless state[:count].zero?
end
end
end
end
Expand All @@ -137,30 +139,32 @@ def enqueue_at(active_job, timestamp)
# job there to be enqueued using enqueue_all
return if GoodJob::Bulk.capture(active_job, queue_adapter: self)

will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
execution = GoodJob::Execution.enqueue(
active_job,
scheduled_at: scheduled_at,
create_with_advisory_lock: will_execute_inline
)
Rails.application.reloader.wrap do
will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
execution = GoodJob::Execution.enqueue(
active_job,
scheduled_at: scheduled_at,
create_with_advisory_lock: will_execute_inline
)

if will_execute_inline
begin
result = execution.perform
ensure
execution.advisory_unlock
execution.run_callbacks(:perform_unlocked)
if will_execute_inline
begin
result = execution.perform
ensure
execution.advisory_unlock
execution.run_callbacks(:perform_unlocked)
end
raise result.unhandled_error if result.unhandled_error
else
job_state = { queue_name: execution.queue_name }
job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at

executed_locally = execute_async? && @capsule&.create_thread(job_state)
Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
end
raise result.unhandled_error if result.unhandled_error
else
job_state = { queue_name: execution.queue_name }
job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at

executed_locally = execute_async? && @capsule&.create_thread(job_state)
Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
execution
end

execution
end

# Shut down the thread pool executors.
Expand Down
4 changes: 4 additions & 0 deletions spec/app/models/good_job/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
require 'rails_helper'

RSpec.describe GoodJob::Execution do
around do |example|
Rails.application.executor.wrap { example.run }
end

before do
allow(described_class).to receive(:discrete_support?).and_return(false)

Expand Down
8 changes: 5 additions & 3 deletions spec/lib/good_job/active_job_extensions/batches_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ def perform

describe 'batch accessors' do
it 'access batch' do
batch = GoodJob::Batch.enqueue(some_property: "Apple") do
TestJob.perform_later
TestJob.perform_later
batch = Rails.application.executor.wrap do
GoodJob::Batch.enqueue(some_property: "Apple") do
TestJob.perform_later
TestJob.perform_later
end
end

expect(batch).to be_a GoodJob::Batch
Expand Down
14 changes: 9 additions & 5 deletions spec/lib/good_job/active_job_extensions/concurrency_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ def perform(name:)
it "is inclusive of both performing and enqueued jobs" do
expect(TestJob.perform_later(name: "Alice")).to be_present

GoodJob::Execution.all.with_advisory_lock do
expect(TestJob.perform_later(name: "Alice")).to be false
Rails.application.executor.wrap do
GoodJob::Execution.all.with_advisory_lock do
expect(TestJob.perform_later(name: "Alice")).to be false
end
end
end
end
Expand Down Expand Up @@ -96,9 +98,11 @@ def perform(name:)
expect(TestJob.perform_later(name: "Alice")).to be_present

# Lock one of the jobs
GoodJob::Execution.first.with_advisory_lock do
# Third usage does enqueue
expect(TestJob.perform_later(name: "Alice")).to be_present
Rails.application.executor.wrap do
GoodJob::Execution.first.with_advisory_lock do
# Third usage does enqueue
expect(TestJob.perform_later(name: "Alice")).to be_present
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/requests/good_job/jobs_controller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
around do |example|
orig_value = ActionController::Base.allow_forgery_protection
ActionController::Base.allow_forgery_protection = false
example.call
example.run
ActionController::Base.allow_forgery_protection = orig_value
end

Expand Down
9 changes: 9 additions & 0 deletions spec/support/postgres_notices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
POSTGRES_NOTICES << result.error_message
end
}

ActiveRecord::ConnectionAdapters::AbstractAdapter.set_callback :checkin, :before, lambda { |conn|
warning = PgLock.debug_own_locks(conn)
next if warning.blank?

$stdout.puts warning
Rails.logger.warn(warning)
POSTGRES_NOTICES << warning
}
end

RSpec.configure do |config|
Expand Down
44 changes: 44 additions & 0 deletions spec/support/reset_good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,15 @@ def initialize_type_map(map = type_map)
end

class PgStatActivity < ActiveRecord::Base
include GoodJob::AssignableConnection

self.table_name = 'pg_stat_activity'
self.primary_key = 'datid'
end

class PgLock < ActiveRecord::Base
include GoodJob::AssignableConnection

self.table_name = 'pg_locks'
self.primary_key = 'objid'

Expand All @@ -142,6 +146,46 @@ class PgLock < ActiveRecord::Base
scope :owns, -> { where('pid = pg_backend_pid()') }
scope :others, -> { where('pid != pg_backend_pid()') }

def self.debug_own_locks(connection = ActiveRecord::Base.connection)
count = PgLock.with_connection(connection) do
PgLock.current_database.advisory_lock.owns.count
end
return false if count.zero?

output = []
output << "There are #{count} advisory locks still open by the current database connection."
GoodJob::Execution.include(GoodJob::AssignableConnection)
GoodJob::Execution.with_connection(connection) do
GoodJob::Execution.owns_advisory_locked.each.with_index do |execution, index|
output << "\nAdvisory locked GoodJob::Execution:" if index.zero?
output << " - Execution ID: #{execution.id} / Active Job ID: #{execution.active_job_id}"
end
end

GoodJob::BatchRecord.include(GoodJob::AssignableConnection)
GoodJob::BatchRecord.with_connection(connection) do
GoodJob::BatchRecord.owns_advisory_locked.each.with_index do |batch, index|
output << "\nAdvisory locked GoodJob::Batch:" if index.zero?
output << " - BatchRecord ID: #{batch.id}"
end
end

GoodJob::Process.include(GoodJob::AssignableConnection)
GoodJob::Process.with_connection(connection) do
GoodJob::Process.owns_advisory_locked.each.with_index do |process, index|
output << "\nAdvisory locked GoodJob::Process:" if index.zero?
output << " - Process ID: #{process.id}"
end
end

output << "\nAdvisory Locks:"
PgLock.current_database.advisory_lock.owns.includes(:pg_stat_activity).all.each do |pg_lock| # rubocop:disable Rails/FindEach
output << " - #{pg_lock.pid}: #{pg_lock.pg_stat_activity&.application_name}"
end

output.join("\n")
end

def unlock
query = <<~SQL.squish
SELECT pg_advisory_unlock(($1::bigint << 32) + $2::bigint) AS unlocked
Expand Down

0 comments on commit 31ff3de

Please sign in to comment.