From f98e5a328dc6b52a24da9ab005e515018e19a1e0 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 16 Aug 2024 12:22:50 +0200 Subject: [PATCH] Refactor ReceiveTimeoutHandler to a separate file. --- libmuscle/python/libmuscle/communicator.py | 46 +----------------- .../libmuscle/receive_timeout_handler.py | 47 +++++++++++++++++++ 2 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 libmuscle/python/libmuscle/receive_timeout_handler.py diff --git a/libmuscle/python/libmuscle/communicator.py b/libmuscle/python/libmuscle/communicator.py index a1c886d3..812397be 100644 --- a/libmuscle/python/libmuscle/communicator.py +++ b/libmuscle/python/libmuscle/communicator.py @@ -8,12 +8,12 @@ from libmuscle.mpp_client import MPPClient from libmuscle.mpp_server import MPPServer from libmuscle.mcp.tcp_util import SocketClosed -from libmuscle.mcp.transport_client import TimeoutHandler from libmuscle.peer_info import PeerInfo from libmuscle.port_manager import PortManager from libmuscle.profiler import Profiler from libmuscle.profiling import ( ProfileEvent, ProfileEventType, ProfileTimestamp) +from libmuscle.receive_timeout_handler import ReceiveTimeoutHandler _logger = logging.getLogger(__name__) @@ -62,48 +62,6 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None, self.settings = settings -class RecvTimeoutHandler(TimeoutHandler): - """Timeout handler when receiving messages from peers. - - This handler sends a message to the Muscle Manager when the receive times out (and - another message when the message does arrive). - - This is used by the manager to detect if the simulation is in a deadlock, where a - cycle of instances is waiting on each other. - """ - - def __init__( - self, manager: MMPClient, - peer_instance: str, port_name: str, slot: Optional[int], - timeout: float - ) -> None: - """Initialize a new timeout handler. - - Args: - manager: Connection to the muscle manager. - peer_instance: the peer instance we try to receive from. - port_name: the name of the port we try to receive on. - slot: the slot we try to receive on. - """ - self._manager = manager - self._peer_instance = peer_instance - self._port_name = port_name - self._slot = slot - self._timeout = timeout - - @property - def timeout(self) -> float: - return self._timeout - - def on_timeout(self) -> None: - self._manager.waiting_for_receive( - self._peer_instance, self._port_name, self._slot) - - def on_receive(self) -> None: - self._manager.waiting_for_receive_done( - self._peer_instance, self._port_name, self._slot) - - class Communicator: """Communication engine for MUSCLE3. @@ -288,7 +246,7 @@ def receive_message( client = self.__get_client(snd_endpoint.instance()) timeout_handler = None if self._receive_timeout >= 0: - timeout_handler = RecvTimeoutHandler( + timeout_handler = ReceiveTimeoutHandler( self._manager, str(snd_endpoint.instance()), port_name, slot, self._receive_timeout) try: diff --git a/libmuscle/python/libmuscle/receive_timeout_handler.py b/libmuscle/python/libmuscle/receive_timeout_handler.py new file mode 100644 index 00000000..20196f0f --- /dev/null +++ b/libmuscle/python/libmuscle/receive_timeout_handler.py @@ -0,0 +1,47 @@ +from typing import Optional + +from libmuscle.mcp.transport_client import TimeoutHandler +from libmuscle.mmp_client import MMPClient + + +class ReceiveTimeoutHandler(TimeoutHandler): + """Timeout handler when receiving messages from peers. + + This handler sends a message to the Muscle Manager when the receive times out (and + another message when the message does arrive). + + This is used by the manager to detect if the simulation is in a deadlock, where a + cycle of instances is waiting on each other. + """ + + def __init__( + self, manager: MMPClient, + peer_instance: str, port_name: str, slot: Optional[int], + timeout: float + ) -> None: + """Initialize a new timeout handler. + + Args: + manager: Connection to the muscle manager. + peer_instance: the peer instance we try to receive from. + port_name: the name of the port we try to receive on. + slot: the slot we try to receive on. + timeout: Timeout in seconds. + """ + self._manager = manager + self._peer_instance = peer_instance + self._port_name = port_name + self._slot = slot + self._timeout = timeout + + @property + def timeout(self) -> float: + return self._timeout + + def on_timeout(self) -> None: + self._manager.waiting_for_receive( + self._peer_instance, self._port_name, self._slot) + + def on_receive(self) -> None: + self._manager.waiting_for_receive_done( + self._peer_instance, self._port_name, self._slot)