Skip to content

Commit

Permalink
Merge pull request #884 from roflcoopter/feature/shutdown-sequence-im…
Browse files Browse the repository at this point in the history
…provements

improve shutdown sequence and make sure all threads are stopped
  • Loading branch information
roflcoopter authored Jan 10, 2025
2 parents 4736501 + 4fc96b0 commit 4020a2b
Show file tree
Hide file tree
Showing 16 changed files with 125 additions and 41 deletions.
1 change: 1 addition & 0 deletions requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ pyupgrade==3.15.2

# types
types-requests==2.31.0.6
types-psutil
types-python-slugify
pygobject-stubs
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MockViseron(Viseron):
"""Protocol for mocking Viseron."""

def __init__(self) -> None:
super().__init__()
super().__init__(start_background_scheduler=False)
self.register_domain = Mock( # type: ignore[method-assign]
side_effect=self.register_domain,
)
Expand Down
8 changes: 4 additions & 4 deletions tests/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_setup_viseron_nvr_loaded(vis, caplog):
with patch("viseron.load_config") as mocked_load_config:
mocked_load_config.return_value = "Testing"
with patch("viseron.components.get_component"):
setup_viseron()
setup_viseron(start_background_scheduler=False)

mocked_setup_components.assert_called_once()
mocked_setup_domains.assert_called_once()
Expand Down Expand Up @@ -86,7 +86,7 @@ def test_setup_viseron_nvr_missing(vis, caplog):
with patch("viseron.load_config") as mocked_load_config:
mocked_load_config.return_value = "Testing"
with patch("viseron.components.get_component"):
setup_viseron()
setup_viseron(start_background_scheduler=False)

mocked_setup_components.assert_called_once()
mocked_setup_component.assert_called_once()
Expand Down Expand Up @@ -119,7 +119,7 @@ def test_setup_viseron_cameras_missing(caplog):
with patch("viseron.load_config") as mocked_load_config:
mocked_load_config.return_value = "Testing"
with patch("viseron.components.get_component"):
setup_viseron()
setup_viseron(start_background_scheduler=False)

mocked_setup_components.assert_called_once()
mocked_setup_component.assert_not_called()
Expand All @@ -144,7 +144,7 @@ def test_setup_viseron_cameras_missing_nvr_loaded(caplog):
with patch("viseron.load_config") as mocked_load_config:
mocked_load_config.return_value = "Testing"
with patch("viseron.components.get_component"):
setup_viseron()
setup_viseron(start_background_scheduler=False)

mocked_setup_components.assert_called_once()
mocked_setup_component.assert_not_called()
Expand Down
28 changes: 23 additions & 5 deletions viseron/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class Viseron:
"""Viseron."""

def __init__(self, start_background_scheduler=True) -> None:
self.logger = LOGGER
self.states = States(self)

self.setup_threads: list[threading.Thread] = []
Expand All @@ -269,12 +270,16 @@ def __init__(self, start_background_scheduler=True) -> None:
self._domain_register_lock = threading.Lock()
self.data[REGISTERED_DOMAINS] = {}

self._thread_watchdog: ThreadWatchDog | None = None
self._subprocess_watchdog: SubprocessWatchDog | None = None
self._process_watchdog: ProcessWatchDog | None = None

self.background_scheduler = BackgroundScheduler(timezone="UTC", daemon=True)
if start_background_scheduler:
self.background_scheduler.start()
self._thread_watchdog = ThreadWatchDog(self)
self._subprocess_watchdog = SubprocessWatchDog(self)
self._process_watchdog = ProcessWatchDog(self)
self._thread_watchdog = ThreadWatchDog(self)
self._subprocess_watchdog = SubprocessWatchDog(self)
self._process_watchdog = ProcessWatchDog(self)

self.storage: Storage | None = None

Expand Down Expand Up @@ -317,7 +322,8 @@ def register_signal_handler(self, viseron_signal, callback):
)
return False

return self.data[DATA_STREAM_COMPONENT].subscribe_data(
data_stream: DataStream = self.data[DATA_STREAM_COMPONENT]
return data_stream.subscribe_data(
VISERON_SIGNALS[viseron_signal], callback, stage=viseron_signal
)

Expand Down Expand Up @@ -541,9 +547,18 @@ def shutdown(self) -> None:
if self.data.get(DATA_STREAM_COMPONENT, None):
data_stream: DataStream = self.data[DATA_STREAM_COMPONENT]

if (
self._thread_watchdog
and self._subprocess_watchdog
and self._process_watchdog
):
self._thread_watchdog.stop()
self._subprocess_watchdog.stop()
self._process_watchdog.stop()

try:
self.background_scheduler.shutdown(wait=False)
self.background_scheduler.remove_all_jobs()
self.background_scheduler.shutdown(wait=False)
except SchedulerNotRunningError as err:
LOGGER.warning(f"Failed to shutdown scheduler: {err}")

Expand All @@ -559,6 +574,9 @@ def shutdown(self) -> None:

if data_stream:
data_stream.remove_all_subscriptions()
data_stream.stop()
data_stream.join()

LOGGER.info("Shutdown complete in %.1f seconds", timer() - start)

def add_entity(self, component: str, entity: Entity):
Expand Down
10 changes: 7 additions & 3 deletions viseron/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Start Viseron."""
from __future__ import annotations

import logging
import multiprocessing as mp
import os
import signal
Expand All @@ -10,6 +11,8 @@

from viseron import Viseron, setup_viseron

LOGGER = logging.getLogger("viseron.main")


def main():
"""Start Viseron."""
Expand All @@ -20,11 +23,12 @@ def signal_term(*_) -> None:
viseron.shutdown()

def shutdown_failed():
print("Shutdown failed. Exiting forcefully.")
print(f"Active threads: {threading.enumerate()}")
print(f"Active processes: {mp.active_children()}")
LOGGER.debug("Shutdown failed. Exiting forcefully.")
LOGGER.debug(f"Active threads: {threading.enumerate()}")
LOGGER.debug(f"Active processes: {mp.active_children()}")
os.kill(os.getpid(), signal.SIGKILL)

LOGGER.debug(f"Active threads: {threading.enumerate()}")
shutdown_timer = Timer(2, shutdown_failed, args=())
shutdown_timer.daemon = True
shutdown_timer.start()
Expand Down
24 changes: 19 additions & 5 deletions viseron/components/data_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time
import uuid
from collections.abc import Callable
from queue import Queue
from queue import Empty, Queue
from typing import Any, TypedDict

from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -72,10 +72,11 @@ def __init__(self, vis) -> None:
self._max_threads = self._get_max_threads()
LOGGER.debug(f"Max threads: {self._max_threads}")

data_consumer = RestartableThread(
self._kill_received = False
self._data_consumer = RestartableThread(
name="data_stream", target=self.consume_data, daemon=True, register=True
)
data_consumer.start()
self._data_consumer.start()

def _get_max_threads(self) -> int:
"""Get the maximum number of threads allowed."""
Expand Down Expand Up @@ -283,7 +284,20 @@ def wildcard_subscriptions(self, data_item: dict[str, Any]) -> None:

def consume_data(self) -> None:
"""Publish data to topics."""
while True:
data_item = self._data_queue.get()
while not self._kill_received:
try:
data_item = self._data_queue.get(timeout=0.1)
except Empty:
continue

self.static_subscriptions(data_item)
self.wildcard_subscriptions(data_item)
LOGGER.debug("Data stream stopped")

def join(self) -> None:
"""Join the data stream."""
self._data_consumer.join()

def stop(self) -> None:
"""Stop the data stream."""
self._kill_received = True
23 changes: 16 additions & 7 deletions viseron/components/ffmpeg/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def __init__(

self._pipe: sp.Popen | None = None
self._segment_process: RestartablePopen | None = None
self._log_pipe = LogPipe(
self._logger, FFMPEG_LOGLEVELS[config[CONFIG_FFMPEG_LOGLEVEL]]
)
self._log_pipe: LogPipe | None = None
self._ffprobe = FFprobe(config, camera_identifier)

self._mainstream = self.get_stream_information(config)
Expand Down Expand Up @@ -481,6 +479,12 @@ def build_command(self):

def pipe(self):
"""Return subprocess pipe for FFmpeg."""
if self._log_pipe:
self._log_pipe.close()
self._log_pipe = LogPipe(
self._logger, FFMPEG_LOGLEVELS[self._config[CONFIG_FFMPEG_LOGLEVEL]]
)

if self._config.get(CONFIG_SUBSTREAM, None):
self._segment_process = RestartablePopen(
self.build_segment_command(),
Expand Down Expand Up @@ -524,6 +528,8 @@ def close_pipe(self) -> None:
self._pipe.communicate()
except AttributeError as error:
self._logger.error("Failed to close pipe: %s", error)
if self._log_pipe:
self._log_pipe.close()

def poll(self):
"""Poll pipe."""
Expand Down Expand Up @@ -556,9 +562,6 @@ class FFprobe:
def __init__(self, config: dict[str, Any], camera_identifier: str) -> None:
self._logger = logging.getLogger(__name__ + "." + camera_identifier)
self._config = config
self._log_pipe = LogPipe(
self._logger, FFPROBE_LOGLEVELS[config[CONFIG_FFPROBE_LOGLEVEL]]
)
self._ffprobe_timeout = FFPROBE_TIMEOUT

def stream_information(
Expand Down Expand Up @@ -638,10 +641,14 @@ def run_ffprobe(
reraise=True,
):
with attempt:
log_pipe = LogPipe(
self._logger,
FFPROBE_LOGLEVELS[self._config[CONFIG_FFPROBE_LOGLEVEL]],
)
pipe = sp.Popen( # type: ignore[call-overload]
ffprobe_command,
stdout=sp.PIPE,
stderr=self._log_pipe,
stderr=log_pipe,
)
try:
stdout, _ = pipe.communicate(timeout=self._ffprobe_timeout)
Expand All @@ -652,6 +659,8 @@ def run_ffprobe(
ffprobe_timeout = self._ffprobe_timeout
self._ffprobe_timeout += FFPROBE_TIMEOUT
raise FFprobeTimeout(ffprobe_timeout) from error
finally:
log_pipe.close()
self._ffprobe_timeout = FFPROBE_TIMEOUT

try:
Expand Down
8 changes: 0 additions & 8 deletions viseron/components/nvr/nvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,14 +741,6 @@ def stop(self) -> None:
if self._camera.is_recording:
self._camera.stop_recorder()

# Empty frame queue before exiting
while True:
try:
shared_frame = self._frame_queue.get(timeout=1)
except Empty:
break
self._camera.shared_frames.remove(shared_frame, self._camera)

for timer in self._removal_timers:
timer.cancel()

Expand Down
4 changes: 3 additions & 1 deletion viseron/domains/camera/fragmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ def _shutdown(self) -> None:
self._logger.debug("Camera stopped, running final fragmentation")
self._create_fragmented_mp4()
self._logger.debug("Fragment thread shutdown complete")
self._log_pipe.close()
self._log_pipe_ffmpeg.close()

def concatenate_fragments(
self, fragments: list[Fragment], media_sequence=0
Expand All @@ -332,7 +334,7 @@ def concatenate_fragments(
end=True,
file_directive=True,
)
self._logger.debug(f"HLS Playlist for contatenation: {playlist}")
self._logger.debug(f"HLS Playlist for concatenation: {playlist}")
ffmpeg_cmd = (
[
"ffmpeg",
Expand Down
18 changes: 15 additions & 3 deletions viseron/domains/post_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from queue import Queue
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any

import voluptuous as vol
Expand All @@ -13,6 +13,7 @@
from viseron.components.storage import Storage
from viseron.components.storage.const import COMPONENT as STORAGE_COMPONENT
from viseron.components.storage.models import PostProcessorResults
from viseron.const import VISERON_SIGNAL_SHUTDOWN
from viseron.domains.camera.const import DOMAIN as CAMERA_DOMAIN
from viseron.domains.object_detector.const import (
EVENT_OBJECTS_IN_FOV,
Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(self, vis: Viseron, config, camera_identifier) -> None:
else:
self._logger.debug(f"Post processor will run for labels: {self._labels}")

self._kill_received = False
self._post_processor_queue: Queue[Event[EventDetectedObjectsData]] = Queue(
maxsize=1
)
Expand All @@ -110,11 +112,16 @@ def __init__(self, vis: Viseron, config, camera_identifier) -> None:
),
self._post_processor_queue,
)
vis.register_signal_handler(VISERON_SIGNAL_SHUTDOWN, self.stop)

def post_process(self) -> None:
"""Post processor loop."""
while True:
event_data = self._post_processor_queue.get()
while not self._kill_received:
try:
event_data = self._post_processor_queue.get(timeout=1)
except Empty:
continue

detected_objects_data = event_data.data

if detected_objects_data.shared_frame is None:
Expand Down Expand Up @@ -147,6 +154,7 @@ def post_process(self) -> None:
zone=detected_objects_data.zone,
)
)
self._logger.debug(f"Post processor {self.__class__.__name__} stopped")

@abstractmethod
def process(self, post_processor_frame: PostProcessorFrame):
Expand All @@ -165,3 +173,7 @@ def _insert_result(
)
session.execute(stmt)
session.commit()

def stop(self) -> None:
"""Stop post processor."""
self._kill_received = True
7 changes: 6 additions & 1 deletion viseron/helpers/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __init__(
self._output_level_func = output_level_func
self._read_filedescriptor, self._write_filedescriptor = os.pipe()
self.pipe_reader = os.fdopen(self._read_filedescriptor)
self._kill_received = False
self.start()

def fileno(self):
Expand All @@ -176,7 +177,8 @@ def fileno(self):

def run(self) -> None:
"""Run the thread, logging everything."""
for line in iter(self.pipe_reader.readline, ""):
while not self._kill_received:
line = self.pipe_reader.readline()
log_str = line.strip().strip("\n")
if not log_str:
continue
Expand All @@ -199,10 +201,13 @@ def run(self) -> None:
else:
self._logger.log(logging.ERROR, log_str)

self._logger.debug("LogPipe thread ended")
self.pipe_reader.close()

def close(self) -> None:
"""Close the write end of the pipe."""
self._logger.debug("Closing LogPipe")
self._kill_received = True
os.close(self._write_filedescriptor)


Expand Down
1 change: 1 addition & 0 deletions viseron/helpers/subprocess_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def stop(self) -> None:
)
self._process_frames_proc.kill()
self._process_frames_proc.communicate()
self._log_pipe.close()
LOGGER.debug(f"{self.subprocess_name} exited")


Expand Down
Loading

0 comments on commit 4020a2b

Please sign in to comment.