Skip to content

Commit

Permalink
added logging process and queue in main
Browse files Browse the repository at this point in the history
  • Loading branch information
TongguangZhang committed Mar 24, 2024
1 parent ef430c1 commit b8a9b9a
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import argparse
import inspect
import multiprocessing as mp
import pathlib
import queue
Expand All @@ -17,6 +18,8 @@
from modules.flight_interface import flight_interface_worker
from modules.video_input import video_input_worker
from modules.data_merge import data_merge_worker
from modules.multiprocess_logging import multiprocess_logging_worker
from modules.multiprocess_logging import multiprocess_logging
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from utilities.workers import worker_manager
Expand Down Expand Up @@ -92,6 +95,12 @@ def main() -> int:
controller = worker_controller.WorkerController()

mp_manager = mp.Manager()

logging_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE
)

video_input_to_detect_target_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand All @@ -109,6 +118,16 @@ def main() -> int:
QUEUE_MAX_SIZE,
)

logging_manager = worker_manager.WorkerManager()
logging_manager.create_workers(
1,
multiprocess_logging_worker.multiprocess_logging_worker,
(
logging_queue,
controller,
)
)

video_input_manager = worker_manager.WorkerManager()
video_input_manager.create_workers(
1,
Expand All @@ -118,6 +137,7 @@ def main() -> int:
VIDEO_INPUT_WORKER_PERIOD,
VIDEO_INPUT_SAVE_PREFIX,
video_input_to_detect_target_queue,
logging_queue,
controller,
),
)
Expand Down Expand Up @@ -165,12 +185,22 @@ def main() -> int:
)

# Run
logging_manager.start_workers()
video_input_manager.start_workers()
detect_target_manager.start_workers()
flight_interface_manager.start_workers()
data_merge_manager.start_workers()

try:
frame = inspect.currentframe()
multiprocess_logging.log_message('workers started', multiprocess_logging.DEBUG, frame, logging_queue)
# logging_queue.queue.put((multiprocess_logging.message_and_metadata('workers started', frame), logging.INFO), block=False)
except queue.Full:
pass

count = 0
while True:
count += 1
try:
merged_data = data_merge_to_main_queue.queue.get_nowait()
except queue.Empty:
Expand Down Expand Up @@ -200,11 +230,13 @@ def main() -> int:
# Teardown
controller.request_exit()

logging_queue.fill_and_drain_queue()
video_input_to_detect_target_queue.fill_and_drain_queue()
detect_target_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_data_merge_queue.fill_and_drain_queue()
data_merge_to_main_queue.fill_and_drain_queue()

logging_manager.join_workers()
video_input_manager.join_workers()
detect_target_manager.join_workers()
flight_interface_manager.join_workers()
Expand Down

0 comments on commit b8a9b9a

Please sign in to comment.