diff --git a/libmuscle/python/libmuscle/communicator.py b/libmuscle/python/libmuscle/communicator.py index 153c194c..a1c886d3 100644 --- a/libmuscle/python/libmuscle/communicator.py +++ b/libmuscle/python/libmuscle/communicator.py @@ -63,11 +63,28 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None, class RecvTimeoutHandler(TimeoutHandler): + """Timeout handler when receiving messages from peers. + + This handler sends a message to the Muscle Manager when the receive times out (and + another message when the message does arrive). + + This is used by the manager to detect if the simulation is in a deadlock, where a + cycle of instances is waiting on each other. + """ + def __init__( self, manager: MMPClient, peer_instance: str, port_name: str, slot: Optional[int], timeout: float ) -> None: + """Initialize a new timeout handler. + + Args: + manager: Connection to the muscle manager. + peer_instance: the peer instance we try to receive from. + port_name: the name of the port we try to receive on. + slot: the slot we try to receive on. + """ self._manager = manager self._peer_instance = peer_instance self._port_name = port_name @@ -84,7 +101,7 @@ def on_timeout(self) -> None: def on_receive(self) -> None: self._manager.waiting_for_receive_done( - self._peer_instance, self._port_name, self._slot) + self._peer_instance, self._port_name, self._slot) class Communicator: @@ -97,7 +114,8 @@ class Communicator: """ def __init__( self, kernel: Reference, index: List[int], - port_manager: PortManager, profiler: Profiler) -> None: + port_manager: PortManager, profiler: Profiler, + manager: MMPClient) -> None: """Create a Communicator. The instance reference must start with one or more Identifiers, @@ -115,8 +133,7 @@ def __init__( self._index = index self._port_manager = port_manager self._profiler = profiler - # TODO: make this a proper argument of __init__() - self._manager = profiler._manager + self._manager = manager # Notify manager, by default, after 10 seconds waiting in receive_message() self._receive_timeout = 10.0 diff --git a/libmuscle/python/libmuscle/instance.py b/libmuscle/python/libmuscle/instance.py index c38f6aa3..d98777f5 100644 --- a/libmuscle/python/libmuscle/instance.py +++ b/libmuscle/python/libmuscle/instance.py @@ -142,7 +142,8 @@ def __init__( """PortManager for this instance.""" self._communicator = Communicator( - self._name, self._index, self._port_manager, self._profiler) + self._name, self._index, self._port_manager, self._profiler, + self.__manager) """Communicator for this instance.""" self._declared_ports = ports diff --git a/libmuscle/python/libmuscle/manager/instance_manager.py b/libmuscle/python/libmuscle/manager/instance_manager.py index 8d06c45e..58582cda 100644 --- a/libmuscle/python/libmuscle/manager/instance_manager.py +++ b/libmuscle/python/libmuscle/manager/instance_manager.py @@ -190,6 +190,9 @@ def cancel_all() -> None: _logger.info( f'Instance {result.instance} was shut down by' f' MUSCLE3 because an error occurred elsewhere') + # Ensure we don't see this as a succesful run when shutdown() is called + # by another thread: + all_seemingly_okay = False else: stderr_file = ( self._run_dir.instance_dir(result.instance) / @@ -260,6 +263,9 @@ def cancel_all() -> None: 'More output may be found in' f' {self._run_dir.instance_dir(result.instance)}\n' ) + elif not all_seemingly_okay: + # shutdown() was called by another thread (e.g. the DeadlockDetector): + _logger.error('The simulation was aborted.') else: _logger.info('The simulation finished without error.') diff --git a/libmuscle/python/libmuscle/test/test_communicator.py b/libmuscle/python/libmuscle/test/test_communicator.py index 165e3353..739777fa 100644 --- a/libmuscle/python/libmuscle/test/test_communicator.py +++ b/libmuscle/python/libmuscle/test/test_communicator.py @@ -1,5 +1,5 @@ import logging -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch import pytest @@ -46,7 +46,7 @@ def mpp_client(MPPClient): @pytest.fixture def communicator(connected_port_manager, profiler): - return Communicator(Ref('component'), [], connected_port_manager, profiler) + return Communicator(Ref('component'), [], connected_port_manager, profiler, Mock()) @pytest.fixture