diff --git a/main_2024.py b/main_2024.py index a17c6e12..f846bed2 100644 --- a/main_2024.py +++ b/main_2024.py @@ -3,6 +3,7 @@ """ import argparse +import inspect import multiprocessing as mp import pathlib import queue @@ -17,6 +18,8 @@ from modules.flight_interface import flight_interface_worker from modules.video_input import video_input_worker from modules.data_merge import data_merge_worker +from modules.multiprocess_logging import multiprocess_logging_worker +from modules.multiprocess_logging import multiprocess_logging from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -92,6 +95,9 @@ def main() -> int: controller = worker_controller.WorkerController() mp_manager = mp.Manager() + + logging_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager, QUEUE_MAX_SIZE) + video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -109,6 +115,16 @@ def main() -> int: QUEUE_MAX_SIZE, ) + logging_manager = worker_manager.WorkerManager() + logging_manager.create_workers( + 1, + multiprocess_logging_worker.multiprocess_logging_worker, + ( + logging_queue, + controller, + ), + ) + video_input_manager = worker_manager.WorkerManager() video_input_manager.create_workers( 1, @@ -118,6 +134,7 @@ def main() -> int: VIDEO_INPUT_WORKER_PERIOD, VIDEO_INPUT_SAVE_PREFIX, video_input_to_detect_target_queue, + logging_queue, controller, ), ) @@ -165,12 +182,24 @@ def main() -> int: ) # Run + logging_manager.start_workers() video_input_manager.start_workers() detect_target_manager.start_workers() flight_interface_manager.start_workers() data_merge_manager.start_workers() + try: + frame = inspect.currentframe() + multiprocess_logging.log_message( + "workers started", multiprocess_logging.DEBUG, frame, logging_queue + ) + # logging_queue.queue.put((multiprocess_logging.message_and_metadata('workers started', frame), logging.INFO), block=False) + except queue.Full: + pass + + count = 0 while True: + count += 1 try: merged_data = data_merge_to_main_queue.queue.get_nowait() except queue.Empty: @@ -200,11 +229,13 @@ def main() -> int: # Teardown controller.request_exit() + logging_queue.fill_and_drain_queue() video_input_to_detect_target_queue.fill_and_drain_queue() detect_target_to_data_merge_queue.fill_and_drain_queue() flight_interface_to_data_merge_queue.fill_and_drain_queue() data_merge_to_main_queue.fill_and_drain_queue() + logging_manager.join_workers() video_input_manager.join_workers() detect_target_manager.join_workers() flight_interface_manager.join_workers() diff --git a/modules/common b/modules/common index a886a389..82f573ff 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit a886a389be700cd3ada97c1f58187a5bf127bb1c +Subproject commit 82f573fff15d9a1c05818e334a6dfb3d60c889b7 diff --git a/modules/multiprocess_logging/init.py b/modules/multiprocess_logging/init.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/multiprocess_logging/multiprocess_logging.py b/modules/multiprocess_logging/multiprocess_logging.py new file mode 100644 index 00000000..18f278f1 --- /dev/null +++ b/modules/multiprocess_logging/multiprocess_logging.py @@ -0,0 +1,20 @@ +import inspect +import logging + +DEBUG = logging.DEBUG +INFO = logging.INFO +WARNING = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL + + +def message_and_metadata(message, frame): + function_name = frame.f_code.co_name + filename = frame.f_code.co_filename + line_number = inspect.getframeinfo(frame).lineno + + return f"[{filename} | {function_name} | {line_number}] {message}" + + +def log_message(message, level, frame, queue): + queue.queue.put((message_and_metadata(message, frame), level), block=False) diff --git a/modules/multiprocess_logging/multiprocess_logging_worker.py b/modules/multiprocess_logging/multiprocess_logging_worker.py new file mode 100644 index 00000000..578e6892 --- /dev/null +++ b/modules/multiprocess_logging/multiprocess_logging_worker.py @@ -0,0 +1,49 @@ +import datetime +import logging +import queue + +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + + +def multiprocess_logging_worker( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + controller: worker_controller.WorkerController, +): + logger = logging.getLogger("airside_logger") + + filename = f"logs/{datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}.log" + file_handler = logging.FileHandler(filename=filename, mode="w") # Handles logging to file + stream_handler = logging.StreamHandler() # Handles logging to terminal + + formatter = logging.Formatter( + fmt="%(asctime)s: [%(levelname)s] %(message)s", + datefmt="%I:%M:%S", + ) + + file_handler.setFormatter(formatter) + stream_handler.setFormatter(formatter) + + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + while not controller.is_exit_requested(): + controller.check_pause() + + logging_message, level = input_queue.queue.get() + + if logging_message is None: + break + + if level == logging.DEBUG: + logger.debug(f"{logging_message}") + elif level == logging.INFO: + logger.info(f"{logging_message}") + elif level == logging.WARNING: + logger.warning(f"{logging_message}") + elif level == logging.ERROR: + logger.error(f"{logging_message}") + elif level == logging.CRITICAL: + logger.critical(f"{logging_message}") diff --git a/modules/video_input/video_input.py b/modules/video_input/video_input.py index 28ebe114..9b8908a0 100644 --- a/modules/video_input/video_input.py +++ b/modules/video_input/video_input.py @@ -2,7 +2,12 @@ Combines image and timestamp together. """ +import inspect +import queue + +from utilities.workers import queue_proxy_wrapper from ..common.camera.modules import camera_device +from ..multiprocess_logging import multiprocess_logging from .. import image_and_time @@ -11,14 +16,30 @@ class VideoInput: Combines image and timestamp together. """ - def __init__(self, camera_name: "int | str", save_name: str = "") -> None: + def __init__( + self, + logging_queue: queue_proxy_wrapper.QueueProxyWrapper, + camera_name: "int | str", + save_name: str = "", + ) -> None: self.device = camera_device.CameraDevice(camera_name, 1, save_name) + self.logging_queue = logging_queue def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]": """ Returns a possible ImageAndTime with current timestamp. """ result, image = self.device.get_image() + + try: + frame = inspect.currentframe() + multiprocess_logging.log_message( + f"image size {image.shape}", multiprocess_logging.DEBUG, frame, self.logging_queue + ) + # self.logging_queue.queue.put((multiprocess_logging.message_and_metadata(f'image size {image.shape}', frame), logging.DEBUG), block=False) + except queue.Full: + pass + if not result: return False, None diff --git a/modules/video_input/video_input_worker.py b/modules/video_input/video_input_worker.py index aa7ea171..3ad0cf22 100644 --- a/modules/video_input/video_input_worker.py +++ b/modules/video_input/video_input_worker.py @@ -2,10 +2,13 @@ Gets images from the camera. """ +import inspect +import queue import time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller +from ..multiprocess_logging import multiprocess_logging from . import video_input @@ -14,6 +17,7 @@ def video_input_worker( period: float, save_name: str, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + logging_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -25,7 +29,16 @@ def video_input_worker( output_queue is the data queue. controller is how the main process communicates to this worker process. """ - input_device = video_input.VideoInput(camera_name, save_name) + input_device = video_input.VideoInput(logging_queue, camera_name, save_name) + + try: + frame = inspect.currentframe() + multiprocess_logging.log_message( + "video_input started", multiprocess_logging.DEBUG, frame, logging_queue + ) + # logging_queue.queue.put((multiprocess_logging.message_and_metadata('video_input started', frame), logging.DEBUG), block=False) + except queue.Full: + pass while not controller.is_exit_requested(): controller.check_pause()