diff --git a/lib/qless/worker/base.rb b/lib/qless/worker/base.rb index 300f0ac2..5d442ddd 100644 --- a/lib/qless/worker/base.rb +++ b/lib/qless/worker/base.rb @@ -84,23 +84,29 @@ def register_signal_handlers def jobs return Enumerator.new do |enum| loop do - begin - job = reserver.reserve - rescue Exception => error - # We want workers to durably stay up, so we don't want errors - # during job reserving (e.g. network timeouts, etc) to kill the - # worker. - log(:error, - "Error reserving job: #{error.class}: #{error.message}") - end - - # If we ended up getting a job, yield it. Otherwise, we wait - if job.nil? - no_job_available + # So long as we're paused, we should wait + if paused + log(:debug, 'Paused...') + sleep interval else - self.current_job = job - enum.yield(job) - self.current_job = nil + begin + job = reserver.reserve + rescue Exception => error + # We want workers to durably stay up, so we don't want errors + # during job reserving (e.g. network timeouts, etc) to kill the + # worker. + log(:error, + "Error reserving job: #{error.class}: #{error.message}") + end + + # If we ended up getting a job, yield it. Otherwise, we wait + if job.nil? + no_job_available + else + self.current_job = job + enum.yield(job) + self.current_job = nil + end end break if @shutdown diff --git a/lib/qless/worker/serial.rb b/lib/qless/worker/serial.rb index 4807d20d..bd23b05b 100644 --- a/lib/qless/worker/serial.rb +++ b/lib/qless/worker/serial.rb @@ -27,12 +27,6 @@ def run log(:debug, "Starting job #{job.klass_name} (#{job.jid} from #{job.queue_name})") perform(job) log(:debug, "Finished job #{job.klass_name} (#{job.jid} from #{job.queue_name})") - - # So long as we're paused, we should wait - while paused - log(:debug, 'Paused...') - sleep interval - end end end end diff --git a/spec/integration/workers/serial_spec.rb b/spec/integration/workers/serial_spec.rb index 220bcb57..0fcb305a 100644 --- a/spec/integration/workers/serial_spec.rb +++ b/spec/integration/workers/serial_spec.rb @@ -83,6 +83,27 @@ def self.perform(job) expect { |b| worker.send(:with_current_job, &b) }.to yield_with_args(nil) end + it 'can pause a worker and then unpause it' do + job_class = Class.new do + def self.perform(job) + Redis.connect(url: job['redis']).rpush(job['key'], job['word']) + end + end + stub_const('JobClass', job_class) + + worker.pause + + queue.put('JobClass', { redis: redis.client.id, key: key, word: 'hello' }) + + run_worker_concurrently_with(worker) do + expect(redis.brpop(key, timeout: 1)).to eq(nil) + + worker.unpause + + expect(redis.brpop(key, timeout: 1)).to eq([key.to_s, 'hello']) + end + end + context 'when a job times out', :uses_threads do it 'invokes the given callback when the current job is the one that timed out' do callback_invoked = false