From 39eb75b211e0431fd256c693295996e15d3229c7 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 22 Aug 2024 17:17:09 +0200 Subject: [PATCH 1/8] [WIP] Add a validator to check if an instance adheres to the MMSF sequence of operations --- libmuscle/python/libmuscle/mmsf_validator.py | 163 ++++++++++++++++++ .../libmuscle/test/test_mmsf_validator.py | 125 ++++++++++++++ 2 files changed, 288 insertions(+) create mode 100644 libmuscle/python/libmuscle/mmsf_validator.py create mode 100644 libmuscle/python/libmuscle/test/test_mmsf_validator.py diff --git a/libmuscle/python/libmuscle/mmsf_validator.py b/libmuscle/python/libmuscle/mmsf_validator.py new file mode 100644 index 00000000..d907d12f --- /dev/null +++ b/libmuscle/python/libmuscle/mmsf_validator.py @@ -0,0 +1,163 @@ +import logging +import sys +import types +from typing import List, Optional + +from libmuscle.port_manager import PortManager +from ymmsl import Operator + + +_logger = logging.getLogger(__name__) + + +class MMSFValidator: + def __init__(self, port_manager: PortManager) -> None: + self._port_manager = port_manager + + port_names = port_manager.list_ports() + port_objects = { + operator: [port_manager.get_port(name) for name in names] + for operator, names in port_names.items()} + self._connected_ports = { + operator: [str(port.name) for port in ports if port.is_connected()] + for operator, ports in port_objects.items()} + self._port_operators = { + port: operator + for operator, ports in port_names.items() + for port in ports} + + # Allowed operator transitions, the following are unconditionally allowed: + self._allowed_transitions = { + (Operator.NONE, Operator.NONE), + (Operator.NONE, Operator.F_INIT), + (Operator.F_INIT, Operator.O_I), + (Operator.F_INIT, Operator.O_F), + (Operator.O_I, Operator.S), + (Operator.S, Operator.O_I), + (Operator.S, Operator.O_F), + (Operator.O_F, Operator.NONE), + } + # If there are operators without connected ports, we can skip over those + for operator in [Operator.F_INIT, Operator.O_I, Operator.S, Operator.O_F]: + if not self._connected_ports.get(operator, []): + # Find all transitions A -> operator -> B and allow transition A -> B: + skip_from = [] + skip_to = [] + for from_op, to_op in self._allowed_transitions: + if from_op is operator: + skip_to.append(to_op) + if to_op is operator: + skip_from.append(from_op) + for from_op in skip_from: + for to_op in skip_to: + self._allowed_transitions.add((from_op, to_op)) + + # Disable this validator when the instance uses vector ports to keep this class + # simpler. Support for vector ports may be added in the future. + self._enabled = not any( + port.is_vector() for ports in port_objects.values() for port in ports) + _logger.debug( + "MMSF Validator is %s", "enabled" if self._enabled else "disabled") + + # State tracking + self._current_ports_used: List[str] = [] + self._current_operator: Operator = Operator.NONE + + def check_send(self, port_name: str, slot: Optional[int]) -> None: + self._check_send_receive(port_name, slot) + + def check_receive(self, port_name: str, slot: Optional[int]) -> None: + self._check_send_receive(port_name, slot) + + def reuse_instance(self) -> None: + if not self._enabled: + return + self._check_transition(Operator.NONE) + + def _check_send_receive( + self, port_name: str, slot: Optional[int]) -> None: + if not self._enabled: + return + + operator = self._port_operators[port_name] + if self._current_operator != operator: + # Operator changed, check that all ports were used in the previous operator + self._check_transition(operator, port_name) + + if port_name in self._current_ports_used: + # We're using the same port for a second time, this is fine when we're + # allowed to do this operator immediately again: + self._check_transition(operator, port_name) + + self._current_ports_used.append(port_name) + + def _check_transition(self, operator: Operator, port_name: str = "") -> None: + connected_ports = self._connected_ports.get(self._current_operator, []) + expected: str = "" + + unused_ports = [ + port for port in connected_ports + if port not in self._current_ports_used] + if unused_ports: + # We didn't complete the current phase + if operator in (Operator.F_INIT, Operator.S): + expected = "a receive" + else: + expected = "a send" + expected += " on any of these ports: " + ", ".join(unused_ports) + + elif (self._current_operator, operator) not in self._allowed_transitions: + # Transition to the operator is not allowed, now figure out what we were + # actually expecting. + # First find the allowed transitions from self._current_operator, that are + # also 'valid' (i.e. have connected ports): + allowed = [ + to_op for from_op, to_op in self._allowed_transitions + if from_op is self._current_operator and + (to_op in self._connected_ports or to_op is Operator.NONE)] + # Build the message we want to display to users: + expected_lst = [] + for to_op in sorted(allowed, key=lambda op: op.value): + ports = ', '.join(map(repr, self._connected_ports.get(to_op, []))) + if to_op is Operator.NONE: + expected_lst.append("a call to reuse_instance()") + elif to_op in (Operator.F_INIT, Operator.S): + expected_lst.append(f"a receive on an {to_op.name} port ({ports})") + else: + expected_lst.append(f"a send on an {to_op.name} port ({ports})") + assert expected_lst + expected = " or ".join(expected_lst) + + if expected: + # We expected something else, log a warning: + if operator is Operator.NONE: + action = "reuse_instance()" + elif operator in (Operator.F_INIT, Operator.S): + action = f"Receive on port '{port_name}'" + else: + action = f"Send on port '{port_name}'" + file_and_line = "" + try: + # Try to find the file:line where the user called + # Instance.send/receive/reuse_instance + frame: Optional[types.FrameType] = sys._getframe() + while frame and frame.f_code.co_qualname.startswith("MMSFValidator."): + frame = frame.f_back + while (frame + and frame.f_code.co_filename.endswith("libmuscle/instance.py")): + frame = frame.f_back + if frame: + code = frame.f_code + file_and_line = f" ({code.co_filename}:{code.co_firstlineno})" + except Exception: + pass + _logger.warning( + "%s%s does not adhere to the MMSF: was expecting %s. " + "Not adhering to the Multiscale Modelling and Simulation Framework " + "may lead to deadlocks. You can disable this warning by " + "setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS " + "when creating the libmuscle.Instance.", + action, file_and_line, expected) + + self._current_operator = operator + self._current_ports_used = [] diff --git a/libmuscle/python/libmuscle/test/test_mmsf_validator.py b/libmuscle/python/libmuscle/test/test_mmsf_validator.py new file mode 100644 index 00000000..0951e896 --- /dev/null +++ b/libmuscle/python/libmuscle/test/test_mmsf_validator.py @@ -0,0 +1,125 @@ +from typing import Any +from unittest.mock import Mock + +import pytest +from ymmsl import Operator, Reference + +from libmuscle.port_manager import PortManager +from libmuscle.mmsf_validator import MMSFValidator + + +# For testing purposes we monkeypatch _logger.warning so it raises the following +# exception: ot is easier to verify that an exception is raised than checking that a +# warning message is logged. +class TestMMSFValidatorException(Exception): + pass + + +@pytest.fixture(autouse=True) +def patch_logger_to_raise_error(monkeypatch): + def raise_on_log(msg: str, *args: Any) -> None: + raise TestMMSFValidatorException(msg % args) + monkeypatch.setattr("libmuscle.mmsf_validator._logger.warning", raise_on_log) + + +@pytest.fixture +def mock_peer_info() -> Mock: + # Create a mock PeerInfo indicating that all ports are connected + peer_info = Mock() + peer_info.is_connected.return_value = True + peer_info.get_peer_ports.return_value = [Reference("test")] + peer_info.get_peer_dims.return_value = [] + return peer_info + + +@pytest.fixture +def validator_simple(mock_peer_info) -> MMSFValidator: + port_manager = PortManager([], { + Operator.F_INIT: ["f_i"], + Operator.O_I: ["o_i"], + Operator.S: ["s"], + Operator.O_F: ["o_f"]}) + port_manager.connect_ports(mock_peer_info) + return MMSFValidator(port_manager) + + +@pytest.mark.parametrize("num_iterations", [0, 1, 2]) +@pytest.mark.parametrize("num_reuse", [1, 5]) +def test_simple_correct(num_iterations, num_reuse, validator_simple): + for _ in range(num_reuse): + validator_simple.reuse_instance() + validator_simple.check_receive("f_i", None) + for _ in range(num_iterations): + validator_simple.check_send("o_i", None) + validator_simple.check_receive("s", None) + validator_simple.check_send("o_f", None) + # Final reuse_instance() + validator_simple.reuse_instance() + + +def test_simple_skip_f_init(validator_simple): + validator_simple.reuse_instance() + with pytest.raises(TestMMSFValidatorException): + validator_simple.check_send("o_i", None) + + +def test_simple_skip_o_i(validator_simple): + validator_simple.reuse_instance() + validator_simple.check_receive("f_i", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.check_receive("f_i", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.check_receive("s", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.reuse_instance() + + +def test_simple_skip_s(validator_simple): + validator_simple.reuse_instance() + validator_simple.check_receive("f_i", None) + validator_simple.check_send("o_i", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.check_send("o_i", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.check_send("o_f", None) + + +def test_simple_skip_o_f(validator_simple): + validator_simple.reuse_instance() + validator_simple.check_receive("f_i", None) + validator_simple.check_send("o_i", None) + validator_simple.check_receive("s", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.reuse_instance() + + +def test_simple_skip_reuse_instance(validator_simple): + validator_simple.reuse_instance() + validator_simple.check_receive("f_i", None) + validator_simple.check_receive("o_f", None) + with pytest.raises(TestMMSFValidatorException): + validator_simple.check_receive("f_i", None) + + +def test_only_o_f(mock_peer_info): + port_manager = PortManager([], {Operator.O_F: ["o_f"]}) + port_manager.connect_ports(mock_peer_info) + validator = MMSFValidator(port_manager) + + for _ in range(5): + validator.reuse_instance() + validator.check_send("o_f", None) + with pytest.raises(TestMMSFValidatorException): + validator.check_send("o_f", None) + + +def test_only_f_i(mock_peer_info): + port_manager = PortManager([], {Operator.F_INIT: ["f_i"]}) + port_manager.connect_ports(mock_peer_info) + validator = MMSFValidator(port_manager) + + for _ in range(5): + validator.reuse_instance() + validator.check_receive("f_i", None) + with pytest.raises(TestMMSFValidatorException): + validator.check_receive("f_i", None) From 06ddd33f0b20d04171815c95f9e3ad4f17c20da5 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 23 Aug 2024 10:55:25 +0200 Subject: [PATCH 2/8] Finalize Python implementation of MMSFValidator --- .../examples/python/interact_coupling.py | 8 +- .../test_snapshot_complex_coupling.py | 5 +- libmuscle/python/libmuscle/instance.py | 22 ++++++ libmuscle/python/libmuscle/mmsf_validator.py | 74 +++++++++++++++---- .../python/libmuscle/test/test_instance.py | 2 + .../libmuscle/test/test_mmsf_validator.py | 29 ++++++++ 6 files changed, 119 insertions(+), 21 deletions(-) diff --git a/docs/source/examples/python/interact_coupling.py b/docs/source/examples/python/interact_coupling.py index 7f8ebd36..e110d651 100644 --- a/docs/source/examples/python/interact_coupling.py +++ b/docs/source/examples/python/interact_coupling.py @@ -1,7 +1,7 @@ import logging from typing import Any, Optional, Tuple, Dict -from libmuscle import Instance, Message, USES_CHECKPOINT_API +from libmuscle import Instance, InstanceFlags, Message from libmuscle.runner import run_simulation from ymmsl import ( Component, Conduit, Configuration, Model, Operator, Ports, Settings) @@ -241,7 +241,8 @@ def temporal_coupler() -> None: """ instance = Instance({ Operator.O_I: ['a_out', 'b_out'], - Operator.S: ['a_in', 'b_in']}) + Operator.S: ['a_in', 'b_in']}, + InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS) while instance.reuse_instance(): # Receive initial messages and initialise state @@ -275,7 +276,8 @@ def checkpointing_temporal_coupler() -> None: """ instance = Instance({ Operator.O_I: ['a_out', 'b_out'], - Operator.S: ['a_in', 'b_in']}, USES_CHECKPOINT_API) + Operator.S: ['a_in', 'b_in']}, + InstanceFlags.USES_CHECKPOINT_API | InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS) while instance.reuse_instance(): if instance.resuming(): diff --git a/integration_test/test_snapshot_complex_coupling.py b/integration_test/test_snapshot_complex_coupling.py index e693ea00..cef92dcc 100644 --- a/integration_test/test_snapshot_complex_coupling.py +++ b/integration_test/test_snapshot_complex_coupling.py @@ -5,7 +5,8 @@ from ymmsl import Operator, load, dump from libmuscle import ( - Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API) + Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API, + SKIP_MMSF_SEQUENCE_CHECKS) from libmuscle.manager.run_dir import RunDir from .conftest import run_manager_with_actors, ls_snapshots @@ -58,7 +59,7 @@ def cache_component(max_channels=2): def echo_component(max_channels=2): ports = {Operator.F_INIT: [f'in{i+1}' for i in range(max_channels)], Operator.O_F: [f'out{i+1}' for i in range(max_channels)]} - instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE) + instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE | SKIP_MMSF_SEQUENCE_CHECKS) while instance.reuse_instance(): for p_in, p_out in zip(ports[Operator.F_INIT], ports[Operator.O_F]): diff --git a/libmuscle/python/libmuscle/instance.py b/libmuscle/python/libmuscle/instance.py index bfca4d06..c3558fb5 100644 --- a/libmuscle/python/libmuscle/instance.py +++ b/libmuscle/python/libmuscle/instance.py @@ -18,6 +18,7 @@ from libmuscle.logging_handler import MuscleManagerHandler from libmuscle.mpp_message import ClosePort from libmuscle.mmp_client import MMPClient +from libmuscle.mmsf_validator import MMSFValidator from libmuscle.peer_info import PeerInfo from libmuscle.port_manager import PortManager from libmuscle.profiler import Profiler @@ -89,6 +90,14 @@ class InstanceFlags(Flag): :external:py:attr:`ymmsl.KeepsStateForNextUse.NECESSARY`). """ + SKIP_MMSF_SEQUENCE_CHECKS = auto() + """Disable the checks whether the MMSF is strictly followed when sending/receiving + messages. + + See :class:`~libmuscle.mmsf_validator.MMSFValidator` for a detailed description of + the checks. + """ + _CHECKPOINT_SUPPORT_MASK = ( InstanceFlags.USES_CHECKPOINT_API | @@ -186,6 +195,10 @@ def __init__( self._set_local_log_level() self._set_remote_log_level() self._setup_profiling() + # MMSFValidator needs a connected port manager, and does some logging + self._mmsf_validator = ( + None if InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS in self._flags + else MMSFValidator(self._port_manager)) def reuse_instance(self) -> bool: """Decide whether to run this instance again. @@ -222,6 +235,8 @@ def reuse_instance(self) -> bool: :meth:`save_final_snapshot`, or the checkpointing tutorial. """ self._api_guard.verify_reuse_instance() + if self._mmsf_validator: + self._mmsf_validator.reuse_instance() if self._do_reuse is not None: # thank you, should_save_final_snapshot, for running this already @@ -230,6 +245,9 @@ def reuse_instance(self) -> bool: else: do_reuse = self._decide_reuse_instance() + if self._do_resume and not self._do_init and self._mmsf_validator: + self._mmsf_validator.skip_f_init() + # now _first_run, _do_resume and _do_init are also set correctly do_implicit_checkpoint = ( @@ -428,6 +446,8 @@ def send(self, port_name: str, message: Message, slot: The slot to send the message on, if any. """ self.__check_port(port_name, slot) + if self._mmsf_validator: + self._mmsf_validator.check_send(port_name, slot) if message.settings is None: message = copy(message) message.settings = self._settings_manager.overlay @@ -883,6 +903,8 @@ def __receive_message( description of those. """ self.__check_port(port_name, slot, True) + if self._mmsf_validator: + self._mmsf_validator.check_receive(port_name, slot) port = self._port_manager.get_port(port_name) if port.operator == Operator.F_INIT: diff --git a/libmuscle/python/libmuscle/mmsf_validator.py b/libmuscle/python/libmuscle/mmsf_validator.py index d907d12f..d0468e0d 100644 --- a/libmuscle/python/libmuscle/mmsf_validator.py +++ b/libmuscle/python/libmuscle/mmsf_validator.py @@ -11,6 +11,30 @@ class MMSFValidator: + """The MMSF Validator checks whether Instances are following the Multiscale + Modelling and Simulation Framework when sending and receiving messages. + + In particular it checks that in order: + + 1. reuse_instance() is called + 2. Messages are received on all F_INIT ports + 3. The following sub-items happen in order, 0 or more times: + + a. Messages are sent on all O_I ports + b. Messages are received on all S ports + + 4. Messages are sent on all O_F ports + + If any message is sent or received out of order, a warning is logged to indicate + that the instance is not following the MMSF pattern. In some cases (for example the + time bridge in ``examples/python/interact_coupling.py``) this is expected and the + warnings can be disabled by setting the + :attr:`~libmuscle.instance.InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS` flag. + + Note: + Checks on vector ports are not implemented. When the instance uses vector ports, + the MMSF Validator will be disabled. + """ def __init__(self, port_manager: PortManager) -> None: self._port_manager = port_manager @@ -64,18 +88,28 @@ def __init__(self, port_manager: PortManager) -> None: self._current_operator: Operator = Operator.NONE def check_send(self, port_name: str, slot: Optional[int]) -> None: + """Check that sending on the provided port adheres to the MMSF.""" self._check_send_receive(port_name, slot) def check_receive(self, port_name: str, slot: Optional[int]) -> None: + """Check that receiving on the provided port adheres to the MMSF.""" self._check_send_receive(port_name, slot) def reuse_instance(self) -> None: + """Check that a reuse_instance() adheres to the MMSF.""" if not self._enabled: return self._check_transition(Operator.NONE) + def skip_f_init(self) -> None: + """Call when resuming from an intermediate snapshot: F_INIT is skipped.""" + # Pretend we're now in F_INIT and we have already received on all F_INIT ports: + self._current_operator = Operator.F_INIT + self._current_ports_used = self._connected_ports.get(Operator.F_INIT, []) + def _check_send_receive( self, port_name: str, slot: Optional[int]) -> None: + """Actual implementation of check_send/check_receive.""" if not self._enabled: return @@ -92,6 +126,15 @@ def _check_send_receive( self._current_ports_used.append(port_name) def _check_transition(self, operator: Operator, port_name: str = "") -> None: + """Check that a transition to the provided operator is allowed. + + Log a warning when the transition does not adhere to the MMSF. + + Args: + operator: Operator to transition to. + port_name: The name of the port that was sent/receveived on. This is only + used for constructing the warning message. + """ connected_ports = self._connected_ports.get(self._current_operator, []) expected: str = "" @@ -100,11 +143,13 @@ def _check_transition(self, operator: Operator, port_name: str = "") -> None: if port not in self._current_ports_used] if unused_ports: # We didn't complete the current phase - if operator in (Operator.F_INIT, Operator.S): + if self._current_operator in (Operator.F_INIT, Operator.S): expected = "a receive" else: expected = "a send" - expected += " on any of these ports: " + ", ".join(unused_ports) + expected += ( + f" on any of these {self._current_operator.name} ports: " + + ", ".join(unused_ports)) elif (self._current_operator, operator) not in self._allowed_transitions: # Transition to the operator is not allowed, now figure out what we were @@ -136,28 +181,25 @@ def _check_transition(self, operator: Operator, port_name: str = "") -> None: action = f"Receive on port '{port_name}'" else: action = f"Send on port '{port_name}'" - file_and_line = "" + + # Find the file:line where the user called send/receive/reuse_instance try: - # Try to find the file:line where the user called - # Instance.send/receive/reuse_instance frame: Optional[types.FrameType] = sys._getframe() - while frame and frame.f_code.co_qualname.startswith("MMSFValidator."): - frame = frame.f_back - while (frame - and frame.f_code.co_filename.endswith("libmuscle/instance.py")): - frame = frame.f_back - if frame: - code = frame.f_code - file_and_line = f" ({code.co_filename}:{code.co_firstlineno})" except Exception: - pass + frame = None # sys._getframe() is not guaranteed available + while (frame + and frame.f_globals.get("__name__", "").startswith("libmuscle.")): + # This frame is still part of a libmuscle module, step up: + frame = frame.f_back + loc = f" ({frame.f_code.co_filename}:{frame.f_lineno})" if frame else "" + _logger.warning( - "%s%s does not adhere to the MMSF: was expecting %s. " + "%s%s does not adhere to the MMSF: was expecting %s.\n" "Not adhering to the Multiscale Modelling and Simulation Framework " "may lead to deadlocks. You can disable this warning by " "setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS " "when creating the libmuscle.Instance.", - action, file_and_line, expected) + action, loc, expected) self._current_operator = operator self._current_ports_used = [] diff --git a/libmuscle/python/libmuscle/test/test_instance.py b/libmuscle/python/libmuscle/test/test_instance.py index 9dc52c52..e9612522 100644 --- a/libmuscle/python/libmuscle/test/test_instance.py +++ b/libmuscle/python/libmuscle/test/test_instance.py @@ -304,6 +304,8 @@ def test_get_setting(instance, settings_manager): def test_list_ports(instance, port_manager): + port_manager.list_ports.assert_called_once_with() + port_manager.list_ports.reset_mock() instance.list_ports() port_manager.list_ports.assert_called_once_with() diff --git a/libmuscle/python/libmuscle/test/test_mmsf_validator.py b/libmuscle/python/libmuscle/test/test_mmsf_validator.py index 0951e896..4efd4e92 100644 --- a/libmuscle/python/libmuscle/test/test_mmsf_validator.py +++ b/libmuscle/python/libmuscle/test/test_mmsf_validator.py @@ -123,3 +123,32 @@ def test_only_f_i(mock_peer_info): validator.check_receive("f_i", None) with pytest.raises(TestMMSFValidatorException): validator.check_receive("f_i", None) + + +def test_micro(mock_peer_info): + port_manager = PortManager([], {Operator.F_INIT: ["f_i"], Operator.O_F: ["o_f"]}) + port_manager.connect_ports(mock_peer_info) + validator = MMSFValidator(port_manager) + + for _ in range(5): + validator.reuse_instance() + validator.check_receive("f_i", None) + validator.check_receive("o_f", None) + validator.reuse_instance() + validator.check_receive("f_i", None) + with pytest.raises(TestMMSFValidatorException): + validator.reuse_instance() + with pytest.raises(TestMMSFValidatorException): + validator.check_receive("f_i", None) + + +def test_not_all_ports_used(mock_peer_info): + port_manager = PortManager([], { + Operator.F_INIT: ["f_i1", "f_i2"], Operator.O_F: ["o_f"]}) + port_manager.connect_ports(mock_peer_info) + validator = MMSFValidator(port_manager) + + validator.reuse_instance() + validator.check_receive("f_i1", None) + with pytest.raises(TestMMSFValidatorException): + validator.check_send("o_f", None) From 78fc8246e791ee2120175bd42fb5d6e90ec3430a Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 27 Aug 2024 09:58:48 +0200 Subject: [PATCH 3/8] Add checks to send() and receive() that the operator belonging to the port allows sending/receiving --- libmuscle/python/libmuscle/instance.py | 19 +++++++++++++++---- .../python/libmuscle/test/test_instance.py | 10 ++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/libmuscle/python/libmuscle/instance.py b/libmuscle/python/libmuscle/instance.py index c3558fb5..a21de5ae 100644 --- a/libmuscle/python/libmuscle/instance.py +++ b/libmuscle/python/libmuscle/instance.py @@ -445,7 +445,7 @@ def send(self, port_name: str, message: Message, message: The message to be sent. slot: The slot to send the message on, if any. """ - self.__check_port(port_name, slot) + self.__check_port(port_name, slot, True) if self._mmsf_validator: self._mmsf_validator.check_send(port_name, slot) if message.settings is None: @@ -902,7 +902,7 @@ def __receive_message( This implements receive and receive_with_settings, see the description of those. """ - self.__check_port(port_name, slot, True) + self.__check_port(port_name, slot, False, True) if self._mmsf_validator: self._mmsf_validator.check_receive(port_name, slot) @@ -1025,7 +1025,7 @@ def __list_declared_ports(self) -> List[Port]: return result def __check_port( - self, port_name: str, slot: Optional[int] = None, + self, port_name: str, slot: Optional[int], is_send: bool, allow_slot_out_of_range: bool = False) -> None: if not self._port_manager.port_exists(port_name): err_msg = (('Port "{}" does not exist on "{}". Please check' @@ -1034,8 +1034,19 @@ def __check_port( self.__shutdown(err_msg) raise RuntimeError(err_msg) + port = self._port_manager.get_port(port_name) + if is_send: + if not port.operator.allows_sending(): + err_msg = (f'Port "{port_name}" does not allow sending messages.') + self.__shutdown(err_msg) + raise RuntimeError(err_msg) + else: + if not port.operator.allows_receiving(): + err_msg = (f'Port "{port_name}" does not allow receiving messages.') + self.__shutdown(err_msg) + raise RuntimeError(err_msg) + if slot is not None: - port = self._port_manager.get_port(port_name) if not port.is_vector(): err_msg = ( f'Port "{port_name}" is not a vector port, but a slot was' diff --git a/libmuscle/python/libmuscle/test/test_instance.py b/libmuscle/python/libmuscle/test/test_instance.py index e9612522..4f32831f 100644 --- a/libmuscle/python/libmuscle/test/test_instance.py +++ b/libmuscle/python/libmuscle/test/test_instance.py @@ -436,11 +436,21 @@ def test_send_after_resize(instance, message): instance.send('out_r', message, 13) +def test_send_on_receiving_port(instance, message): + with pytest.raises(RuntimeError): + instance.send("in_v", message, 3) + + def test_receive_on_invalid_port(instance): with pytest.raises(RuntimeError): instance.receive('does_not_exist') +def test_receive_on_sending_port(instance): + with pytest.raises(RuntimeError): + instance.receive("out_v", 3) + + def test_receive_f_init(instance, port_manager, communicator): mock_msg = MagicMock() mock_msg.data = Settings() From 73b9f3560deff249753eec83e093532e4c566f79 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 27 Aug 2024 09:59:16 +0200 Subject: [PATCH 4/8] Refactor MMSFValidator._allowed_transitions and process review feedback --- libmuscle/python/libmuscle/mmsf_validator.py | 60 +++++++++----------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/libmuscle/python/libmuscle/mmsf_validator.py b/libmuscle/python/libmuscle/mmsf_validator.py index d0468e0d..d2c6ac43 100644 --- a/libmuscle/python/libmuscle/mmsf_validator.py +++ b/libmuscle/python/libmuscle/mmsf_validator.py @@ -52,36 +52,37 @@ def __init__(self, port_manager: PortManager) -> None: # Allowed operator transitions, the following are unconditionally allowed: self._allowed_transitions = { - (Operator.NONE, Operator.NONE), - (Operator.NONE, Operator.F_INIT), - (Operator.F_INIT, Operator.O_I), - (Operator.F_INIT, Operator.O_F), - (Operator.O_I, Operator.S), - (Operator.S, Operator.O_I), - (Operator.S, Operator.O_F), - (Operator.O_F, Operator.NONE), - } + Operator.NONE: [Operator.NONE, Operator.F_INIT], + Operator.F_INIT: [Operator.O_I, Operator.O_F], + Operator.O_I: [Operator.S], + Operator.S: [Operator.O_I, Operator.O_F], + Operator.O_F: [Operator.NONE]} # If there are operators without connected ports, we can skip over those for operator in [Operator.F_INIT, Operator.O_I, Operator.S, Operator.O_F]: if not self._connected_ports.get(operator, []): # Find all transitions A -> operator -> B and allow transition A -> B: - skip_from = [] - skip_to = [] - for from_op, to_op in self._allowed_transitions: + for from_op, allowed in self._allowed_transitions.items(): if from_op is operator: - skip_to.append(to_op) - if to_op is operator: - skip_from.append(from_op) - for from_op in skip_from: - for to_op in skip_to: - self._allowed_transitions.add((from_op, to_op)) + continue + if operator not in allowed: + continue + for to_op in self._allowed_transitions[operator]: + if to_op not in allowed: + allowed.append(to_op) + # Sort allowed transitions for more logical log messages + for allowed in self._allowed_transitions.values(): + allowed.sort(key=lambda op: op.value) # Disable this validator when the instance uses vector ports to keep this class # simpler. Support for vector ports may be added in the future. self._enabled = not any( port.is_vector() for ports in port_objects.values() for port in ports) - _logger.debug( - "MMSF Validator is %s", "enabled" if self._enabled else "disabled") + if self._enabled: + _logger.debug("MMSF Validator is enabled") + else: + _logger.debug( + "MMSF Validator is disabled: this instance uses vector ports, " + "which are not supported by the MMSF Validator.") # State tracking self._current_ports_used: List[str] = [] @@ -143,7 +144,7 @@ def _check_transition(self, operator: Operator, port_name: str = "") -> None: if port not in self._current_ports_used] if unused_ports: # We didn't complete the current phase - if self._current_operator in (Operator.F_INIT, Operator.S): + if self._current_operator.allows_receiving(): expected = "a receive" else: expected = "a send" @@ -151,22 +152,17 @@ def _check_transition(self, operator: Operator, port_name: str = "") -> None: f" on any of these {self._current_operator.name} ports: " + ", ".join(unused_ports)) - elif (self._current_operator, operator) not in self._allowed_transitions: - # Transition to the operator is not allowed, now figure out what we were - # actually expecting. - # First find the allowed transitions from self._current_operator, that are - # also 'valid' (i.e. have connected ports): - allowed = [ - to_op for from_op, to_op in self._allowed_transitions - if from_op is self._current_operator and - (to_op in self._connected_ports or to_op is Operator.NONE)] + elif operator not in self._allowed_transitions[self._current_operator]: + # Transition to the operator is not allowed. # Build the message we want to display to users: expected_lst = [] - for to_op in sorted(allowed, key=lambda op: op.value): + for to_op in self._allowed_transitions[self._current_operator]: ports = ', '.join(map(repr, self._connected_ports.get(to_op, []))) if to_op is Operator.NONE: expected_lst.append("a call to reuse_instance()") - elif to_op in (Operator.F_INIT, Operator.S): + elif not ports: + continue + elif to_op.allows_receiving(): expected_lst.append(f"a receive on an {to_op.name} port ({ports})") else: expected_lst.append(f"a send on an {to_op.name} port ({ports})") From 938670973ff786da870d09da7e2c499e2dababf6 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 27 Aug 2024 10:28:14 +0200 Subject: [PATCH 5/8] Use caplog in test_mmsf_validator --- .../libmuscle/test/test_mmsf_validator.py | 121 +++++++++++------- 1 file changed, 75 insertions(+), 46 deletions(-) diff --git a/libmuscle/python/libmuscle/test/test_mmsf_validator.py b/libmuscle/python/libmuscle/test/test_mmsf_validator.py index 4efd4e92..4a6bd69b 100644 --- a/libmuscle/python/libmuscle/test/test_mmsf_validator.py +++ b/libmuscle/python/libmuscle/test/test_mmsf_validator.py @@ -1,3 +1,4 @@ +from logging import WARNING from typing import Any from unittest.mock import Mock @@ -8,18 +9,16 @@ from libmuscle.mmsf_validator import MMSFValidator -# For testing purposes we monkeypatch _logger.warning so it raises the following -# exception: ot is easier to verify that an exception is raised than checking that a -# warning message is logged. -class TestMMSFValidatorException(Exception): - pass +class Contains: + """Helper class to simplify tests using caplog.record_tuples""" + def __init__(self, value: Any) -> None: + self.value = value + def __eq__(self, other: Any) -> bool: + return self.value in other -@pytest.fixture(autouse=True) -def patch_logger_to_raise_error(monkeypatch): - def raise_on_log(msg: str, *args: Any) -> None: - raise TestMMSFValidatorException(msg % args) - monkeypatch.setattr("libmuscle.mmsf_validator._logger.warning", raise_on_log) + def __repr__(self) -> str: + return f"Contains({self.value!r})" @pytest.fixture @@ -45,7 +44,7 @@ def validator_simple(mock_peer_info) -> MMSFValidator: @pytest.mark.parametrize("num_iterations", [0, 1, 2]) @pytest.mark.parametrize("num_reuse", [1, 5]) -def test_simple_correct(num_iterations, num_reuse, validator_simple): +def test_simple_correct(num_iterations, num_reuse, validator_simple, caplog): for _ in range(num_reuse): validator_simple.reuse_instance() validator_simple.check_receive("f_i", None) @@ -55,53 +54,72 @@ def test_simple_correct(num_iterations, num_reuse, validator_simple): validator_simple.check_send("o_f", None) # Final reuse_instance() validator_simple.reuse_instance() + assert caplog.record_tuples == [] -def test_simple_skip_f_init(validator_simple): +def test_simple_skip_f_init(validator_simple, caplog): validator_simple.reuse_instance() - with pytest.raises(TestMMSFValidatorException): - validator_simple.check_send("o_i", None) + validator_simple.check_send("o_i", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Send on port 'o_i'"))] -def test_simple_skip_o_i(validator_simple): +def test_simple_skip_o_i(validator_simple, caplog): validator_simple.reuse_instance() validator_simple.check_receive("f_i", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.check_receive("f_i", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.check_receive("s", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.reuse_instance() + + validator_simple.check_receive("f_i", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Receive on port 'f_i'"))] + + caplog.clear() + validator_simple.check_receive("s", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Receive on port 's'"))] + + caplog.clear() + validator_simple.reuse_instance() + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("reuse_instance()"))] -def test_simple_skip_s(validator_simple): +def test_simple_skip_s(validator_simple, caplog): validator_simple.reuse_instance() validator_simple.check_receive("f_i", None) validator_simple.check_send("o_i", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.check_send("o_i", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.check_send("o_f", None) + validator_simple.check_send("o_i", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Send on port 'o_i'"))] + + caplog.clear() + validator_simple.check_send("o_f", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Send on port 'o_f'"))] -def test_simple_skip_o_f(validator_simple): + +def test_simple_skip_o_f(validator_simple, caplog): validator_simple.reuse_instance() validator_simple.check_receive("f_i", None) validator_simple.check_send("o_i", None) validator_simple.check_receive("s", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.reuse_instance() + + validator_simple.reuse_instance() + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("reuse_instance()"))] -def test_simple_skip_reuse_instance(validator_simple): +def test_simple_skip_reuse_instance(validator_simple, caplog): validator_simple.reuse_instance() validator_simple.check_receive("f_i", None) validator_simple.check_receive("o_f", None) - with pytest.raises(TestMMSFValidatorException): - validator_simple.check_receive("f_i", None) + + validator_simple.check_receive("f_i", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Receive on port 'f_i'"))] -def test_only_o_f(mock_peer_info): +def test_only_o_f(mock_peer_info, caplog): port_manager = PortManager([], {Operator.O_F: ["o_f"]}) port_manager.connect_ports(mock_peer_info) validator = MMSFValidator(port_manager) @@ -109,11 +127,13 @@ def test_only_o_f(mock_peer_info): for _ in range(5): validator.reuse_instance() validator.check_send("o_f", None) - with pytest.raises(TestMMSFValidatorException): - validator.check_send("o_f", None) + validator.check_send("o_f", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Send on port 'o_f'"))] -def test_only_f_i(mock_peer_info): + +def test_only_f_i(mock_peer_info, caplog): port_manager = PortManager([], {Operator.F_INIT: ["f_i"]}) port_manager.connect_ports(mock_peer_info) validator = MMSFValidator(port_manager) @@ -121,11 +141,13 @@ def test_only_f_i(mock_peer_info): for _ in range(5): validator.reuse_instance() validator.check_receive("f_i", None) - with pytest.raises(TestMMSFValidatorException): - validator.check_receive("f_i", None) + + validator.check_receive("f_i", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Receive on port 'f_i'"))] -def test_micro(mock_peer_info): +def test_micro(mock_peer_info, caplog): port_manager = PortManager([], {Operator.F_INIT: ["f_i"], Operator.O_F: ["o_f"]}) port_manager.connect_ports(mock_peer_info) validator = MMSFValidator(port_manager) @@ -136,13 +158,18 @@ def test_micro(mock_peer_info): validator.check_receive("o_f", None) validator.reuse_instance() validator.check_receive("f_i", None) - with pytest.raises(TestMMSFValidatorException): - validator.reuse_instance() - with pytest.raises(TestMMSFValidatorException): - validator.check_receive("f_i", None) + + validator.check_receive("f_i", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Receive on port 'f_i'"))] + + caplog.clear() + validator.reuse_instance() + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("reuse_instance()"))] -def test_not_all_ports_used(mock_peer_info): +def test_not_all_ports_used(mock_peer_info, caplog): port_manager = PortManager([], { Operator.F_INIT: ["f_i1", "f_i2"], Operator.O_F: ["o_f"]}) port_manager.connect_ports(mock_peer_info) @@ -150,5 +177,7 @@ def test_not_all_ports_used(mock_peer_info): validator.reuse_instance() validator.check_receive("f_i1", None) - with pytest.raises(TestMMSFValidatorException): - validator.check_send("o_f", None) + + validator.check_send("o_f", None) + assert caplog.record_tuples == [ + ("libmuscle.mmsf_validator", WARNING, Contains("Send on port 'o_f'"))] From 73f1b4167b1494e855cec0dfd206b46bbb91f76a Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 27 Aug 2024 15:15:32 +0200 Subject: [PATCH 6/8] Implement C++ MMSFValidator and tests Also fix a python test and comment --- .../cpp/src/libmuscle/mmsf_validator.cpp | 207 ++++++++++++++ .../cpp/src/libmuscle/mmsf_validator.hpp | 76 +++++ .../libmuscle/tests/test_mmsf_validator.cpp | 268 ++++++++++++++++++ libmuscle/cpp/src/ymmsl/component.cpp | 18 ++ libmuscle/cpp/src/ymmsl/component.hpp | 7 + libmuscle/cpp/src/ymmsl/ymmsl.hpp | 1 + libmuscle/python/libmuscle/mmsf_validator.py | 6 +- .../libmuscle/test/test_mmsf_validator.py | 2 +- 8 files changed, 582 insertions(+), 3 deletions(-) create mode 100644 libmuscle/cpp/src/libmuscle/mmsf_validator.cpp create mode 100644 libmuscle/cpp/src/libmuscle/mmsf_validator.hpp create mode 100644 libmuscle/cpp/src/libmuscle/tests/test_mmsf_validator.cpp diff --git a/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp b/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp new file mode 100644 index 00000000..3c34e2cc --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp @@ -0,0 +1,207 @@ +#include "mmsf_validator.hpp" + +#include + +namespace { + +template +inline bool contains(Container const & container, T const & value) { + return std::find(container.begin(), container.end(), value) != container.end(); +} + +} + +namespace libmuscle { namespace _MUSCLE_IMPL_NS { + +using ::ymmsl::Operator; + +MMSFValidator::MMSFValidator(PortManager const& port_manager, Logger & logger) + : port_manager_(port_manager) + , logger_(logger) + , enabled_(true) + , current_operator_(Operator::NONE) +{ + auto port_names = port_manager.list_ports(); + + connected_ports_[Operator::NONE] = {}; + connected_ports_[Operator::F_INIT] = {}; + connected_ports_[Operator::O_I] = {}; + connected_ports_[Operator::S] = {}; + connected_ports_[Operator::O_F] = {}; + + for (auto const & value : port_names) { + auto const & op = value.first; + std::vector connected_ports; + for (auto const & port_name : value.second) { + auto const & port_obj = port_manager.get_port(port_name); + if (port_obj.is_connected()) { + connected_ports.push_back(port_name); + } + if (port_obj.is_vector()) { + enabled_ = false; // We don't support vector ports (yet) + } + port_operators_[port_name] = op; + } + connected_ports_[op] = connected_ports; + } + + // Allowed operator transitions, the following are unconditionally allowed + allowed_transitions_[Operator::NONE] = {Operator::NONE, Operator::F_INIT}; + allowed_transitions_[Operator::F_INIT] = {Operator::O_I, Operator::O_F}; + allowed_transitions_[Operator::O_I] = {Operator::S}; + allowed_transitions_[Operator::S] = {Operator::O_I, Operator::O_F}; + allowed_transitions_[Operator::O_F] = {Operator::NONE}; + // If there are operators without connected ports, we can skip over those + for (auto const op : {Operator::F_INIT, Operator::O_I, Operator::S, Operator::O_F}) { + if (connected_ports_[op].empty()) { + // Find all transitions A -> op -> B and allow transition A -> B: + for (auto & item : allowed_transitions_) { + if (item.first == op) continue; + auto & allowed = item.second; + if (!contains(allowed, op)) + continue; // op is not in the allowed list + for (auto const & to_op : allowed_transitions_[op]) { + // add to_op to allowed, if it is not already in the list: + if (std::find(allowed.begin(), allowed.end(), to_op) == allowed.end()) + allowed.push_back(to_op); + } + } + } + } + // Sort allowed transitions for more logical log messages + for (auto & item : allowed_transitions_) { + std::sort(item.second.begin(), item.second.end()); + } + + if (enabled_) { + logger_.debug("MMSF Validator is enabled"); + } else { + logger_.debug( + "MMSF Validator is disabled: this instance uses vector ports, " + "which are not supported by the MMSF Validator."); + } +} + +void MMSFValidator::check_send( + std::string const& port_name, Optional slot) +{ + check_send_receive(port_name, slot); +} + +void MMSFValidator::check_receive( + std::string const& port_name, Optional slot) +{ + check_send_receive(port_name, slot); +} + +void MMSFValidator::reuse_instance() { + if (enabled_) { + check_transition(Operator::NONE, ""); + } +} + +void MMSFValidator::skip_f_init() { + // Pretend we're now in F_INIT and we have already received on all F_INIT ports + current_operator_ = Operator::F_INIT; + current_ports_used_ = connected_ports_[Operator::F_INIT]; +} + +void MMSFValidator::check_send_receive( + std::string const& port_name, Optional slot) +{ + if (!enabled_) return; + + auto op = port_operators_[port_name]; + if (current_operator_ != op) { + // Operator changed, check that all ports were used in the previous operator + check_transition(op, port_name); + } + + if (std::find( + current_ports_used_.begin(), + current_ports_used_.end(), + port_name) != current_ports_used_.end()) { + // We're using the same port for a second time, this is fine if: + // 1. We're allowed to do this operator immediately again, and + // 2. All ports of the current operator have been used + // Both are checked by check_transition_: + check_transition(op, port_name); + } + + current_ports_used_.push_back(port_name); +} + +void MMSFValidator::check_transition( + ::ymmsl::Operator op, std::string const& port_name) +{ + std::ostringstream expected_oss; + + std::vector unused_ports; + for (auto const & port : connected_ports_[current_operator_]) { + if (!contains(current_ports_used_, port)) { + unused_ports.push_back(port); + } + } + if (!unused_ports.empty()) { + // We didn't complete the current phase + if (::ymmsl::allows_receiving(current_operator_)) { + expected_oss << "a receive"; + } else { + expected_oss << "a send"; + } + expected_oss << " on any of these " << ::ymmsl::operator_name(current_operator_) << " ports: "; + for (std::size_t i = 0; i < unused_ports.size(); ++i) { + if (i > 0) expected_oss << ", "; + expected_oss << unused_ports[i]; + } + } else if (!contains(allowed_transitions_[current_operator_], op)) { + // Transition to the operator is not allowed + std::size_t i = 0; + for (auto const & to_op : allowed_transitions_[current_operator_]) { + if (i++ > 0) expected_oss << " or "; + if (to_op == Operator::NONE) { + expected_oss << "a call to reuse_instance()"; + } else if (!connected_ports_[to_op].empty()) { + if (::ymmsl::allows_receiving(to_op)) { + expected_oss << "a receive"; + } else { + expected_oss << "a send"; + } + expected_oss << " on an " << ::ymmsl::operator_name(to_op); + expected_oss << " port ("; + std::size_t j = 0; + for (auto const & port : connected_ports_[to_op]) { + if (j++ > 0) expected_oss << ", "; + expected_oss << port; + } + expected_oss << ")"; + } + } + } + + std::string expected = expected_oss.str(); + if (!expected.empty()) { + // We expected something else, log a warning + std::string action; + if (op == Operator::NONE) { + action = "reuse_instance()"; + } else if (op == Operator::F_INIT || op == Operator::S) { + action = "Receive on port '" + port_name + "'"; + } else { + action = "Send on port '" + port_name + "'"; + } + + logger_.warning( + action, " does not adhere to the MMSF: was expecting ", expected, + ".\n" + "Not adhering to the Multiscale Modelling and Simulation Framework " + "may lead to deadlocks. You can disable this warning by " + "setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS " + "when creating the libmuscle.Instance."); + } + + current_operator_ = op; + current_ports_used_.clear(); +} + +} } diff --git a/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp b/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp new file mode 100644 index 00000000..9a7720f3 --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + + +namespace libmuscle { namespace _MUSCLE_IMPL_NS { + +/** The MMSF Validator checks whether Instances are following the Multiscale + * Modelling and Simulation Framework when sending and receiving messages. + * + * In particular it checks that in order: + * + * 1. reuse_instance() is called + * 2. Messages are received on all F_INIT ports + * 3. The following sub-items happen in order, 0 or more times: + * + * a. Messages are sent on all O_I ports + * b. Messages are received on all S ports + * + * 4. Messages are sent on all O_F ports + * + * If any message is sent or received out of order, a warning is logged to indicate + * that the instance is not following the MMSF pattern. In some cases (for example the + * time bridge in ``examples/python/interact_coupling.py``) this is expected and the + * warnings can be disabled by setting the SKIP_MMSF_SEQUENCE_CHECKS flag. + * + * Note: + * Checks on vector ports are not implemented. When the instance uses vector ports, + * the MMSF Validator will be disabled. + */ +class MMSFValidator { + public: + MMSFValidator(PortManager const & port_manager, Logger & logger); + ~MMSFValidator() = default; + + /** Check that sending on the provided port adheres to the MMSF. */ + void check_send(std::string const & port_name, Optional slot); + /** Check that receiving on the provided port adheres to the MMSF. */ + void check_receive(std::string const & port_name, Optional slot); + /** Check that a reuse_instance() adheres to the MMSF. */ + void reuse_instance(); + /** Call when resuming from an intermediate snapshot: F_INIT is skipped. */ + void skip_f_init(); + + private: + /** Actual implementation of check_send/check_receive. */ + void check_send_receive(std::string const & port_name, Optional slot); + /** Check that a transition to the provided operator is allowed. + * + * Log a warning when the transition does not adhere to the MMSF. + * + * @param op Operator to transition to. + * @param port_name The name of the port that was sent/receveived on. This is only + * used for constructing the warning message. + */ + void check_transition(::ymmsl::Operator op, std::string const & port_name); + + PortManager const & port_manager_; + Logger & logger_; + std::unordered_map<::ymmsl::Operator, std::vector> connected_ports_; + std::unordered_map port_operators_; + std::unordered_map<::ymmsl::Operator, std::vector<::ymmsl::Operator>> allowed_transitions_; + bool enabled_; + + // state tracking + std::vector current_ports_used_; + ::ymmsl::Operator current_operator_; +}; + +} } diff --git a/libmuscle/cpp/src/libmuscle/tests/test_mmsf_validator.cpp b/libmuscle/cpp/src/libmuscle/tests/test_mmsf_validator.cpp new file mode 100644 index 00000000..dfc04edf --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/tests/test_mmsf_validator.cpp @@ -0,0 +1,268 @@ +// Inject mocks +#define LIBMUSCLE_MOCK_LOGGER + +// into the real implementation to test. +#include + +#include +#include + +// Test code dependencies +#include +#include + +#include +#include + +#include +#include + + +using libmuscle::_MUSCLE_IMPL_NS::LogLevel; +using libmuscle::_MUSCLE_IMPL_NS::MMSFValidator; +using libmuscle::_MUSCLE_IMPL_NS::MockLogger; +using libmuscle::_MUSCLE_IMPL_NS::PeerDims; +using libmuscle::_MUSCLE_IMPL_NS::PeerInfo; +using libmuscle::_MUSCLE_IMPL_NS::PeerLocations; +using libmuscle::_MUSCLE_IMPL_NS::PortManager; +using libmuscle::_MUSCLE_IMPL_NS::PortsDescription; + +using ymmsl::Conduit; +using ymmsl::Operator; +using ymmsl::Reference; + +using testing::HasSubstr; + + +int main(int argc, char *argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + +struct libmuscle_mmsf_validator : ::testing::Test { + std::unique_ptr port_manager_; + std::unique_ptr validator_; + std::unique_ptr logger_; + + void create_validator(PortsDescription const & declared_ports) { + logger_ = std::make_unique(); + port_manager_ = std::make_unique(std::vector(), declared_ports); + + // Build peer info for port_manager.connect_ports + Reference component_id("other"); + std::vector conduits; + for (auto const & item : declared_ports) { + for (auto const & port_name : item.second) { + if (::ymmsl::allows_receiving(item.first)) { + conduits.emplace_back("component."+port_name, "other."+port_name); + } else { + conduits.emplace_back("other."+port_name, "component."+port_name); + } + } + } + PeerDims peer_dims({ {"component", {}}}); + PeerLocations peer_locations({{"component", {"direct:test"}}}); + PeerInfo peer_info(component_id, {}, conduits, peer_dims, peer_locations); + + port_manager_->connect_ports(peer_info); + validator_ = std::make_unique(*port_manager_, *logger_); + // Discard the debug log statement in the MMSFValidator initializer: + logger_->caplog.call_args_list.clear(); + } +}; + + +struct libmuscle_simple_validator : libmuscle_mmsf_validator { + libmuscle_simple_validator() { + create_validator(PortsDescription{ + {Operator::F_INIT, {"f_i"}}, + {Operator::O_I, {"o_i"}}, + {Operator::S, {"s"}}, + {Operator::O_F, {"o_f"}}}); + } +}; + + +TEST_F(libmuscle_simple_validator, test_simple_correct_0it) { + for (std::size_t i = 0; i < 5; ++i) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_f", {}); + } + validator_->reuse_instance(); + ASSERT_FALSE(logger_->caplog.called()); +} + + +TEST_F(libmuscle_simple_validator, test_simple_correct_1it) { + for (std::size_t i = 0; i < 5; ++i) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_i", {}); + validator_->check_receive("s", {}); + validator_->check_send("o_f", {}); + } + validator_->reuse_instance(); + ASSERT_FALSE(logger_->caplog.called()); +} + + +TEST_F(libmuscle_simple_validator, test_simple_correct_2it) { + for (std::size_t i = 0; i < 5; ++i) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_i", {}); + validator_->check_receive("s", {}); + validator_->check_send("o_i", {}); + validator_->check_receive("s", {}); + validator_->check_send("o_f", {}); + } + validator_->reuse_instance(); + ASSERT_FALSE(logger_->caplog.called()); +} + + +TEST_F(libmuscle_simple_validator, test_simple_skip_f_init) { + validator_->reuse_instance(); + validator_->check_send("o_i", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Send on port 'o_i'")); +} + + +TEST_F(libmuscle_simple_validator, test_simple_skip_o_i) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + + validator_->check_receive("f_i", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Receive on port 'f_i'")); + + logger_->caplog.call_args_list.clear(); + validator_->check_receive("s", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Receive on port 's'")); + + logger_->caplog.call_args_list.clear(); + validator_->reuse_instance(); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("reuse_instance()")); +} + + +TEST_F(libmuscle_simple_validator, test_simple_skip_s) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_i", {}); + + validator_->check_send("o_i", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Send on port 'o_i'")); + + logger_->caplog.call_args_list.clear(); + validator_->check_send("o_f", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Send on port 'o_f'")); +} + + +TEST_F(libmuscle_simple_validator, test_simple_skip_o_f) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_i", {}); + validator_->check_receive("s", {}); + + validator_->reuse_instance(); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("reuse_instance()")); +} + +TEST_F(libmuscle_simple_validator, test_simple_skip_reuse_instance) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_f", {}); + + validator_->check_receive("f_i", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Receive on port 'f_i'")); +} + + +TEST_F(libmuscle_mmsf_validator, test_only_o_f) { + create_validator(PortsDescription{{Operator::O_F, {"o_f"}}}); + + for (std::size_t i = 0; i < 5; ++i) { + validator_->reuse_instance(); + validator_->check_send("o_f", {}); + } + + validator_->check_send("o_f", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Send on port 'o_f'")); +} + + +TEST_F(libmuscle_mmsf_validator, test_only_f_i) { + create_validator(PortsDescription{{Operator::F_INIT, {"f_i"}}}); + + for (std::size_t i = 0; i < 5; ++i) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + } + + validator_->check_receive("f_i", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Receive on port 'f_i'")); +} + + +TEST_F(libmuscle_mmsf_validator, test_micro) { + create_validator(PortsDescription{ + {Operator::F_INIT, {"f_i"}}, + {Operator::O_F, {"o_f"}}}); + + for (std::size_t i = 0; i < 5; ++i) { + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + validator_->check_send("o_f", {}); + } + validator_->reuse_instance(); + validator_->check_receive("f_i", {}); + + validator_->check_receive("f_i", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Receive on port 'f_i'")); + + logger_->caplog.call_args_list.clear(); + validator_->reuse_instance(); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("reuse_instance()")); +} + + +TEST_F(libmuscle_mmsf_validator, test_not_all_ports_used) { + create_validator(PortsDescription{ + {Operator::F_INIT, {"f_i1", "f_i2"}}, + {Operator::O_F, {"o_f"}}}); + + validator_->reuse_instance(); + validator_->check_receive("f_i1", {}); + + validator_->check_send("o_f", {}); + ASSERT_TRUE(logger_->caplog.called_once()); + ASSERT_EQ(logger_->caplog.call_arg<0>(0), LogLevel::WARNING); + ASSERT_THAT(logger_->caplog.call_arg<1>(0), HasSubstr("Send on port 'o_f'")); +} diff --git a/libmuscle/cpp/src/ymmsl/component.cpp b/libmuscle/cpp/src/ymmsl/component.cpp index d0b6014f..3b821bb8 100644 --- a/libmuscle/cpp/src/ymmsl/component.cpp +++ b/libmuscle/cpp/src/ymmsl/component.cpp @@ -1,5 +1,6 @@ #include +#include namespace ymmsl { namespace impl { @@ -15,6 +16,23 @@ bool allows_receiving(Operator op) { (op == Operator::S); } +std::string operator_name(Operator op) { + switch (op) + { + case Operator::NONE: + return "NONE"; + case Operator::F_INIT: + return "F_INIT"; + case Operator::O_I: + return "O_I"; + case Operator::S: + return "S"; + case Operator::O_F: + return "O_F"; + } + throw std::logic_error("Unreachable code reached"); +} + Port::Port(Identifier const & name, Operator oper) : name(name) , oper(oper) diff --git a/libmuscle/cpp/src/ymmsl/component.hpp b/libmuscle/cpp/src/ymmsl/component.hpp index 893210a8..31747af8 100644 --- a/libmuscle/cpp/src/ymmsl/component.hpp +++ b/libmuscle/cpp/src/ymmsl/component.hpp @@ -38,6 +38,13 @@ bool allows_sending(Operator op); */ bool allows_receiving(Operator op); +/** Return the name of the given operator + * + * @param op The operator + * @return The name: "NONE", "F_INIT", "O_I", "S" or "O_F" + */ +std::string operator_name(Operator op); + /** A port on a component. * * Ports are used by components to send or receive messages on. They are diff --git a/libmuscle/cpp/src/ymmsl/ymmsl.hpp b/libmuscle/cpp/src/ymmsl/ymmsl.hpp index ce10ced6..06c4f509 100644 --- a/libmuscle/cpp/src/ymmsl/ymmsl.hpp +++ b/libmuscle/cpp/src/ymmsl/ymmsl.hpp @@ -10,6 +10,7 @@ namespace ymmsl { using impl::allows_sending; using impl::allows_receiving; + using impl::operator_name; using impl::Conduit; using impl::Identifier; using impl::operator<<; diff --git a/libmuscle/python/libmuscle/mmsf_validator.py b/libmuscle/python/libmuscle/mmsf_validator.py index d2c6ac43..1f85aeb7 100644 --- a/libmuscle/python/libmuscle/mmsf_validator.py +++ b/libmuscle/python/libmuscle/mmsf_validator.py @@ -120,8 +120,10 @@ def _check_send_receive( self._check_transition(operator, port_name) if port_name in self._current_ports_used: - # We're using the same port for a second time, this is fine when we're - # allowed to do this operator immediately again: + # We're using the same port for a second time, this is fine if: + # 1. We're allowed to do this operator immediately again, and + # 2. All ports of the current operator have been used + # Both are checked by _check_transition: self._check_transition(operator, port_name) self._current_ports_used.append(port_name) diff --git a/libmuscle/python/libmuscle/test/test_mmsf_validator.py b/libmuscle/python/libmuscle/test/test_mmsf_validator.py index 4a6bd69b..673fcc82 100644 --- a/libmuscle/python/libmuscle/test/test_mmsf_validator.py +++ b/libmuscle/python/libmuscle/test/test_mmsf_validator.py @@ -155,7 +155,7 @@ def test_micro(mock_peer_info, caplog): for _ in range(5): validator.reuse_instance() validator.check_receive("f_i", None) - validator.check_receive("o_f", None) + validator.check_send("o_f", None) validator.reuse_instance() validator.check_receive("f_i", None) From 2ac553364b778aa48b3670044d6bb62a85586dca Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 27 Aug 2024 15:59:15 +0200 Subject: [PATCH 7/8] Use MMSFValidator in Instance And implement checks in send() and receive() to check if the port's operator allows sending/receiving. --- libmuscle/cpp/src/libmuscle/instance.cpp | 41 +++++++++++++++---- libmuscle/cpp/src/libmuscle/instance.hpp | 7 ++++ .../cpp/src/libmuscle/tests/test_instance.cpp | 12 ++++++ .../libmuscle/tests/test_snapshot_manager.cpp | 1 + 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/libmuscle/cpp/src/libmuscle/instance.cpp b/libmuscle/cpp/src/libmuscle/instance.cpp index 8edc11c1..f8f83e78 100644 --- a/libmuscle/cpp/src/libmuscle/instance.cpp +++ b/libmuscle/cpp/src/libmuscle/instance.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -130,6 +131,7 @@ class Instance::Impl { SettingsManager settings_manager_; std::unique_ptr snapshot_manager_; std::unique_ptr trigger_manager_; + std::unique_ptr mmsf_validator_; Optional first_run_; Optional do_reuse_; bool do_resume_; @@ -151,8 +153,8 @@ class Instance::Impl { std::vector<::ymmsl::Port> list_declared_ports_() const; void check_port_( - std::string const & port_name, Optional slot = {}, - bool allow_slot_out_of_range = false); + std::string const & port_name, Optional slot, + bool is_send, bool allow_slot_out_of_range = false); bool receive_settings_(); bool have_f_init_connections_(); @@ -239,6 +241,10 @@ Instance::Impl::Impl( set_local_log_level_(); set_remote_log_level_(); setup_profiling_(); + // MMSFValidator needs a connected port manager, and does some logging + if (! (InstanceFlags::SKIP_MMSF_SEQUENCE_CHECKS & flags_)) { + mmsf_validator_.reset(new MMSFValidator(*port_manager_, *logger_)); + } #ifdef MUSCLE_ENABLE_MPI auto sbase_data = Data(settings_manager_.base); msgpack::sbuffer sbuf; @@ -268,6 +274,7 @@ Instance::Impl::~Impl() { bool Instance::Impl::reuse_instance() { api_guard_->verify_reuse_instance(); + if (mmsf_validator_) mmsf_validator_->reuse_instance(); bool do_reuse; if (do_reuse_.is_set()) { @@ -278,6 +285,9 @@ bool Instance::Impl::reuse_instance() { do_reuse = decide_reuse_instance_(); } + if (do_resume_ && !do_init_ && mmsf_validator_) + mmsf_validator_->skip_f_init(); + // now first_run_, do_resume_ and do_init_ are also set correctly #ifdef MUSCLE_ENABLE_MPI if (mpi_barrier_.is_root()) { @@ -411,7 +421,8 @@ void Instance::Impl::send(std::string const & port_name, Message const & message if (mpi_barrier_.is_root()) { #endif - check_port_(port_name); + check_port_(port_name, {}, true); + if (mmsf_validator_) mmsf_validator_->check_send(port_name, {}); if (!message.has_settings()) { Message msg(message); msg.set_settings(settings_manager_.overlay); @@ -434,7 +445,8 @@ void Instance::Impl::send( try { #endif - check_port_(port_name, slot); + check_port_(port_name, slot, true); + if (mmsf_validator_) mmsf_validator_->check_send(port_name, slot); if (!message.has_settings()) { Message msg(message); msg.set_settings(settings_manager_.overlay); @@ -570,7 +582,8 @@ Message Instance::Impl::receive_message( try { #endif - check_port_(port_name, slot, true); + check_port_(port_name, slot, false, true); + if (mmsf_validator_) mmsf_validator_->check_receive(port_name, slot); Reference port_ref(port_name); auto const & port = port_manager_->get_port(port_name); @@ -842,7 +855,7 @@ std::vector<::ymmsl::Port> Instance::Impl::list_declared_ports_() const { */ void Instance::Impl::check_port_( std::string const & port_name, Optional slot, - bool allow_slot_out_of_range) + bool is_send, bool allow_slot_out_of_range) { if (!port_manager_->port_exists(port_name)) { std::ostringstream oss; @@ -852,8 +865,22 @@ void Instance::Impl::check_port_( throw std::logic_error(oss.str()); } + auto & port = port_manager_->get_port(port_name); + if (is_send) { + if (!::ymmsl::allows_sending(port.oper)) { + std::ostringstream oss; + oss << " Port " << port_name << " does not allow sending messages."; + throw std::logic_error(oss.str()); + } + } else { + if (!::ymmsl::allows_receiving(port.oper)) { + std::ostringstream oss; + oss << " Port " << port_name << " does not allow receiving messages."; + throw std::logic_error(oss.str()); + } + } + if (slot.is_set()) { - auto & port = port_manager_->get_port(port_name); if (!port.is_vector()) { std::ostringstream oss; oss << "Port \"" << port_name << "\" is not a vector port, but a slot was"; diff --git a/libmuscle/cpp/src/libmuscle/instance.hpp b/libmuscle/cpp/src/libmuscle/instance.hpp index 27c658bc..8b3b44c0 100644 --- a/libmuscle/cpp/src/libmuscle/instance.hpp +++ b/libmuscle/cpp/src/libmuscle/instance.hpp @@ -69,6 +69,13 @@ enum class InstanceFlags : int { * `ymmsl.KeepsStateForNextUse.NECESSARY`). */ STATE_NOT_REQUIRED_FOR_NEXT_USE = 8, + + /** Disable the checks whether the MMSF is strictly followed when + * sending/receiving messages. + * + * See MMSFValidator for a detailed description of the checks. + */ + SKIP_MMSF_SEQUENCE_CHECKS = 16, }; inline InstanceFlags operator|(InstanceFlags a, InstanceFlags b) { diff --git a/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp b/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp index 7ff08ef0..68031a83 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -426,6 +427,8 @@ TEST_F(libmuscle_instance, get_setting) { } TEST_F(libmuscle_instance, list_ports) { + ASSERT_TRUE(port_manager_.list_ports.called_once_with()); + port_manager_.list_ports.call_args_list.clear(); instance_.list_ports(); ASSERT_TRUE(port_manager_.list_ports.called_once_with()); } @@ -554,10 +557,19 @@ TEST_F(libmuscle_instance, send_after_resize) { instance_.send("out_r", mock_msg, 13); } +TEST_F(libmuscle_instance, send_on_receiving_port) { + Message mock_msg(0.0); + ASSERT_THROW((instance_.send("in_v", mock_msg, 3)), std::logic_error); +} + TEST_F(libmuscle_instance, receive_on_invalid_port) { ASSERT_THROW(instance_.receive("does_not_exist"), std::logic_error); } +TEST_F(libmuscle_instance, receive_on_sending_port) { + ASSERT_THROW(instance_.receive("out_v", 3), std::logic_error); +} + TEST_F(libmuscle_instance, receive_f_init) { Message mock_msg(0.0, Settings()); communicator_.receive_message.return_value = std::make_tuple(mock_msg, 0.0); diff --git a/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp b/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp index 7ad4218a..f12ad1b5 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include From 7e9fd46fca8cbb7a9bb13154842def38b7d3dd73 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 28 Aug 2024 09:53:48 +0200 Subject: [PATCH 8/8] Add warning when Operator.NONE is used for a connected port And small changes based on PR feedback --- .../cpp/src/libmuscle/mmsf_validator.cpp | 36 +++++++++++-------- .../cpp/src/libmuscle/mmsf_validator.hpp | 4 +-- libmuscle/python/libmuscle/mmsf_validator.py | 10 ++++++ 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp b/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp index 3c34e2cc..037c3c46 100644 --- a/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp +++ b/libmuscle/cpp/src/libmuscle/mmsf_validator.cpp @@ -45,6 +45,14 @@ MMSFValidator::MMSFValidator(PortManager const& port_manager, Logger & logger) connected_ports_[op] = connected_ports; } + if (!connected_ports_[Operator::NONE].empty()) + logger_.warning( + "This instance is using ports with Operator.NONE. This does not " + "adhere to the Multiscale Modelling and Simulation Framework " + "and may lead to deadlocks. You can disable this warning by " + "setting the flag InstanceFlags::SKIP_MMSF_SEQUENCE_CHECKS " + "when creating the libmuscle::Instance."); + // Allowed operator transitions, the following are unconditionally allowed allowed_transitions_[Operator::NONE] = {Operator::NONE, Operator::F_INIT}; allowed_transitions_[Operator::F_INIT] = {Operator::O_I, Operator::O_F}; @@ -52,6 +60,8 @@ MMSFValidator::MMSFValidator(PortManager const& port_manager, Logger & logger) allowed_transitions_[Operator::S] = {Operator::O_I, Operator::O_F}; allowed_transitions_[Operator::O_F] = {Operator::NONE}; // If there are operators without connected ports, we can skip over those + // This logic is transitive, i.e. when there are no connected ports for both + // F_INIT and O_I, we will also add NONE -> S to self._allowed_transition: for (auto const op : {Operator::F_INIT, Operator::O_I, Operator::S, Operator::O_F}) { if (connected_ports_[op].empty()) { // Find all transitions A -> op -> B and allow transition A -> B: @@ -61,8 +71,7 @@ MMSFValidator::MMSFValidator(PortManager const& port_manager, Logger & logger) if (!contains(allowed, op)) continue; // op is not in the allowed list for (auto const & to_op : allowed_transitions_[op]) { - // add to_op to allowed, if it is not already in the list: - if (std::find(allowed.begin(), allowed.end(), to_op) == allowed.end()) + if (!contains(allowed, to_op)) allowed.push_back(to_op); } } @@ -85,18 +94,18 @@ MMSFValidator::MMSFValidator(PortManager const& port_manager, Logger & logger) void MMSFValidator::check_send( std::string const& port_name, Optional slot) { - check_send_receive(port_name, slot); + check_send_receive_(port_name, slot); } void MMSFValidator::check_receive( std::string const& port_name, Optional slot) { - check_send_receive(port_name, slot); + check_send_receive_(port_name, slot); } void MMSFValidator::reuse_instance() { if (enabled_) { - check_transition(Operator::NONE, ""); + check_transition_(Operator::NONE, ""); } } @@ -106,7 +115,7 @@ void MMSFValidator::skip_f_init() { current_ports_used_ = connected_ports_[Operator::F_INIT]; } -void MMSFValidator::check_send_receive( +void MMSFValidator::check_send_receive_( std::string const& port_name, Optional slot) { if (!enabled_) return; @@ -114,24 +123,21 @@ void MMSFValidator::check_send_receive( auto op = port_operators_[port_name]; if (current_operator_ != op) { // Operator changed, check that all ports were used in the previous operator - check_transition(op, port_name); + check_transition_(op, port_name); } - if (std::find( - current_ports_used_.begin(), - current_ports_used_.end(), - port_name) != current_ports_used_.end()) { + if (contains(current_ports_used_, port_name)) { // We're using the same port for a second time, this is fine if: // 1. We're allowed to do this operator immediately again, and // 2. All ports of the current operator have been used // Both are checked by check_transition_: - check_transition(op, port_name); + check_transition_(op, port_name); } current_ports_used_.push_back(port_name); } -void MMSFValidator::check_transition( +void MMSFValidator::check_transition_( ::ymmsl::Operator op, std::string const& port_name) { std::ostringstream expected_oss; @@ -196,8 +202,8 @@ void MMSFValidator::check_transition( ".\n" "Not adhering to the Multiscale Modelling and Simulation Framework " "may lead to deadlocks. You can disable this warning by " - "setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS " - "when creating the libmuscle.Instance."); + "setting the flag InstanceFlags::SKIP_MMSF_SEQUENCE_CHECKS " + "when creating the libmuscle::Instance."); } current_operator_ = op; diff --git a/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp b/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp index 9a7720f3..5c15fb54 100644 --- a/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp +++ b/libmuscle/cpp/src/libmuscle/mmsf_validator.hpp @@ -50,7 +50,7 @@ class MMSFValidator { private: /** Actual implementation of check_send/check_receive. */ - void check_send_receive(std::string const & port_name, Optional slot); + void check_send_receive_(std::string const & port_name, Optional slot); /** Check that a transition to the provided operator is allowed. * * Log a warning when the transition does not adhere to the MMSF. @@ -59,7 +59,7 @@ class MMSFValidator { * @param port_name The name of the port that was sent/receveived on. This is only * used for constructing the warning message. */ - void check_transition(::ymmsl::Operator op, std::string const & port_name); + void check_transition_(::ymmsl::Operator op, std::string const & port_name); PortManager const & port_manager_; Logger & logger_; diff --git a/libmuscle/python/libmuscle/mmsf_validator.py b/libmuscle/python/libmuscle/mmsf_validator.py index 1f85aeb7..ab69aa71 100644 --- a/libmuscle/python/libmuscle/mmsf_validator.py +++ b/libmuscle/python/libmuscle/mmsf_validator.py @@ -50,6 +50,14 @@ def __init__(self, port_manager: PortManager) -> None: for operator, ports in port_names.items() for port in ports} + if self._connected_ports.get(Operator.NONE, []): + _logger.warning( + "This instance is using ports with Operator.NONE. This does not " + "adhere to the Multiscale Modelling and Simulation Framework " + "and may lead to deadlocks. You can disable this warning by " + "setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS " + "when creating the libmuscle.Instance.") + # Allowed operator transitions, the following are unconditionally allowed: self._allowed_transitions = { Operator.NONE: [Operator.NONE, Operator.F_INIT], @@ -58,6 +66,8 @@ def __init__(self, port_manager: PortManager) -> None: Operator.S: [Operator.O_I, Operator.O_F], Operator.O_F: [Operator.NONE]} # If there are operators without connected ports, we can skip over those + # This logic is transitive, i.e. when there are no connected ports for both + # F_INIT and O_I, we will also add NONE -> S to self._allowed_transition: for operator in [Operator.F_INIT, Operator.O_I, Operator.S, Operator.O_F]: if not self._connected_ports.get(operator, []): # Find all transitions A -> operator -> B and allow transition A -> B: