diff --git a/lib/qless/worker/forking.rb b/lib/qless/worker/forking.rb index c97082b5..f8a3355e 100644 --- a/lib/qless/worker/forking.rb +++ b/lib/qless/worker/forking.rb @@ -32,6 +32,8 @@ def initialize(reserver, options = {}) @modules = [] @sandbox_mutex = Mutex.new + # A list of [signal_name, exit_parent] + @signal_queue = [] end # Because we spawn a new worker, we need to apply all the modules that @@ -52,27 +54,46 @@ def spawn worker end + # If @sandbox_mutex is free, handles a signal now, shutting down workers + # and possibly the main thread. If @sandbox_mutex is locked, adds the signal + # to the signal queue, to be handled as soon as it can (see + # process_signal_queue as called by the main loop) + def handle_signal(signal_name, exit_parent) + if @sandbox_mutex.try_lock + stop!(signal_name, exit_parent) + @sandbox_mutex.unlock + else + @signal_queue << [signal_name, exit_parent] + end + end + + # Process any signals (such as TERM) that could not be processed + # immediately due to @sandbox_mutex being in use + def process_signal_queue + until @signal_queue.empty? + signal_name, exit_parent = @signal_queue.shift + stop!(signal_name, exit_parent) + end + end + # Register our handling of signals def register_signal_handlers # If we're the parent process, we mostly want to forward the signals on # to the child processes. It's just that sometimes we want to wait for # them and then exit trap('TERM') do - stop!('TERM') - exit + handle_signal('TERM', true) end trap('INT') do - stop!('TERM') - exit + handle_signal('TERM', true) end begin trap('QUIT') do - stop!('QUIT') - exit + handle_signal('QUIT', true) end - trap('USR1') { stop!('KILL') } + trap('USR1') { handle_signal('KILL', false) } trap('USR2') { stop('USR2') } trap('CONT') { stop('CONT') } rescue ArgumentError @@ -87,6 +108,8 @@ def run # Now keep an eye on our child processes, spawn replacements as needed loop do begin + process_signal_queue + # Don't wait on any processes if we're already in shutdown mode. break if @shutdown @@ -125,10 +148,12 @@ def stop(signal = 'QUIT') end end - # Signal all the children and wait for them to exit - def stop!(signal = 'QUIT') + # Signal all the children and wait for them to exit. Optionally have this process (parent) die, too. + # Should only be called when we have the lock on @sandbox_mutex + def stop!(signal = 'QUIT', exit_parent = false) shutdown shutdown_sandboxes(signal) + exit if exit_parent end private @@ -155,33 +180,32 @@ def startup_sandboxes end end + # Should only be called when we have a lock on @sandbox_mutex def shutdown_sandboxes(signal) - @sandbox_mutex.synchronize do - # First, send the signal - stop(signal) - - # Wait for each of our children - log(:warn, 'Waiting for child processes') - - until @sandboxes.empty? - begin - pid, _ = Process.wait2 - log(:warn, "Child #{pid} stopped") - @sandboxes.delete(pid) - rescue SystemCallError - break - end - end + # First, send the signal + stop(signal) - log(:warn, 'All children have stopped') + # Wait for each of our children + log(:warn, 'Waiting for child processes') - # If there were any children processes we couldn't wait for, log it - @sandboxes.keys.each do |cpid| - log(:warn, "Could not wait for child #{cpid}") + until @sandboxes.empty? + begin + pid, _ = Process.wait2 + log(:warn, "Child #{pid} stopped") + @sandboxes.delete(pid) + rescue SystemCallError + break end + end + + log(:warn, 'All children have stopped') - @sandboxes.clear + # If there were any children processes we couldn't wait for, log it + @sandboxes.keys.each do |cpid| + log(:warn, "Could not wait for child #{cpid}") end + + @sandboxes.clear end private