Skip to content

Commit

Permalink
feature: implement depends_on parameter to spawn processes only when …
Browse files Browse the repository at this point in the history
…all dependees are in RUNNING state
  • Loading branch information
Michael Hammann committed Apr 13, 2022
1 parent 16f069c commit d524285
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 13 deletions.
10 changes: 8 additions & 2 deletions supervisor/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,8 @@ def get(section, opt, *args, **kwargs):
serverurl = get(section, 'serverurl', None)
if serverurl and serverurl.strip().upper() == 'AUTO':
serverurl = None
depends_on = get(section, 'depends_on', None)
spawn_timeout = int(get(section, 'spawn_timeout', 60))

# find uid from "user" option
user = get(section, 'user', None)
Expand Down Expand Up @@ -1055,7 +1057,10 @@ def get(section, opt, *args, **kwargs):
exitcodes=exitcodes,
redirect_stderr=redirect_stderr,
environment=environment,
serverurl=serverurl)
serverurl=serverurl,
depends_on=depends_on,
spawn_timeout=spawn_timeout,
)

programs.append(pconfig)

Expand Down Expand Up @@ -1873,7 +1878,8 @@ class ProcessConfig(Config):
'stderr_events_enabled', 'stderr_syslog',
'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup',
'exitcodes', 'redirect_stderr' ]
optional_param_names = [ 'environment', 'serverurl' ]
optional_param_names = [ 'environment', 'serverurl',
'depends_on', 'spawn_timeout' ]

def __init__(self, options, **params):
self.options = options
Expand Down
36 changes: 27 additions & 9 deletions supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,29 @@ def record_spawnerr(self, msg):
self.spawnerr = msg
self.config.options.logger.info("spawnerr: %s" % msg)

def spawn(self):
def queue_all_dependee_processes(self, supervisor):
if (self.config.name not in supervisor.process_spawn_dict.keys() and
self.config.name not in supervisor.process_started_dict.keys()):
supervisor.process_spawn_dict[self.config.name] = self
if self.config.depends_on is not None:
for dependee in self.config.depends_on.values():
if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING:
if (dependee.config.name not in supervisor.process_spawn_dict.keys() and
dependee.config.name not in supervisor.process_started_dict.keys()):
supervisor.process_spawn_dict[dependee.config.name] = dependee
dependee.queue_all_dependee_processes(supervisor)

def spawn(self, supervisor=None):
"""Start the subprocess. It must not be running already.
Return the process id. If the fork() call fails, return None.
"""
if self.config.depends_on is not None:
if any([dependee.state is not ProcessStates.RUNNING for dependee in
self.config.depends_on.values()]):
self.queue_all_dependee_processes(supervisor)
return

options = self.config.options
processname = as_string(self.config.name)

Expand Down Expand Up @@ -647,7 +665,7 @@ def __repr__(self):
def get_state(self):
return self.state

def transition(self):
def transition(self, supervisor=None):
now = time.time()
state = self.state

Expand All @@ -659,22 +677,22 @@ def transition(self):
# dont start any processes if supervisor is shutting down
if state == ProcessStates.EXITED:
if self.config.autorestart:
# STOPPED -> STARTING
if self.config.autorestart is RestartUnconditionally:
# EXITED -> STARTING
self.spawn()
self.spawn(supervisor)
else: # autorestart is RestartWhenExitUnexpected
if self.exitstatus not in self.config.exitcodes:
# EXITED -> STARTING
self.spawn()
self.spawn(supervisor)
elif state == ProcessStates.STOPPED and not self.laststart:
if self.config.autostart:
# STOPPED -> STARTING
self.spawn()
self.spawn(supervisor)
elif state == ProcessStates.BACKOFF:
if self.backoff <= self.config.startretries:
if now > self.delay:
# BACKOFF -> STARTING
self.spawn()
self.spawn(supervisor)

processname = as_string(self.config.name)
if state == ProcessStates.STARTING:
Expand Down Expand Up @@ -836,9 +854,9 @@ def before_remove(self):
pass

class ProcessGroup(ProcessGroupBase):
def transition(self):
def transition(self, supervisor=None):
for proc in self.processes.values():
proc.transition()
proc.transition(supervisor)

class FastCGIProcessGroup(ProcessGroup):

Expand Down
11 changes: 10 additions & 1 deletion supervisor/rpcinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ def startProcess(self, name, wait=True):
@return boolean result Always true unless error
"""
## check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state
group, process = self._getGroupAndProcess(name)

self._update('startProcess')
group, process = self._getGroupAndProcess(name)
if process is None:
Expand All @@ -303,7 +306,11 @@ def startProcess(self, name, wait=True):
raise RPCError(Faults.FAILED,
"%s is in an unknown process state" % name)

process.spawn()
process.spawn(self.supervisord)
# if process has dependees, return succesfull start -
# errors will be handled in main loop and inside process.spawn()
if process.config.depends_on is not None:
return True

# We call reap() in order to more quickly obtain the side effects of
# process.finish(), which reap() eventually ends up calling. This
Expand Down Expand Up @@ -592,6 +599,8 @@ def getAllConfigInfo(self):
'stderr_logfile_backups': pconfig.stderr_logfile_backups,
'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
'stderr_syslog': pconfig.stderr_syslog,
'depends_on': pconfig.depends_on,
'spawn_timeout': pconfig.spawn_timeout,
}
# no support for these types in xml-rpc
d.update((k, 'auto') for k, v in d.items() if v is Automatic)
Expand Down
115 changes: 114 additions & 1 deletion supervisor/supervisord.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
from supervisor import events
from supervisor.states import SupervisorStates
from supervisor.states import getProcessStateDescription
from supervisor.graphutils import Graph

from supervisor.states import ProcessStates

class Supervisor:
stopping = False # set after we detect that we are handling a stop request
Expand All @@ -55,6 +58,8 @@ def __init__(self, options):
self.options = options
self.process_groups = {}
self.ticks = {}
self.process_spawn_dict = dict()
self.process_started_dict = dict()

def main(self):
if not self.options.first:
Expand Down Expand Up @@ -84,6 +89,29 @@ def run(self):
try:
for config in self.options.process_group_configs:
self.add_process_group(config)
# add processes to directed graph, to check for dependency cycles
g = Graph(len(self.options.process_group_configs))
# replace depends_on string with actual process object
for config in (self.options.process_group_configs):
# check dependencies for all programs in group:
for conf in enumerate(config.process_configs):
if config.process_configs[conf[0]].depends_on is not None:
process_dict=dict({})
# split to get all processes in case there are multiple dependencies
dependent_processes = (config.process_configs[conf[0]].depends_on).split()
for process in dependent_processes:
# this can be of form group:process or simply process
try:
dependent_group, dependent_process=process.split(":")
except:
dependent_group=dependent_process=process
g.addEdge(config.process_configs[conf[0]].name, dependent_process)
process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process]
config.process_configs[conf[0]].depends_on = process_dict
# check for cyclical process dependencies
if g.cyclic() == 1:
raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!')

self.options.openhttpservers(self)
self.options.setsignals()
if (not self.options.nodaemon) and self.options.first:
Expand Down Expand Up @@ -239,7 +267,10 @@ def runforever(self):
combined_map[fd].handle_error()

for group in pgroups:
group.transition()
group.transition(self)

self._spawn_dependee_queue()
self._handle_spawn_timeout()

self.reap()
self.handle_signal()
Expand Down Expand Up @@ -316,6 +347,88 @@ def handle_signal(self):
def get_state(self):
return self.options.mood

def _spawn_dependee_queue(self):
"""
Iterate over processes that are not started but added to
process_spawn_dict. Spawn all processes which are ready
(All dependees RUNNING or process without dependees)
"""
if self.process_spawn_dict:
for process_name, process_object in list(self.process_spawn_dict.items()):
if process_object.config.depends_on is not None:
if any([dependee.state is ProcessStates.FATAL for dependee in
process_object.config.depends_on.values()]):
self._set_fatal_state_and_empty_queue()
break
if all([dependee.state is ProcessStates.RUNNING for dependee in
process_object.config.depends_on.values()]):
self._spawn_process_from_process_dict(process_name, process_object)
else:
self._spawn_process_from_process_dict(process_name, process_object)

def _spawn_process_from_process_dict(self, process_name, process_object):
self.process_started_dict[process_name] = process_object
del self.process_spawn_dict[process_name]
# only spawn if the process is not running yet (could be started in the meanwhile)
if (process_object.state is not ProcessStates.STARTING and
process_object.state is not ProcessStates.RUNNING):
process_object.spawn(self)
process_object.notify_timer = 5

def _set_fatal_state_and_empty_queue(self):
for process_name, process_object in self.process_spawn_dict.items():
process_object.record_spawnerr(
'Dependee process did not start - set FATAL state for {}'
.format(process_name))
process_object.change_state(ProcessStates.FATAL)
self.process_spawn_set = set()
self.process_spawn_dict = dict()

def _handle_spawn_timeout(self):
"""
Log info message each 5 seconds if some process is waiting on a dependee
Timeout if a process needs longer than spawn_timeout (default=60 seconds)
to reach RUNNING
"""
# check if any of the processes that was started did not make it and remove RUNNING ones.
if self.process_started_dict:
for process_name, process_object in list(self.process_started_dict.items()):
if process_object.state is ProcessStates.RUNNING:
del self.process_started_dict[process_name]
# handle timeout error.
elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout:
self._timeout_process(process_name, process_object)
# notify user about waiting
elif (time.time() - process_object.laststart) >= process_object.notify_timer:
self._notfiy_user_about_waiting(process_name, process_object)

def _timeout_process(self, process_name, process_object):
msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned"
.format(process_name,
getProcessStateDescription(process_object.state),
process_object.config.spawn_timeout,
[process for process in self.process_spawn_dict.keys()]))
process_object.config.options.logger.warn(msg)
process_object.record_spawnerr(
'timeout: Process {} did not reach RUNNING state within {} seconds'
.format(process_name,
process_object.config.spawn_timeout))
process_object.change_state(ProcessStates.FATAL)
for process_name, process_object in self.process_spawn_dict.items():
process_object.record_spawnerr(
'Dependee process did not start - set FATAL state for {}'
.format(process_name))
process_object.change_state(ProcessStates.FATAL)
self.process_spawn_dict = dict()
self.process_started_dict = dict()

def _notfiy_user_about_waiting(self, process_name, process_object):
process_object.notify_timer += 5
msg = ("waiting for dependee process {} in {} state to be RUNNING"
.format(process_name,
getProcessStateDescription(process_object.state)))
process_object.config.options.logger.info(msg)

def timeslice(period, when):
return int(when - (when % period))

Expand Down

0 comments on commit d524285

Please sign in to comment.