Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deadlock detection #299

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/source/tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,60 @@
Tips & tricks
=============

Deadlock detection
==================

.. versionadded:: 0.8

MUSCLE3 has a deadlock detection mechanism, which can detect when the simulation
is deadlocked because (part of) the components of the simulation are all waiting
for a message of each other. This could happen, for example, due to a bug in
one of the components, or because the components are not correctly wired
together.

The simplest deadlock consists of two components, where the first component is
waiting to receive a message from the second component and vice versa. Because
both components are waiting for eachother, the simulation is stuck and will no
longer progress. MUSCLE3 will abort the simulation run and provide an error
message that indicates that the simulation was deadlocked:

.. code-block:: output
:caption: Example output of a deadlocked simulation

muscle_manager 2024-08-20 13:57:58,544 CRITICAL libmuscle.manager.deadlock_detector: Potential deadlock detected:
The following 2 instances are deadlocked:
1. Instance 'micro' is waiting on instance 'macro' in a receive on port 'initial_state'.
2. Instance 'macro' is waiting on instance 'micro' in a receive on port 'state_in'.


.. note::
MUSCLE3 can only detect deadlocks that are the result of components waiting
for messages to receive. "Internal" deadlocks in simulation components (for
example due to bugs in MPI logic) cannot be detected by MUSCLE3.


Configuring the deadlock detection
----------------------------------

With the default settings, MUSCLE3 will detect a deadlock 10 seconds after it
occurs. The simulation is halted after another 15 seconds have passed.
These default settings are chosen to limit the runtime impact of the deadlock
detection. It may be useful to detect deadlocks faster during development of the
simulation. This can be achieved with the special setting
``muscle_deadlock_receive_timeout``:

.. code-block:: yaml
:caption: Example configuration setting ``muscle_deadlock_receive_timeout``

ymmsl_version: v0.1
settings:
muscle_deadlock_receive_timeout: 1.0

The value provided to this setting is the initial timeout (in seconds) before
MUSCLE3 detects a deadlock. The simulation is halted after 1.5 times that
duration. Deadlock detection is disabled when a negative value is used.


Running simulation components interactively
===========================================

Expand Down
15 changes: 12 additions & 3 deletions integration_test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ def _python_wrapper(instance_name, muscle_manager, callable):
callable()


def run_manager_with_actors(ymmsl_text, tmpdir, actors):
def run_manager_with_actors(ymmsl_text, tmpdir, actors, expect_success=True):
"""Start muscle_manager along with C++ and python actors.

Args:
ymmsl_text: YMMSL configuration for the simulation
tmpdir: Temporary folder to use as runpath a
actors: a dictionary of lists containing details for each actor:
``{"instance_name": ("language", "details", ...)}``.

Expand Down Expand Up @@ -155,10 +157,17 @@ def run_manager_with_actors(ymmsl_text, tmpdir, actors):
# check results
for proc in native_processes:
proc.wait()
assert proc.returncode == 0
if expect_success:
assert proc.returncode == 0
for proc in python_processes:
proc.join()
assert proc.exitcode == 0
if expect_success:
assert proc.exitcode == 0
if not expect_success:
# Check that at least one process has failed
assert (
any(proc.returncode != 0 for proc in native_processes) or
any(proc.exitcode != 0 for proc in python_processes))


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion integration_test/test_cpp_tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_cpp_tcp_server(log_file_in_tmpdir):
assert TcpTransportClient.can_connect_to(location)

client = MPPClient([location])
msg_bytes, _ = client.receive(Reference('test_receiver.port'))
msg_bytes, _ = client.receive(Reference('test_receiver.port'), None)
msg = MPPMessage.from_bytes(msg_bytes)
client.close()

Expand Down
184 changes: 184 additions & 0 deletions integration_test/test_deadlock_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import functools
import sys

from ymmsl import Operator

from libmuscle import Instance, Message

from .conftest import skip_if_python_only, run_manager_with_actors


def suppress_deadlock_exception_output(func):
@functools.wraps(func)
def wrapper():
try:
func()
except RuntimeError as exc:
exc_str = str(exc).lower()
if "deadlock" not in exc_str and "did the peer crash?" not in exc_str:
raise
sys.exit(1)
return wrapper


@suppress_deadlock_exception_output
def deadlocking_micro():
instance = Instance({Operator.F_INIT: ["in"], Operator.O_F: ["out"]})

counter = 5 # Deadlock after 5 iterations
while instance.reuse_instance():
message = instance.receive("in")
counter -= 1
if counter > 0:
instance.send("out", message)


@suppress_deadlock_exception_output
def micro():
instance = Instance({Operator.F_INIT: ["in"], Operator.O_F: ["out"]})

while instance.reuse_instance():
message = instance.receive("in")
instance.send("out", message)


@suppress_deadlock_exception_output
def deadlocking_macro():
instance = Instance({Operator.O_I: ["out"], Operator.S: ["in"]})

while instance.reuse_instance():
for i in range(10):
message = Message(float(i), data="testing")
instance.send("out", message)
instance.receive("in")
# Deadlock:
instance.receive("in")


@suppress_deadlock_exception_output
def macro():
instance = Instance({Operator.O_I: ["out"], Operator.S: ["in"]})

while instance.reuse_instance():
for i in range(10):
message = Message(float(i), data="testing")
instance.send("out", message)
instance.receive("in")


MACRO_MICRO_CONFIG = """
ymmsl_version: v0.1
model:
name: test_model
components:
macro:
implementation: macro
micro:
implementation: micro
conduits:
macro.out: micro.in
micro.out: macro.in
settings:
muscle_deadlock_receive_timeout: 0.1
"""
MACRO_MICRO_WITH_DISPATCH_CONFIG = """
ymmsl_version: v0.1
model:
name: test_model
components:
macro:
implementation: macro
micro1:
implementation: micro
micro2:
implementation: micro
micro3:
implementation: micro
conduits:
macro.out: micro1.in
micro1.out: micro2.in
micro2.out: micro3.in
micro3.out: macro.in
settings:
muscle_deadlock_receive_timeout: 0.1
"""


def test_no_deadlock(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro": ("python", micro)})


def test_deadlock1(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro": ("python", deadlocking_micro)},
expect_success=False)


def test_deadlock2(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("python", deadlocking_macro),
"micro": ("python", micro)},
expect_success=False)


def test_no_deadlock_with_dispatch(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro1": ("python", micro),
"micro2": ("python", micro),
"micro3": ("python", micro)})


def test_deadlock1_with_dispatch(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro1": ("python", micro),
"micro2": ("python", deadlocking_micro),
"micro3": ("python", micro)},
expect_success=False)


def test_deadlock2_with_dispatch(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("python", deadlocking_macro),
"micro1": ("python", micro),
"micro2": ("python", micro),
"micro3": ("python", micro)},
expect_success=False)


@skip_if_python_only
def test_no_deadlock_cpp(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("cpp", "component_test"),
"micro": ("python", micro)})


@skip_if_python_only
def test_deadlock1_cpp(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("cpp", "component_test"),
"micro": ("python", deadlocking_micro)},
expect_success=False)


@skip_if_python_only
def test_deadlock2_cpp(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("cpp", "component_test"),
"micro1": ("python", micro),
"micro2": ("python", deadlocking_micro),
"micro3": ("cpp", "component_test")},
expect_success=False)
50 changes: 25 additions & 25 deletions libmuscle/cpp/src/libmuscle/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ Communicator::Communicator(
ymmsl::Reference const & kernel,
std::vector<int> const & index,
PortManager & port_manager,
Logger & logger, Profiler & profiler)
Logger & logger, Profiler & profiler,
MMPClient & manager)
: kernel_(kernel)
, index_(index)
, port_manager_(port_manager)
, logger_(logger)
, profiler_(profiler)
, manager_(manager)
, server_()
, clients_()
, receive_timeout_(10.0) // Notify manager, by default, after 10 seconds waiting in receive_message()
{}

std::vector<std::string> Communicator::get_locations() const {
Expand Down Expand Up @@ -119,10 +122,10 @@ std::tuple<Message, double> Communicator::receive_message(
Port & port = (port_name == "muscle_settings_in") ?
port_manager_.muscle_settings_in() : port_manager_.get_port(port_name);

std::string port_and_slot = port_name;
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
if (slot.is_set())
logger_.debug("Waiting for message on ", port_name, "[", slot.get(), "]");
else
logger_.debug("Waiting for message on ", port_name);
port_and_slot = port_name + "[" + std::to_string(slot.get()) + "]";
logger_.debug("Waiting for message on ", port_and_slot);
std::vector<int> slot_list;
if (slot.is_set())
slot_list.emplace_back(slot.get());
Expand All @@ -138,8 +141,12 @@ std::tuple<Message, double> Communicator::receive_message(
Endpoint snd_endpoint = peer_info_.get().get_peer_endpoints(
recv_endpoint.port, slot_list).at(0);
MPPClient & client = get_client_(snd_endpoint.instance());
std::string peer_instance = static_cast<std::string>(snd_endpoint.instance());
ReceiveTimeoutHandler handler(
manager_, peer_instance, port_name, slot, receive_timeout_);
ReceiveTimeoutHandler *timeout_handler = receive_timeout_ < 0 ? nullptr : &handler;
auto msg_and_profile = try_receive_(
client, recv_endpoint.ref(), snd_endpoint.kernel);
client, recv_endpoint.ref(), snd_endpoint.kernel, port_and_slot, timeout_handler);
auto & msg = std::get<0>(msg_and_profile);

ProfileEvent recv_decode_event(
Expand Down Expand Up @@ -203,37 +210,24 @@ std::tuple<Message, double> Communicator::receive_message(
if (expected_message_number != mpp_message.message_number) {
if (expected_message_number - 1 == mpp_message.message_number and
port.is_resuming(slot)) {
if (slot.is_set())
logger_.debug("Discarding received message on ", port_name,
"[", slot.get(), "]: resuming from weakly",
" consistent snapshot");
else
logger_.debug("Discarding received message on ", port_name,
": resuming from weakly consistent snapshot");
logger_.debug("Discarding received message on ", port_and_slot,
": resuming from weakly consistent snapshot");
port.set_resumed(slot);
return receive_message(port_name, slot, default_msg);
}
std::ostringstream oss;
oss << "Received message on " << port_name;
if (slot.is_set())
oss << "[" << slot.get() << "]";
oss << "Received message on " << port_and_slot;
oss << " with unexpected message number " << mpp_message.message_number;
oss << ". Was expecting " << expected_message_number;
oss << ". Are you resuming from an inconsistent snapshot?";
throw std::runtime_error(oss.str());
}
port.increment_num_messages(slot);

if (slot.is_set())
logger_.debug("Received message on ", port_name, "[", slot.get(), "]");
else
logger_.debug("Received message on ", port_name);
logger_.debug("Received message on ", port_and_slot);

if (is_close_port(message.data())) {
if (slot.is_set())
logger_.debug("Port ", port_name, "[", slot.get(), "] is now closed");
else
logger_.debug("Port ", port_name, " is now closed");
logger_.debug("Port ", port_and_slot, " is now closed");
}
return std::make_tuple(message, mpp_message.saved_until);
}
Expand Down Expand Up @@ -289,9 +283,15 @@ Endpoint Communicator::get_endpoint_(
}

std::tuple<std::vector<char>, mcp::ProfileData> Communicator::try_receive_(
MPPClient & client, Reference const & receiver, Reference const & peer) {
MPPClient & client, Reference const & receiver, Reference const & peer,
std::string const & port_and_slot, ReceiveTimeoutHandler *timeout_handler) {
try {
return client.receive(receiver);
return client.receive(receiver, timeout_handler);
} catch(Deadlock const & err) {
throw std::runtime_error(
"Deadlock detected when when receiving a message on '" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when when!

port_and_slot +
"'. See manager logs for more detail.");
} catch(std::runtime_error const & err) {
throw std::runtime_error(
"Error while receiving a message: connection with peer '" +
Expand Down
Loading
Loading