Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/mmsf validator #301

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
8 changes: 5 additions & 3 deletions docs/source/examples/python/interact_coupling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Any, Optional, Tuple, Dict

from libmuscle import Instance, Message, USES_CHECKPOINT_API
from libmuscle import Instance, InstanceFlags, Message
from libmuscle.runner import run_simulation
from ymmsl import (
Component, Conduit, Configuration, Model, Operator, Ports, Settings)
Expand Down Expand Up @@ -241,7 +241,8 @@ def temporal_coupler() -> None:
"""
instance = Instance({
Operator.O_I: ['a_out', 'b_out'],
Operator.S: ['a_in', 'b_in']})
Operator.S: ['a_in', 'b_in']},
InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
# Receive initial messages and initialise state
Expand Down Expand Up @@ -275,7 +276,8 @@ def checkpointing_temporal_coupler() -> None:
"""
instance = Instance({
Operator.O_I: ['a_out', 'b_out'],
Operator.S: ['a_in', 'b_in']}, USES_CHECKPOINT_API)
Operator.S: ['a_in', 'b_in']},
InstanceFlags.USES_CHECKPOINT_API | InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
if instance.resuming():
Expand Down
5 changes: 3 additions & 2 deletions integration_test/test_snapshot_complex_coupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from ymmsl import Operator, load, dump

from libmuscle import (
Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API)
Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API,
SKIP_MMSF_SEQUENCE_CHECKS)
from libmuscle.manager.run_dir import RunDir

from .conftest import run_manager_with_actors, ls_snapshots
Expand Down Expand Up @@ -58,7 +59,7 @@ def cache_component(max_channels=2):
def echo_component(max_channels=2):
ports = {Operator.F_INIT: [f'in{i+1}' for i in range(max_channels)],
Operator.O_F: [f'out{i+1}' for i in range(max_channels)]}
instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE)
instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE | SKIP_MMSF_SEQUENCE_CHECKS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe it would be nicer to modify this so that it receives all messages into a list, and then sends them all out? Then it would be MMSF-compatible, and we could run with checks enabled. Of course it's a test, so it's fine also because we know what we're doing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it as is: I think it's also nice to have an integration test for this flag.


while instance.reuse_instance():
for p_in, p_out in zip(ports[Operator.F_INIT], ports[Operator.O_F]):
Expand Down
22 changes: 22 additions & 0 deletions libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from libmuscle.logging_handler import MuscleManagerHandler
from libmuscle.mpp_message import ClosePort
from libmuscle.mmp_client import MMPClient
from libmuscle.mmsf_validator import MMSFValidator
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from libmuscle.profiler import Profiler
Expand Down Expand Up @@ -89,6 +90,14 @@ class InstanceFlags(Flag):
:external:py:attr:`ymmsl.KeepsStateForNextUse.NECESSARY`).
"""

SKIP_MMSF_SEQUENCE_CHECKS = auto()
"""Disable the checks whether the MMSF is strictly followed when sending/receiving
messages.

See :class:`~libmuscle.mmsf_validator.MMSFValidator` for a detailed description of
the checks.
"""


_CHECKPOINT_SUPPORT_MASK = (
InstanceFlags.USES_CHECKPOINT_API |
Expand Down Expand Up @@ -186,6 +195,10 @@ def __init__(
self._set_local_log_level()
self._set_remote_log_level()
self._setup_profiling()
# MMSFValidator needs a connected port manager, and does some logging
self._mmsf_validator = (
None if InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS in self._flags
else MMSFValidator(self._port_manager))

def reuse_instance(self) -> bool:
"""Decide whether to run this instance again.
Expand Down Expand Up @@ -222,6 +235,8 @@ def reuse_instance(self) -> bool:
:meth:`save_final_snapshot`, or the checkpointing tutorial.
"""
self._api_guard.verify_reuse_instance()
if self._mmsf_validator:
self._mmsf_validator.reuse_instance()

if self._do_reuse is not None:
# thank you, should_save_final_snapshot, for running this already
Expand All @@ -230,6 +245,9 @@ def reuse_instance(self) -> bool:
else:
do_reuse = self._decide_reuse_instance()

if self._do_resume and not self._do_init and self._mmsf_validator:
self._mmsf_validator.skip_f_init()

# now _first_run, _do_resume and _do_init are also set correctly

do_implicit_checkpoint = (
Expand Down Expand Up @@ -428,6 +446,8 @@ def send(self, port_name: str, message: Message,
slot: The slot to send the message on, if any.
"""
self.__check_port(port_name, slot)
if self._mmsf_validator:
self._mmsf_validator.check_send(port_name, slot)
if message.settings is None:
message = copy(message)
message.settings = self._settings_manager.overlay
Expand Down Expand Up @@ -883,6 +903,8 @@ def __receive_message(
description of those.
"""
self.__check_port(port_name, slot, True)
if self._mmsf_validator:
self._mmsf_validator.check_receive(port_name, slot)

port = self._port_manager.get_port(port_name)
if port.operator == Operator.F_INIT:
Expand Down
205 changes: 205 additions & 0 deletions libmuscle/python/libmuscle/mmsf_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import logging
import sys
import types
from typing import List, Optional

from libmuscle.port_manager import PortManager
from ymmsl import Operator


_logger = logging.getLogger(__name__)


class MMSFValidator:
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
"""The MMSF Validator checks whether Instances are following the Multiscale
Modelling and Simulation Framework when sending and receiving messages.

In particular it checks that in order:

1. reuse_instance() is called
2. Messages are received on all F_INIT ports
3. The following sub-items happen in order, 0 or more times:

a. Messages are sent on all O_I ports
b. Messages are received on all S ports

4. Messages are sent on all O_F ports

If any message is sent or received out of order, a warning is logged to indicate
that the instance is not following the MMSF pattern. In some cases (for example the
time bridge in ``examples/python/interact_coupling.py``) this is expected and the
warnings can be disabled by setting the
:attr:`~libmuscle.instance.InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS` flag.

Note:
LourensVeen marked this conversation as resolved.
Show resolved Hide resolved
Checks on vector ports are not implemented. When the instance uses vector ports,
the MMSF Validator will be disabled.
"""
def __init__(self, port_manager: PortManager) -> None:
self._port_manager = port_manager

port_names = port_manager.list_ports()
port_objects = {
operator: [port_manager.get_port(name) for name in names]
for operator, names in port_names.items()}
self._connected_ports = {
operator: [str(port.name) for port in ports if port.is_connected()]
for operator, ports in port_objects.items()}
self._port_operators = {
port: operator
for operator, ports in port_names.items()
for port in ports}

# Allowed operator transitions, the following are unconditionally allowed:
self._allowed_transitions = {
(Operator.NONE, Operator.NONE),
(Operator.NONE, Operator.F_INIT),
(Operator.F_INIT, Operator.O_I),
(Operator.F_INIT, Operator.O_F),
(Operator.O_I, Operator.S),
(Operator.S, Operator.O_I),
(Operator.S, Operator.O_F),
(Operator.O_F, Operator.NONE),
}
# If there are operators without connected ports, we can skip over those
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
for operator in [Operator.F_INIT, Operator.O_I, Operator.S, Operator.O_F]:
if not self._connected_ports.get(operator, []):
# Find all transitions A -> operator -> B and allow transition A -> B:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realise this before but this is actually a bit more involved, because we're editing _allowed_transitions on the fly. So this will also enable e.g. NONE -> S if F_INIT and O_I have no ports. That's what we want, but not immediately obvious. Maybe this comment could be extended to point out that it's transitive to avoid future confusion?

Also, in that case, will we end up with None -> O_I in _allowed_transitions, despite O_I not having connected ports? That's probably not a problem, since that transition cannot occur anyway?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this comment could be extended to point out that it's transitive to avoid future confusion?

Sure, will do

Also, in that case, will we end up with None -> O_I in _allowed_transitions, despite O_I not having connected ports? That's probably not a problem, since that transition cannot occur anyway?

Correct, and it is deliberately so. Users can still send on an O_I port when it is not connected (the Communicator will just discard the message). For the MMSFValidator this transition still happens and is valid :)

skip_from = []
LourensVeen marked this conversation as resolved.
Show resolved Hide resolved
skip_to = []
for from_op, to_op in self._allowed_transitions:
if from_op is operator:
skip_to.append(to_op)
if to_op is operator:
skip_from.append(from_op)
for from_op in skip_from:
for to_op in skip_to:
self._allowed_transitions.add((from_op, to_op))

# Disable this validator when the instance uses vector ports to keep this class
# simpler. Support for vector ports may be added in the future.
self._enabled = not any(
port.is_vector() for ports in port_objects.values() for port in ports)
_logger.debug(
LourensVeen marked this conversation as resolved.
Show resolved Hide resolved
"MMSF Validator is %s", "enabled" if self._enabled else "disabled")

# State tracking
self._current_ports_used: List[str] = []
self._current_operator: Operator = Operator.NONE

def check_send(self, port_name: str, slot: Optional[int]) -> None:
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
"""Check that sending on the provided port adheres to the MMSF."""
self._check_send_receive(port_name, slot)

def check_receive(self, port_name: str, slot: Optional[int]) -> None:
"""Check that receiving on the provided port adheres to the MMSF."""
self._check_send_receive(port_name, slot)

def reuse_instance(self) -> None:
"""Check that a reuse_instance() adheres to the MMSF."""
if not self._enabled:
return
self._check_transition(Operator.NONE)

def skip_f_init(self) -> None:
"""Call when resuming from an intermediate snapshot: F_INIT is skipped."""
# Pretend we're now in F_INIT and we have already received on all F_INIT ports:
self._current_operator = Operator.F_INIT
self._current_ports_used = self._connected_ports.get(Operator.F_INIT, [])

def _check_send_receive(
self, port_name: str, slot: Optional[int]) -> None:
"""Actual implementation of check_send/check_receive."""
if not self._enabled:
return

operator = self._port_operators[port_name]
if self._current_operator != operator:
# Operator changed, check that all ports were used in the previous operator
self._check_transition(operator, port_name)

if port_name in self._current_ports_used:
# We're using the same port for a second time, this is fine when we're
LourensVeen marked this conversation as resolved.
Show resolved Hide resolved
# allowed to do this operator immediately again:
self._check_transition(operator, port_name)

self._current_ports_used.append(port_name)

def _check_transition(self, operator: Operator, port_name: str = "") -> None:
"""Check that a transition to the provided operator is allowed.

Log a warning when the transition does not adhere to the MMSF.

Args:
operator: Operator to transition to.
port_name: The name of the port that was sent/receveived on. This is only
used for constructing the warning message.
"""
connected_ports = self._connected_ports.get(self._current_operator, [])
expected: str = ""

unused_ports = [
port for port in connected_ports
if port not in self._current_ports_used]
if unused_ports:
# We didn't complete the current phase
if self._current_operator in (Operator.F_INIT, Operator.S):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._current_operator.allows_receiving() would be better I think, because it doesn't duplicate the information on which ports can send or receive.

I think Operator.NONE allows both actually, there should probably be a branch for that too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe Operator.NONE should be deprecated, now that I think of it. I think it came from MUSCLE2, the idea being to allow any kind of generic message exchange, but it seems like everything people want to do fits the MMSF, even if it's generic RPC. And we can disable the checks if needed. Let's add a branch and leave it like that for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this logic I'm using Operator.NONE to signal a reuse-instance. I'm assuming there are no ports with this operator.

Do you think it's logical to add a check in MMSFValidator.__init__ that there are no ports with Operator.NONE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you would? Then people could still use it, but they'd have to disable the validator and that definitely makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Added a warning in MMSFValidator.__init__ (python) and constructor (C++)

expected = "a receive"
else:
expected = "a send"
expected += (
f" on any of these {self._current_operator.name} ports: "
+ ", ".join(unused_ports))

elif (self._current_operator, operator) not in self._allowed_transitions:
# Transition to the operator is not allowed, now figure out what we were
# actually expecting.
# First find the allowed transitions from self._current_operator, that are
# also 'valid' (i.e. have connected ports):
allowed = [
LourensVeen marked this conversation as resolved.
Show resolved Hide resolved
to_op for from_op, to_op in self._allowed_transitions
if from_op is self._current_operator and
(to_op in self._connected_ports or to_op is Operator.NONE)]
# Build the message we want to display to users:
expected_lst = []
for to_op in sorted(allowed, key=lambda op: op.value):
ports = ', '.join(map(repr, self._connected_ports.get(to_op, [])))
if to_op is Operator.NONE:
LourensVeen marked this conversation as resolved.
Show resolved Hide resolved
expected_lst.append("a call to reuse_instance()")
elif to_op in (Operator.F_INIT, Operator.S):
expected_lst.append(f"a receive on an {to_op.name} port ({ports})")
else:
expected_lst.append(f"a send on an {to_op.name} port ({ports})")
assert expected_lst
expected = " or ".join(expected_lst)

if expected:
# We expected something else, log a warning:
if operator is Operator.NONE:
action = "reuse_instance()"
elif operator in (Operator.F_INIT, Operator.S):
action = f"Receive on port '{port_name}'"
else:
action = f"Send on port '{port_name}'"

# Find the file:line where the user called send/receive/reuse_instance
try:
frame: Optional[types.FrameType] = sys._getframe()
except Exception:
frame = None # sys._getframe() is not guaranteed available
while (frame
and frame.f_globals.get("__name__", "").startswith("libmuscle.")):
# This frame is still part of a libmuscle module, step up:
frame = frame.f_back
loc = f" ({frame.f_code.co_filename}:{frame.f_lineno})" if frame else ""

_logger.warning(
"%s%s does not adhere to the MMSF: was expecting %s.\n"
"Not adhering to the Multiscale Modelling and Simulation Framework "
"may lead to deadlocks. You can disable this warning by "
"setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS "
"when creating the libmuscle.Instance.",
action, loc, expected)

self._current_operator = operator
self._current_ports_used = []
2 changes: 2 additions & 0 deletions libmuscle/python/libmuscle/test/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ def test_get_setting(instance, settings_manager):


def test_list_ports(instance, port_manager):
port_manager.list_ports.assert_called_once_with()
port_manager.list_ports.reset_mock()
instance.list_ports()
port_manager.list_ports.assert_called_once_with()

Expand Down
Loading
Loading