diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index c3d6404f2..4a35ff34a 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -24,6 +24,9 @@ class Notifier WAIT_INTERVAL = 1 # Seconds to wait if database cannot be connected to RECONNECT_INTERVAL = 5 + # Number of consecutive connection errors before reporting an error + CONNECTION_ERRORS_REPORTING_THRESHOLD = 6 + # Connection errors that will wait {RECONNECT_INTERVAL} before reconnecting CONNECTION_ERRORS = %w[ ActiveRecord::ConnectionNotEstablished @@ -31,7 +34,6 @@ class Notifier PG::UnableToSend PG::Error ].freeze - CONNECTION_ERRORS_REPORTING_THRESHOLD = 3 # @!attribute [r] instances # @!scope class @@ -69,8 +71,8 @@ def initialize(*recipients, enable_listening: true, executor: Concurrent.global_ @mutex = Mutex.new @shutdown_event = Concurrent::Event.new.tap(&:set) @running = Concurrent::AtomicBoolean.new(false) - @connected = Concurrent::AtomicBoolean.new(false) - @listening = Concurrent::AtomicBoolean.new(false) + @connected = Concurrent::Event.new + @listening = Concurrent::Event.new @connection_errors_count = Concurrent::AtomicFixnum.new(0) @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @@ -85,15 +87,25 @@ def running? end # Tests whether the notifier is active and has acquired a dedicated database connection. + # @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever # @return [true, false, nil] - def connected? - @connected.true? + def connected?(timeout: nil) + if timeout.nil? + @connected.set? + else + @connected.wait(timeout == -1 ? nil : timeout) + end end # Tests whether the notifier is listening for new messages. + # @param timeout [Numeric, nil] Seconds to wait for condition to be true, -1 is forever # @return [true, false, nil] - def listening? - @listening.true? + def listening?(timeout: nil) + if timeout.nil? + @listening.set? + else + @listening.wait(timeout == -1 ? nil : timeout) + end end def shutdown? @@ -114,11 +126,12 @@ def shutdown(timeout: -1) if @executor.shutdown? || @task&.complete? # clean up in the even the executor is killed - @connected.make_false - @listening.make_false + @connected.reset + @listening.reset @shutdown_event.set else @shutdown_event.wait(timeout == -1 ? nil : timeout) unless timeout.nil? + @connected.reset if @shutdown_event.set? end @shutdown_event.set? end @@ -152,6 +165,7 @@ def listen_observer(_time, _result, thread_error) if connection_error @connection_errors_count.increment if @connection_errors_reported.false? && @connection_errors_count.value >= CONNECTION_ERRORS_REPORTING_THRESHOLD + @connected.reset GoodJob._on_thread_error(thread_error) @connection_errors_reported.make_true end @@ -180,15 +194,17 @@ def start end def create_listen_task(delay: 0) - @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_listening| + @task = Concurrent::ScheduledTask.new(delay, args: [@recipients, @running, @executor, @enable_listening, @connected, @listening], executor: @executor) do |thr_recipients, thr_running, thr_executor, thr_enable_listening, thr_connected, thr_listening| with_connection do + thr_connected.set + begin Rails.application.executor.wrap do run_callbacks :listen do if thr_enable_listening ActiveSupport::Notifications.instrument("notifier_listen.good_job") do connection.execute("LISTEN #{CHANNEL}") - thr_listening.make_true + thr_listening.set end end end @@ -216,7 +232,7 @@ def create_listen_task(delay: 0) run_callbacks :unlisten do if thr_enable_listening ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - thr_listening.make_false + thr_listening.reset connection.execute("UNLISTEN *") end end @@ -237,11 +253,9 @@ def with_connection end end connection.execute("SET application_name = #{connection.quote(self.class.name)}") - @connected.make_true yield ensure - @connected.make_false connection&.disconnect! self.connection = nil end diff --git a/lib/good_job/probe_server.rb b/lib/good_job/probe_server.rb index 197cf0bbf..0e1553636 100644 --- a/lib/good_job/probe_server.rb +++ b/lib/good_job/probe_server.rb @@ -37,7 +37,7 @@ def call(env) started ? [200, {}, ["Started"]] : [503, {}, ["Not started"]] when '/status/connected' connected = GoodJob::Scheduler.instances.any? && GoodJob::Scheduler.instances.all?(&:running?) && - GoodJob::Notifier.instances.any? && GoodJob::Notifier.instances.all?(&:listening?) + GoodJob::Notifier.instances.any? && GoodJob::Notifier.instances.all?(&:connected?) connected ? [200, {}, ["Connected"]] : [503, {}, ["Not connected"]] else [404, {}, ["Not found"]] diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index ae3811ebf..2f2bef2e7 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -25,56 +25,69 @@ describe '#connected?' do it 'becomes true when the notifier is connected' do notifier = described_class.new(enable_listening: true) - sleep_until(max: 5) { notifier.connected? } + expect(notifier.connected?(timeout: 5)).to be true expect do notifier.shutdown end.to change(notifier, :connected?).from(true).to(false) end + + it 'remains true through multiple connection errors until CONNECTION_ERRORS_REPORTING_THRESHOLD is reached' do + error_event = Concurrent::Event.new + allow(GoodJob).to receive(:_on_thread_error) { error_event.set } + + stub_const('GoodJob::Notifier::WAIT_INTERVAL', 0.1) + stub_const('GoodJob::Notifier::RECONNECT_INTERVAL', 0.1) + stub_const('GoodJob::Notifier::CONNECTION_ERRORS_REPORTING_THRESHOLD', 3) + + notifier = described_class.new(enable_listening: true) + expect(notifier.connected?(timeout: 5)).to be true + allow(notifier).to receive(:wait_for_notify).and_raise(ActiveRecord::ConnectionTimeoutError) + error_event.wait(5) + expect(notifier).not_to be_connected + end end describe '#listen' do it 'loops until it receives a command' do - stub_const 'RECEIVED_MESSAGE', Concurrent::AtomicBoolean.new(false) - - recipient = proc { |_payload| RECEIVED_MESSAGE.make_true } + event = Concurrent::Event.new + recipient = proc { |_payload| event.set } notifier = described_class.new(recipient, enable_listening: true) - sleep_until(max: 5) { notifier.listening? } + notifier.listening?(timeout: 5) + described_class.notify(true) - sleep_until(max: 5) { RECEIVED_MESSAGE.true? } - notifier.shutdown + expect(event.wait(5)).to be true - expect(RECEIVED_MESSAGE.true?).to be true + notifier.shutdown end it 'loops but does not receive a command if listening is not enabled' do - stub_const 'RECEIVED_MESSAGE', Concurrent::AtomicBoolean.new(false) - - recipient = proc { |_payload| RECEIVED_MESSAGE.make_true } + latch = Concurrent::CountDownLatch.new(1) + recipient = proc { |_payload| latch.count_down } notifier = described_class.new(recipient, enable_listening: false) - expect(notifier.listening?).to be false - described_class.notify(true) - sleep_until(max: 1) { RECEIVED_MESSAGE.false? } + + expect(notifier.connected?(timeout: 5)).to be true + expect(notifier.listening?(timeout: 1)).to be false + sleep 1 notifier.shutdown - expect(RECEIVED_MESSAGE.false?).to be true + expect(latch.count).to eq 1 end shared_examples 'calls refresh_if_stale on every tick' do specify do - stub_const 'REFRESH_IF_STALE_CALLED', Concurrent::AtomicFixnum.new(0) - - allow_any_instance_of(GoodJob::Process).to receive(:refresh_if_stale) { REFRESH_IF_STALE_CALLED.increment } + refreshes = Concurrent::AtomicFixnum.new(0) + allow_any_instance_of(GoodJob::Process).to receive(:refresh_if_stale) { refreshes.increment } recipient = proc {} notifier = described_class.new(recipient, enable_listening: true) - sleep_until(max: 5) { notifier.listening? } + expect(notifier).to be_listening(timeout: 2) described_class.notify(true) - sleep_until(max: 5) { REFRESH_IF_STALE_CALLED.value > 0 } - notifier.shutdown - expect(REFRESH_IF_STALE_CALLED.value).to be > 0 + wait_until(max: 5) { expect(refreshes.value).to be > 0 } + + notifier.shutdown end end @@ -98,7 +111,7 @@ allow(JSON).to receive(:parse).and_raise ExpectedError notifier = described_class.new(enable_listening: true) - sleep_until { notifier.listening? } + expect(notifier).to be_listening(timeout: 2) described_class.notify(true) wait_until { expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) } @@ -114,7 +127,7 @@ allow(JSON).to receive(:parse).and_raise ExpectedError notifier = described_class.new(enable_listening: true) - sleep_until { notifier.listening? } + expect(notifier).to be_listening(timeout: 2) described_class.notify(true) wait_until { expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) } diff --git a/spec/lib/good_job/probe_server_spec.rb b/spec/lib/good_job/probe_server_spec.rb index b4687481a..9faf5adb8 100644 --- a/spec/lib/good_job/probe_server_spec.rb +++ b/spec/lib/good_job/probe_server_spec.rb @@ -82,12 +82,25 @@ end end - context 'when there are running schedulers and listening notifiers' do + context 'when there are running schedulers but disconnected notifiers' do it 'returns 200' do scheduler = instance_double(GoodJob::Scheduler, running?: true, shutdown: true, shutdown?: true) GoodJob::Scheduler.instances << scheduler - notifier = instance_double(GoodJob::Notifier, listening?: true, shutdown: true, shutdown?: true) + notifier = instance_double(GoodJob::Notifier, connected?: false, shutdown: true, shutdown?: true) + GoodJob::Notifier.instances << notifier + + response = probe_server.call(env) + expect(response[0]).to eq(503) + end + end + + context 'when there are running schedulers and connected notifiers' do + it 'returns 200' do + scheduler = instance_double(GoodJob::Scheduler, running?: true, shutdown: true, shutdown?: true) + GoodJob::Scheduler.instances << scheduler + + notifier = instance_double(GoodJob::Notifier, connected?: true, shutdown: true, shutdown?: true) GoodJob::Notifier.instances << notifier response = probe_server.call(env)