diff --git a/main_2024.py b/main_2024.py index b34c522a..d68adaa0 100644 --- a/main_2024.py +++ b/main_2024.py @@ -344,6 +344,13 @@ def main() -> int: manager.start_workers() while True: + for manager in worker_managers: + result = manager.check_and_restart_dead_workers() + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to restart workers", frame) + return -1 + try: geolocation_data = geolocation_to_main_queue.queue.get_nowait() except queue.Empty: diff --git a/modules/common b/modules/common index cc6384ec..b55dfd3b 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit cc6384ecbcf1ff02edc610dcc5accf0d732c1c66 +Subproject commit b55dfd3b26309f78d1fb0012060fa49ffc736f8d diff --git a/utilities/workers/worker_manager.py b/utilities/workers/worker_manager.py index 37d79282..c4edf505 100644 --- a/utilities/workers/worker_manager.py +++ b/utilities/workers/worker_manager.py @@ -106,6 +106,18 @@ def get_worker_target(self) -> "(...) -> object": # type: ignore """ return self.__target + def get_input_queues(self) -> "list[queue_proxy_wrapper.QueueProxyWrapper]": + """ + Returns the input queues. + """ + return self.__input_queues + + def get_target_name(self) -> str: + """ + Returns the name of the target. + """ + return self.__target.__name__ + class WorkerManager: """ @@ -146,12 +158,16 @@ def create( return True, WorkerManager( cls.__create_key, workers, + worker_properties, + local_logger, ) def __init__( self, class_private_create_key: object, workers: "list[mp.Process]", + worker_properties: WorkerProperties, + local_logger: logger.Logger, ) -> None: """ Private constructor, use create() method. @@ -159,6 +175,8 @@ def __init__( assert class_private_create_key is WorkerManager.__create_key, "Use create() method" self.__workers = workers + self.__worker_properties = worker_properties + self.__local_logger = local_logger @staticmethod def __create_single_worker(target: "(...) -> object", args: "tuple", local_logger: logger.Logger) -> "tuple[bool, mp.Process | None]": # type: ignore @@ -195,3 +213,48 @@ def join_workers(self) -> None: """ for worker in self.__workers: worker.join() + + def check_and_restart_dead_workers(self) -> bool: + """ + Check and restart dead workers. + + Returns whether the dead workers were able to be restarted. + """ + new_workers = [] + for worker in self.__workers: + if worker.is_alive(): + new_workers.append(worker) + continue + + # Log dead worker + frame = inspect.currentframe() + target_and_worker_name = f"{self.__worker_properties.get_target_name()} {worker.name}" + self.__local_logger.warning( + f"Worker died, restarting {target_and_worker_name}", + frame, + ) + + # Create a new worker + result, new_worker = WorkerManager.__create_single_worker( + self.__worker_properties.get_worker_target(), + self.__worker_properties.get_worker_arguments(), + self.__local_logger, + ) + if not result: + frame = inspect.currentframe() + self.__local_logger.error(f"Failed to restart {target_and_worker_name}", frame) + return False + + # Append the new worker + new_workers.append(new_worker) + + self.__workers = new_workers + + # Draining the preceding queue ensures that the preceding queue data wasn't what + # caused the worker to fail. Draining the succeeding queues is not needed + # because a worker that died would not have put bad data into the queue. + input_queues = self.__worker_properties.get_input_queues() + for queue in input_queues: + queue.drain_queue() + + return True