diff --git a/documentation/main_multiprocess_example.py b/documentation/main_multiprocess_example.py index 0bbd46c4..dab157a4 100644 --- a/documentation/main_multiprocess_example.py +++ b/documentation/main_multiprocess_example.py @@ -5,17 +5,23 @@ ``` """ +import inspect import multiprocessing as mp +import pathlib import time from documentation.multiprocess_example.add_random import add_random_worker from documentation.multiprocess_example.concatenator import concatenator_worker from documentation.multiprocess_example.countup import countup_worker +from modules.logger import logger_setup_main +from utilities import yaml from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager +CONFIG_FILE_PATH = pathlib.Path("config.yaml") + # Play with these numbers to see queue bottlenecks COUNTUP_TO_ADD_RANDOM_QUEUE_MAX_SIZE = 5 ADD_RANDOM_TO_CONCATENATOR_QUEUE_MAX_SIZE = 5 @@ -26,6 +32,22 @@ def main() -> int: """ Main function. """ + # Configuration settings + result, config = yaml.open_config(CONFIG_FILE_PATH) + if not result: + print("ERROR: Failed to load configuration file") + return -1 + + assert config is not None + + # Setup main logger + result, main_logger = logger_setup_main.setup_main_logger(config) + if not result: + print("ERROR: Failed to create main logger") + return -1 + + assert main_logger is not None + # Main is managing all worker processes and is responsible # for creating supporting interprocess communication controller = worker_controller.WorkerController() @@ -125,27 +147,44 @@ def main() -> int: add_random_manager.start_workers() concatenator_manager.start_workers() + frame = inspect.currentframe() + main_logger.info("Started", frame) + # Run for some time and then pause time.sleep(2) controller.request_pause() - print("Paused") + + frame = inspect.currentframe() + main_logger.info("Paused", frame) + time.sleep(4) - print("Resumed") controller.request_resume() + frame = inspect.currentframe() + main_logger.info("Resumed", frame) + time.sleep(2) # Stop the processes controller.request_exit() + frame = inspect.currentframe() + main_logger.info("Requested exit", frame) + # Fill and drain queues from END TO START countup_to_add_random_queue.fill_and_drain_queue() add_random_to_concatenator_queue.fill_and_drain_queue() + frame = inspect.currentframe() + main_logger.info("Queues cleared", frame) + # Clean up worker processes countup_manager.join_workers() add_random_manager.join_workers() concatenator_manager.join_workers() + frame = inspect.currentframe() + main_logger.info("Stopped", frame) + # We can reset controller in case we want to reuse it # Alternatively, create a new WorkerController instance controller.clear_exit() diff --git a/documentation/multiprocess_example/add_random/add_random.py b/documentation/multiprocess_example/add_random/add_random.py index b90e00cf..0a1a24c4 100644 --- a/documentation/multiprocess_example/add_random/add_random.py +++ b/documentation/multiprocess_example/add_random/add_random.py @@ -2,9 +2,11 @@ Contains the AddRandom class. """ +import inspect import time import random +from modules.logger import logger from .. import intermediate_struct @@ -12,10 +14,12 @@ class AddRandom: """ Adds a random number to the input. - A new random number is generated every `__ADD_SWITCH_COUNT` times. + A new random number is generated every `__add_change_count` times. """ - def __init__(self, seed: int, max_random_term: int, add_change_count: int) -> None: + def __init__( + self, seed: int, max_random_term: int, add_change_count: int, local_logger: logger.Logger + ) -> None: """ Constructor seeds the RNG and sets the max add and number of adds before a new random number is chosen. @@ -30,6 +34,8 @@ def __init__(self, seed: int, max_random_term: int, add_change_count: int) -> No self.__current_random_term = self.__generate_random_number(0, self.__max_random_term) self.__add_count = 0 + self.__logger = local_logger + @staticmethod def __generate_random_number(min_value: int, max_value: int) -> int: """ @@ -41,6 +47,10 @@ def run_add_random(self, term: int) -> "tuple[bool, intermediate_struct.Intermed """ Adds a random number to the input and returns the sum. """ + # Log + frame = inspect.currentframe() + self.__logger.debug("Run", frame) + add_sum = term + self.__current_random_term # Change the random term if the add count has been reached diff --git a/documentation/multiprocess_example/add_random/add_random_worker.py b/documentation/multiprocess_example/add_random/add_random_worker.py index 2700714f..29630491 100644 --- a/documentation/multiprocess_example/add_random/add_random_worker.py +++ b/documentation/multiprocess_example/add_random/add_random_worker.py @@ -2,6 +2,11 @@ Intermediate worker that adds a random number to the input. """ +import inspect +import os +import pathlib + +from modules.logger import logger from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import add_random @@ -22,8 +27,24 @@ def add_random_worker( input_queue and output_queue are the data queues. controller is how the main process communicates to this worker process. """ + # Instantiate logger + worker_name = pathlib.Path(__file__).stem + process_id = os.getpid() + result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) + if not result: + print("ERROR: Worker failed to create logger") + return + + # Get Pylance to stop complaining + assert local_logger is not None + + frame = inspect.currentframe() + local_logger.info("Logger initialized", frame) + # Instantiate class object - add_random_instance = add_random.AddRandom(seed, max_random_term, add_change_count) + add_random_instance = add_random.AddRandom( + seed, max_random_term, add_change_count, local_logger + ) # Loop forever until exit has been requested or sentinel value (consumer) while not controller.is_exit_requested(): diff --git a/documentation/multiprocess_example/concatenator/concatenator.py b/documentation/multiprocess_example/concatenator/concatenator.py index fd188073..009a2f1a 100644 --- a/documentation/multiprocess_example/concatenator/concatenator.py +++ b/documentation/multiprocess_example/concatenator/concatenator.py @@ -2,8 +2,10 @@ Contains the Concatenator class. """ +import inspect import time +from modules.logger import logger from .. import intermediate_struct @@ -12,13 +14,15 @@ class Concatenator: Concatenates a prefix and suffix to the object. """ - def __init__(self, prefix: str, suffix: str) -> None: + def __init__(self, prefix: str, suffix: str, local_logger: logger.Logger) -> None: """ Constructor sets the prefix and suffix. """ self.__prefix = prefix self.__suffix = suffix + self.__logger = local_logger + # The working function def run_concatenation( self, middle: intermediate_struct.IntermediateStruct @@ -26,6 +30,10 @@ def run_concatenation( """ Concatenate the prefix and suffix to the input. """ + # Log + frame = inspect.currentframe() + self.__logger.debug("Run", frame) + # The class is responsible for unpacking the intermediate type # Validate input input_number = middle.number @@ -34,7 +42,7 @@ def run_concatenation( # Function returns result and the output return False, "" - # Print string + # String to be printed concatenated_string = self.__prefix + str(input_number) + self.__suffix # Pretending this is hard at work diff --git a/documentation/multiprocess_example/concatenator/concatenator_worker.py b/documentation/multiprocess_example/concatenator/concatenator_worker.py index 8bf2d27d..4b5acf56 100644 --- a/documentation/multiprocess_example/concatenator/concatenator_worker.py +++ b/documentation/multiprocess_example/concatenator/concatenator_worker.py @@ -2,6 +2,11 @@ Ending worker that concatenates a prefix and suffix and then prints the result. """ +import inspect +import os +import pathlib + +from modules.logger import logger from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import concatenator @@ -20,8 +25,22 @@ def concatenator_worker( input_queue is the data queue. controller is how the main process communicates to this worker process. """ + # Instantiate logger + worker_name = pathlib.Path(__file__).stem + process_id = os.getpid() + result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) + if not result: + print("ERROR: Worker failed to create logger") + return + + # Get Pylance to stop complaining + assert local_logger is not None + + frame = inspect.currentframe() + local_logger.info("Logger initialized", frame) + # Instantiate class object - concatenator_instance = concatenator.Concatenator(prefix, suffix) + concatenator_instance = concatenator.Concatenator(prefix, suffix, local_logger) # Loop forever until exit has been requested or sentinel value (consumer) while not controller.is_exit_requested(): @@ -46,5 +65,5 @@ def concatenator_worker( if not result: continue - # Print the string - print(value) + # Print just the string + local_logger.info(str(value), None) diff --git a/documentation/multiprocess_example/countup/countup.py b/documentation/multiprocess_example/countup/countup.py index 029cef97..e6bc62b5 100644 --- a/documentation/multiprocess_example/countup/countup.py +++ b/documentation/multiprocess_example/countup/countup.py @@ -2,15 +2,20 @@ Contains the Countup class. """ +import inspect import time +from modules.logger import logger + class Countup: """ Increments its internal counter and outputs current counter. """ - def __init__(self, start_thousands: int, max_iterations: int) -> None: + def __init__( + self, start_thousands: int, max_iterations: int, local_logger: logger.Logger + ) -> None: """ Constructor initializes the start and max points. """ @@ -18,10 +23,16 @@ def __init__(self, start_thousands: int, max_iterations: int) -> None: self.__max_count = self.__start_count + max_iterations self.__current_count = self.__start_count + self.__logger = local_logger + def run_countup(self) -> "tuple[bool, int]": """ Counts upward. """ + # Log + frame = inspect.currentframe() + self.__logger.debug("Run", frame) + # Increment counter self.__current_count += 1 if self.__current_count > self.__max_count: diff --git a/documentation/multiprocess_example/countup/countup_worker.py b/documentation/multiprocess_example/countup/countup_worker.py index c9e7c0c0..54497a36 100644 --- a/documentation/multiprocess_example/countup/countup_worker.py +++ b/documentation/multiprocess_example/countup/countup_worker.py @@ -2,6 +2,11 @@ Beginning worker that counts up from a starting value. """ +import inspect +import os +import pathlib + +from modules.logger import logger from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import countup @@ -22,8 +27,22 @@ def countup_worker( output_queue is the data queue. worker_manager is how the main process communicates to this worker process. """ + # Instantiate logger + worker_name = pathlib.Path(__file__).stem + process_id = os.getpid() + result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) + if not result: + print("ERROR: Worker failed to create logger") + return + + # Get Pylance to stop complaining + assert local_logger is not None + + frame = inspect.currentframe() + local_logger.info("Logger initialized", frame) + # Instantiate class object - countup_instance = countup.Countup(start_thousands, max_iterations) + countup_instance = countup.Countup(start_thousands, max_iterations, local_logger) # Loop forever until exit has been requested (producer) while not controller.is_exit_requested(): diff --git a/main_2024.py b/main_2024.py index 3c2b6873..991e9593 100644 --- a/main_2024.py +++ b/main_2024.py @@ -21,7 +21,8 @@ from modules.data_merge import data_merge_worker from modules.geolocation import geolocation_worker from modules.geolocation import camera_properties -from modules.logger import logger +from modules.logger import logger_setup_main +from utilities import yaml from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -34,21 +35,6 @@ def main() -> int: """ Main function. """ - # Open config file - try: - with CONFIG_FILE_PATH.open("r", encoding="utf8") as file: - try: - config = yaml.safe_load(file) - except yaml.YAMLError as exc: - print(f"Error parsing YAML file: {exc}") - return -1 - except FileNotFoundError: - print(f"File not found: {CONFIG_FILE_PATH}") - return -1 - except IOError as exc: - print(f"Error when opening file: {exc}") - return -1 - # Parse whether or not to force cpu from command line parser = argparse.ArgumentParser() parser.add_argument("--cpu", action="store_true", help="option to force cpu") @@ -60,20 +46,37 @@ def main() -> int: ) args = parser.parse_args() - # Set constants + # Configuration settings + result, config = yaml.open_config(CONFIG_FILE_PATH) + if not result: + print("ERROR: Failed to load configuration file") + return -1 + + assert config is not None + + # Setup main logger + result, main_logger = logger_setup_main.setup_main_logger(config) + if not result: + print("ERROR: Failed to create main logger") + return -1 + + assert main_logger is not None + + # Get settings try: # Local constants # pylint: disable=invalid-name QUEUE_MAX_SIZE = config["queue_max_size"] - LOG_DIRECTORY_PATH = config["logger"]["directory_path"] - start_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + log_directory_path = config["logger"]["directory_path"] + log_path_format = config["logger"]["file_datetime_format"] + start_time = datetime.datetime.now().strftime(log_path_format) VIDEO_INPUT_CAMERA_NAME = config["video_input"]["camera_name"] VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"] VIDEO_INPUT_SAVE_NAME_PREFIX = config["video_input"]["save_prefix"] VIDEO_INPUT_SAVE_PREFIX = ( - f"{LOG_DIRECTORY_PATH}/{start_time}/{VIDEO_INPUT_SAVE_NAME_PREFIX}" + f"{log_directory_path}/{start_time}/{VIDEO_INPUT_SAVE_NAME_PREFIX}" ) DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"] @@ -82,7 +85,7 @@ def main() -> int: DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full DETECT_TARGET_SAVE_NAME_PREFIX = config["detect_target"]["save_prefix"] DETECT_TARGET_SAVE_PREFIX = ( - f"{LOG_DIRECTORY_PATH}/{start_time}/{DETECT_TARGET_SAVE_NAME_PREFIX}" + f"{log_directory_path}/{start_time}/{DETECT_TARGET_SAVE_NAME_PREFIX}" ) DETECT_TARGET_SHOW_ANNOTATED = args.show_annotated @@ -104,17 +107,10 @@ def main() -> int: GEOLOCATION_CAMERA_ORIENTATION_PITCH = config["geolocation"]["camera_orientation_pitch"] GEOLOCATION_CAMERA_ORIENTATION_ROLL = config["geolocation"]["camera_orientation_roll"] # pylint: enable=invalid-name - except KeyError: - print("Config key(s) not found") - return -1 - - pathlib.Path(LOG_DIRECTORY_PATH).mkdir(exist_ok=True) - pathlib.Path(f"{LOG_DIRECTORY_PATH}/{start_time}").mkdir() - - result, main_logger = logger.Logger.create("main") - if result: + except KeyError as exception: frame = inspect.currentframe() - main_logger.info("main logger initialized", frame) + main_logger.error(f"ERROR: Config key(s) not found: {exception}", frame) + return -1 # Setup controller = worker_controller.WorkerController() @@ -204,7 +200,8 @@ def main() -> int: GEOLOCATION_FOV_Y, ) if not result: - print("Error creating camera intrinsics") + frame = inspect.currentframe() + main_logger.error("Error creating camera intrinsics", frame) return -1 result, camera_extrinsics = camera_properties.CameraDroneExtrinsics.create( @@ -220,7 +217,8 @@ def main() -> int: ), ) if not result: - print("Error creating camera extrinsics") + frame = inspect.currentframe() + main_logger.error("Error creating camera extrinsics", frame) return -1 geolocation_manager = worker_manager.WorkerManager() @@ -251,14 +249,22 @@ def main() -> int: if geolocation_data is not None: for detection_world in geolocation_data: - print("geolocation vertices: " + str(detection_world.vertices.tolist())) - print("geolocation centre: " + str(detection_world.centre.tolist())) - print("geolocation label: " + str(detection_world.label)) - print("geolocation confidence: " + str(detection_world.confidence)) - print("") + frame = inspect.currentframe() + main_logger.debug("Detection in world:", frame) + main_logger.debug( + "geolocation vertices: " + str(detection_world.vertices.tolist()), frame + ) + main_logger.debug( + "geolocation centre: " + str(detection_world.centre.tolist()), frame + ) + main_logger.debug("geolocation label: " + str(detection_world.label), frame) + main_logger.debug( + "geolocation confidence: " + str(detection_world.confidence), frame + ) if cv2.waitKey(1) == ord("q"): # type: ignore - print("Exiting main loop") + frame = inspect.currentframe() + main_logger.info("Exiting main loop", frame) break # Teardown diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index 8e63b3a0..253ff7dc 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -20,27 +20,17 @@ class FlightInterface: @classmethod def create( - cls, address: str, timeout_home: float, baud_rate: int + cls, address: str, timeout_home: float, baud_rate: int, local_logger: logger.Logger ) -> "tuple[bool, FlightInterface | None]": """ address: TCP address or port. timeout_home: Timeout for home location in seconds. baud_rate: Baud rate for the connection. """ - result, flight_interface_logger = logger.Logger.create("flight_interface") - if not result: - return False, None - - # Get Pylance to stop complaining - assert flight_interface_logger is not None - - frame = inspect.currentframe() - flight_interface_logger.info("flight interface logger initialized", frame) - result, controller = flight_controller.FlightController.create(address, baud_rate) if not result: frame = inspect.currentframe() - flight_interface_logger.error("controller could not be created", frame) + local_logger.error("controller could not be created", frame) return False, None # Get Pylance to stop complaining @@ -49,22 +39,20 @@ def create( result, home_location = controller.get_home_location(timeout_home) if not result: frame = inspect.currentframe() - flight_interface_logger.error("home_location could not be created", frame) + local_logger.error("home_location could not be created", frame) return False, None # Get Pylance to stop complaining assert home_location is not None - return True, FlightInterface( - cls.__create_key, controller, home_location, flight_interface_logger - ) + return True, FlightInterface(cls.__create_key, controller, home_location, local_logger) def __init__( self, class_private_create_key: object, controller: flight_controller.FlightController, home_location: drone_odometry.DronePosition, - flight_interface_logger: logger.Logger, + local_logger: logger.Logger, ) -> None: """ Private constructor, use create() method. @@ -73,10 +61,10 @@ def __init__( self.controller = controller self.__home_location = home_location - self.__logger = flight_interface_logger + self.__logger = local_logger frame = inspect.currentframe() - self.__logger.info(self.__home_location, frame) + self.__logger.info(str(self.__home_location), frame) def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": """ diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 69af414d..a4175dd7 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -2,11 +2,15 @@ Gets odometry information from drone. """ +import inspect +import os +import pathlib import time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import flight_interface +from ..logger import logger def flight_interface_worker( @@ -26,11 +30,26 @@ def flight_interface_worker( controller is how the main process communicates to this worker process. """ # TODO: Error handling - # TODO: Logging - result, interface = flight_interface.FlightInterface.create(address, timeout, baud_rate) + worker_name = pathlib.Path(__file__).stem + process_id = os.getpid() + result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) if not result: - print("ERROR: Worker failed to create class object") + print("ERROR: Worker failed to create logger") + return + + # Get Pylance to stop complaining + assert local_logger is not None + + frame = inspect.currentframe() + local_logger.info("Logger initialized", frame) + + result, interface = flight_interface.FlightInterface.create( + address, timeout, baud_rate, local_logger + ) + if not result: + frame = inspect.currentframe() + local_logger.error("Worker failed to create class object", frame) return # Get Pylance to stop complaining diff --git a/modules/logger/logger.py b/modules/logger/logger.py index 3cc484b1..6a27e375 100644 --- a/modules/logger/logger.py +++ b/modules/logger/logger.py @@ -7,9 +7,13 @@ import logging import pathlib import os + +# Used in type annotation of logger parameters +# pylint: disable-next=unused-import import types -import typing -import yaml + +from utilities import yaml + CONFIG_FILE_PATH = pathlib.Path("config.yaml") @@ -22,69 +26,68 @@ class Logger: __create_key = object() @classmethod - def create(cls, name: str) -> "tuple[bool, Logger | None]": + def create(cls, name: str, enable_log_to_file: bool) -> "tuple[bool, Logger | None]": """ Create and configure a logger. """ - - # Open config file. - try: - with CONFIG_FILE_PATH.open("r", encoding="utf8") as file: - try: - config = yaml.safe_load(file) - except yaml.YAMLError as exc: - print(f"Error parsing YAML file: {exc}") - return -1 - except FileNotFoundError: - print(f"File not found: {CONFIG_FILE_PATH}") - return False, None - except IOError as exc: - print(f"Error when opening file: {exc}") + # Configuration settings + result, config = yaml.open_config(CONFIG_FILE_PATH) + if not result: + print("ERROR: Failed to load configuration file") return False, None + assert config is not None + try: log_directory_path = config["logger"]["directory_path"] file_datetime_format = config["logger"]["file_datetime_format"] logger_format = config["logger"]["format"] logger_datetime_format = config["logger"]["datetime_format"] - except KeyError: - print("Config key(s) not found") + except KeyError as exception: + print(f"Config key(s) not found: {exception}") return False, None - # Get the path to the logs directory. - entries = os.listdir(log_directory_path) - log_names = [ - entry for entry in entries if os.path.isdir(os.path.join(log_directory_path, entry)) - ] - - # Find the log directory for the current run, which is the most recent timestamp. - log_path = max( - log_names, - key=lambda datetime_string: datetime.datetime.strptime( - datetime_string, file_datetime_format - ), - ) - - filename = f"{log_directory_path}/{log_path}/{name}.log" - - # Formatting configurations for the logger. - file_handler = logging.FileHandler(filename=filename, mode="w") # Handles logging to file. - stream_handler = logging.StreamHandler() # Handles logging to terminal. + # Create a unique logger instance + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) formatter = logging.Formatter( fmt=logger_format, datefmt=logger_datetime_format, ) - file_handler.setFormatter(formatter) + # Handles logging to terminal + stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) - - # Create a unique logger instance and configure it. - logger = logging.getLogger(name) - logger.setLevel(logging.DEBUG) - logger.addHandler(file_handler) logger.addHandler(stream_handler) + # Handles logging to file + if enable_log_to_file: + # Get the path to the logs directory. + entries = os.listdir(log_directory_path) + + if len(entries) == 0: + print("ERROR: Must create a new log directory for this run before starting logger") + return False, None + + log_names = [ + entry for entry in entries if os.path.isdir(os.path.join(log_directory_path, entry)) + ] + + # Find the log directory for the current run, which is the most recent timestamp. + log_path = max( + log_names, + key=lambda datetime_string: datetime.datetime.strptime( + datetime_string, file_datetime_format + ), + ) + + filepath = pathlib.Path(log_directory_path, log_path, f"{name}.log") + + file_handler = logging.FileHandler(filename=filepath, mode="w") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + return True, Logger(cls.__create_key, logger) def __init__(self, class_create_private_key: object, logger: logging.Logger) -> None: @@ -96,45 +99,50 @@ def __init__(self, class_create_private_key: object, logger: logging.Logger) -> self.logger = logger @staticmethod - def message_and_metadata(message: str, frame: typing.Optional[types.FrameType]) -> str: + def message_and_metadata(message: str, frame: "types.FrameType | None") -> str: """ Extracts metadata from frame and appends it to the message. """ + if frame is None: + return message + + assert frame is not None + function_name = frame.f_code.co_name filename = frame.f_code.co_filename line_number = inspect.getframeinfo(frame).lineno return f"[{filename} | {function_name} | {line_number}] {message}" - def debug(self, message: str, frame: typing.Optional[types.FrameType]) -> None: + def debug(self, message: str, frame: "types.FrameType | None") -> None: """ Logs a debug level message. """ message = self.message_and_metadata(message, frame) self.logger.debug(message) - def info(self, message: str, frame: typing.Optional[types.FrameType]) -> None: + def info(self, message: str, frame: "types.FrameType | None") -> None: """ Logs an info level message. """ message = self.message_and_metadata(message, frame) self.logger.info(message) - def warning(self, message: str, frame: typing.Optional[types.FrameType]) -> None: + def warning(self, message: str, frame: "types.FrameType | None") -> None: """ Logs a warning level message. """ message = self.message_and_metadata(message, frame) self.logger.warning(message) - def error(self, message: str, frame: typing.Optional[types.FrameType]) -> None: + def error(self, message: str, frame: "types.FrameType | None") -> None: """ Logs an error level message. """ message = self.message_and_metadata(message, frame) self.logger.error(message) - def critical(self, message: str, frame: typing.Optional[types.FrameType]) -> None: + def critical(self, message: str, frame: "types.FrameType | None") -> None: """ Logs a critical level message. """ diff --git a/modules/logger/logger_setup_main.py b/modules/logger/logger_setup_main.py new file mode 100644 index 00000000..cbcfd211 --- /dev/null +++ b/modules/logger/logger_setup_main.py @@ -0,0 +1,47 @@ +""" +Logger setup for `main()` . +""" + +import datetime +import inspect +import pathlib + +from . import logger + + +MAIN_LOGGER_NAME = "main" + + +def setup_main_logger(config: "dict") -> "tuple[bool, logger.Logger | None]": + """ + Setup prerequisites for logging in `main()` . + + config: The configuration. + + Returns: Success, logger. + """ + # Get settings + try: + log_directory_path = config["logger"]["directory_path"] + log_path_format = config["logger"]["file_datetime_format"] + start_time = datetime.datetime.now().strftime(log_path_format) + except KeyError as exception: + print(f"ERROR: Config key(s) not found: {exception}") + return False, None + + # Create logging directory + pathlib.Path(log_directory_path).mkdir(exist_ok=True) + pathlib.Path(log_directory_path, start_time).mkdir() + + # Setup logger + result, main_logger = logger.Logger.create(MAIN_LOGGER_NAME, True) + if not result: + print("ERROR: Failed to create main logger") + return False, None + + assert main_logger is not None + + frame = inspect.currentframe() + main_logger.info(f"{MAIN_LOGGER_NAME} logger initialized", frame) + + return True, main_logger diff --git a/tests/integration/test_flight_interface_hardware.py b/tests/integration/test_flight_interface_hardware.py index 191007dd..b7c63640 100644 --- a/tests/integration/test_flight_interface_hardware.py +++ b/tests/integration/test_flight_interface_hardware.py @@ -2,7 +2,10 @@ Simple hardware test, requires drone connection. """ +import pathlib + from modules.flight_interface import flight_interface +from modules.logger import logger MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550" @@ -14,11 +17,18 @@ def main() -> int: """ Main function. """ + # Logger + test_name = pathlib.Path(__file__).stem + result, local_logger = logger.Logger.create(test_name, False) + assert result + assert local_logger is not None + # Setup result, interface = flight_interface.FlightInterface.create( MAVLINK_CONNECTION_ADDRESS, FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_BAUD_RATE, + local_logger, ) assert result assert interface is not None diff --git a/utilities/yaml.py b/utilities/yaml.py new file mode 100644 index 00000000..2377a5e1 --- /dev/null +++ b/utilities/yaml.py @@ -0,0 +1,25 @@ +""" +For YAML files. +""" + +import pathlib +import yaml + + +def open_config(file_path: pathlib.Path) -> "tuple[bool, dict | None]": + """ + Open and decode YAML file. + """ + try: + with file_path.open("r", encoding="utf8") as file: + try: + config = yaml.safe_load(file) + return True, config + except yaml.YAMLError as exception: + print(f"ERROR: Could not parse YAML file: {exception}") + except FileNotFoundError as exception: + print(f"ERROR: YAML file not found: {exception}") + except IOError as exception: + print(f"ERROR: Could not open file: {exception}") + + return False, None