Skip to content

Commit

Permalink
Update documentation and logging
Browse files Browse the repository at this point in the history
Also pass the manager as a direct argument to the Communicator
  • Loading branch information
maarten-ic committed Aug 13, 2024
1 parent d37b663 commit c1d1d57
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
25 changes: 21 additions & 4 deletions libmuscle/python/libmuscle/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,28 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None,


class RecvTimeoutHandler(TimeoutHandler):
"""Timeout handler when receiving messages from peers.
This handler sends a message to the Muscle Manager when the receive times out (and
another message when the message does arrive).
This is used by the manager to detect if the simulation is in a deadlock, where a
cycle of instances is waiting on each other.
"""

def __init__(
self, manager: MMPClient,
peer_instance: str, port_name: str, slot: Optional[int],
timeout: float
) -> None:
"""Initialize a new timeout handler.
Args:
manager: Connection to the muscle manager.
peer_instance: the peer instance we try to receive from.
port_name: the name of the port we try to receive on.
slot: the slot we try to receive on.
"""
self._manager = manager
self._peer_instance = peer_instance
self._port_name = port_name
Expand All @@ -84,7 +101,7 @@ def on_timeout(self) -> None:

def on_receive(self) -> None:
self._manager.waiting_for_receive_done(
self._peer_instance, self._port_name, self._slot)
self._peer_instance, self._port_name, self._slot)


class Communicator:
Expand All @@ -97,7 +114,8 @@ class Communicator:
"""
def __init__(
self, kernel: Reference, index: List[int],
port_manager: PortManager, profiler: Profiler) -> None:
port_manager: PortManager, profiler: Profiler,
manager: MMPClient) -> None:
"""Create a Communicator.
The instance reference must start with one or more Identifiers,
Expand All @@ -115,8 +133,7 @@ def __init__(
self._index = index
self._port_manager = port_manager
self._profiler = profiler
# TODO: make this a proper argument of __init__()
self._manager = profiler._manager
self._manager = manager
# Notify manager, by default, after 10 seconds waiting in receive_message()
self._receive_timeout = 10.0

Expand Down
3 changes: 2 additions & 1 deletion libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def __init__(
"""PortManager for this instance."""

self._communicator = Communicator(
self._name, self._index, self._port_manager, self._profiler)
self._name, self._index, self._port_manager, self._profiler,
self.__manager)
"""Communicator for this instance."""

self._declared_ports = ports
Expand Down
6 changes: 6 additions & 0 deletions libmuscle/python/libmuscle/manager/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ def cancel_all() -> None:
_logger.info(
f'Instance {result.instance} was shut down by'
f' MUSCLE3 because an error occurred elsewhere')
# Ensure we don't see this as a succesful run when shutdown() is called
# by another thread:
all_seemingly_okay = False
else:
stderr_file = (
self._run_dir.instance_dir(result.instance) /
Expand Down Expand Up @@ -260,6 +263,9 @@ def cancel_all() -> None:
'More output may be found in'
f' {self._run_dir.instance_dir(result.instance)}\n'
)
elif not all_seemingly_okay:
# shutdown() was called by another thread (e.g. the DeadlockDetector):
_logger.error('The simulation was aborted.')
else:
_logger.info('The simulation finished without error.')

Expand Down
4 changes: 2 additions & 2 deletions libmuscle/python/libmuscle/test/test_communicator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, Mock, patch

import pytest

Expand Down Expand Up @@ -46,7 +46,7 @@ def mpp_client(MPPClient):

@pytest.fixture
def communicator(connected_port_manager, profiler):
return Communicator(Ref('component'), [], connected_port_manager, profiler)
return Communicator(Ref('component'), [], connected_port_manager, profiler, Mock())


@pytest.fixture
Expand Down

0 comments on commit c1d1d57

Please sign in to comment.