Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added multiprocessing logging module #169

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import argparse
import inspect
import multiprocessing as mp
import pathlib
import queue
Expand All @@ -17,6 +18,8 @@
from modules.flight_interface import flight_interface_worker
from modules.video_input import video_input_worker
from modules.data_merge import data_merge_worker
from modules.multiprocess_logging import multiprocess_logging_worker
from modules.multiprocess_logging import multiprocess_logging
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from utilities.workers import worker_manager
Expand Down Expand Up @@ -92,6 +95,9 @@ def main() -> int:
controller = worker_controller.WorkerController()

mp_manager = mp.Manager()

logging_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager, QUEUE_MAX_SIZE)

video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand All @@ -109,6 +115,16 @@ def main() -> int:
QUEUE_MAX_SIZE,
)

logging_manager = worker_manager.WorkerManager()
logging_manager.create_workers(
1,
multiprocess_logging_worker.multiprocess_logging_worker,
(
logging_queue,
controller,
),
)

video_input_manager = worker_manager.WorkerManager()
video_input_manager.create_workers(
1,
Expand All @@ -118,6 +134,7 @@ def main() -> int:
VIDEO_INPUT_WORKER_PERIOD,
VIDEO_INPUT_SAVE_PREFIX,
video_input_to_detect_target_queue,
logging_queue,
controller,
),
)
Expand Down Expand Up @@ -165,12 +182,24 @@ def main() -> int:
)

# Run
logging_manager.start_workers()
video_input_manager.start_workers()
detect_target_manager.start_workers()
flight_interface_manager.start_workers()
data_merge_manager.start_workers()

try:
frame = inspect.currentframe()
multiprocess_logging.log_message(
"workers started", multiprocess_logging.DEBUG, frame, logging_queue
)
# logging_queue.queue.put((multiprocess_logging.message_and_metadata('workers started', frame), logging.INFO), block=False)
except queue.Full:
pass

count = 0
while True:
count += 1
try:
merged_data = data_merge_to_main_queue.queue.get_nowait()
except queue.Empty:
Expand Down Expand Up @@ -200,11 +229,13 @@ def main() -> int:
# Teardown
controller.request_exit()

logging_queue.fill_and_drain_queue()
video_input_to_detect_target_queue.fill_and_drain_queue()
detect_target_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_data_merge_queue.fill_and_drain_queue()
data_merge_to_main_queue.fill_and_drain_queue()

logging_manager.join_workers()
video_input_manager.join_workers()
detect_target_manager.join_workers()
flight_interface_manager.join_workers()
Expand Down
2 changes: 1 addition & 1 deletion modules/common
Submodule common updated 46 files
+0 −44 .github/workflows/run-tests.yml
+33 −0 .gitignore
+3 −6 camera/test_camera.py
+51 −0 comms/TelemMessages/GroundStation.lcm
+24 −0 comms/TelemMessages/common.lcm
+25 −0 comms/TelemMessages/jetson.lcm
+60 −0 comms/TelemMessages/messages.json
+31 −0 comms/Tools/build_messages.sh
+10 −0 comms/Tools/install_lcm.sh
+12 −0 comms/Tools/list_usb.sh
+15 −0 comms/Tools/message_include.sh
+187 −0 comms/Tools/py_gen_helpers.py
+86 −0 comms/modules/TelemMessages/GroundStationData.py
+74 −0 comms/modules/TelemMessages/GroundStationDisarm.py
+80 −0 comms/modules/TelemMessages/GroundStationPIDSetResponse.py
+81 −0 comms/modules/TelemMessages/GroundStationPIDValues.py
+83 −0 comms/modules/TelemMessages/GroundStationWaypoints.py
+70 −0 comms/modules/TelemMessages/Header.py
+74 −0 comms/modules/TelemMessages/JetsonLandingInitiationCommand.py
+74 −0 comms/modules/TelemMessages/JetsonMovementRequest.py
+77 −0 comms/modules/TelemMessages/JetsonOdometryData.py
+78 −0 comms/modules/TelemMessages/JetsonRelativeMovementCommand.py
+71 −0 comms/modules/TelemMessages/PIDController.py
+66 −0 comms/modules/TelemMessages/PIDValues.py
+77 −0 comms/modules/TelemMessages/SensorData.py
+67 −0 comms/modules/TelemMessages/Waypoint.py
+19 −0 comms/modules/TelemMessages/__init__.py
+0 −0 comms/modules/__init__.py
+89 −0 comms/modules/generic_comms_device.py
+61 −0 comms/modules/helper.py
+21 −0 comms/receive.py
+28 −0 comms/test_tx.py
+19 −0 comms/transmit.py
+0 −0 kml/__init__.py
+3 −3 kml/expected_document.kml
+0 −50 kml/modules/ground_locations_to_kml.py
+0 −54 kml/modules/location_ground.py
+45 −0 kml/modules/waypoints_to_kml.py
+0 −55 kml/test_ground_locations_to_kml.py
+45 −0 kml/test_waypoints_to_kml.py
+0 −0 mavlink/__init__.py
+0 −96 mavlink/modules/flight_controller.py
+1 −10 mavlink/test_flight_controller.py
+0 −140 mavlink/test_mission_ended.py
+0 −0 qr/__init__.py
+8 −11 qr/test_qr.py
Empty file.
20 changes: 20 additions & 0 deletions modules/multiprocess_logging/multiprocess_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import inspect
import logging

DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
Comment on lines +4 to +8
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this so so that modules trying to log don't have to import the logging module just for the logging levels

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we create a separate file and store these values as an enum?



def message_and_metadata(message, frame):
function_name = frame.f_code.co_name
filename = frame.f_code.co_filename
line_number = inspect.getframeinfo(frame).lineno

return f"[{filename} | {function_name} | {line_number}] {message}"


def log_message(message, level, frame, queue):
queue.queue.put((message_and_metadata(message, frame), level), block=False)
49 changes: 49 additions & 0 deletions modules/multiprocess_logging/multiprocess_logging_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import datetime
import logging
import queue

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller


def multiprocess_logging_worker(
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
):
logger = logging.getLogger("airside_logger")

filename = f"logs/{datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}.log"
file_handler = logging.FileHandler(filename=filename, mode="w") # Handles logging to file
stream_handler = logging.StreamHandler() # Handles logging to terminal

formatter = logging.Formatter(
fmt="%(asctime)s: [%(levelname)s] %(message)s",
datefmt="%I:%M:%S",
)

file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

while not controller.is_exit_requested():
controller.check_pause()

logging_message, level = input_queue.queue.get()

if logging_message is None:
break

if level == logging.DEBUG:
logger.debug(f"{logging_message}")
elif level == logging.INFO:
logger.info(f"{logging_message}")
elif level == logging.WARNING:
logger.warning(f"{logging_message}")
elif level == logging.ERROR:
logger.error(f"{logging_message}")
elif level == logging.CRITICAL:
logger.critical(f"{logging_message}")
23 changes: 22 additions & 1 deletion modules/video_input/video_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
Combines image and timestamp together.
"""

import inspect
import queue

from utilities.workers import queue_proxy_wrapper
from ..common.camera.modules import camera_device
from ..multiprocess_logging import multiprocess_logging
from .. import image_and_time


Expand All @@ -11,14 +16,30 @@ class VideoInput:
Combines image and timestamp together.
"""

def __init__(self, camera_name: "int | str", save_name: str = "") -> None:
def __init__(
self,
logging_queue: queue_proxy_wrapper.QueueProxyWrapper,
camera_name: "int | str",
save_name: str = "",
) -> None:
self.device = camera_device.CameraDevice(camera_name, 1, save_name)
self.logging_queue = logging_queue

def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]":
"""
Returns a possible ImageAndTime with current timestamp.
"""
result, image = self.device.get_image()

try:
frame = inspect.currentframe()
multiprocess_logging.log_message(
f"image size {image.shape}", multiprocess_logging.DEBUG, frame, self.logging_queue
)
# self.logging_queue.queue.put((multiprocess_logging.message_and_metadata(f'image size {image.shape}', frame), logging.DEBUG), block=False)
except queue.Full:
pass

if not result:
return False, None

Expand Down
15 changes: 14 additions & 1 deletion modules/video_input/video_input_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
Gets images from the camera.
"""

import inspect
import queue
import time

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from ..multiprocess_logging import multiprocess_logging
from . import video_input


Expand All @@ -14,6 +17,7 @@ def video_input_worker(
period: float,
save_name: str,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
logging_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Expand All @@ -25,7 +29,16 @@ def video_input_worker(
output_queue is the data queue.
controller is how the main process communicates to this worker process.
"""
input_device = video_input.VideoInput(camera_name, save_name)
input_device = video_input.VideoInput(logging_queue, camera_name, save_name)

try:
frame = inspect.currentframe()
multiprocess_logging.log_message(
"video_input started", multiprocess_logging.DEBUG, frame, logging_queue
)
# logging_queue.queue.put((multiprocess_logging.message_and_metadata('video_input started', frame), logging.DEBUG), block=False)
except queue.Full:
pass

while not controller.is_exit_requested():
controller.check_pause()
Expand Down
Loading