From 55e622abafd82d21062fcad037c24b3975e372fe Mon Sep 17 00:00:00 2001 From: Ashish Agrahari <44419439+AshishA26@users.noreply.github.com> Date: Thu, 11 Jul 2024 10:02:54 -0400 Subject: [PATCH] Move worker management (#180) * Moved worker management code from main to worker manager. * Added worker properties class. --- documentation/main_multiprocess_example.py | 182 +++++++++------- main_2024.py | 241 +++++++++++++++------ utilities/workers/worker_manager.py | 175 +++++++++++++-- 3 files changed, 441 insertions(+), 157 deletions(-) diff --git a/documentation/main_multiprocess_example.py b/documentation/main_multiprocess_example.py index dab157a4..079b75a8 100644 --- a/documentation/main_multiprocess_example.py +++ b/documentation/main_multiprocess_example.py @@ -26,6 +26,11 @@ COUNTUP_TO_ADD_RANDOM_QUEUE_MAX_SIZE = 5 ADD_RANDOM_TO_CONCATENATOR_QUEUE_MAX_SIZE = 5 +# Play with these numbers to see process bottlenecks +COUNTUP_WORKER_COUNT = 2 +ADD_RANDOM_WORKER_COUNT = 2 +CONCATENATOR_WORKER_COUNT = 2 + # main() is required for early return def main() -> int: @@ -69,83 +74,111 @@ def main() -> int: ADD_RANDOM_TO_CONCATENATOR_QUEUE_MAX_SIZE, ) - # Prepare processes - # Data path: countup_worker to add_random_worker to concatenator_workers - # Play with these numbers to see process bottlenecks - countup_workers = [ - mp.Process( - target=countup_worker.countup_worker, - args=( - 3, - 100, - countup_to_add_random_queue, - controller, - ), - ), - mp.Process( - target=countup_worker.countup_worker, - args=( - 2, - 200, - countup_to_add_random_queue, - controller, - ), - ), - ] - countup_manager = worker_manager.WorkerManager(countup_workers) - - add_random_workers = [ - mp.Process( - target=add_random_worker.add_random_worker, - args=( - 252, - 10, - 5, - countup_to_add_random_queue, - add_random_to_concatenator_queue, - controller, - ), - ), - mp.Process( - target=add_random_worker.add_random_worker, - args=( - 350, - 4, - 1, - countup_to_add_random_queue, - add_random_to_concatenator_queue, - controller, - ), + # Worker properties + result, countup_worker_properties = worker_manager.WorkerProperties.create( + count=COUNTUP_WORKER_COUNT, + target=countup_worker.countup_worker, + work_arguments=( + 3, + 100, ), - ] - add_random_manager = worker_manager.WorkerManager(add_random_workers) - - concatenator_workers = [ - mp.Process( - target=concatenator_worker.concatenator_worker, - args=( - "Hello ", - " world!", - add_random_to_concatenator_queue, - controller, - ), + input_queues=[], + output_queues=[countup_to_add_random_queue], + controller=controller, + local_logger=main_logger, + ) + if not result: + print("Failed to create arguments for Countup") + return -1 + + # Get Pylance to stop complaining + assert countup_worker_properties is not None + + result, add_random_worker_properties = worker_manager.WorkerProperties.create( + count=ADD_RANDOM_WORKER_COUNT, + target=add_random_worker.add_random_worker, + work_arguments=( + 252, + 10, + 5, ), - mp.Process( - target=concatenator_worker.concatenator_worker, - args=( - "Example ", - " code!", - add_random_to_concatenator_queue, - controller, - ), + input_queues=[countup_to_add_random_queue], + output_queues=[add_random_to_concatenator_queue], + controller=controller, + local_logger=main_logger, + ) + if not result: + print("Failed to create arguments for Add Random") + return -1 + + # Get Pylance to stop complaining + assert add_random_worker_properties is not None + + result, concatenator_worker_properties = worker_manager.WorkerProperties.create( + count=CONCATENATOR_WORKER_COUNT, + target=concatenator_worker.concatenator_worker, + work_arguments=( + "Hello ", + " world!", ), - ] - concatenator_manager = worker_manager.WorkerManager(concatenator_workers) + input_queues=[add_random_to_concatenator_queue], + output_queues=[], + controller=controller, + local_logger=main_logger, + ) + if not result: + print("Failed to create arguments for Concatenator") + return -1 + + # Get Pylance to stop complaining + assert concatenator_worker_properties is not None + + # Prepare processes + # Data path: countup_worker to add_random_worker to concatenator_workers + worker_managers = [] + + result, countup_manager = worker_manager.WorkerManager.create( + worker_properties=countup_worker_properties, + local_logger=main_logger, + ) + if not result: + print("Failed to create manager for Countup") + return -1 + + # Get Pylance to stop complaining + assert countup_manager is not None + + worker_managers.append(countup_manager) + + result, add_random_manager = worker_manager.WorkerManager.create( + worker_properties=add_random_worker_properties, + local_logger=main_logger, + ) + if not result: + print("Failed to create manager for Add Random") + return -1 + + # Get Pylance to stop complaining + assert add_random_manager is not None + + worker_managers.append(add_random_manager) + + result, concatenator_manager = worker_manager.WorkerManager.create( + worker_properties=concatenator_worker_properties, + local_logger=main_logger, + ) + if not result: + print("Failed to create manager for Concatenator") + return -1 + + # Get Pylance to stop complaining + assert concatenator_manager is not None + + worker_managers.append(concatenator_manager) # Start worker processes - countup_manager.start_workers() - add_random_manager.start_workers() - concatenator_manager.start_workers() + for manager in worker_managers: + manager.start_workers() frame = inspect.currentframe() main_logger.info("Started", frame) @@ -178,9 +211,8 @@ def main() -> int: main_logger.info("Queues cleared", frame) # Clean up worker processes - countup_manager.join_workers() - add_random_manager.join_workers() - concatenator_manager.join_workers() + for manager in worker_managers: + manager.join_workers() frame = inspect.currentframe() main_logger.info("Stopped", frame) diff --git a/main_2024.py b/main_2024.py index 991e9593..cb36ad54 100644 --- a/main_2024.py +++ b/main_2024.py @@ -137,109 +137,217 @@ def main() -> int: QUEUE_MAX_SIZE, ) - video_input_manager = worker_manager.WorkerManager() - video_input_manager.create_workers( - 1, - video_input_worker.video_input_worker, + result, camera_intrinsics = camera_properties.CameraIntrinsics.create( + GEOLOCATION_RESOLUTION_X, + GEOLOCATION_RESOLUTION_Y, + GEOLOCATION_FOV_X, + GEOLOCATION_FOV_Y, + ) + if not result: + frame = inspect.currentframe() + main_logger.error("Error creating camera intrinsics", frame) + return -1 + + result, camera_extrinsics = camera_properties.CameraDroneExtrinsics.create( + ( + GEOLOCATION_CAMERA_POSITION_X, + GEOLOCATION_CAMERA_POSITION_Y, + GEOLOCATION_CAMERA_POSITION_Z, + ), ( + GEOLOCATION_CAMERA_ORIENTATION_YAW, + GEOLOCATION_CAMERA_ORIENTATION_PITCH, + GEOLOCATION_CAMERA_ORIENTATION_ROLL, + ), + ) + if not result: + frame = inspect.currentframe() + main_logger.error("Error creating camera extrinsics", frame) + return -1 + + # Worker properties + result, video_input_worker_properties = worker_manager.WorkerProperties.create( + count=1, + target=video_input_worker.video_input_worker, + work_arguments=( VIDEO_INPUT_CAMERA_NAME, VIDEO_INPUT_WORKER_PERIOD, VIDEO_INPUT_SAVE_PREFIX, - video_input_to_detect_target_queue, - controller, ), + input_queues=[], + output_queues=[video_input_to_detect_target_queue], + controller=controller, + local_logger=main_logger, ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create arguments for Video Input", frame) + return -1 - detect_target_manager = worker_manager.WorkerManager() - detect_target_manager.create_workers( - DETECT_TARGET_WORKER_COUNT, - detect_target_worker.detect_target_worker, - ( + # Get Pylance to stop complaining + assert video_input_worker_properties is not None + + result, detect_target_worker_properties = worker_manager.WorkerProperties.create( + count=DETECT_TARGET_WORKER_COUNT, + target=detect_target_worker.detect_target_worker, + work_arguments=( DETECT_TARGET_DEVICE, DETECT_TARGET_MODEL_PATH, DETECT_TARGET_OVERRIDE_FULL_PRECISION, DETECT_TARGET_SHOW_ANNOTATED, DETECT_TARGET_SAVE_PREFIX, - video_input_to_detect_target_queue, - detect_target_to_data_merge_queue, - controller, ), + input_queues=[video_input_to_detect_target_queue], + output_queues=[detect_target_to_data_merge_queue], + controller=controller, + local_logger=main_logger, ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create arguments for Detect Target", frame) + return -1 - flight_interface_manager = worker_manager.WorkerManager() - flight_interface_manager.create_workers( - 1, - flight_interface_worker.flight_interface_worker, - ( + # Get Pylance to stop complaining + assert detect_target_worker_properties is not None + + result, flight_interface_worker_properties = worker_manager.WorkerProperties.create( + count=1, + target=flight_interface_worker.flight_interface_worker, + work_arguments=( FLIGHT_INTERFACE_ADDRESS, FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, - flight_interface_to_data_merge_queue, - controller, ), + input_queues=[], + output_queues=[flight_interface_to_data_merge_queue], + controller=controller, + local_logger=main_logger, ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create arguments for Flight Interface", frame) + return -1 - data_merge_manager = worker_manager.WorkerManager() - data_merge_manager.create_workers( - 1, - data_merge_worker.data_merge_worker, - ( - DATA_MERGE_TIMEOUT, + # Get Pylance to stop complaining + assert flight_interface_worker_properties is not None + + result, data_merge_worker_properties = worker_manager.WorkerProperties.create( + count=1, + target=data_merge_worker.data_merge_worker, + work_arguments=(DATA_MERGE_TIMEOUT,), + input_queues=[ detect_target_to_data_merge_queue, flight_interface_to_data_merge_queue, - data_merge_to_geolocation_queue, - controller, + ], + output_queues=[data_merge_to_geolocation_queue], + controller=controller, + local_logger=main_logger, + ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create arguments for Data Merge", frame) + return -1 + + # Get Pylance to stop complaining + assert data_merge_worker_properties is not None + + result, geolocation_worker_properties = worker_manager.WorkerProperties.create( + count=1, + target=geolocation_worker.geolocation_worker, + work_arguments=( + camera_intrinsics, + camera_extrinsics, ), + input_queues=[data_merge_to_geolocation_queue], + output_queues=[geolocation_to_main_queue], + controller=controller, + local_logger=main_logger, ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create arguments for Geolocation", frame) + return -1 - result, camera_intrinsics = camera_properties.CameraIntrinsics.create( - GEOLOCATION_RESOLUTION_X, - GEOLOCATION_RESOLUTION_Y, - GEOLOCATION_FOV_X, - GEOLOCATION_FOV_Y, + # Get Pylance to stop complaining + assert geolocation_worker_properties is not None + + # Create managers + worker_managers = [] + + result, video_input_manager = worker_manager.WorkerManager.create( + worker_properties=video_input_worker_properties, + local_logger=main_logger, ) if not result: frame = inspect.currentframe() - main_logger.error("Error creating camera intrinsics", frame) + main_logger.error("Failed to create manager for Video Input", frame) return -1 - result, camera_extrinsics = camera_properties.CameraDroneExtrinsics.create( - ( - GEOLOCATION_CAMERA_POSITION_X, - GEOLOCATION_CAMERA_POSITION_Y, - GEOLOCATION_CAMERA_POSITION_Z, - ), - ( - GEOLOCATION_CAMERA_ORIENTATION_YAW, - GEOLOCATION_CAMERA_ORIENTATION_PITCH, - GEOLOCATION_CAMERA_ORIENTATION_ROLL, - ), + # Get Pylance to stop complaining + assert video_input_manager is not None + + worker_managers.append(video_input_manager) + + result, detect_target_manager = worker_manager.WorkerManager.create( + worker_properties=detect_target_worker_properties, + local_logger=main_logger, ) if not result: frame = inspect.currentframe() - main_logger.error("Error creating camera extrinsics", frame) + main_logger.error("Failed to create manager for Detect Target", frame) return -1 - geolocation_manager = worker_manager.WorkerManager() - geolocation_manager.create_workers( - 1, - geolocation_worker.geolocation_worker, - ( - camera_intrinsics, - camera_extrinsics, - data_merge_to_geolocation_queue, - geolocation_to_main_queue, - controller, - ), + # Get Pylance to stop complaining + assert detect_target_manager is not None + + worker_managers.append(detect_target_manager) + + result, flight_interface_manager = worker_manager.WorkerManager.create( + worker_properties=flight_interface_worker_properties, + local_logger=main_logger, + ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create manager for Flight Interface", frame) + return -1 + + # Get Pylance to stop complaining + assert flight_interface_manager is not None + + worker_managers.append(flight_interface_manager) + + result, data_merge_manager = worker_manager.WorkerManager.create( + worker_properties=data_merge_worker_properties, + local_logger=main_logger, ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create manager for Data Merge", frame) + return -1 + + # Get Pylance to stop complaining + assert data_merge_manager is not None + + worker_managers.append(data_merge_manager) + + result, geolocation_manager = worker_manager.WorkerManager.create( + worker_properties=geolocation_worker_properties, + local_logger=main_logger, + ) + if not result: + frame = inspect.currentframe() + main_logger.error("Failed to create manager for Geolocation", frame) + return -1 + + # Get Pylance to stop complaining + assert geolocation_manager is not None + + worker_managers.append(geolocation_manager) # Run - video_input_manager.start_workers() - detect_target_manager.start_workers() - flight_interface_manager.start_workers() - data_merge_manager.start_workers() - geolocation_manager.start_workers() + for manager in worker_managers: + manager.start_workers() while True: try: @@ -276,11 +384,8 @@ def main() -> int: data_merge_to_geolocation_queue.fill_and_drain_queue() geolocation_to_main_queue.fill_and_drain_queue() - video_input_manager.join_workers() - detect_target_manager.join_workers() - flight_interface_manager.join_workers() - data_merge_manager.join_workers() - geolocation_manager.join_workers() + for manager in worker_managers: + manager.join_workers() cv2.destroyAllWindows() # type: ignore diff --git a/utilities/workers/worker_manager.py b/utilities/workers/worker_manager.py index 85ca8bbd..37d79282 100644 --- a/utilities/workers/worker_manager.py +++ b/utilities/workers/worker_manager.py @@ -2,8 +2,110 @@ For managing workers. """ +import inspect import multiprocessing as mp +from modules.logger import logger +from utilities.workers import worker_controller +from utilities.workers import queue_proxy_wrapper + + +class WorkerProperties: + """ + Worker Properties. + """ + + __create_key = object() + + @classmethod + def create( + cls, + count: int, + target: "(...) -> object", # type: ignore + work_arguments: "tuple", + input_queues: "list[queue_proxy_wrapper.QueueProxyWrapper]", + output_queues: "list[queue_proxy_wrapper.QueueProxyWrapper]", + controller: worker_controller.WorkerController, + local_logger: logger.Logger, + ) -> "tuple[bool, WorkerProperties | None]": + """ + Creates worker properties. + + count: Number of workers. + target: Function. + work_arguments: Arguments for worker internals. + input_queues: Input queues. + output_queues: Output queues. + controller: Worker controller. + local_logger: Existing logger from process. + + Returns the WorkerProperties object. + """ + if count <= 0: + frame = inspect.currentframe() + local_logger.error( + "Worker count requested is less than or equal to zero, no workers were created", + frame, + ) + return False, None + + return True, WorkerProperties( + cls.__create_key, + count, + target, + work_arguments, + input_queues, + output_queues, + controller, + ) + + def __init__( + self, + class_private_create_key: object, + count: int, + target: "(...) -> object", # type: ignore + work_arguments: "tuple", + input_queues: "list[queue_proxy_wrapper.QueueProxyWrapper]", + output_queues: "list[queue_proxy_wrapper.QueueProxyWrapper]", + controller: worker_controller.WorkerController, + ) -> None: + """ + Private constructor, use create() method. + """ + assert class_private_create_key is WorkerProperties.__create_key, "Use create() method" + + self.__count = count + self.__target = target + self.__work_arguments = work_arguments + self.__input_queues = input_queues + self.__output_queues = output_queues + self.__controller = controller + + def get_worker_arguments(self) -> "tuple": + """ + Concatenates the worker properties into a tuple. + + Returns the worker properties as a tuple. + """ + return ( + self.__work_arguments + + tuple(self.__input_queues) + + tuple(self.__output_queues) + + (self.__controller,) + ) + + def get_worker_count(self) -> int: + """ + Returns the worker count. + """ + return self.__count + + def get_worker_target(self) -> "(...) -> object": # type: ignore + """ + Returns the worker target. + """ + return self.__target + class WorkerManager: """ @@ -11,29 +113,74 @@ class WorkerManager: Contains exit and pause requests. """ - def __init__(self, workers: "list[mp.Process] | None" = None) -> None: + __create_key = object() + + @classmethod + def create( + cls, + worker_properties: WorkerProperties, + local_logger: logger.Logger, + ) -> "tuple[bool, WorkerManager | None]": + """ + Create identical workers and append them to a workers list. + + worker_properties: Worker properties. + local_logger: Existing logger from process. + + Returns whether the workers were able to be created and the Worker Manager. + """ + workers = [] + for _ in range(0, worker_properties.get_worker_count()): + result, worker = WorkerManager.__create_single_worker( + worker_properties.get_worker_target(), + worker_properties.get_worker_arguments(), + local_logger, + ) + if not result: + frame = inspect.currentframe() + local_logger.error("Failed to create worker", frame) + return False, None + + workers.append(worker) + + return True, WorkerManager( + cls.__create_key, + workers, + ) + + def __init__( + self, + class_private_create_key: object, + workers: "list[mp.Process]", + ) -> None: """ - Constructor creates internal queue and semaphore. + Private constructor, use create() method. """ - self.__workers = [] if workers is None else workers + assert class_private_create_key is WorkerManager.__create_key, "Use create() method" - def create_workers(self, count: int, target: "(...) -> object", args: tuple) -> None: # type: ignore + self.__workers = workers + + @staticmethod + def __create_single_worker(target: "(...) -> object", args: "tuple", local_logger: logger.Logger) -> "tuple[bool, mp.Process | None]": # type: ignore """ - Create identical workers. + Creates a single worker. - count: Number of workers. target: Function. - args: Arguments to function. + args: Target function arguments. + local_logger: Existing logger from process. + + Returns whether a worker was created and the worker. """ - for _ in range(0, count): + try: worker = mp.Process(target=target, args=args) - self.__workers.append(worker) + # Catching all exceptions for library call + # pylint: disable-next=broad-exception-caught + except Exception as e: + frame = inspect.currentframe() + local_logger.error(f"Exception raised while creating a worker: {e}", frame) + return False, None - def concatenate_workers(self, workers: "list[mp.Process]") -> None: - """ - Add workers. - """ - self.__workers += workers + return True, worker def start_workers(self) -> None: """