From 1f6de948185c07c790f2b9d701cf469dc89274d2 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 13 Aug 2024 15:49:25 +0200 Subject: [PATCH] Implement timeout for shutdown after a deadlock Improve documentation of the DeadlockDetector and add unit tests --- .../libmuscle/manager/deadlock_detector.py | 146 ++++++++++++++---- .../manager/test/test_deadlock_detector.py | 106 +++++++++++++ 2 files changed, 226 insertions(+), 26 deletions(-) create mode 100644 libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py diff --git a/libmuscle/python/libmuscle/manager/deadlock_detector.py b/libmuscle/python/libmuscle/manager/deadlock_detector.py index e7f820dc..dc3c5111 100644 --- a/libmuscle/python/libmuscle/manager/deadlock_detector.py +++ b/libmuscle/python/libmuscle/manager/deadlock_detector.py @@ -1,7 +1,8 @@ import logging from threading import Thread +import time from typing import Callable, Dict, List, Optional, Tuple -from queue import Queue +from queue import Empty, Queue _logger = logging.getLogger(__name__) @@ -9,30 +10,77 @@ class DeadlockDetector(Thread): - """TODO""" + """The DeadlockDetector attempts to detect when multiple instances are stuck waiting + for each other. + + This class is responsible for handling WAITING_FOR_RECEIVE and + WAITING_FOR_RECEIVE_DONE MMP messages, which are submitted by the MMPServer. + + When a deadlock is detected, the cycle of instances that is waiting on each other is + logged with FATAL severity. If this deadlock does not get resoled in + ``wait_before_shutdown`` seconds, the simulation is shut down. + """ def __init__( self, shutdown_callback: Callable[[], None], wait_before_shutdown: float ) -> None: + """Construct a new DeadlockDetector thread. + + Args: + shutdown_callback: function to execute when a deadlock is detected. This + callback (which is executed in this thread!) is responsible for stopping + the simulation when a deadlock is detected. + wait_before_shutdown: Number of seconds to wait before executing + :param:`shutdown_callback` after a deadlock is detected. If the deadlock + is resolved (although this is unlikely), the simulation will not shut + down. + """ super().__init__(name="DeadlockDetector") self._shutdown_callback = shutdown_callback self._wait_before_shutdown = wait_before_shutdown self._queue: Queue[Optional[_QueueItem]] = Queue() + """Queue of incoming messages. Incoming messages can come in any communication + thread and will be consumed and processed in this worker thread. + """ self._waiting_instances: Dict[str, str] = {} + """Maps instance IDs to the peer instance IDs they are waiting for.""" self._waiting_instance_ports: Dict[str, Tuple[str, Optional[int]]] = {} + """Maps instance IDs to the port/slot they are waiting on..""" - self._detected_deadlocks: List[str] = [] + self._detected_deadlocks: List[List[str]] = [] + """List of deadlocked instance cycles. Set by _handle_potential_deadlock. + """ + self._shutdown_time: Optional[float] = None + """Future time when we confirm the potential deadlock and abort the simulation. + """ def run(self) -> None: """Logic that is executed in the thread.""" while True: - item = self._queue.get() - if item is None: # Shutdown sentinal + # Set a timeout when a deadlock was detected + timeout = None + if self._shutdown_time is not None: + timeout = max(0, self._shutdown_time - time.monotonic()) + + # Grab a new item from the queue, this raises Empty when timeout expires: + try: + item = self._queue.get(timeout=timeout) + if item is None: # On shutdown, None is pushed to the queue + return # exit thread + self._process_queue_item(item) + + except Empty: + # timeout expired and queue is empty, call shutdown callback + formatted_deadlocks = "\n\n".join( + self._format_deadlock(instances) + for instances in self._detected_deadlocks) + _logger.fatal( + "Aborting simulation: deadlock detected.\n%s", + formatted_deadlocks) + self._shutdown_callback() return - # Handle item - self._process_queue_item(item) def shutdown(self) -> None: """Stop the deadlock detector thread.""" @@ -42,18 +90,40 @@ def put_waiting( self, instance_id: str, peer_instance_id: str, port_name: str, slot: Optional[int] ) -> None: - """Process a WATING_FOR_RECEIVE message from an instance.""" + """Queue a WAITING_FOR_RECEIVE message from an instance for processing. + + This method can be called from any thread. + + Args: + instance_id: ID of instance that is waiting to receive a message. + peer_instance_id: ID of the peer that the instance is waiting on. + port_name: Name of the input port. + slot: Optional slot number of the input port. + """ self._queue.put((True, instance_id, peer_instance_id, port_name, slot)) def put_waiting_done( self, instance_id: str, peer_instance_id: str, port_name: str, slot: Optional[int] ) -> None: - """Process a WATING_FOR_RECEIVE_DONE message from an instance.""" + """Queue a WAITING_FOR_RECEIVE_DONE message from an instance for processing. + + This method can be called from any thread. + + Args: + instance_id: ID of instance that is waiting to receive a message. + peer_instance_id: ID of the peer that the instance is waiting on. + port_name: Name of the input port. + slot: Optional slot number of the input port. + """ self._queue.put((False, instance_id, peer_instance_id, port_name, slot)) def _process_queue_item(self, item: _QueueItem) -> None: - _logger.info("Processing queue item: %s", item) + """Actually process a WAITING_FOR_RECEIVE[_DONE] request. + + This method should be called inside the worker thread. + """ + _logger.debug("Processing queue item: %s", item) is_waiting, instance_id, peer_instance_id, port_name, slot = item if is_waiting: # Sanity checks, triggering this is a bug in the instance or the manager @@ -88,6 +158,15 @@ def _process_queue_item(self, item: _QueueItem) -> None: del self._waiting_instances[instance_id] del self._waiting_instance_ports[instance_id] + # Check if we were part of a deadlock + for i, instance_list in enumerate(self._detected_deadlocks): + if instance_id in instance_list: + del self._detected_deadlocks[i] + break + if not self._detected_deadlocks: + # There are no deadlocks anymore: cancel shutdown + self._shutdown_time = None + def _check_for_deadlock(self, instance_id: str) -> None: """Check if there is a cycle of waiting instances that involves this instance. """ @@ -96,25 +175,40 @@ def _check_for_deadlock(self, instance_id: str) -> None: while cur_instance in self._waiting_instances: cur_instance = self._waiting_instances[cur_instance] if cur_instance == instance_id: - break # Found a deadlocked cycle + self._handle_potential_deadlock(deadlock_instances) + return deadlock_instances.append(cur_instance) - else: # No cycle detected - _logger.info("No deadlock detected") - return - - _logger.warning( - "Potential deadlock detected, aborting run in %d seconds.\n%s", - self._wait_before_shutdown, - self._format_deadlock(deadlock_instances), - ) - # TODO: wait and abort - self._shutdown_callback() + _logger.debug("No deadlock detected") + + def _handle_potential_deadlock(self, deadlock_instances: List[str]) -> None: + """Handle a potential deadlock. - def _format_deadlock(self, instances: List[str]) -> str: - """Create and return formatted deadlock debug info.""" - num_instances = str(len(instances)) + Args: + deadlock_instances: list of instances waiting on eachother + """ + shutdown_delay = self._wait_before_shutdown + if self._shutdown_time is not None: + # Get time until shutdown + shutdown_delay = self._shutdown_time - time.monotonic() + _logger.fatal( + "Potential deadlock detected, aborting run in %d seconds.\n%s", + shutdown_delay, + self._format_deadlock(deadlock_instances), + ) + + self._detected_deadlocks.append(deadlock_instances) + if self._shutdown_time is None: + self._shutdown_time = time.monotonic() + self._wait_before_shutdown + + def _format_deadlock(self, deadlock_instances: List[str]) -> str: + """Create and return formatted deadlock debug info. + + Args: + deadlock_instances: list of instances waiting on eachother + """ + num_instances = str(len(deadlock_instances)) lines = [f"The following {num_instances} instances are dead-locked:"] - for i, instance in enumerate(instances): + for i, instance in enumerate(deadlock_instances): num = str(i+1).rjust(len(num_instances)) peer_instance = self._waiting_instances[instance] port, slot = self._waiting_instance_ports[instance] diff --git a/libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py b/libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py new file mode 100644 index 00000000..718a6569 --- /dev/null +++ b/libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py @@ -0,0 +1,106 @@ +import logging +import time +from typing import Iterator +from unittest.mock import Mock + +import pytest + +from libmuscle.manager.deadlock_detector import DeadlockDetector + + +@pytest.fixture +def shutdown_callback() -> Mock: + return Mock() + + +@pytest.fixture +def detector(shutdown_callback) -> Iterator[DeadlockDetector]: + # Using a very short delay (10ms) to speed up unit testing + detector = DeadlockDetector(shutdown_callback, 0.01) + detector.start() + yield detector + if detector.is_alive(): + detector.shutdown() + detector.join() + + +def test_no_deadlock(shutdown_callback: Mock, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", None) + detector.put_waiting_done("macro", "micro", "s", None) + time.sleep(0.05) + detector.shutdown() + detector.join() + shutdown_callback.assert_not_called() + + +def test_double_waiting_log_error( + caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", 0) + detector.put_waiting("macro", "micro", "s", 1) + detector.shutdown() + detector.join() + assert len(caplog.record_tuples) == 1 + assert caplog.record_tuples[0][:2] == ( + "libmuscle.manager.deadlock_detector", logging.ERROR) + + +def test_not_waiting_log_error( + caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None: + detector.put_waiting_done("macro", "micro", "s", 0) + detector.shutdown() + detector.join() + assert len(caplog.record_tuples) == 1 + assert caplog.record_tuples[0][:2] == ( + "libmuscle.manager.deadlock_detector", logging.ERROR) + + +def test_waiting_for_different_instance_log_error( + caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", 0) + detector.put_waiting_done("macro", "meso", "s", 0) + detector.shutdown() + detector.join() + assert len(caplog.record_tuples) == 1 + assert caplog.record_tuples[0][:2] == ( + "libmuscle.manager.deadlock_detector", logging.ERROR) + + +def test_waiting_for_different_port_log_error( + caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", 0) + detector.put_waiting_done("macro", "micro", "f_init", 0) + detector.shutdown() + detector.join() + assert len(caplog.record_tuples) == 1 + assert caplog.record_tuples[0][:2] == ( + "libmuscle.manager.deadlock_detector", logging.ERROR) + + +def test_deadlock(shutdown_callback: Mock, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", None) + detector.put_waiting("micro", "macro", "f_init", None) + time.sleep(0.05) + assert not detector.is_alive() + shutdown_callback.assert_called_once() + + +def test_deadlock_cancelled( + shutdown_callback: Mock, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", None) + detector.put_waiting("micro", "macro", "f_init", None) + detector.put_waiting_done("macro", "micro", "s", None) + time.sleep(0.05) + detector.shutdown() + detector.join() + shutdown_callback.assert_not_called() + + +def test_double_deadlock(shutdown_callback: Mock, detector: DeadlockDetector) -> None: + detector.put_waiting("macro", "micro", "s", None) + detector.put_waiting("micro", "macro", "f_init", None) + detector.put_waiting("cycle2", "peer2", "s", None) + detector.put_waiting("peer2", "cycle2", "f_init", None) + detector.put_waiting_done("macro", "micro", "s", None) + time.sleep(0.05) + assert not detector.is_alive() + shutdown_callback.assert_called()