diff --git a/supervisor/options.py b/supervisor/options.py index 255c5bfbd..9a275ce0a 100644 --- a/supervisor/options.py +++ b/supervisor/options.py @@ -930,6 +930,8 @@ def get(section, opt, *args, **kwargs): serverurl = get(section, 'serverurl', None) if serverurl and serverurl.strip().upper() == 'AUTO': serverurl = None + depends_on = get(section, 'depends_on', None) + spawn_timeout = int(get(section, 'spawn_timeout', 60)) # find uid from "user" option user = get(section, 'user', None) @@ -1055,7 +1057,10 @@ def get(section, opt, *args, **kwargs): exitcodes=exitcodes, redirect_stderr=redirect_stderr, environment=environment, - serverurl=serverurl) + serverurl=serverurl, + depends_on=depends_on, + spawn_timeout=spawn_timeout, + ) programs.append(pconfig) @@ -1873,7 +1878,8 @@ class ProcessConfig(Config): 'stderr_events_enabled', 'stderr_syslog', 'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup', 'exitcodes', 'redirect_stderr' ] - optional_param_names = [ 'environment', 'serverurl' ] + optional_param_names = [ 'environment', 'serverurl', + 'depends_on', 'spawn_timeout' ] def __init__(self, options, **params): self.options = options diff --git a/supervisor/process.py b/supervisor/process.py index be7e81c27..4a8779cb8 100644 --- a/supervisor/process.py +++ b/supervisor/process.py @@ -188,11 +188,29 @@ def record_spawnerr(self, msg): self.spawnerr = msg self.config.options.logger.info("spawnerr: %s" % msg) - def spawn(self): + def queue_all_dependee_processes(self, supervisor): + if (self.config.name not in supervisor.process_spawn_dict.keys() and + self.config.name not in supervisor.process_started_dict.keys()): + supervisor.process_spawn_dict[self.config.name] = self + if self.config.depends_on is not None: + for dependee in self.config.depends_on.values(): + if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING: + if (dependee.config.name not in supervisor.process_spawn_dict.keys() and + dependee.config.name not in supervisor.process_started_dict.keys()): + supervisor.process_spawn_dict[dependee.config.name] = dependee + dependee.queue_all_dependee_processes(supervisor) + + def spawn(self, supervisor=None): """Start the subprocess. It must not be running already. Return the process id. If the fork() call fails, return None. """ + if self.config.depends_on is not None: + if any([dependee.state is not ProcessStates.RUNNING for dependee in + self.config.depends_on.values()]): + self.queue_all_dependee_processes(supervisor) + return + options = self.config.options processname = as_string(self.config.name) @@ -647,7 +665,7 @@ def __repr__(self): def get_state(self): return self.state - def transition(self): + def transition(self, supervisor=None): now = time.time() state = self.state @@ -659,22 +677,22 @@ def transition(self): # dont start any processes if supervisor is shutting down if state == ProcessStates.EXITED: if self.config.autorestart: + # STOPPED -> STARTING if self.config.autorestart is RestartUnconditionally: # EXITED -> STARTING - self.spawn() + self.spawn(supervisor) else: # autorestart is RestartWhenExitUnexpected if self.exitstatus not in self.config.exitcodes: # EXITED -> STARTING - self.spawn() + self.spawn(supervisor) elif state == ProcessStates.STOPPED and not self.laststart: if self.config.autostart: - # STOPPED -> STARTING - self.spawn() + self.spawn(supervisor) elif state == ProcessStates.BACKOFF: if self.backoff <= self.config.startretries: if now > self.delay: # BACKOFF -> STARTING - self.spawn() + self.spawn(supervisor) processname = as_string(self.config.name) if state == ProcessStates.STARTING: @@ -836,9 +854,9 @@ def before_remove(self): pass class ProcessGroup(ProcessGroupBase): - def transition(self): + def transition(self, supervisor=None): for proc in self.processes.values(): - proc.transition() + proc.transition(supervisor) class FastCGIProcessGroup(ProcessGroup): diff --git a/supervisor/rpcinterface.py b/supervisor/rpcinterface.py index 854b7285f..b6be7270f 100644 --- a/supervisor/rpcinterface.py +++ b/supervisor/rpcinterface.py @@ -281,6 +281,9 @@ def startProcess(self, name, wait=True): @return boolean result Always true unless error """ + ## check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state + group, process = self._getGroupAndProcess(name) + self._update('startProcess') group, process = self._getGroupAndProcess(name) if process is None: @@ -303,7 +306,11 @@ def startProcess(self, name, wait=True): raise RPCError(Faults.FAILED, "%s is in an unknown process state" % name) - process.spawn() + process.spawn(self.supervisord) + # if process has dependees, return succesfull start - + # errors will be handled in main loop and inside process.spawn() + if process.config.depends_on is not None: + return True # We call reap() in order to more quickly obtain the side effects of # process.finish(), which reap() eventually ends up calling. This @@ -592,6 +599,8 @@ def getAllConfigInfo(self): 'stderr_logfile_backups': pconfig.stderr_logfile_backups, 'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes, 'stderr_syslog': pconfig.stderr_syslog, + 'depends_on': pconfig.depends_on, + 'spawn_timeout': pconfig.spawn_timeout, } # no support for these types in xml-rpc d.update((k, 'auto') for k, v in d.items() if v is Automatic) diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index 138732dd4..749f532bd 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -44,6 +44,9 @@ from supervisor import events from supervisor.states import SupervisorStates from supervisor.states import getProcessStateDescription +from supervisor.graphutils import Graph + +from supervisor.states import ProcessStates class Supervisor: stopping = False # set after we detect that we are handling a stop request @@ -55,6 +58,8 @@ def __init__(self, options): self.options = options self.process_groups = {} self.ticks = {} + self.process_spawn_dict = dict() + self.process_started_dict = dict() def main(self): if not self.options.first: @@ -84,6 +89,29 @@ def run(self): try: for config in self.options.process_group_configs: self.add_process_group(config) + # add processes to directed graph, to check for dependency cycles + g = Graph(len(self.options.process_group_configs)) + # replace depends_on string with actual process object + for config in (self.options.process_group_configs): + # check dependencies for all programs in group: + for conf in enumerate(config.process_configs): + if config.process_configs[conf[0]].depends_on is not None: + process_dict=dict({}) + # split to get all processes in case there are multiple dependencies + dependent_processes = (config.process_configs[conf[0]].depends_on).split() + for process in dependent_processes: + # this can be of form group:process or simply process + try: + dependent_group, dependent_process=process.split(":") + except: + dependent_group=dependent_process=process + g.addEdge(config.process_configs[conf[0]].name, dependent_process) + process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process] + config.process_configs[conf[0]].depends_on = process_dict + # check for cyclical process dependencies + if g.cyclic() == 1: + raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!') + self.options.openhttpservers(self) self.options.setsignals() if (not self.options.nodaemon) and self.options.first: @@ -239,7 +267,10 @@ def runforever(self): combined_map[fd].handle_error() for group in pgroups: - group.transition() + group.transition(self) + + self._spawn_dependee_queue() + self._handle_spawn_timeout() self.reap() self.handle_signal() @@ -316,6 +347,88 @@ def handle_signal(self): def get_state(self): return self.options.mood + def _spawn_dependee_queue(self): + """ + Iterate over processes that are not started but added to + process_spawn_dict. Spawn all processes which are ready + (All dependees RUNNING or process without dependees) + """ + if self.process_spawn_dict: + for process_name, process_object in list(self.process_spawn_dict.items()): + if process_object.config.depends_on is not None: + if any([dependee.state is ProcessStates.FATAL for dependee in + process_object.config.depends_on.values()]): + self._set_fatal_state_and_empty_queue() + break + if all([dependee.state is ProcessStates.RUNNING for dependee in + process_object.config.depends_on.values()]): + self._spawn_process_from_process_dict(process_name, process_object) + else: + self._spawn_process_from_process_dict(process_name, process_object) + + def _spawn_process_from_process_dict(self, process_name, process_object): + self.process_started_dict[process_name] = process_object + del self.process_spawn_dict[process_name] + # only spawn if the process is not running yet (could be started in the meanwhile) + if (process_object.state is not ProcessStates.STARTING and + process_object.state is not ProcessStates.RUNNING): + process_object.spawn(self) + process_object.notify_timer = 5 + + def _set_fatal_state_and_empty_queue(self): + for process_name, process_object in self.process_spawn_dict.items(): + process_object.record_spawnerr( + 'Dependee process did not start - set FATAL state for {}' + .format(process_name)) + process_object.change_state(ProcessStates.FATAL) + self.process_spawn_set = set() + self.process_spawn_dict = dict() + + def _handle_spawn_timeout(self): + """ + Log info message each 5 seconds if some process is waiting on a dependee + Timeout if a process needs longer than spawn_timeout (default=60 seconds) + to reach RUNNING + """ + # check if any of the processes that was started did not make it and remove RUNNING ones. + if self.process_started_dict: + for process_name, process_object in list(self.process_started_dict.items()): + if process_object.state is ProcessStates.RUNNING: + del self.process_started_dict[process_name] + # handle timeout error. + elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout: + self._timeout_process(process_name, process_object) + # notify user about waiting + elif (time.time() - process_object.laststart) >= process_object.notify_timer: + self._notfiy_user_about_waiting(process_name, process_object) + + def _timeout_process(self, process_name, process_object): + msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned" + .format(process_name, + getProcessStateDescription(process_object.state), + process_object.config.spawn_timeout, + [process for process in self.process_spawn_dict.keys()])) + process_object.config.options.logger.warn(msg) + process_object.record_spawnerr( + 'timeout: Process {} did not reach RUNNING state within {} seconds' + .format(process_name, + process_object.config.spawn_timeout)) + process_object.change_state(ProcessStates.FATAL) + for process_name, process_object in self.process_spawn_dict.items(): + process_object.record_spawnerr( + 'Dependee process did not start - set FATAL state for {}' + .format(process_name)) + process_object.change_state(ProcessStates.FATAL) + self.process_spawn_dict = dict() + self.process_started_dict = dict() + + def _notfiy_user_about_waiting(self, process_name, process_object): + process_object.notify_timer += 5 + msg = ("waiting for dependee process {} in {} state to be RUNNING" + .format(process_name, + getProcessStateDescription(process_object.state))) + process_object.config.options.logger.info(msg) + def timeslice(period, when): return int(when - (when % period))