From 303e2d9c4f0b6ba54f1cb22a0d58f8f66fb0fadf Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Mon, 12 Aug 2024 16:27:25 +0200 Subject: [PATCH] [WIP] Implement communication deadlock detection mechanism --- integration_test/test_cpp_tcp_server.py | 2 +- libmuscle/python/libmuscle/communicator.py | 46 ++++++- libmuscle/python/libmuscle/instance.py | 24 +++- .../libmuscle/manager/deadlock_detector.py | 126 ++++++++++++++++++ libmuscle/python/libmuscle/manager/manager.py | 9 +- .../python/libmuscle/manager/mmp_server.py | 38 +++++- libmuscle/python/libmuscle/mcp/protocol.py | 3 + .../libmuscle/mcp/tcp_transport_client.py | 41 +++++- .../python/libmuscle/mcp/transport_client.py | 29 +++- libmuscle/python/libmuscle/mmp_client.py | 20 +++ libmuscle/python/libmuscle/mpp_client.py | 7 +- .../libmuscle/test/test_communicator.py | 22 +-- .../python/libmuscle/test/test_instance.py | 5 +- 13 files changed, 341 insertions(+), 31 deletions(-) create mode 100644 libmuscle/python/libmuscle/manager/deadlock_detector.py diff --git a/integration_test/test_cpp_tcp_server.py b/integration_test/test_cpp_tcp_server.py index 912b838d..50a60c16 100644 --- a/integration_test/test_cpp_tcp_server.py +++ b/integration_test/test_cpp_tcp_server.py @@ -37,7 +37,7 @@ def test_cpp_tcp_server(log_file_in_tmpdir): assert TcpTransportClient.can_connect_to(location) client = MPPClient([location]) - msg_bytes, _ = client.receive(Reference('test_receiver.port')) + msg_bytes, _ = client.receive(Reference('test_receiver.port'), None) msg = MPPMessage.from_bytes(msg_bytes) client.close() diff --git a/libmuscle/python/libmuscle/communicator.py b/libmuscle/python/libmuscle/communicator.py index 19b8e9c8..74e2a2e2 100644 --- a/libmuscle/python/libmuscle/communicator.py +++ b/libmuscle/python/libmuscle/communicator.py @@ -3,10 +3,12 @@ from ymmsl import Identifier, Reference, Settings from libmuscle.endpoint import Endpoint +from libmuscle.mmp_client import MMPClient from libmuscle.mpp_message import ClosePort, MPPMessage from libmuscle.mpp_client import MPPClient from libmuscle.mpp_server import MPPServer from libmuscle.mcp.tcp_util import SocketClosed +from libmuscle.mcp.transport_client import TimeoutHandler from libmuscle.peer_info import PeerInfo from libmuscle.port_manager import PortManager from libmuscle.profiler import Profiler @@ -60,6 +62,31 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None, self.settings = settings +class RecvTimeoutHandler(TimeoutHandler): + def __init__( + self, manager: MMPClient, + peer_instance: str, port_name: str, slot: Optional[int], + timeout: float + ) -> None: + self._manager = manager + self._peer_instance = peer_instance + self._port_name = port_name + self._slot = slot + self._timeout = timeout + + @property + def timeout(self) -> float: + return self._timeout + + def on_timeout(self) -> None: + self._manager.waiting_for_receive( + self._peer_instance, self._port_name, self._slot) + + def on_receive(self) -> None: + self._manager.waiting_for_receive_done( + self._peer_instance, self._port_name, self._slot) + + class Communicator: """Communication engine for MUSCLE3. @@ -88,6 +115,8 @@ def __init__( self._index = index self._port_manager = port_manager self._profiler = profiler + # TODO: make this a proper argument of __init__() + self._manager = profiler._manager self._server = MPPServer() @@ -178,7 +207,8 @@ def send_message( self._profiler.record_event(profile_event) def receive_message( - self, port_name: str, slot: Optional[int] = None) -> Tuple[Message, float]: + self, port_name: str, slot: Optional[int], timeout: float + ) -> Tuple[Message, float]: """Receive a message and attached settings overlay. Receiving is a blocking operation. This function will contact @@ -228,8 +258,14 @@ def receive_message( snd_endpoint = self._peer_info.get_peer_endpoints( recv_endpoint.port, slot_list)[0] client = self.__get_client(snd_endpoint.instance()) + timeout_handler = None + if timeout >= 0: + timeout_handler = RecvTimeoutHandler( + self._manager, str(snd_endpoint.instance()), + port_name, slot, timeout) try: - mpp_message_bytes, profile = client.receive(recv_endpoint.ref()) + mpp_message_bytes, profile = client.receive( + recv_endpoint.ref(), timeout_handler) except (ConnectionError, SocketClosed) as exc: raise RuntimeError( "Error while receiving a message: connection with peer" @@ -288,7 +324,7 @@ def receive_message( _logger.debug(f'Discarding received message on {port_and_slot}' ': resuming from weakly consistent snapshot') port.set_resumed(slot) - return self.receive_message(port_name, slot) + return self.receive_message(port_name, slot, timeout) raise RuntimeError(f'Received message on {port_and_slot} with' ' unexpected message number' f' {mpp_message.message_number}. Was expecting' @@ -398,7 +434,7 @@ def _drain_incoming_port(self, port_name: str) -> None: port = self._port_manager.get_port(port_name) while port.is_open(): # TODO: log warning if not a ClosePort - self.receive_message(port_name) + self.receive_message(port_name, None, 10.0) # FIXME: timeout def _drain_incoming_vector_port(self, port_name: str) -> None: """Receives messages until a ClosePort is received. @@ -413,7 +449,7 @@ def _drain_incoming_vector_port(self, port_name: str) -> None: for slot in range(port.get_length())]): for slot in range(port.get_length()): if port.is_open(slot): - self.receive_message(port_name, slot) + self.receive_message(port_name, slot, 10.0) # FIXME: timeout def _close_incoming_ports(self) -> None: """Closes incoming ports. diff --git a/libmuscle/python/libmuscle/instance.py b/libmuscle/python/libmuscle/instance.py index bfca4d06..f6899d27 100644 --- a/libmuscle/python/libmuscle/instance.py +++ b/libmuscle/python/libmuscle/instance.py @@ -145,6 +145,10 @@ def __init__( self._name, self._index, self._port_manager, self._profiler) """Communicator for this instance.""" + self._receive_timeout = 10.0 + """Timeout in seconds on message receives after which the manager is notified + that we are waiting for a message. Used to detect communication deadlocks.""" + self._declared_ports = ports """Declared ports for this instance.""" @@ -186,6 +190,7 @@ def __init__( self._set_local_log_level() self._set_remote_log_level() self._setup_profiling() + self._setup_receive_timeout() def reuse_instance(self) -> bool: """Decide whether to run this instance again. @@ -809,6 +814,16 @@ def _setup_profiling(self) -> None: self._profiler.set_level(profile_level_str) + def _setup_receive_timeout(self) -> None: + """Configures receive timeout with settings from settings. + """ + try: + self._receive_timeout = self.get_setting( + 'muscle_deadlock_receive_timeout', 'float') + except KeyError: + pass # do nothing and keep the default + _logger.debug("Timeout on receiving messages set to %f", self._receive_timeout) + def _decide_reuse_instance(self) -> bool: """Decide whether and how to reuse the instance. @@ -933,7 +948,8 @@ def __receive_message( return default else: - msg, saved_until = self._communicator.receive_message(port_name, slot) + msg, saved_until = self._communicator.receive_message( + port_name, slot, self._receive_timeout) if not port.is_open(slot): err_msg = (('Port {} was closed while trying to' ' receive on it, did the peer crash?' @@ -1077,7 +1093,8 @@ def __receive_settings(self) -> bool: Returns: False iff the port is connnected and ClosePort was received. """ - message, saved_until = self._communicator.receive_message('muscle_settings_in') + message, saved_until = self._communicator.receive_message( + 'muscle_settings_in', None, self._receive_timeout) if isinstance(message.data, ClosePort): return False @@ -1107,7 +1124,8 @@ def __pre_receive_f_init(self) -> None: apply_overlay = InstanceFlags.DONT_APPLY_OVERLAY not in self._flags def pre_receive(port_name: str, slot: Optional[int]) -> None: - msg, saved_until = self._communicator.receive_message(port_name, slot) + msg, saved_until = self._communicator.receive_message( + port_name, slot, self._receive_timeout) self._f_init_cache[(port_name, slot)] = msg if apply_overlay: self.__apply_overlay(msg) diff --git a/libmuscle/python/libmuscle/manager/deadlock_detector.py b/libmuscle/python/libmuscle/manager/deadlock_detector.py new file mode 100644 index 00000000..b1cf0597 --- /dev/null +++ b/libmuscle/python/libmuscle/manager/deadlock_detector.py @@ -0,0 +1,126 @@ +import logging +from threading import Thread +from typing import Callable, Dict, List, Optional, Tuple +from queue import Queue + + +_logger = logging.getLogger(__name__) +_QueueItem = Tuple[bool, str, str, str, Optional[int]] + + +class DeadlockDetector(Thread): + """TODO""" + + def __init__( + self, shutdown_callback: Callable[[], None], wait_before_shutdown: float + ) -> None: + super().__init__(name="DeadlockDetector") + + self._shutdown_callback = shutdown_callback + self._wait_before_shutdown = wait_before_shutdown + + self._queue: Queue[Optional[_QueueItem]] = Queue() + self._waiting_instances: Dict[str, str] = {} + self._waiting_instance_ports: Dict[str, Tuple[str, int]] = {} + + self._detected_deadlocks: List[str] = [] + + def run(self) -> None: + """Logic that is executed in the thread.""" + while True: + item = self._queue.get() + if item is None: # Shutdown sentinal + return + # Handle item + self._process_queue_item(item) + + def shutdown(self) -> None: + """Stop the deadlock detector thread.""" + self._queue.put(None) + + def put_waiting( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int] + ) -> None: + """Process a WATING_FOR_RECEIVE message from an instance.""" + self._queue.put((True, instance_id, peer_instance_id, port_name, slot)) + + def put_waiting_done( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int] + ) -> None: + """Process a WATING_FOR_RECEIVE_DONE message from an instance.""" + self._queue.put((False, instance_id, peer_instance_id, port_name, slot)) + + def _process_queue_item(self, item: _QueueItem) -> None: + _logger.info("Processing queue item: %s", item) + is_waiting, instance_id, peer_instance_id, port_name, slot = item + if is_waiting: + # Sanity checks, triggering this is a bug in the instance or the manager + if instance_id in self._waiting_instances: + _logger.error( + "Instance %s was already waiting on a receive call. " + "Did we miss a WAITING DONE event?", + instance_id) + # Register that the instance is waiting + self._waiting_instances[instance_id] = peer_instance_id + self._waiting_instance_ports[instance_id] = (port_name, slot) + self._check_for_deadlock(instance_id) + + else: + # Sanity checks, triggering these is a bug in the instance or the manager + if instance_id not in self._waiting_instances: + _logger.error( + "Instance %s is not waiting on a receive call.", instance_id) + elif self._waiting_instances[instance_id] != peer_instance_id: + _logger.error( + "Instance %s was waiting for %s, not for %s.", + instance_id, + self._waiting_instances[instance_id], + peer_instance_id) + elif self._waiting_instance_ports[instance_id] != (port_name, slot): + _logger.error( + "Instance %s was waiting on port[slot] %s[%s], not on %s[%s]", + instance_id, + *self._waiting_instance_ports[instance_id], + port_name, slot) + else: + del self._waiting_instances[instance_id] + del self._waiting_instance_ports[instance_id] + + def _check_for_deadlock(self, instance_id: str) -> None: + """Check if there is a cycle of waiting instances that involves this instance. + """ + deadlock_instances = [instance_id] + cur_instance = instance_id + while cur_instance in self._waiting_instances: + cur_instance = self._waiting_instances[cur_instance] + if cur_instance == instance_id: + break # Found a deadlocked cycle + deadlock_instances.append(cur_instance) + else: # No cycle detected + _logger.info("No deadlock detected") + return + + _logger.warning( + "Potential deadlock detected, aborting run in %d seconds.\n%s", + self._wait_before_shutdown, + self._format_deadlock(deadlock_instances), + ) + # TODO: wait and abort + self._shutdown_callback() + + def _format_deadlock(self, instances: List[str]) -> str: + """Create and return formatted deadlock debug info.""" + num_instances = str(len(instances)) + lines = [f"The following {num_instances} instances are dead-locked:"] + for i, instance in enumerate(instances): + num = str(i+1).rjust(len(num_instances)) + peer_instance = self._waiting_instances[instance] + port, slot = self._waiting_instance_ports[instance] + slot_txt = "" if slot is None else f"[{slot}]" + lines.append( + f"{num}. Instance '{instance}' is waiting on instance '{peer_instance}'" + f" in a receive on port '{port}{slot_txt}'." + ) + return "\n".join(lines) diff --git a/libmuscle/python/libmuscle/manager/manager.py b/libmuscle/python/libmuscle/manager/manager.py index 044a28d7..df4cd0d8 100644 --- a/libmuscle/python/libmuscle/manager/manager.py +++ b/libmuscle/python/libmuscle/manager/manager.py @@ -14,6 +14,7 @@ from libmuscle.manager.run_dir import RunDir from libmuscle.manager.snapshot_registry import SnapshotRegistry from libmuscle.manager.topology_store import TopologyStore +from libmuscle.manager.deadlock_detector import DeadlockDetector _logger = logging.getLogger(__name__) @@ -79,11 +80,14 @@ def __init__( self._snapshot_registry = SnapshotRegistry( configuration, snapshot_dir, self._topology_store) self._snapshot_registry.start() + # FIXME configure timeout: + self._deadlock_detector = DeadlockDetector(self.stop, 5.0) + self._deadlock_detector.start() self._server = MMPServer( self._logger, self._profile_store, self._configuration, self._instance_registry, self._topology_store, - self._snapshot_registry, run_dir) + self._snapshot_registry, self._deadlock_detector, run_dir) if self._instance_manager: self._instance_manager.set_manager_location( @@ -121,6 +125,9 @@ def stop(self) -> None: """Shuts down the manager.""" if self._instance_manager: self._instance_manager.shutdown() + self._deadlock_detector.shutdown() + # Note: don't join() deadlock detector, as this method may be called from the + # DeadlockDetector thread. join() would (ironically) deadlock the shutdown :) self._server.stop() self._snapshot_registry.shutdown() self._snapshot_registry.join() diff --git a/libmuscle/python/libmuscle/manager/mmp_server.py b/libmuscle/python/libmuscle/manager/mmp_server.py index 94847fd9..6eca3131 100644 --- a/libmuscle/python/libmuscle/manager/mmp_server.py +++ b/libmuscle/python/libmuscle/manager/mmp_server.py @@ -20,6 +20,7 @@ from libmuscle.mcp.tcp_transport_server import TcpTransportServer from libmuscle.mcp.transport_server import RequestHandler from libmuscle.manager.profile_store import ProfileStore +from libmuscle.manager.deadlock_detector import DeadlockDetector from libmuscle.profiling import ( ProfileEvent, ProfileEventType, ProfileTimestamp) from libmuscle.snapshot import SnapshotMetadata @@ -77,6 +78,7 @@ def __init__( instance_registry: InstanceRegistry, topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, + deadlock_detector: DeadlockDetector, run_dir: Optional[RunDir] ) -> None: """Create an MMPRequestHandler. @@ -93,6 +95,7 @@ def __init__( self._instance_registry = instance_registry self._topology_store = topology_store self._snapshot_registry = snapshot_registry + self._deadlock_detector = deadlock_detector self._run_dir = run_dir self._reference_time = time.monotonic() @@ -124,6 +127,10 @@ def handle_request(self, request: bytes) -> bytes: response = self._submit_snapshot(*req_args) elif req_type == RequestType.GET_CHECKPOINT_INFO.value: response = self._get_checkpoint_info(*req_args) + elif req_type == RequestType.WAITING_FOR_RECEIVE.value: + response = self._waiting_for_receive(*req_args) + elif req_type == RequestType.WAITING_FOR_RECEIVE_DONE.value: + response = self._waiting_for_receive_done(*req_args) return cast(bytes, msgpack.packb(response, use_bin_type=True)) @@ -357,6 +364,34 @@ def _get_checkpoint_info(self, instance_id: str) -> Any: resume, snapshot_directory] + def _waiting_for_receive( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int]) -> Any: + """Indicate that the instance is waiting to receive a message. + + Args: + instance_id: The instance that is waiting + port_name: Port name that the instance is waiting on + slot: Slot that the instance is waiting on + """ + self._deadlock_detector.put_waiting( + instance_id, peer_instance_id, port_name, slot) + return [ResponseType.SUCCESS.value] + + def _waiting_for_receive_done( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int]) -> Any: + """Indicate that the instance is done waiting to receive a message. + + Args: + instance_id: The instance that is waiting + port_name: Port name that the instance is waiting on + slot: Slot that the instance is waiting on + """ + self._deadlock_detector.put_waiting_done( + instance_id, peer_instance_id, port_name, slot) + return [ResponseType.SUCCESS.value] + class MMPServer: """The MUSCLE Manager Protocol server. @@ -373,6 +408,7 @@ def __init__( instance_registry: InstanceRegistry, topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, + deadlock_detector: DeadlockDetector, run_dir: Optional[RunDir] ) -> None: """Create an MMPServer. @@ -395,7 +431,7 @@ def __init__( """ self._handler = MMPRequestHandler( logger, profile_store, configuration, instance_registry, - topology_store, snapshot_registry, run_dir) + topology_store, snapshot_registry, deadlock_detector, run_dir) try: self._server = TcpTransportServer(self._handler, 9000) except OSError as e: diff --git a/libmuscle/python/libmuscle/mcp/protocol.py b/libmuscle/python/libmuscle/mcp/protocol.py index 5d1217ed..da23b221 100644 --- a/libmuscle/python/libmuscle/mcp/protocol.py +++ b/libmuscle/python/libmuscle/mcp/protocol.py @@ -22,6 +22,9 @@ class RequestType(Enum): SUBMIT_PROFILE_EVENTS = 6 SUBMIT_SNAPSHOT = 7 GET_CHECKPOINT_INFO = 8 + # Connection deadlock detection + WAITING_FOR_RECEIVE = 9 + WAITING_FOR_RECEIVE_DONE = 10 # MUSCLE Peer Protocol GET_NEXT_MESSAGE = 21 diff --git a/libmuscle/python/libmuscle/mcp/tcp_transport_client.py b/libmuscle/python/libmuscle/mcp/tcp_transport_client.py index ed9536d2..bf565e5e 100644 --- a/libmuscle/python/libmuscle/mcp/tcp_transport_client.py +++ b/libmuscle/python/libmuscle/mcp/tcp_transport_client.py @@ -1,8 +1,9 @@ from errno import ENOTCONN +import select import socket from typing import Optional, Tuple -from libmuscle.mcp.transport_client import ProfileData, TransportClient +from libmuscle.mcp.transport_client import ProfileData, TransportClient, TimeoutHandler from libmuscle.mcp.tcp_util import recv_all, recv_int64, send_int64 from libmuscle.profiling import ProfileTimestamp @@ -51,13 +52,22 @@ def __init__(self, location: str) -> None: sock.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) self._socket = sock - def call(self, request: bytes) -> Tuple[bytes, ProfileData]: + if hasattr(select, "poll"): + self._poll_obj = select.poll() + self._poll_obj.register(self._socket, select.POLLIN) + else: + self._poll_obj = None # On platforms that don't support select.poll + + def call(self, request: bytes, timeout_handler: Optional[TimeoutHandler] = None + ) -> Tuple[bytes, ProfileData]: """Send a request to the server and receive the response. This is a blocking call. Args: request: The request to send + timeout_handler: Optional timeout handler. This is used for communication + deadlock detection. Returns: The received response @@ -66,13 +76,40 @@ def call(self, request: bytes) -> Tuple[bytes, ProfileData]: send_int64(self._socket, len(request)) self._socket.sendall(request) + did_timeout = False + if timeout_handler is not None: + if not self._poll(timeout_handler.timeout): + did_timeout = True + timeout_handler.on_timeout() + length = recv_int64(self._socket) + if did_timeout: + timeout_handler.on_receive() start_transfer = ProfileTimestamp() response = recv_all(self._socket, length) stop_transfer = ProfileTimestamp() return response, (start_wait, start_transfer, stop_transfer) + def _poll(self, timeout: float) -> bool: + """Poll the socket and return whether its ready for receiving. + + This method blocks until the socket is ready for receiving, or :param:`timeout` + seconds have passed (whichever is earlier). + + Args: + timeout: timeout in seconds + + Returns: + True if the socket is ready for receiving data, False otherwise. + """ + if self._poll_obj is not None: + ready = self._poll_obj.poll(timeout * 1000) # poll timeout is in ms + else: + # Fallback to select() + ready, _, _ = select.select([self._socket], (), (), timeout) + return bool(ready) + def close(self) -> None: """Closes this client. diff --git a/libmuscle/python/libmuscle/mcp/transport_client.py b/libmuscle/python/libmuscle/mcp/transport_client.py index 55942dc9..9b1c0ab5 100644 --- a/libmuscle/python/libmuscle/mcp/transport_client.py +++ b/libmuscle/python/libmuscle/mcp/transport_client.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import Optional, Tuple from libmuscle.profiling import ProfileTimestamp @@ -6,6 +6,28 @@ ProfileData = Tuple[ProfileTimestamp, ProfileTimestamp, ProfileTimestamp] +class TimeoutHandler: + """Object handling timeouts during :meth:`TransportClient.call`.""" + + @property + def timeout(self) -> float: + """Timeout (in seconds) after which :meth:`on_timeout` is called.""" + raise NotImplementedError() # pragma: no cover + + def on_timeout(self) -> None: + """Callback when :attr:`timeout` seconds have passed without a response from the + peer. + """ + raise NotImplementedError() # pragma: no cover + + def on_receive(self) -> None: + """Callback when receiving a response from the peer. + + Note: this method is only called when the request has timed out. + """ + raise NotImplementedError() # pragma: no cover + + class TransportClient: """A client that connects to an MCP server. @@ -25,7 +47,8 @@ def can_connect_to(location: str) -> bool: """ raise NotImplementedError() # pragma: no cover - def call(self, request: bytes) -> Tuple[bytes, ProfileData]: + def call(self, request: bytes, timeout_handler: Optional[TimeoutHandler] = None + ) -> Tuple[bytes, ProfileData]: """Send a request to the server and receive the response. This is a blocking call. Besides the result, this function @@ -36,6 +59,8 @@ def call(self, request: bytes) -> Tuple[bytes, ProfileData]: Args: request: The request to send + timeout_handler: Optional timeout handler. This is used for communication + deadlock detection. Returns: The received response, and the timestamps diff --git a/libmuscle/python/libmuscle/mmp_client.py b/libmuscle/python/libmuscle/mmp_client.py index 34238a87..00a2a20c 100644 --- a/libmuscle/python/libmuscle/mmp_client.py +++ b/libmuscle/python/libmuscle/mmp_client.py @@ -275,6 +275,26 @@ def deregister_instance(self) -> None: raise RuntimeError('Error deregistering instance: {}'.format( response[1])) + def waiting_for_receive( + self, peer_instance_id: str, port_name: str, slot: Optional[int] + ) -> None: + """Notify the manager that we're waiting to receive a message.""" + request = [ + RequestType.WAITING_FOR_RECEIVE.value, + str(self._instance_id), + peer_instance_id, port_name, slot] + self._call_manager(request) + + def waiting_for_receive_done( + self, peer_instance_id: str, port_name: str, slot: Optional[int] + ) -> None: + """Notify the manager that we're done waiting to receive a message.""" + request = [ + RequestType.WAITING_FOR_RECEIVE_DONE.value, + str(self._instance_id), + peer_instance_id, port_name, slot] + self._call_manager(request) + def _call_manager(self, request: Any) -> Any: """Call the manager and do en/decoding. diff --git a/libmuscle/python/libmuscle/mpp_client.py b/libmuscle/python/libmuscle/mpp_client.py index 852ec938..5eedbf97 100644 --- a/libmuscle/python/libmuscle/mpp_client.py +++ b/libmuscle/python/libmuscle/mpp_client.py @@ -4,7 +4,7 @@ from ymmsl import Reference from libmuscle.mcp.protocol import RequestType -from libmuscle.mcp.transport_client import ProfileData, TransportClient +from libmuscle.mcp.transport_client import ProfileData, TransportClient, TimeoutHandler from libmuscle.mcp.type_registry import transport_client_types @@ -40,7 +40,8 @@ def __init__(self, locations: List[str]) -> None: self._transport_client = client - def receive(self, receiver: Reference) -> Tuple[bytes, ProfileData]: + def receive(self, receiver: Reference, timeout_handler: Optional[TimeoutHandler] + ) -> Tuple[bytes, ProfileData]: """Receive a message from a port this client connects to. Args: @@ -51,7 +52,7 @@ def receive(self, receiver: Reference) -> Tuple[bytes, ProfileData]: """ request = [RequestType.GET_NEXT_MESSAGE.value, str(receiver)] encoded_request = msgpack.packb(request, use_bin_type=True) - return self._transport_client.call(encoded_request) + return self._transport_client.call(encoded_request, timeout_handler) def close(self) -> None: """Closes this client. diff --git a/libmuscle/python/libmuscle/test/test_communicator.py b/libmuscle/python/libmuscle/test/test_communicator.py index ff1e08dd..5397a14e 100644 --- a/libmuscle/python/libmuscle/test/test_communicator.py +++ b/libmuscle/python/libmuscle/test/test_communicator.py @@ -115,9 +115,9 @@ def test_receive_message(connected_communicator, mpp_client): mpp_client.receive.return_value = msg.encoded(), MagicMock() - recv_msg, saved_until = connected_communicator.receive_message('in') + recv_msg, saved_until = connected_communicator.receive_message('in', None, -1) - mpp_client.receive.assert_called_with(Ref('component.in')) + mpp_client.receive.assert_called_with(Ref('component.in'), None) assert recv_msg.timestamp == 2.0 assert recv_msg.next_timestamp == 3.0 @@ -135,9 +135,9 @@ def test_receive_message_vector(connected_communicator, mpp_client): mpp_client.receive.return_value = msg.encoded(), MagicMock() - recv_msg, saved_until = connected_communicator.receive_message('in_v', 5) + recv_msg, saved_until = connected_communicator.receive_message('in_v', 5, -1) - mpp_client.receive.assert_called_with(Ref('component.in_v[5]')) + mpp_client.receive.assert_called_with(Ref('component.in_v[5]'), None) assert recv_msg.timestamp == 4.0 assert recv_msg.next_timestamp == 6.0 @@ -155,7 +155,7 @@ def test_receive_close_port(connected_communicator, mpp_client, port_manager): mpp_client.receive.return_value = msg.encoded(), MagicMock() - recv_msg, saved_until = connected_communicator.receive_message('in') + recv_msg, saved_until = connected_communicator.receive_message('in', None, -1) assert port_manager.get_port('in').is_open() is False @@ -167,7 +167,7 @@ def test_receive_close_port_vector(connected_communicator, mpp_client, port_mana mpp_client.receive.return_value = msg.encoded(), MagicMock() - recv_msg, saved_until = connected_communicator.receive_message('in_v', 5) + recv_msg, saved_until = connected_communicator.receive_message('in_v', 5, -1) assert port_manager.get_port('in_v').is_open(5) is False @@ -182,12 +182,12 @@ def test_port_count_validation( mpp_client.receive.return_value = msg.encoded(), MagicMock() - connected_communicator.receive_message('in') + connected_communicator.receive_message('in', None, -1) assert connected_port_manager.get_port('in').get_message_counts() == [1] with pytest.raises(RuntimeError): # the message received has message_number = 0 again - connected_communicator.receive_message('in') + connected_communicator.receive_message('in', None, -1) def test_port_discard_error_on_resume( @@ -212,7 +212,7 @@ def test_port_discard_error_on_resume( # message_number=1 with caplog.at_level(logging.DEBUG, 'libmuscle.communicator'): with pytest.raises(RuntimeError): - connected_communicator.receive_message('in') + connected_communicator.receive_message('in', None, -1) assert any([ 'Discarding received message' in rec.message @@ -238,7 +238,7 @@ def test_port_discard_success_on_resume( assert connected_port_manager.get_port(port).is_resuming(None) with caplog.at_level(logging.DEBUG, 'libmuscle.communicator'): - msg, _ = connected_communicator.receive_message('in') + msg, _ = connected_communicator.receive_message('in', None, -1) assert any([ 'Discarding received message' in rec.message for rec in caplog.records]) @@ -272,7 +272,7 @@ def test_shutdown( sender, receiver, slot, float('inf'), None, Settings(), 0, 3.5, ClosePort()) - def receive(receiver): + def receive(receiver, timeout_handler): return messages[receiver].encoded(), MagicMock() mpp_client.receive = receive diff --git a/libmuscle/python/libmuscle/test/test_instance.py b/libmuscle/python/libmuscle/test/test_instance.py index 9dc52c52..264a0b9b 100644 --- a/libmuscle/python/libmuscle/test/test_instance.py +++ b/libmuscle/python/libmuscle/test/test_instance.py @@ -355,6 +355,7 @@ def test_reuse_set_overlay( instance, port_manager, mock_ports, communicator, settings_manager): port_manager.settings_in_connected.return_value = True mock_ports['in']._is_connected = False + instance._receive_timeout = -1 mock_msg = MagicMock() mock_msg.data = Settings({'s1': 1, 's2': 2}) @@ -363,7 +364,7 @@ def test_reuse_set_overlay( instance.reuse_instance() - communicator.receive_message.assert_called_with('muscle_settings_in') + communicator.receive_message.assert_called_with('muscle_settings_in', None, -1) assert settings_manager.overlay['s0'] == 0 assert settings_manager.overlay['s1'] == 1 assert settings_manager.overlay['s2'] == 2 @@ -478,7 +479,7 @@ def test_receive_no_default(instance): def test_receive_inconsistent_settings( instance, settings_manager, port_manager, communicator): - def receive_message(port, slot=None): + def receive_message(port, slot, timeout): mock_msg = MagicMock() if port == 'muscle_settings_in': mock_msg.data = Settings({'s1': 1})