Skip to content

Commit

Permalink
More robust logger with file logging as an option (#186)
Browse files Browse the repository at this point in the history
* More robust logger with file logging as an option

* Clearer worker logging template
  • Loading branch information
Xierumeng authored Jul 8, 2024
1 parent 8f5f0fc commit 18ffc37
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 123 deletions.
43 changes: 41 additions & 2 deletions documentation/main_multiprocess_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@
```
"""

import inspect
import multiprocessing as mp
import pathlib
import time

from documentation.multiprocess_example.add_random import add_random_worker
from documentation.multiprocess_example.concatenator import concatenator_worker
from documentation.multiprocess_example.countup import countup_worker
from modules.logger import logger_setup_main
from utilities import yaml
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from utilities.workers import worker_manager


CONFIG_FILE_PATH = pathlib.Path("config.yaml")

# Play with these numbers to see queue bottlenecks
COUNTUP_TO_ADD_RANDOM_QUEUE_MAX_SIZE = 5
ADD_RANDOM_TO_CONCATENATOR_QUEUE_MAX_SIZE = 5
Expand All @@ -26,6 +32,22 @@ def main() -> int:
"""
Main function.
"""
# Configuration settings
result, config = yaml.open_config(CONFIG_FILE_PATH)
if not result:
print("ERROR: Failed to load configuration file")
return -1

assert config is not None

# Setup main logger
result, main_logger = logger_setup_main.setup_main_logger(config)
if not result:
print("ERROR: Failed to create main logger")
return -1

assert main_logger is not None

# Main is managing all worker processes and is responsible
# for creating supporting interprocess communication
controller = worker_controller.WorkerController()
Expand Down Expand Up @@ -125,27 +147,44 @@ def main() -> int:
add_random_manager.start_workers()
concatenator_manager.start_workers()

frame = inspect.currentframe()
main_logger.info("Started", frame)

# Run for some time and then pause
time.sleep(2)
controller.request_pause()
print("Paused")

frame = inspect.currentframe()
main_logger.info("Paused", frame)

time.sleep(4)
print("Resumed")
controller.request_resume()
frame = inspect.currentframe()
main_logger.info("Resumed", frame)

time.sleep(2)

# Stop the processes
controller.request_exit()

frame = inspect.currentframe()
main_logger.info("Requested exit", frame)

# Fill and drain queues from END TO START
countup_to_add_random_queue.fill_and_drain_queue()
add_random_to_concatenator_queue.fill_and_drain_queue()

frame = inspect.currentframe()
main_logger.info("Queues cleared", frame)

# Clean up worker processes
countup_manager.join_workers()
add_random_manager.join_workers()
concatenator_manager.join_workers()

frame = inspect.currentframe()
main_logger.info("Stopped", frame)

# We can reset controller in case we want to reuse it
# Alternatively, create a new WorkerController instance
controller.clear_exit()
Expand Down
14 changes: 12 additions & 2 deletions documentation/multiprocess_example/add_random/add_random.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
Contains the AddRandom class.
"""

import inspect
import time
import random

from modules.logger import logger
from .. import intermediate_struct


class AddRandom:
"""
Adds a random number to the input.
A new random number is generated every `__ADD_SWITCH_COUNT` times.
A new random number is generated every `__add_change_count` times.
"""

def __init__(self, seed: int, max_random_term: int, add_change_count: int) -> None:
def __init__(
self, seed: int, max_random_term: int, add_change_count: int, local_logger: logger.Logger
) -> None:
"""
Constructor seeds the RNG and sets the max add and
number of adds before a new random number is chosen.
Expand All @@ -30,6 +34,8 @@ def __init__(self, seed: int, max_random_term: int, add_change_count: int) -> No
self.__current_random_term = self.__generate_random_number(0, self.__max_random_term)
self.__add_count = 0

self.__logger = local_logger

@staticmethod
def __generate_random_number(min_value: int, max_value: int) -> int:
"""
Expand All @@ -41,6 +47,10 @@ def run_add_random(self, term: int) -> "tuple[bool, intermediate_struct.Intermed
"""
Adds a random number to the input and returns the sum.
"""
# Log
frame = inspect.currentframe()
self.__logger.debug("Run", frame)

add_sum = term + self.__current_random_term

# Change the random term if the add count has been reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
Intermediate worker that adds a random number to the input.
"""

import inspect
import os
import pathlib

from modules.logger import logger
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import add_random
Expand All @@ -22,8 +27,24 @@ def add_random_worker(
input_queue and output_queue are the data queues.
controller is how the main process communicates to this worker process.
"""
# Instantiate logger
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

# Get Pylance to stop complaining
assert local_logger is not None

frame = inspect.currentframe()
local_logger.info("Logger initialized", frame)

# Instantiate class object
add_random_instance = add_random.AddRandom(seed, max_random_term, add_change_count)
add_random_instance = add_random.AddRandom(
seed, max_random_term, add_change_count, local_logger
)

# Loop forever until exit has been requested or sentinel value (consumer)
while not controller.is_exit_requested():
Expand Down
12 changes: 10 additions & 2 deletions documentation/multiprocess_example/concatenator/concatenator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
Contains the Concatenator class.
"""

import inspect
import time

from modules.logger import logger
from .. import intermediate_struct


Expand All @@ -12,20 +14,26 @@ class Concatenator:
Concatenates a prefix and suffix to the object.
"""

def __init__(self, prefix: str, suffix: str) -> None:
def __init__(self, prefix: str, suffix: str, local_logger: logger.Logger) -> None:
"""
Constructor sets the prefix and suffix.
"""
self.__prefix = prefix
self.__suffix = suffix

self.__logger = local_logger

# The working function
def run_concatenation(
self, middle: intermediate_struct.IntermediateStruct
) -> "tuple[bool, str]":
"""
Concatenate the prefix and suffix to the input.
"""
# Log
frame = inspect.currentframe()
self.__logger.debug("Run", frame)

# The class is responsible for unpacking the intermediate type
# Validate input
input_number = middle.number
Expand All @@ -34,7 +42,7 @@ def run_concatenation(
# Function returns result and the output
return False, ""

# Print string
# String to be printed
concatenated_string = self.__prefix + str(input_number) + self.__suffix

# Pretending this is hard at work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
Ending worker that concatenates a prefix and suffix and then prints the result.
"""

import inspect
import os
import pathlib

from modules.logger import logger
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import concatenator
Expand All @@ -20,8 +25,22 @@ def concatenator_worker(
input_queue is the data queue.
controller is how the main process communicates to this worker process.
"""
# Instantiate logger
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

# Get Pylance to stop complaining
assert local_logger is not None

frame = inspect.currentframe()
local_logger.info("Logger initialized", frame)

# Instantiate class object
concatenator_instance = concatenator.Concatenator(prefix, suffix)
concatenator_instance = concatenator.Concatenator(prefix, suffix, local_logger)

# Loop forever until exit has been requested or sentinel value (consumer)
while not controller.is_exit_requested():
Expand All @@ -46,5 +65,5 @@ def concatenator_worker(
if not result:
continue

# Print the string
print(value)
# Print just the string
local_logger.info(str(value), None)
13 changes: 12 additions & 1 deletion documentation/multiprocess_example/countup/countup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,37 @@
Contains the Countup class.
"""

import inspect
import time

from modules.logger import logger


class Countup:
"""
Increments its internal counter and outputs current counter.
"""

def __init__(self, start_thousands: int, max_iterations: int) -> None:
def __init__(
self, start_thousands: int, max_iterations: int, local_logger: logger.Logger
) -> None:
"""
Constructor initializes the start and max points.
"""
self.__start_count = start_thousands * 1000
self.__max_count = self.__start_count + max_iterations
self.__current_count = self.__start_count

self.__logger = local_logger

def run_countup(self) -> "tuple[bool, int]":
"""
Counts upward.
"""
# Log
frame = inspect.currentframe()
self.__logger.debug("Run", frame)

# Increment counter
self.__current_count += 1
if self.__current_count > self.__max_count:
Expand Down
21 changes: 20 additions & 1 deletion documentation/multiprocess_example/countup/countup_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
Beginning worker that counts up from a starting value.
"""

import inspect
import os
import pathlib

from modules.logger import logger
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import countup
Expand All @@ -22,8 +27,22 @@ def countup_worker(
output_queue is the data queue.
worker_manager is how the main process communicates to this worker process.
"""
# Instantiate logger
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

# Get Pylance to stop complaining
assert local_logger is not None

frame = inspect.currentframe()
local_logger.info("Logger initialized", frame)

# Instantiate class object
countup_instance = countup.Countup(start_thousands, max_iterations)
countup_instance = countup.Countup(start_thousands, max_iterations, local_logger)

# Loop forever until exit has been requested (producer)
while not controller.is_exit_requested():
Expand Down
Loading

0 comments on commit 18ffc37

Please sign in to comment.