From f0547be075c3bbff6081ea3da6bcbddc61d30de1 Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Thu, 21 Mar 2024 22:19:22 -0400 Subject: [PATCH 1/8] created custom logging module --- modules/logging/init.py | 0 modules/logging/logging_worker.py | 46 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 modules/logging/init.py create mode 100644 modules/logging/logging_worker.py diff --git a/modules/logging/init.py b/modules/logging/init.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/logging/logging_worker.py b/modules/logging/logging_worker.py new file mode 100644 index 00000000..5150941b --- /dev/null +++ b/modules/logging/logging_worker.py @@ -0,0 +1,46 @@ +import datetime +import logging +import queue + +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + +def logging_worker(input_queue: queue_proxy_wrapper.QueueProxyWrapper, + controller: worker_controller.WorkerController): + logger = logging.getLogger('airside_logger') + + filename = f"logs/{datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}.log" + file_handler = logging.FileHandler(filename=filename, mode="w") # Handles logging to file + stream_handler = logging.StreamHandler() # Handles logging to terminal + + formatter = logging.Formatter( + fmt='%(asctime)s: [%(levelname)s] %(message)s', + datefmt='%I:%M:%S', + ) + + file_handler.setFormatter(formatter) + stream_handler.setFormatter(formatter) + + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + while not controller.is_exit_requested(): + controller.check_pause() + + logging_message, level = input_queue.queue.get() + + if logging_message is None: + break + + if level == logging.DEBUG: + logger.debug(f"{logging_message}") + elif level == logging.INFO: + logger.info(f"{logging_message}") + elif level == logging.WARNING: + logger.warning(f"{logging_message}") + elif level == logging.ERROR: + logger.error(f"{logging_message}") + elif level == logging.CRITICAL: + logger.critical(f"{logging_message}") From 9725d9e2b7174def5aa46084772c443078eaa4dd Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Thu, 21 Mar 2024 23:01:28 -0400 Subject: [PATCH 2/8] renamed to avoid conflict with python logging module --- modules/{logging => multiprocess_logging}/init.py | 0 .../multiprocess_logging_worker.py} | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename modules/{logging => multiprocess_logging}/init.py (100%) rename modules/{logging/logging_worker.py => multiprocess_logging/multiprocess_logging_worker.py} (94%) diff --git a/modules/logging/init.py b/modules/multiprocess_logging/init.py similarity index 100% rename from modules/logging/init.py rename to modules/multiprocess_logging/init.py diff --git a/modules/logging/logging_worker.py b/modules/multiprocess_logging/multiprocess_logging_worker.py similarity index 94% rename from modules/logging/logging_worker.py rename to modules/multiprocess_logging/multiprocess_logging_worker.py index 5150941b..daabf59d 100644 --- a/modules/logging/logging_worker.py +++ b/modules/multiprocess_logging/multiprocess_logging_worker.py @@ -5,7 +5,7 @@ from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller -def logging_worker(input_queue: queue_proxy_wrapper.QueueProxyWrapper, +def multiprocess_logging_worker(input_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController): logger = logging.getLogger('airside_logger') From 89dce041e1584443063f639f86e86dc2d75654db Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Thu, 21 Mar 2024 23:02:23 -0400 Subject: [PATCH 3/8] frame info extracting helper function --- modules/multiprocess_logging/multiprocess_logging.py | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 modules/multiprocess_logging/multiprocess_logging.py diff --git a/modules/multiprocess_logging/multiprocess_logging.py b/modules/multiprocess_logging/multiprocess_logging.py new file mode 100644 index 00000000..5e486474 --- /dev/null +++ b/modules/multiprocess_logging/multiprocess_logging.py @@ -0,0 +1,9 @@ +import inspect +import linecache + +def message_and_metadata(message, frame): + function_name = frame.f_code.co_name + filename = frame.f_code.co_filename + line_number = inspect.getframeinfo(frame).lineno + + return f'[{filename} | {function_name} | {line_number}] {message}' \ No newline at end of file From ef430c1c449d0a268acdabd0bbf9b2cd9f315b9c Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Sat, 23 Mar 2024 22:40:16 -0400 Subject: [PATCH 4/8] logging helper functions --- .../multiprocess_logging.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/modules/multiprocess_logging/multiprocess_logging.py b/modules/multiprocess_logging/multiprocess_logging.py index 5e486474..a7c040e3 100644 --- a/modules/multiprocess_logging/multiprocess_logging.py +++ b/modules/multiprocess_logging/multiprocess_logging.py @@ -1,9 +1,18 @@ import inspect -import linecache +import logging + +DEBUG = logging.DEBUG +INFO = logging.INFO +WARNING = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL def message_and_metadata(message, frame): - function_name = frame.f_code.co_name - filename = frame.f_code.co_filename - line_number = inspect.getframeinfo(frame).lineno + function_name = frame.f_code.co_name + filename = frame.f_code.co_filename + line_number = inspect.getframeinfo(frame).lineno + + return f'[{filename} | {function_name} | {line_number}] {message}' - return f'[{filename} | {function_name} | {line_number}] {message}' \ No newline at end of file +def log_message(message, level, frame, queue): + queue.queue.put((message_and_metadata(message, frame), level), block=False) From b8a9b9a3ff8a0b1c5a61f143accafb022efe93ec Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Sat, 23 Mar 2024 22:40:48 -0400 Subject: [PATCH 5/8] added logging process and queue in main --- main_2024.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/main_2024.py b/main_2024.py index a17c6e12..4d3f5eb8 100644 --- a/main_2024.py +++ b/main_2024.py @@ -3,6 +3,7 @@ """ import argparse +import inspect import multiprocessing as mp import pathlib import queue @@ -17,6 +18,8 @@ from modules.flight_interface import flight_interface_worker from modules.video_input import video_input_worker from modules.data_merge import data_merge_worker +from modules.multiprocess_logging import multiprocess_logging_worker +from modules.multiprocess_logging import multiprocess_logging from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -92,6 +95,12 @@ def main() -> int: controller = worker_controller.WorkerController() mp_manager = mp.Manager() + + logging_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE + ) + video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -109,6 +118,16 @@ def main() -> int: QUEUE_MAX_SIZE, ) + logging_manager = worker_manager.WorkerManager() + logging_manager.create_workers( + 1, + multiprocess_logging_worker.multiprocess_logging_worker, + ( + logging_queue, + controller, + ) + ) + video_input_manager = worker_manager.WorkerManager() video_input_manager.create_workers( 1, @@ -118,6 +137,7 @@ def main() -> int: VIDEO_INPUT_WORKER_PERIOD, VIDEO_INPUT_SAVE_PREFIX, video_input_to_detect_target_queue, + logging_queue, controller, ), ) @@ -165,12 +185,22 @@ def main() -> int: ) # Run + logging_manager.start_workers() video_input_manager.start_workers() detect_target_manager.start_workers() flight_interface_manager.start_workers() data_merge_manager.start_workers() + try: + frame = inspect.currentframe() + multiprocess_logging.log_message('workers started', multiprocess_logging.DEBUG, frame, logging_queue) + # logging_queue.queue.put((multiprocess_logging.message_and_metadata('workers started', frame), logging.INFO), block=False) + except queue.Full: + pass + + count = 0 while True: + count += 1 try: merged_data = data_merge_to_main_queue.queue.get_nowait() except queue.Empty: @@ -200,11 +230,13 @@ def main() -> int: # Teardown controller.request_exit() + logging_queue.fill_and_drain_queue() video_input_to_detect_target_queue.fill_and_drain_queue() detect_target_to_data_merge_queue.fill_and_drain_queue() flight_interface_to_data_merge_queue.fill_and_drain_queue() data_merge_to_main_queue.fill_and_drain_queue() + logging_manager.join_workers() video_input_manager.join_workers() detect_target_manager.join_workers() flight_interface_manager.join_workers() From 63d49f9b282d8b8dc5a2e8ee1ade3b1b8ccf5b8c Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Sun, 24 Mar 2024 15:24:10 -0400 Subject: [PATCH 6/8] merge conflict in video input --- modules/common | 2 +- modules/video_input/video_input.py | 20 +++++++++++++++++++- modules/video_input/video_input_worker.py | 16 +++++++++++++--- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/modules/common b/modules/common index a886a389..82f573ff 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit a886a389be700cd3ada97c1f58187a5bf127bb1c +Subproject commit 82f573fff15d9a1c05818e334a6dfb3d60c889b7 diff --git a/modules/video_input/video_input.py b/modules/video_input/video_input.py index 28ebe114..8df757bd 100644 --- a/modules/video_input/video_input.py +++ b/modules/video_input/video_input.py @@ -1,8 +1,12 @@ """ Combines image and timestamp together. """ +import inspect +import queue +from utilities.workers import queue_proxy_wrapper from ..common.camera.modules import camera_device +from ..multiprocess_logging import multiprocess_logging from .. import image_and_time @@ -11,14 +15,28 @@ class VideoInput: Combines image and timestamp together. """ - def __init__(self, camera_name: "int | str", save_name: str = "") -> None: + def __init__( + self, + logging_queue: queue_proxy_wrapper.QueueProxyWrapper, + camera_name: "int | str", + save_name: str = "" + ) -> None: self.device = camera_device.CameraDevice(camera_name, 1, save_name) + self.logging_queue = logging_queue def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]": """ Returns a possible ImageAndTime with current timestamp. """ result, image = self.device.get_image() + + try: + frame = inspect.currentframe() + multiprocess_logging.log_message(f'image size {image.shape}', multiprocess_logging.DEBUG, frame, self.logging_queue) + # self.logging_queue.queue.put((multiprocess_logging.message_and_metadata(f'image size {image.shape}', frame), logging.DEBUG), block=False) + except queue.Full: + pass + if not result: return False, None diff --git a/modules/video_input/video_input_worker.py b/modules/video_input/video_input_worker.py index aa7ea171..16447cb8 100644 --- a/modules/video_input/video_input_worker.py +++ b/modules/video_input/video_input_worker.py @@ -1,11 +1,13 @@ """ Gets images from the camera. """ - +import inspect +import queue import time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller +from ..multiprocess_logging import multiprocess_logging from . import video_input @@ -14,6 +16,7 @@ def video_input_worker( period: float, save_name: str, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + logging_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -25,8 +28,15 @@ def video_input_worker( output_queue is the data queue. controller is how the main process communicates to this worker process. """ - input_device = video_input.VideoInput(camera_name, save_name) - + input_device = video_input.VideoInput(logging_queue, camera_name, save_name) + + try: + frame = inspect.currentframe() + multiprocess_logging.log_message('video_input started', multiprocess_logging.DEBUG, frame, logging_queue) + # logging_queue.queue.put((multiprocess_logging.message_and_metadata('video_input started', frame), logging.DEBUG), block=False) + except queue.Full: + pass + while not controller.is_exit_requested(): controller.check_pause() From 864410c5b8852b459733fc9d117d3a9845d34152 Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Sun, 24 Mar 2024 15:26:39 -0400 Subject: [PATCH 7/8] resolve merge conflict in video input --- modules/video_input/video_input.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/video_input/video_input.py b/modules/video_input/video_input.py index 8df757bd..a68a4d37 100644 --- a/modules/video_input/video_input.py +++ b/modules/video_input/video_input.py @@ -14,12 +14,11 @@ class VideoInput: """ Combines image and timestamp together. """ - def __init__( self, logging_queue: queue_proxy_wrapper.QueueProxyWrapper, camera_name: "int | str", - save_name: str = "" + save_name: str = "", ) -> None: self.device = camera_device.CameraDevice(camera_name, 1, save_name) self.logging_queue = logging_queue From 577add426737d49a7ed717b5633ef0d6a3f22f26 Mon Sep 17 00:00:00 2001 From: TongguangZhang Date: Sun, 24 Mar 2024 15:40:32 -0400 Subject: [PATCH 8/8] black formatting changes --- main_2024.py | 11 +++++------ .../multiprocess_logging.py | 12 +++++++----- .../multiprocess_logging_worker.py | 19 +++++++++++-------- modules/video_input/video_input.py | 6 +++++- modules/video_input/video_input_worker.py | 7 +++++-- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/main_2024.py b/main_2024.py index 4d3f5eb8..f846bed2 100644 --- a/main_2024.py +++ b/main_2024.py @@ -96,10 +96,7 @@ def main() -> int: mp_manager = mp.Manager() - logging_queue = queue_proxy_wrapper.QueueProxyWrapper( - mp_manager, - QUEUE_MAX_SIZE - ) + logging_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager, QUEUE_MAX_SIZE) video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, @@ -125,7 +122,7 @@ def main() -> int: ( logging_queue, controller, - ) + ), ) video_input_manager = worker_manager.WorkerManager() @@ -193,7 +190,9 @@ def main() -> int: try: frame = inspect.currentframe() - multiprocess_logging.log_message('workers started', multiprocess_logging.DEBUG, frame, logging_queue) + multiprocess_logging.log_message( + "workers started", multiprocess_logging.DEBUG, frame, logging_queue + ) # logging_queue.queue.put((multiprocess_logging.message_and_metadata('workers started', frame), logging.INFO), block=False) except queue.Full: pass diff --git a/modules/multiprocess_logging/multiprocess_logging.py b/modules/multiprocess_logging/multiprocess_logging.py index a7c040e3..18f278f1 100644 --- a/modules/multiprocess_logging/multiprocess_logging.py +++ b/modules/multiprocess_logging/multiprocess_logging.py @@ -7,12 +7,14 @@ ERROR = logging.ERROR CRITICAL = logging.CRITICAL + def message_and_metadata(message, frame): - function_name = frame.f_code.co_name - filename = frame.f_code.co_filename - line_number = inspect.getframeinfo(frame).lineno + function_name = frame.f_code.co_name + filename = frame.f_code.co_filename + line_number = inspect.getframeinfo(frame).lineno + + return f"[{filename} | {function_name} | {line_number}] {message}" - return f'[{filename} | {function_name} | {line_number}] {message}' def log_message(message, level, frame, queue): - queue.queue.put((message_and_metadata(message, frame), level), block=False) + queue.queue.put((message_and_metadata(message, frame), level), block=False) diff --git a/modules/multiprocess_logging/multiprocess_logging_worker.py b/modules/multiprocess_logging/multiprocess_logging_worker.py index daabf59d..578e6892 100644 --- a/modules/multiprocess_logging/multiprocess_logging_worker.py +++ b/modules/multiprocess_logging/multiprocess_logging_worker.py @@ -5,17 +5,20 @@ from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller -def multiprocess_logging_worker(input_queue: queue_proxy_wrapper.QueueProxyWrapper, - controller: worker_controller.WorkerController): - logger = logging.getLogger('airside_logger') + +def multiprocess_logging_worker( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + controller: worker_controller.WorkerController, +): + logger = logging.getLogger("airside_logger") filename = f"logs/{datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}.log" - file_handler = logging.FileHandler(filename=filename, mode="w") # Handles logging to file - stream_handler = logging.StreamHandler() # Handles logging to terminal + file_handler = logging.FileHandler(filename=filename, mode="w") # Handles logging to file + stream_handler = logging.StreamHandler() # Handles logging to terminal formatter = logging.Formatter( - fmt='%(asctime)s: [%(levelname)s] %(message)s', - datefmt='%I:%M:%S', + fmt="%(asctime)s: [%(levelname)s] %(message)s", + datefmt="%I:%M:%S", ) file_handler.setFormatter(formatter) @@ -28,7 +31,7 @@ def multiprocess_logging_worker(input_queue: queue_proxy_wrapper.QueueProxyWrapp while not controller.is_exit_requested(): controller.check_pause() - + logging_message, level = input_queue.queue.get() if logging_message is None: diff --git a/modules/video_input/video_input.py b/modules/video_input/video_input.py index a68a4d37..9b8908a0 100644 --- a/modules/video_input/video_input.py +++ b/modules/video_input/video_input.py @@ -1,6 +1,7 @@ """ Combines image and timestamp together. """ + import inspect import queue @@ -14,6 +15,7 @@ class VideoInput: """ Combines image and timestamp together. """ + def __init__( self, logging_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -31,7 +33,9 @@ def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]": try: frame = inspect.currentframe() - multiprocess_logging.log_message(f'image size {image.shape}', multiprocess_logging.DEBUG, frame, self.logging_queue) + multiprocess_logging.log_message( + f"image size {image.shape}", multiprocess_logging.DEBUG, frame, self.logging_queue + ) # self.logging_queue.queue.put((multiprocess_logging.message_and_metadata(f'image size {image.shape}', frame), logging.DEBUG), block=False) except queue.Full: pass diff --git a/modules/video_input/video_input_worker.py b/modules/video_input/video_input_worker.py index 16447cb8..3ad0cf22 100644 --- a/modules/video_input/video_input_worker.py +++ b/modules/video_input/video_input_worker.py @@ -1,6 +1,7 @@ """ Gets images from the camera. """ + import inspect import queue import time @@ -32,11 +33,13 @@ def video_input_worker( try: frame = inspect.currentframe() - multiprocess_logging.log_message('video_input started', multiprocess_logging.DEBUG, frame, logging_queue) + multiprocess_logging.log_message( + "video_input started", multiprocess_logging.DEBUG, frame, logging_queue + ) # logging_queue.queue.put((multiprocess_logging.message_and_metadata('video_input started', frame), logging.DEBUG), block=False) except queue.Full: pass - + while not controller.is_exit_requested(): controller.check_pause()