Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart worker patch #229

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions utilities/workers/queue_proxy_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import multiprocessing.managers
import queue
import time
from typing import Any


class QueueProxyWrapper:
Expand Down Expand Up @@ -49,6 +50,23 @@ def drain_queue(self, timeout: float = 0.0) -> None:
except queue.Empty:
return

def get_queue_items(self, timeout: float = 0.0) -> "list[Any] | None":
"""
Gets items from the queue and returns them as a list.

timeout: Time waiting in seconds before giving up; must be greater than 0.
"""
items = []

for item in range(0, self.queue.qsize()):
items.append(item)

if not items:
return None
else:
return items


def fill_and_drain_queue(self) -> None:
"""
Fill with sentinel and then drain.
Expand Down
27 changes: 26 additions & 1 deletion utilities/workers/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,18 @@

import multiprocessing as mp

from modules import detection_in_world
from modules import geolocation
from modules.common.modules.logger import logger
from utilities.workers import worker_controller
from utilities.workers import queue_proxy_wrapper
from modules.detect_target import detect_target_worker
from modules.detections_and_time import DetectionsAndTime
from modules.flight_interface import flight_interface_worker
from modules.video_input import video_input_worker
from modules.data_merge import data_merge_worker
from modules.geolocation import geolocation_worker
from modules.detect_target.detect_target_ultralytics import DetectTargetUltralytics


class WorkerProperties:
Expand Down Expand Up @@ -248,7 +257,23 @@ def check_and_restart_dead_workers(self) -> bool:
# caused the worker to fail. Draining the succeeding queues is not needed
# because a worker that died would not have put bad data into the queue.
input_queues = self.__worker_properties.get_input_queues()

type = None
match self.__worker_properties.get_worker_target():
case detect_target_worker.detect_target_worker:
type = DetectionsAndTime
case geolocation_worker.geolocation_worker:
type = detection_in_world.DetectionInWorld
case _:
type = None

for queue in input_queues:
queue.drain_queue()
# TODO: Implement the check for type of worker and check queue type for all queue types
items = queue.get_queue_items()

if items is not None and type is not None:
for item in items:
if isinstance(item, type):
queue.queue.put(item)

return True
Loading