Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/mmsf validator #301

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
8 changes: 5 additions & 3 deletions docs/source/examples/python/interact_coupling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Any, Optional, Tuple, Dict

from libmuscle import Instance, Message, USES_CHECKPOINT_API
from libmuscle import Instance, InstanceFlags, Message
from libmuscle.runner import run_simulation
from ymmsl import (
Component, Conduit, Configuration, Model, Operator, Ports, Settings)
Expand Down Expand Up @@ -241,7 +241,8 @@ def temporal_coupler() -> None:
"""
instance = Instance({
Operator.O_I: ['a_out', 'b_out'],
Operator.S: ['a_in', 'b_in']})
Operator.S: ['a_in', 'b_in']},
InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
# Receive initial messages and initialise state
Expand Down Expand Up @@ -275,7 +276,8 @@ def checkpointing_temporal_coupler() -> None:
"""
instance = Instance({
Operator.O_I: ['a_out', 'b_out'],
Operator.S: ['a_in', 'b_in']}, USES_CHECKPOINT_API)
Operator.S: ['a_in', 'b_in']},
InstanceFlags.USES_CHECKPOINT_API | InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
if instance.resuming():
Expand Down
5 changes: 3 additions & 2 deletions integration_test/test_snapshot_complex_coupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from ymmsl import Operator, load, dump

from libmuscle import (
Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API)
Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API,
SKIP_MMSF_SEQUENCE_CHECKS)
from libmuscle.manager.run_dir import RunDir

from .conftest import run_manager_with_actors, ls_snapshots
Expand Down Expand Up @@ -58,7 +59,7 @@ def cache_component(max_channels=2):
def echo_component(max_channels=2):
ports = {Operator.F_INIT: [f'in{i+1}' for i in range(max_channels)],
Operator.O_F: [f'out{i+1}' for i in range(max_channels)]}
instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE)
instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE | SKIP_MMSF_SEQUENCE_CHECKS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe it would be nicer to modify this so that it receives all messages into a list, and then sends them all out? Then it would be MMSF-compatible, and we could run with checks enabled. Of course it's a test, so it's fine also because we know what we're doing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it as is: I think it's also nice to have an integration test for this flag.


while instance.reuse_instance():
for p_in, p_out in zip(ports[Operator.F_INIT], ports[Operator.O_F]):
Expand Down
41 changes: 34 additions & 7 deletions libmuscle/cpp/src/libmuscle/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <libmuscle/data.hpp>
#include <libmuscle/mcp/data_pack.hpp>
#include <libmuscle/logger.hpp>
#include <libmuscle/mmsf_validator.hpp>
#include <libmuscle/mmp_client.hpp>
#include <libmuscle/peer_info.hpp>
#include <libmuscle/port_manager.hpp>
Expand Down Expand Up @@ -130,6 +131,7 @@ class Instance::Impl {
SettingsManager settings_manager_;
std::unique_ptr<SnapshotManager> snapshot_manager_;
std::unique_ptr<TriggerManager> trigger_manager_;
std::unique_ptr<MMSFValidator> mmsf_validator_;
Optional<bool> first_run_;
Optional<bool> do_reuse_;
bool do_resume_;
Expand All @@ -151,8 +153,8 @@ class Instance::Impl {

std::vector<::ymmsl::Port> list_declared_ports_() const;
void check_port_(
std::string const & port_name, Optional<int> slot = {},
bool allow_slot_out_of_range = false);
std::string const & port_name, Optional<int> slot,
bool is_send, bool allow_slot_out_of_range = false);

bool receive_settings_();
bool have_f_init_connections_();
Expand Down Expand Up @@ -239,6 +241,10 @@ Instance::Impl::Impl(
set_local_log_level_();
set_remote_log_level_();
setup_profiling_();
// MMSFValidator needs a connected port manager, and does some logging
if (! (InstanceFlags::SKIP_MMSF_SEQUENCE_CHECKS & flags_)) {
mmsf_validator_.reset(new MMSFValidator(*port_manager_, *logger_));
}
#ifdef MUSCLE_ENABLE_MPI
auto sbase_data = Data(settings_manager_.base);
msgpack::sbuffer sbuf;
Expand Down Expand Up @@ -268,6 +274,7 @@ Instance::Impl::~Impl() {

bool Instance::Impl::reuse_instance() {
api_guard_->verify_reuse_instance();
if (mmsf_validator_) mmsf_validator_->reuse_instance();

bool do_reuse;
if (do_reuse_.is_set()) {
Expand All @@ -278,6 +285,9 @@ bool Instance::Impl::reuse_instance() {
do_reuse = decide_reuse_instance_();
}

if (do_resume_ && !do_init_ && mmsf_validator_)
mmsf_validator_->skip_f_init();

// now first_run_, do_resume_ and do_init_ are also set correctly
#ifdef MUSCLE_ENABLE_MPI
if (mpi_barrier_.is_root()) {
Expand Down Expand Up @@ -411,7 +421,8 @@ void Instance::Impl::send(std::string const & port_name, Message const & message
if (mpi_barrier_.is_root()) {
#endif

check_port_(port_name);
check_port_(port_name, {}, true);
if (mmsf_validator_) mmsf_validator_->check_send(port_name, {});
if (!message.has_settings()) {
Message msg(message);
msg.set_settings(settings_manager_.overlay);
Expand All @@ -434,7 +445,8 @@ void Instance::Impl::send(
try {
#endif

check_port_(port_name, slot);
check_port_(port_name, slot, true);
if (mmsf_validator_) mmsf_validator_->check_send(port_name, slot);
if (!message.has_settings()) {
Message msg(message);
msg.set_settings(settings_manager_.overlay);
Expand Down Expand Up @@ -570,7 +582,8 @@ Message Instance::Impl::receive_message(
try {
#endif

check_port_(port_name, slot, true);
check_port_(port_name, slot, false, true);
if (mmsf_validator_) mmsf_validator_->check_receive(port_name, slot);

Reference port_ref(port_name);
auto const & port = port_manager_->get_port(port_name);
Expand Down Expand Up @@ -842,7 +855,7 @@ std::vector<::ymmsl::Port> Instance::Impl::list_declared_ports_() const {
*/
void Instance::Impl::check_port_(
std::string const & port_name, Optional<int> slot,
bool allow_slot_out_of_range)
bool is_send, bool allow_slot_out_of_range)
{
if (!port_manager_->port_exists(port_name)) {
std::ostringstream oss;
Expand All @@ -852,8 +865,22 @@ void Instance::Impl::check_port_(
throw std::logic_error(oss.str());
}

auto & port = port_manager_->get_port(port_name);
if (is_send) {
if (!::ymmsl::allows_sending(port.oper)) {
std::ostringstream oss;
oss << " Port " << port_name << " does not allow sending messages.";
throw std::logic_error(oss.str());
}
} else {
if (!::ymmsl::allows_receiving(port.oper)) {
std::ostringstream oss;
oss << " Port " << port_name << " does not allow receiving messages.";
throw std::logic_error(oss.str());
}
}

if (slot.is_set()) {
auto & port = port_manager_->get_port(port_name);
if (!port.is_vector()) {
std::ostringstream oss;
oss << "Port \"" << port_name << "\" is not a vector port, but a slot was";
Expand Down
7 changes: 7 additions & 0 deletions libmuscle/cpp/src/libmuscle/instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ enum class InstanceFlags : int {
* `ymmsl.KeepsStateForNextUse.NECESSARY`).
*/
STATE_NOT_REQUIRED_FOR_NEXT_USE = 8,

/** Disable the checks whether the MMSF is strictly followed when
* sending/receiving messages.
*
* See MMSFValidator for a detailed description of the checks.
*/
SKIP_MMSF_SEQUENCE_CHECKS = 16,
};

inline InstanceFlags operator|(InstanceFlags a, InstanceFlags b) {
Expand Down
207 changes: 207 additions & 0 deletions libmuscle/cpp/src/libmuscle/mmsf_validator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#include "mmsf_validator.hpp"

#include <sstream>

namespace {

template <typename Container, typename T>
inline bool contains(Container const & container, T const & value) {
return std::find(container.begin(), container.end(), value) != container.end();
}

}

namespace libmuscle { namespace _MUSCLE_IMPL_NS {

using ::ymmsl::Operator;

MMSFValidator::MMSFValidator(PortManager const& port_manager, Logger & logger)
: port_manager_(port_manager)
, logger_(logger)
, enabled_(true)
, current_operator_(Operator::NONE)
{
auto port_names = port_manager.list_ports();

connected_ports_[Operator::NONE] = {};
connected_ports_[Operator::F_INIT] = {};
connected_ports_[Operator::O_I] = {};
connected_ports_[Operator::S] = {};
connected_ports_[Operator::O_F] = {};

for (auto const & value : port_names) {
auto const & op = value.first;
std::vector<std::string> connected_ports;
for (auto const & port_name : value.second) {
auto const & port_obj = port_manager.get_port(port_name);
if (port_obj.is_connected()) {
connected_ports.push_back(port_name);
}
if (port_obj.is_vector()) {
enabled_ = false; // We don't support vector ports (yet)
}
port_operators_[port_name] = op;
}
connected_ports_[op] = connected_ports;
}

// Allowed operator transitions, the following are unconditionally allowed
allowed_transitions_[Operator::NONE] = {Operator::NONE, Operator::F_INIT};
allowed_transitions_[Operator::F_INIT] = {Operator::O_I, Operator::O_F};
allowed_transitions_[Operator::O_I] = {Operator::S};
allowed_transitions_[Operator::S] = {Operator::O_I, Operator::O_F};
allowed_transitions_[Operator::O_F] = {Operator::NONE};
// If there are operators without connected ports, we can skip over those
for (auto const op : {Operator::F_INIT, Operator::O_I, Operator::S, Operator::O_F}) {
if (connected_ports_[op].empty()) {
// Find all transitions A -> op -> B and allow transition A -> B:
for (auto & item : allowed_transitions_) {
if (item.first == op) continue;
auto & allowed = item.second;
if (!contains(allowed, op))
continue; // op is not in the allowed list
for (auto const & to_op : allowed_transitions_[op]) {
// add to_op to allowed, if it is not already in the list:
if (std::find(allowed.begin(), allowed.end(), to_op) == allowed.end())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't you write a helper function for this std::find?

allowed.push_back(to_op);
}
}
}
}
// Sort allowed transitions for more logical log messages
for (auto & item : allowed_transitions_) {
std::sort(item.second.begin(), item.second.end());
}

if (enabled_) {
logger_.debug("MMSF Validator is enabled");
} else {
logger_.debug(
"MMSF Validator is disabled: this instance uses vector ports, "
"which are not supported by the MMSF Validator.");
}
}

void MMSFValidator::check_send(
std::string const& port_name, Optional<int> slot)
{
check_send_receive(port_name, slot);
}

void MMSFValidator::check_receive(
std::string const& port_name, Optional<int> slot)
{
check_send_receive(port_name, slot);
}

void MMSFValidator::reuse_instance() {
if (enabled_) {
check_transition(Operator::NONE, "");
}
}

void MMSFValidator::skip_f_init() {
// Pretend we're now in F_INIT and we have already received on all F_INIT ports
current_operator_ = Operator::F_INIT;
current_ports_used_ = connected_ports_[Operator::F_INIT];
}

void MMSFValidator::check_send_receive(
std::string const& port_name, Optional<int> slot)
{
if (!enabled_) return;

auto op = port_operators_[port_name];
if (current_operator_ != op) {
// Operator changed, check that all ports were used in the previous operator
check_transition(op, port_name);
}

if (std::find(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the helper helps here too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I did, after I had to repeat this too often. Apparently I wasn't sharp enough to replace all instances of std::find I'd written up to that point 😅

current_ports_used_.begin(),
current_ports_used_.end(),
port_name) != current_ports_used_.end()) {
// We're using the same port for a second time, this is fine if:
// 1. We're allowed to do this operator immediately again, and
// 2. All ports of the current operator have been used
// Both are checked by check_transition_:
check_transition(op, port_name);
}

current_ports_used_.push_back(port_name);
}

void MMSFValidator::check_transition(
::ymmsl::Operator op, std::string const& port_name)
{
std::ostringstream expected_oss;

std::vector<std::string> unused_ports;
for (auto const & port : connected_ports_[current_operator_]) {
if (!contains(current_ports_used_, port)) {
unused_ports.push_back(port);
}
}
if (!unused_ports.empty()) {
// We didn't complete the current phase
if (::ymmsl::allows_receiving(current_operator_)) {
expected_oss << "a receive";
} else {
expected_oss << "a send";
}
expected_oss << " on any of these " << ::ymmsl::operator_name(current_operator_) << " ports: ";
for (std::size_t i = 0; i < unused_ports.size(); ++i) {
if (i > 0) expected_oss << ", ";
expected_oss << unused_ports[i];
}
} else if (!contains(allowed_transitions_[current_operator_], op)) {
// Transition to the operator is not allowed
std::size_t i = 0;
for (auto const & to_op : allowed_transitions_[current_operator_]) {
if (i++ > 0) expected_oss << " or ";
if (to_op == Operator::NONE) {
expected_oss << "a call to reuse_instance()";
} else if (!connected_ports_[to_op].empty()) {
if (::ymmsl::allows_receiving(to_op)) {
expected_oss << "a receive";
} else {
expected_oss << "a send";
}
expected_oss << " on an " << ::ymmsl::operator_name(to_op);
expected_oss << " port (";
std::size_t j = 0;
for (auto const & port : connected_ports_[to_op]) {
if (j++ > 0) expected_oss << ", ";
expected_oss << port;
}
expected_oss << ")";
}
}
}

std::string expected = expected_oss.str();
if (!expected.empty()) {
// We expected something else, log a warning
std::string action;
if (op == Operator::NONE) {
action = "reuse_instance()";
} else if (op == Operator::F_INIT || op == Operator::S) {
action = "Receive on port '" + port_name + "'";
} else {
action = "Send on port '" + port_name + "'";
}

logger_.warning(
action, " does not adhere to the MMSF: was expecting ", expected,
".\n"
"Not adhering to the Multiscale Modelling and Simulation Framework "
"may lead to deadlocks. You can disable this warning by "
"setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS "
"when creating the libmuscle.Instance.");
}

current_operator_ = op;
current_ports_used_.clear();
}

} }
Loading
Loading