Skip to content

Commit

Permalink
fix: improve handling when spawn_timeout for dependent process is rea…
Browse files Browse the repository at this point in the history
…ched
  • Loading branch information
Michael Hammann committed Apr 13, 2022
1 parent f04970d commit d1a8866
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 28 deletions.
2 changes: 1 addition & 1 deletion supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def spawn(self, supervisor=None):
Return the process id. If the fork() call fails, return None.
"""
if self.config.depends_on is not None:
if self.config.depends_on is not None and not supervisor.abort_queing :
if any([dependee.state is not ProcessStates.RUNNING for dependee in
self.config.depends_on.values()]):
self.queue_all_dependee_processes(supervisor)
Expand Down
75 changes: 48 additions & 27 deletions supervisor/supervisord.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self, options):
self.ticks = {}
self.process_spawn_dict = dict()
self.process_started_dict = dict()
self.abort_queing = None

def main(self):
if not self.options.first:
Expand Down Expand Up @@ -90,7 +91,7 @@ def run(self):
for config in self.options.process_group_configs:
self.add_process_group(config)
# add processes to directed graph, to check for dependency cycles
g = Graph(len(self.options.process_group_configs))
self.g = Graph(len(self.options.process_group_configs))
# replace depends_on string with actual process object
for config in (self.options.process_group_configs):
# check dependencies for all programs in group:
Expand All @@ -105,11 +106,11 @@ def run(self):
dependent_group, dependent_process=process.split(":")
except:
dependent_group=dependent_process=process
g.addEdge(config.process_configs[conf[0]].name, dependent_process)
self.g.addEdge(config.process_configs[conf[0]].name, dependent_process)
process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process]
config.process_configs[conf[0]].depends_on = process_dict
# check for cyclical process dependencies
if g.cyclic() == 1:
if self.g.cyclic() == 1:
raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!')

self.options.openhttpservers(self)
Expand Down Expand Up @@ -356,16 +357,23 @@ def _spawn_dependee_queue(self):
if self.process_spawn_dict:
for process_name, process_object in list(self.process_spawn_dict.items()):
if process_object.config.depends_on is not None:
if any([dependee.state is ProcessStates.FATAL for dependee in
process_object.config.depends_on.values()]):
self._set_fatal_state_and_empty_queue()
if self._any_dependee_failed(process_object):
self._empty_queue()
break
if all([dependee.state is ProcessStates.RUNNING for dependee in
process_object.config.depends_on.values()]):
if self._all_dependees_running(process_object):
self._spawn_process_from_process_dict(process_name, process_object)
else:
self._spawn_process_from_process_dict(process_name, process_object)

def _any_dependee_failed(self, process_object):
return any([dependee.state is ProcessStates.BACKOFF or dependee.state
is ProcessStates.FATAL for dependee in
process_object.config.depends_on.values()])

def _all_dependees_running(self, process_object):
return all([dependee.state is ProcessStates.RUNNING for dependee in
process_object.config.depends_on.values()])

def _spawn_process_from_process_dict(self, process_name, process_object):
self.process_started_dict[process_name] = process_object
del self.process_spawn_dict[process_name]
Expand All @@ -375,12 +383,7 @@ def _spawn_process_from_process_dict(self, process_name, process_object):
process_object.spawn(self)
process_object.notify_timer = 5

def _set_fatal_state_and_empty_queue(self):
for process_name, process_object in self.process_spawn_dict.items():
process_object.record_spawnerr(
'Dependee process did not start - set FATAL state for {}'
.format(process_name))
process_object.change_state(ProcessStates.FATAL)
def _empty_queue(self):
self.process_spawn_set = set()
self.process_spawn_dict = dict()

Expand All @@ -390,37 +393,55 @@ def _handle_spawn_timeout(self):
Timeout if a process needs longer than spawn_timeout (default=60 seconds)
to reach RUNNING
"""
# check if any of the processes that was started did not make it and remove RUNNING ones.
# check if any of the processes which was started did not make it and
# remove RUNNING processes from the process_started_dict.
if self.abort_queing is not None:
self.abort_queing.change_state(ProcessStates.FATAL)
self.abort_queing = None
if self.process_started_dict:
for process_name, process_object in list(self.process_started_dict.items()):
if process_object.state is ProcessStates.RUNNING:
del self.process_started_dict[process_name]
# handle timeout error.
elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout:
self._timeout_process(process_name, process_object)
del self.process_started_dict[process_name]
break
# notify user about waiting
elif (time.time() - process_object.laststart) >= process_object.notify_timer:
self._notfiy_user_about_waiting(process_name, process_object)

def _timeout_process(self, process_name, process_object):
msg = ("timeout: dependee process {} in {} did not reach RUNNING within {} seconds, dependees {} are not spawned"
msg = ("timeout: dependee process {} in {} did not reach RUNNING within"
" {} seconds, checking now if dependees {} can be spawned"
.format(process_name,
getProcessStateDescription(process_object.state),
process_object.config.spawn_timeout,
[process for process in self.process_spawn_dict.keys()]))
process_object.config.options.logger.warn(msg)
process_object.record_spawnerr(
'timeout: Process {} did not reach RUNNING state within {} seconds'
.format(process_name,
process_object.config.spawn_timeout))
process_object.change_state(ProcessStates.FATAL)
process_object.stop()
# keep track of the process that timed out - will be set to FATAL in
# the next iteration
self.abort_queing = process_object
keys_to_remove = []
for process_name, process_object in self.process_spawn_dict.items():
process_object.record_spawnerr(
'Dependee process did not start - set FATAL state for {}'
.format(process_name))
process_object.change_state(ProcessStates.FATAL)
self.process_spawn_dict = dict()
self.process_started_dict = dict()
if self.g.connected(process_name, self.abort_queing.config.name):
keys_to_remove.append(process_name)
msg = ("{} will not be spawned, because {} did not "
"successfully start".format(process_name, self.abort_queing.config.name))
process_object.config.options.logger.warn(msg)
for key in keys_to_remove:
del self.process_spawn_dict[key]
keys_to_remove = []
for process_name, process_object in self.process_started_dict.items():
if self.g.connected(process_name, self.abort_queing.config.name):
keys_to_remove.append(process_name)
msg = ("stopping {}, because {} did not successfully "
"start".format(process_name, self.abort_queing.config.name))
process_object.config.options.logger.warn(msg)
process_object.stop()
for key in keys_to_remove:
del self.process_started_dict[key]

def _notfiy_user_about_waiting(self, process_name, process_object):
process_object.notify_timer += 5
Expand Down

0 comments on commit d1a8866

Please sign in to comment.