Skip to content

Commit

Permalink
Allow Probe Server's /connect to handle a certain number of reconne…
Browse files Browse the repository at this point in the history
…cts before statusing (#1075)
  • Loading branch information
bensheldon authored Sep 16, 2023
1 parent 3c45787 commit 284d671
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 41 deletions.
42 changes: 28 additions & 14 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ class Notifier
WAIT_INTERVAL = 1
# Seconds to wait if database cannot be connected to
RECONNECT_INTERVAL = 5
# Number of consecutive connection errors before reporting an error
CONNECTION_ERRORS_REPORTING_THRESHOLD = 6

# Connection errors that will wait {RECONNECT_INTERVAL} before reconnecting
CONNECTION_ERRORS = %w[
ActiveRecord::ConnectionNotEstablished
ActiveRecord::StatementInvalid
PG::UnableToSend
PG::Error
].freeze
CONNECTION_ERRORS_REPORTING_THRESHOLD = 3

# @!attribute [r] instances
# @!scope class
Expand Down Expand Up @@ -69,8 +71,8 @@ def initialize(*recipients, enable_listening: true, executor: Concurrent.global_
@mutex = Mutex.new
@shutdown_event = Concurrent::Event.new.tap(&:set)
@running = Concurrent::AtomicBoolean.new(false)
@connected = Concurrent::AtomicBoolean.new(false)
@listening = Concurrent::AtomicBoolean.new(false)
@connected = Concurrent::Event.new
@listening = Concurrent::Event.new
@connection_errors_count = Concurrent::AtomicFixnum.new(0)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
@enable_listening = enable_listening
Expand All @@ -85,15 +87,25 @@ def running?
end

# Tests whether the notifier is active and has acquired a dedicated database connection.
# @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever
# @return [true, false, nil]
def connected?
@connected.true?
def connected?(timeout: nil)
if timeout.nil?
@connected.set?
else
@connected.wait(timeout == -1 ? nil : timeout)
end
end

# Tests whether the notifier is listening for new messages.
# @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever
# @return [true, false, nil]
def listening?
@listening.true?
def listening?(timeout: nil)
if timeout.nil?
@listening.set?
else
@listening.wait(timeout == -1 ? nil : timeout)
end
end

def shutdown?
Expand All @@ -114,11 +126,12 @@ def shutdown(timeout: -1)

if @executor.shutdown? || @task&.complete?
# clean up in the even the executor is killed
@connected.make_false
@listening.make_false
@connected.reset
@listening.reset
@shutdown_event.set
else
@shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil?
@connected.reset if @shutdown_event.set?
end
@shutdown_event.set?
end
Expand Down Expand Up @@ -152,6 +165,7 @@ def listen_observer(_time, _result, thread_error)
if connection_error
@connection_errors_count.increment
if @connection_errors_reported.false? && @connection_errors_count.value >= CONNECTION_ERRORS_REPORTING_THRESHOLD
@connected.reset
GoodJob._on_thread_error(thread_error)
@connection_errors_reported.make_true
end
Expand Down Expand Up @@ -180,15 +194,17 @@ def start
end

def create_listen_task(delay: 0)
@task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_listening|
@task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @connected, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_connected, thr_listening|
with_connection do
thr_connected.set

begin
Rails.application.executor.wrap do
run_callbacks :listen do
if thr_enable_listening
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
thr_listening.make_true
thr_listening.set
end
end
end
Expand Down Expand Up @@ -216,7 +232,7 @@ def create_listen_task(delay: 0)
run_callbacks :unlisten do
if thr_enable_listening
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
thr_listening.make_false
thr_listening.reset
connection.execute("UNLISTEN *")
end
end
Expand All @@ -237,11 +253,9 @@ def with_connection
end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")
@connected.make_true

yield
ensure
@connected.make_false
connection&.disconnect!
self.connection = nil
end
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/probe_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def call(env)
started ? [200, {}, ["Started"]] : [503, {}, ["Not started"]]
when '/status/connected'
connected = GoodJob::Scheduler.instances.any? && GoodJob::Scheduler.instances.all?(&:running?) &&
GoodJob::Notifier.instances.any? && GoodJob::Notifier.instances.all?(&:listening?)
GoodJob::Notifier.instances.any? && GoodJob::Notifier.instances.all?(&:connected?)
connected ? [200, {}, ["Connected"]] : [503, {}, ["Not connected"]]
else
[404, {}, ["Not found"]]
Expand Down
61 changes: 37 additions & 24 deletions spec/lib/good_job/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,69 @@
describe '#connected?' do
it 'becomes true when the notifier is connected' do
notifier = described_class.new(enable_listening: true)
sleep_until(max: 5) { notifier.connected? }
expect(notifier.connected?(timeout: 5)).to be true

expect do
notifier.shutdown
end.to change(notifier, :connected?).from(true).to(false)
end

it 'remains true through multiple connection errors until CONNECTION_ERRORS_REPORTING_THRESHOLD is reached' do
error_event = Concurrent::Event.new
allow(GoodJob).to receive(:_on_thread_error) { error_event.set }

stub_const('GoodJob::Notifier::WAIT_INTERVAL', 0.1)
stub_const('GoodJob::Notifier::RECONNECT_INTERVAL', 0.1)
stub_const('GoodJob::Notifier::CONNECTION_ERRORS_REPORTING_THRESHOLD', 3)

notifier = described_class.new(enable_listening: true)
expect(notifier.connected?(timeout: 5)).to be true
allow(notifier).to receive(:wait_for_notify).and_raise(ActiveRecord::ConnectionTimeoutError)
error_event.wait(5)
expect(notifier).not_to be_connected
end
end

describe '#listen' do
it 'loops until it receives a command' do
stub_const 'RECEIVED_MESSAGE', Concurrent::AtomicBoolean.new(false)

recipient = proc { |_payload| RECEIVED_MESSAGE.make_true }
event = Concurrent::Event.new
recipient = proc { |_payload| event.set }

notifier = described_class.new(recipient, enable_listening: true)
sleep_until(max: 5) { notifier.listening? }
notifier.listening?(timeout: 5)

described_class.notify(true)
sleep_until(max: 5) { RECEIVED_MESSAGE.true? }
notifier.shutdown
expect(event.wait(5)).to be true

expect(RECEIVED_MESSAGE.true?).to be true
notifier.shutdown
end

it 'loops but does not receive a command if listening is not enabled' do
stub_const 'RECEIVED_MESSAGE', Concurrent::AtomicBoolean.new(false)

recipient = proc { |_payload| RECEIVED_MESSAGE.make_true }
latch = Concurrent::CountDownLatch.new(1)
recipient = proc { |_payload| latch.count_down }
notifier = described_class.new(recipient, enable_listening: false)
expect(notifier.listening?).to be false
described_class.notify(true)
sleep_until(max: 1) { RECEIVED_MESSAGE.false? }

expect(notifier.connected?(timeout: 5)).to be true
expect(notifier.listening?(timeout: 1)).to be false
sleep 1
notifier.shutdown

expect(RECEIVED_MESSAGE.false?).to be true
expect(latch.count).to eq 1
end

shared_examples 'calls refresh_if_stale on every tick' do
specify do
stub_const 'REFRESH_IF_STALE_CALLED', Concurrent::AtomicFixnum.new(0)

allow_any_instance_of(GoodJob::Process).to receive(:refresh_if_stale) { REFRESH_IF_STALE_CALLED.increment }
refreshes = Concurrent::AtomicFixnum.new(0)
allow_any_instance_of(GoodJob::Process).to receive(:refresh_if_stale) { refreshes.increment }

recipient = proc {}
notifier = described_class.new(recipient, enable_listening: true)
sleep_until(max: 5) { notifier.listening? }
expect(notifier).to be_listening(timeout: 2)
described_class.notify(true)
sleep_until(max: 5) { REFRESH_IF_STALE_CALLED.value > 0 }
notifier.shutdown

expect(REFRESH_IF_STALE_CALLED.value).to be > 0
wait_until(max: 5) { expect(refreshes.value).to be > 0 }

notifier.shutdown
end
end

Expand All @@ -98,7 +111,7 @@
allow(JSON).to receive(:parse).and_raise ExpectedError

notifier = described_class.new(enable_listening: true)
sleep_until { notifier.listening? }
expect(notifier).to be_listening(timeout: 2)

described_class.notify(true)
wait_until { expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) }
Expand All @@ -114,7 +127,7 @@
allow(JSON).to receive(:parse).and_raise ExpectedError

notifier = described_class.new(enable_listening: true)
sleep_until { notifier.listening? }
expect(notifier).to be_listening(timeout: 2)

described_class.notify(true)
wait_until { expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) }
Expand Down
17 changes: 15 additions & 2 deletions spec/lib/good_job/probe_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,25 @@
end
end

context 'when there are running schedulers and listening notifiers' do
context 'when there are running schedulers but disconnected notifiers' do
it 'returns 200' do
scheduler = instance_double(GoodJob::Scheduler, running?: true, shutdown: true, shutdown?: true)
GoodJob::Scheduler.instances << scheduler

notifier = instance_double(GoodJob::Notifier, listening?: true, shutdown: true, shutdown?: true)
notifier = instance_double(GoodJob::Notifier, connected?: false, shutdown: true, shutdown?: true)
GoodJob::Notifier.instances << notifier

response = probe_server.call(env)
expect(response[0]).to eq(503)
end
end

context 'when there are running schedulers and connected notifiers' do
it 'returns 200' do
scheduler = instance_double(GoodJob::Scheduler, running?: true, shutdown: true, shutdown?: true)
GoodJob::Scheduler.instances << scheduler

notifier = instance_double(GoodJob::Notifier, connected?: true, shutdown: true, shutdown?: true)
GoodJob::Notifier.instances << notifier

response = probe_server.call(env)
Expand Down

0 comments on commit 284d671

Please sign in to comment.