diff --git a/docs/source/tips.rst b/docs/source/tips.rst index a68b4983..658c963f 100644 --- a/docs/source/tips.rst +++ b/docs/source/tips.rst @@ -2,6 +2,60 @@ Tips & tricks ============= +Deadlock detection +================== + +.. versionadded:: 0.8 + +MUSCLE3 has a deadlock detection mechanism, which can detect when the simulation +is deadlocked because (part of) the components of the simulation are all waiting +for a message of each other. This could happen, for example, due to a bug in +one of the components, or because the components are not correctly wired +together. + +The simplest deadlock consists of two components, where the first component is +waiting to receive a message from the second component and vice versa. Because +both components are waiting for eachother, the simulation is stuck and will no +longer progress. MUSCLE3 will abort the simulation run and provide an error +message that indicates that the simulation was deadlocked: + +.. code-block:: output + :caption: Example output of a deadlocked simulation + + muscle_manager 2024-08-20 13:57:58,544 CRITICAL libmuscle.manager.deadlock_detector: Potential deadlock detected: + The following 2 instances are deadlocked: + 1. Instance 'micro' is waiting on instance 'macro' in a receive on port 'initial_state'. + 2. Instance 'macro' is waiting on instance 'micro' in a receive on port 'state_in'. + + +.. note:: + MUSCLE3 can only detect deadlocks that are the result of components waiting + for messages to receive. "Internal" deadlocks in simulation components (for + example due to bugs in MPI logic) cannot be detected by MUSCLE3. + + +Configuring the deadlock detection +---------------------------------- + +With the default settings, MUSCLE3 will detect a deadlock 10 seconds after it +occurs. The simulation is halted after another 15 seconds have passed. +These default settings are chosen to limit the runtime impact of the deadlock +detection. It may be useful to detect deadlocks faster during development of the +simulation. This can be achieved with the special setting +``muscle_deadlock_receive_timeout``: + +.. code-block:: yaml + :caption: Example configuration setting ``muscle_deadlock_receive_timeout`` + + ymmsl_version: v0.1 + settings: + muscle_deadlock_receive_timeout: 1.0 + +The value provided to this setting is the initial timeout (in seconds) before +MUSCLE3 detects a deadlock. The simulation is halted after 1.5 times that +duration. Deadlock detection is disabled when a negative value is used. + + Running simulation components interactively =========================================== diff --git a/integration_test/conftest.py b/integration_test/conftest.py index 18ab5ce4..4cf8701c 100644 --- a/integration_test/conftest.py +++ b/integration_test/conftest.py @@ -72,10 +72,12 @@ def _python_wrapper(instance_name, muscle_manager, callable): callable() -def run_manager_with_actors(ymmsl_text, tmpdir, actors): +def run_manager_with_actors(ymmsl_text, tmpdir, actors, expect_success=True): """Start muscle_manager along with C++ and python actors. Args: + ymmsl_text: YMMSL configuration for the simulation + tmpdir: Temporary folder to use as runpath a actors: a dictionary of lists containing details for each actor: ``{"instance_name": ("language", "details", ...)}``. @@ -155,10 +157,17 @@ def run_manager_with_actors(ymmsl_text, tmpdir, actors): # check results for proc in native_processes: proc.wait() - assert proc.returncode == 0 + if expect_success: + assert proc.returncode == 0 for proc in python_processes: proc.join() - assert proc.exitcode == 0 + if expect_success: + assert proc.exitcode == 0 + if not expect_success: + # Check that at least one process has failed + assert ( + any(proc.returncode != 0 for proc in native_processes) or + any(proc.exitcode != 0 for proc in python_processes)) @pytest.fixture diff --git a/integration_test/test_cpp_tcp_server.py b/integration_test/test_cpp_tcp_server.py index 912b838d..50a60c16 100644 --- a/integration_test/test_cpp_tcp_server.py +++ b/integration_test/test_cpp_tcp_server.py @@ -37,7 +37,7 @@ def test_cpp_tcp_server(log_file_in_tmpdir): assert TcpTransportClient.can_connect_to(location) client = MPPClient([location]) - msg_bytes, _ = client.receive(Reference('test_receiver.port')) + msg_bytes, _ = client.receive(Reference('test_receiver.port'), None) msg = MPPMessage.from_bytes(msg_bytes) client.close() diff --git a/integration_test/test_deadlock_detection.py b/integration_test/test_deadlock_detection.py new file mode 100644 index 00000000..c6d5d55e --- /dev/null +++ b/integration_test/test_deadlock_detection.py @@ -0,0 +1,184 @@ +import functools +import sys + +from ymmsl import Operator + +from libmuscle import Instance, Message + +from .conftest import skip_if_python_only, run_manager_with_actors + + +def suppress_deadlock_exception_output(func): + @functools.wraps(func) + def wrapper(): + try: + func() + except RuntimeError as exc: + exc_str = str(exc).lower() + if "deadlock" not in exc_str and "did the peer crash?" not in exc_str: + raise + sys.exit(1) + return wrapper + + +@suppress_deadlock_exception_output +def deadlocking_micro(): + instance = Instance({Operator.F_INIT: ["in"], Operator.O_F: ["out"]}) + + counter = 5 # Deadlock after 5 iterations + while instance.reuse_instance(): + message = instance.receive("in") + counter -= 1 + if counter > 0: + instance.send("out", message) + + +@suppress_deadlock_exception_output +def micro(): + instance = Instance({Operator.F_INIT: ["in"], Operator.O_F: ["out"]}) + + while instance.reuse_instance(): + message = instance.receive("in") + instance.send("out", message) + + +@suppress_deadlock_exception_output +def deadlocking_macro(): + instance = Instance({Operator.O_I: ["out"], Operator.S: ["in"]}) + + while instance.reuse_instance(): + for i in range(10): + message = Message(float(i), data="testing") + instance.send("out", message) + instance.receive("in") + # Deadlock: + instance.receive("in") + + +@suppress_deadlock_exception_output +def macro(): + instance = Instance({Operator.O_I: ["out"], Operator.S: ["in"]}) + + while instance.reuse_instance(): + for i in range(10): + message = Message(float(i), data="testing") + instance.send("out", message) + instance.receive("in") + + +MACRO_MICRO_CONFIG = """ +ymmsl_version: v0.1 +model: + name: test_model + components: + macro: + implementation: macro + micro: + implementation: micro + conduits: + macro.out: micro.in + micro.out: macro.in +settings: + muscle_deadlock_receive_timeout: 0.1 +""" +MACRO_MICRO_WITH_DISPATCH_CONFIG = """ +ymmsl_version: v0.1 +model: + name: test_model + components: + macro: + implementation: macro + micro1: + implementation: micro + micro2: + implementation: micro + micro3: + implementation: micro + conduits: + macro.out: micro1.in + micro1.out: micro2.in + micro2.out: micro3.in + micro3.out: macro.in +settings: + muscle_deadlock_receive_timeout: 0.1 +""" + + +def test_no_deadlock(tmp_path): + run_manager_with_actors( + MACRO_MICRO_CONFIG, tmp_path, + {"macro": ("python", macro), + "micro": ("python", micro)}) + + +def test_deadlock1(tmp_path): + run_manager_with_actors( + MACRO_MICRO_CONFIG, tmp_path, + {"macro": ("python", macro), + "micro": ("python", deadlocking_micro)}, + expect_success=False) + + +def test_deadlock2(tmp_path): + run_manager_with_actors( + MACRO_MICRO_CONFIG, tmp_path, + {"macro": ("python", deadlocking_macro), + "micro": ("python", micro)}, + expect_success=False) + + +def test_no_deadlock_with_dispatch(tmp_path): + run_manager_with_actors( + MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path, + {"macro": ("python", macro), + "micro1": ("python", micro), + "micro2": ("python", micro), + "micro3": ("python", micro)}) + + +def test_deadlock1_with_dispatch(tmp_path): + run_manager_with_actors( + MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path, + {"macro": ("python", macro), + "micro1": ("python", micro), + "micro2": ("python", deadlocking_micro), + "micro3": ("python", micro)}, + expect_success=False) + + +def test_deadlock2_with_dispatch(tmp_path): + run_manager_with_actors( + MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path, + {"macro": ("python", deadlocking_macro), + "micro1": ("python", micro), + "micro2": ("python", micro), + "micro3": ("python", micro)}, + expect_success=False) + + +@skip_if_python_only +def test_no_deadlock_cpp(tmp_path): + run_manager_with_actors( + MACRO_MICRO_CONFIG, tmp_path, + {"macro": ("cpp", "component_test"), + "micro": ("python", micro)}) + + +@skip_if_python_only +def test_deadlock1_cpp(tmp_path): + run_manager_with_actors( + MACRO_MICRO_CONFIG, tmp_path, + {"macro": ("cpp", "component_test"), + "micro": ("python", deadlocking_micro)}, + expect_success=False) + + +@skip_if_python_only +def test_deadlock2_cpp(tmp_path): + run_manager_with_actors( + MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path, + {"macro": ("cpp", "component_test"), + "micro1": ("python", micro), + "micro2": ("python", deadlocking_micro), + "micro3": ("cpp", "component_test")}, + expect_success=False) diff --git a/libmuscle/cpp/src/libmuscle/communicator.cpp b/libmuscle/cpp/src/libmuscle/communicator.cpp index 29cd4293..33dce3c4 100644 --- a/libmuscle/cpp/src/libmuscle/communicator.cpp +++ b/libmuscle/cpp/src/libmuscle/communicator.cpp @@ -34,14 +34,17 @@ Communicator::Communicator( ymmsl::Reference const & kernel, std::vector const & index, PortManager & port_manager, - Logger & logger, Profiler & profiler) + Logger & logger, Profiler & profiler, + MMPClient & manager) : kernel_(kernel) , index_(index) , port_manager_(port_manager) , logger_(logger) , profiler_(profiler) + , manager_(manager) , server_() , clients_() + , receive_timeout_(10.0) // Notify manager, by default, after 10 seconds waiting in receive_message() {} std::vector Communicator::get_locations() const { @@ -119,10 +122,10 @@ std::tuple Communicator::receive_message( Port & port = (port_name == "muscle_settings_in") ? port_manager_.muscle_settings_in() : port_manager_.get_port(port_name); + std::string port_and_slot = port_name; if (slot.is_set()) - logger_.debug("Waiting for message on ", port_name, "[", slot.get(), "]"); - else - logger_.debug("Waiting for message on ", port_name); + port_and_slot = port_name + "[" + std::to_string(slot.get()) + "]"; + logger_.debug("Waiting for message on ", port_and_slot); std::vector slot_list; if (slot.is_set()) slot_list.emplace_back(slot.get()); @@ -138,8 +141,11 @@ std::tuple Communicator::receive_message( Endpoint snd_endpoint = peer_info_.get().get_peer_endpoints( recv_endpoint.port, slot_list).at(0); MPPClient & client = get_client_(snd_endpoint.instance()); + ReceiveTimeoutHandler handler( + manager_, snd_endpoint.instance(), port_name, slot, receive_timeout_); + ReceiveTimeoutHandler *timeout_handler = receive_timeout_ < 0 ? nullptr : &handler; auto msg_and_profile = try_receive_( - client, recv_endpoint.ref(), snd_endpoint.kernel); + client, recv_endpoint.ref(), snd_endpoint.kernel, port_and_slot, timeout_handler); auto & msg = std::get<0>(msg_and_profile); ProfileEvent recv_decode_event( @@ -203,20 +209,13 @@ std::tuple Communicator::receive_message( if (expected_message_number != mpp_message.message_number) { if (expected_message_number - 1 == mpp_message.message_number and port.is_resuming(slot)) { - if (slot.is_set()) - logger_.debug("Discarding received message on ", port_name, - "[", slot.get(), "]: resuming from weakly", - " consistent snapshot"); - else - logger_.debug("Discarding received message on ", port_name, - ": resuming from weakly consistent snapshot"); + logger_.debug("Discarding received message on ", port_and_slot, + ": resuming from weakly consistent snapshot"); port.set_resumed(slot); return receive_message(port_name, slot, default_msg); } std::ostringstream oss; - oss << "Received message on " << port_name; - if (slot.is_set()) - oss << "[" << slot.get() << "]"; + oss << "Received message on " << port_and_slot; oss << " with unexpected message number " << mpp_message.message_number; oss << ". Was expecting " << expected_message_number; oss << ". Are you resuming from an inconsistent snapshot?"; @@ -224,16 +223,10 @@ std::tuple Communicator::receive_message( } port.increment_num_messages(slot); - if (slot.is_set()) - logger_.debug("Received message on ", port_name, "[", slot.get(), "]"); - else - logger_.debug("Received message on ", port_name); + logger_.debug("Received message on ", port_and_slot); if (is_close_port(message.data())) { - if (slot.is_set()) - logger_.debug("Port ", port_name, "[", slot.get(), "] is now closed"); - else - logger_.debug("Port ", port_name, " is now closed"); + logger_.debug("Port ", port_and_slot, " is now closed"); } return std::make_tuple(message, mpp_message.saved_until); } @@ -289,9 +282,15 @@ Endpoint Communicator::get_endpoint_( } std::tuple, mcp::ProfileData> Communicator::try_receive_( - MPPClient & client, Reference const & receiver, Reference const & peer) { + MPPClient & client, Reference const & receiver, Reference const & peer, + std::string const & port_and_slot, ReceiveTimeoutHandler *timeout_handler) { try { - return client.receive(receiver); + return client.receive(receiver, timeout_handler); + } catch(Deadlock const & err) { + throw std::runtime_error( + "Deadlock detected when receiving a message on '" + + port_and_slot + + "'. See manager logs for more detail."); } catch(std::runtime_error const & err) { throw std::runtime_error( "Error while receiving a message: connection with peer '" + diff --git a/libmuscle/cpp/src/libmuscle/communicator.hpp b/libmuscle/cpp/src/libmuscle/communicator.hpp index 535c988f..7f472d79 100644 --- a/libmuscle/cpp/src/libmuscle/communicator.hpp +++ b/libmuscle/cpp/src/libmuscle/communicator.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,7 @@ #include #include #include +#include #include #include @@ -53,7 +55,8 @@ class Communicator { ymmsl::Reference const & kernel, std::vector const & index, PortManager & port_manager, - Logger & logger, Profiler & profiler); + Logger & logger, Profiler & profiler, + MMPClient & manager); /** Returns a list of locations that we can be reached at. * @@ -127,6 +130,19 @@ class Communicator { */ void shutdown(); + /** Update the timeout after which the manager is notified that we are + * waiting for a message. + * + * @param receive_timeout Timeout (seconds). A negative number disables + * the deadlock notification mechanism. + */ + void set_receive_timeout(double receive_timeout) { receive_timeout_ = receive_timeout; } + + /** Get the timeout after which the manager is notified that we are + * waiting for a message. + */ + double get_receive_timeout() const { return receive_timeout_; } + PRIVATE: using Ports_ = std::unordered_map; @@ -140,7 +156,8 @@ class Communicator { std::tuple, mcp::ProfileData> try_receive_( MPPClient & client, ymmsl::Reference const & receiver, - ymmsl::Reference const & peer); + ymmsl::Reference const & peer, std::string const & port_and_slot, + ReceiveTimeoutHandler *handler); void close_port_(std::string const & port_name, Optional slot = {}); @@ -186,9 +203,11 @@ class Communicator { PortManager & port_manager_; Logger & logger_; Profiler & profiler_; + MMPClient & manager_; MPPServer server_; std::unordered_map> clients_; Optional peer_info_; + double receive_timeout_; }; } } diff --git a/libmuscle/cpp/src/libmuscle/instance.cpp b/libmuscle/cpp/src/libmuscle/instance.cpp index 8edc11c1..027f2ca8 100644 --- a/libmuscle/cpp/src/libmuscle/instance.cpp +++ b/libmuscle/cpp/src/libmuscle/instance.cpp @@ -143,6 +143,7 @@ class Instance::Impl { void deregister_(); void setup_checkpointing_(); void setup_profiling_(); + void setup_receive_timeout_(); ::ymmsl::Reference make_full_name_(int argc, char const * const argv[]) const; std::string extract_manager_location_(int argc, char const * const argv[]) const; @@ -226,7 +227,7 @@ Instance::Impl::Impl( port_manager_.reset(new PortManager(index_(), ports)); communicator_.reset( new Communicator( - name_(), index_(), *port_manager_, *logger_, *profiler_)); + name_(), index_(), *port_manager_, *logger_, *profiler_, *manager_)); snapshot_manager_.reset(new SnapshotManager( instance_name_, *manager_, *port_manager_, *logger_)); trigger_manager_.reset(new TriggerManager()); @@ -239,6 +240,7 @@ Instance::Impl::Impl( set_local_log_level_(); set_remote_log_level_(); setup_profiling_(); + setup_receive_timeout_(); #ifdef MUSCLE_ENABLE_MPI auto sbase_data = Data(settings_manager_.base); msgpack::sbuffer sbuf; @@ -556,6 +558,30 @@ void Instance::Impl::setup_profiling_() { profiler_->set_level(profile_level_str); } +void Instance::Impl::setup_receive_timeout_() { + double timeout; + try { + timeout = settings_manager_.get_setting( + instance_name_, "muscle_deadlock_receive_timeout").as(); + if (timeout >= 0 && timeout < 0.1) { + logger_->info( + "Provided muscle_deadlock_receive_timeout (", timeout, + ") was less than the minimum of 0.1 seconds, setting it to 0.1."); + timeout = 0.1; + } + communicator_->set_receive_timeout(timeout); + } + catch (std::runtime_error const & e) { + logger_->error(e.what() + std::string(" in muscle_deadlock_receive_timeout")); + } + catch (std::out_of_range const &) { + // muscle_deadlock_receive_timeout not set, do nothing and keep the default + } + logger_->debug( + "Timeout on receiving messages set to ", + communicator_->get_receive_timeout()); +} + Message Instance::Impl::receive_message( std::string const & port_name, Optional slot, diff --git a/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp b/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp index c45df271..3f7b5aa9 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp +++ b/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp @@ -19,6 +19,10 @@ enum class RequestType { submit_profile_events = 6, submit_snapshot = 7, get_checkpoint_info = 8, + // Connection deadlock detection + waiting_for_receive = 9, + waiting_for_receive_done = 10, + is_deadlocked = 11, // MUSCLE Peer Protocol get_next_message = 21 diff --git a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp index c2551a3b..821f6d8b 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include namespace { @@ -81,6 +83,30 @@ int connect(std::string const & address) { + port); } +/** Poll until timeout (in seconds) is reached. Retry when interrupted with EINTR. */ +inline int poll_retry_eintr(pollfd *fds, nfds_t nfds, double timeout) { + using std::chrono::duration; + using std::chrono::steady_clock; + using std::chrono::milliseconds; + using std::chrono::duration_cast; + + const auto timeout_duration = duration(timeout); + const auto deadline = steady_clock::now() + timeout_duration; + while (true) { + int timeout_ms = duration_cast(deadline - steady_clock::now()).count(); + int poll_result = poll(fds, nfds, timeout_ms); + + if (poll_result >= 0) + return poll_result; + + if (errno != EINTR) + throw std::runtime_error( + "Unexpected error during poll(): " + + std::string(std::strerror(errno))); + // poll() was interrupted by a signal: retry with re-calculated timeout + } +} + } @@ -128,12 +154,32 @@ TcpTransportClient::~TcpTransportClient() { } std::tuple, ProfileData> TcpTransportClient::call( - char const * req_buf, std::size_t req_len + char const * req_buf, std::size_t req_len, + TimeoutHandler* timeout_handler ) const { ProfileTimestamp start_wait; send_frame(socket_fd_, req_buf, req_len); - int64_t length = recv_int64(socket_fd_); + int64_t length; + if (timeout_handler == nullptr) { + length = recv_int64(socket_fd_); + } else { + bool did_timeout = false; + pollfd socket_poll_fd; + socket_poll_fd.fd = socket_fd_; + socket_poll_fd.events = POLLIN; + while (poll_retry_eintr(&socket_poll_fd, 1, timeout_handler->get_timeout()) == 0) { + timeout_handler->on_timeout(); + did_timeout = true; + } + // socket is ready for a receive, this call shouldn't block: + length = recv_int64(socket_fd_); + + if (did_timeout) { + timeout_handler->on_receive(); + } + } + ProfileTimestamp start_transfer; std::vector result(length); recv_all(socket_fd_, result.data(), result.size()); diff --git a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp index 9da32e4b..4f33f4f7 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp @@ -46,7 +46,8 @@ class TcpTransportClient : public TransportClient { * @return A byte array with the received data. */ virtual std::tuple, ProfileData> call( - char const * req_buf, std::size_t req_len) const override; + char const * req_buf, std::size_t req_len, + TimeoutHandler* timeout_handler=nullptr) const override; /** Closes this client. * diff --git a/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp b/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp index 556f74f2..bfe03ee3 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp @@ -18,6 +18,21 @@ namespace libmuscle { namespace _MUSCLE_IMPL_NS { namespace mcp { using ProfileData = std::tuple< ProfileTimestamp, ProfileTimestamp, ProfileTimestamp>; +class TimeoutHandler { + public: + virtual ~TimeoutHandler() = default; + + /** Timeout (in seconds) after which on_timeout is called. */ + virtual double get_timeout() = 0; + /** Callback when getTimeout seconds have passed without a response from * the peer. + */ + virtual void on_timeout() = 0; + /** Callback when receiving a response from the peer. + * + * Note: this method is only called when the request has timed out. + */ + virtual void on_receive() = 0; +}; /** A client that connects to an MCP transport server. * @@ -73,7 +88,8 @@ class TransportClient { * received data, and the timestamps. */ virtual std::tuple, ProfileData> call( - char const * req_buf, std::size_t req_len) const = 0; + char const * req_buf, std::size_t req_len, + TimeoutHandler* timeout_handler=nullptr) const = 0; /** Closes this client. * diff --git a/libmuscle/cpp/src/libmuscle/mmp_client.cpp b/libmuscle/cpp/src/libmuscle/mmp_client.cpp index ecacc2c2..eb56bcf2 100644 --- a/libmuscle/cpp/src/libmuscle/mmp_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mmp_client.cpp @@ -323,6 +323,39 @@ void MMPClient::deregister_instance() { } } +void MMPClient::waiting_for_receive( + ::ymmsl::Reference const & peer_instance_id, std::string const & port_name, + Optional slot) +{ + auto request = Data::list( + static_cast(RequestType::waiting_for_receive), + static_cast(instance_id_), + static_cast(peer_instance_id), port_name, encode_optional(slot)); + + auto response = call_manager_(request); +} + +void MMPClient::waiting_for_receive_done( + ::ymmsl::Reference const & peer_instance_id, std::string const & port_name, + Optional slot) +{ + auto request = Data::list( + static_cast(RequestType::waiting_for_receive_done), + static_cast(instance_id_), + static_cast(peer_instance_id), port_name, encode_optional(slot)); + + auto response = call_manager_(request); +} + +bool MMPClient::is_deadlocked() { + auto request = Data::list( + static_cast(RequestType::is_deadlocked), + static_cast(instance_id_)); + + auto response = call_manager_(request); + return response[1].as(); +} + DataConstRef MMPClient::call_manager_(DataConstRef const & request) { std::lock_guard lock(mutex_); diff --git a/libmuscle/cpp/src/libmuscle/mmp_client.hpp b/libmuscle/cpp/src/libmuscle/mmp_client.hpp index c082809b..86f653fc 100644 --- a/libmuscle/cpp/src/libmuscle/mmp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mmp_client.hpp @@ -123,6 +123,19 @@ class MMPClient { void deregister_instance(); + /** Notify the manager that we're waiting to receive a message. */ + void waiting_for_receive( + ::ymmsl::Reference const & peer_instance_id, std::string const & port_name, + Optional slot); + + /** Notify the manager that we're done waiting to receive a message. */ + void waiting_for_receive_done( + ::ymmsl::Reference const & peer_instance_id, std::string const & port_name, + Optional slot); + + /** Ask the manager if we're part of a deadlock. */ + bool is_deadlocked(); + private: ymmsl::Reference instance_id_; mcp::TcpTransportClient transport_client_; diff --git a/libmuscle/cpp/src/libmuscle/mpp_client.cpp b/libmuscle/cpp/src/libmuscle/mpp_client.cpp index feba677f..973fcf7c 100644 --- a/libmuscle/cpp/src/libmuscle/mpp_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mpp_client.cpp @@ -26,7 +26,7 @@ MPPClient::MPPClient(std::vector const & locations) { } std::tuple, ProfileData> MPPClient::receive( - Reference const & receiver) + Reference const & receiver, mcp::TimeoutHandler *timeout_handler) { auto request = Data::list( static_cast(RequestType::get_next_message), @@ -37,7 +37,7 @@ std::tuple, ProfileData> MPPClient::receive( // can then overwrite after encoding with the length? msgpack::pack(sbuf, request); - return transport_client_->call(sbuf.data(), sbuf.size()); + return transport_client_->call(sbuf.data(), sbuf.size(), timeout_handler); } void MPPClient::close() { diff --git a/libmuscle/cpp/src/libmuscle/mpp_client.hpp b/libmuscle/cpp/src/libmuscle/mpp_client.hpp index 8cc07a5c..3b5c58cb 100644 --- a/libmuscle/cpp/src/libmuscle/mpp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mpp_client.hpp @@ -59,7 +59,8 @@ class MPPClient { * @return The received message. */ std::tuple, mcp::ProfileData> receive( - ::ymmsl::Reference const & receiver); + ::ymmsl::Reference const & receiver, + mcp::TimeoutHandler *timeout_handler=nullptr); /** Closes this client. * diff --git a/libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp new file mode 100644 index 00000000..1bb2bd10 --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp @@ -0,0 +1,38 @@ +#include "receive_timeout_handler.hpp" + +#include + +namespace libmuscle { namespace _MUSCLE_IMPL_NS { + +ReceiveTimeoutHandler::ReceiveTimeoutHandler( + MMPClient& manager, ::ymmsl::Reference const peer_instance, + std::string const& port_name, Optional slot, double timeout) + : manager_(manager) + , peer_instance_(peer_instance) + , port_name_(port_name) + , slot_(slot) + , timeout_(timeout) + , num_timeout_(0) {} + +double ReceiveTimeoutHandler::get_timeout() +{ + // Increase timeout by a factor 1.5 with every timeout we hit: + return timeout_ * std::pow(1.5, num_timeout_); +} + +void ReceiveTimeoutHandler::on_timeout() +{ + if (num_timeout_ == 0) + manager_.waiting_for_receive(peer_instance_, port_name_, slot_); + else + if (manager_.is_deadlocked()) + throw Deadlock(); + num_timeout_ ++; +} + +void ReceiveTimeoutHandler::on_receive() +{ + manager_.waiting_for_receive_done(peer_instance_, port_name_, slot_); +} + +} } diff --git a/libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp new file mode 100644 index 00000000..1b70456d --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include + +namespace libmuscle { namespace _MUSCLE_IMPL_NS { + +/** Error thrown when a deadlock is detected. */ +class Deadlock : public std::runtime_error { + public: + Deadlock() : std::runtime_error("Deadlock detected") {}; + virtual ~Deadlock() = default; +}; + + +/** Timeout handler when receiving messages from peers. + * + * This handler sends a message to the Muscle Manager when the receive times out (and + * another message when the message does arrive). + * + * This is used by the manager to detect if the simulation is in a deadlock, where a + * cycle of instances is waiting on each other. + */ +class ReceiveTimeoutHandler : public mcp::TimeoutHandler { + public: + ReceiveTimeoutHandler( + MMPClient & manager, + ::ymmsl::Reference const peer_instance, + std::string const & port_name, + Optional slot, + double timeout); + + virtual ~ReceiveTimeoutHandler() = default; + + double get_timeout() override; + void on_timeout() override; + void on_receive() override; + + private: + MMPClient & manager_; + ::ymmsl::Reference const peer_instance_; + std::string const & port_name_; + Optional slot_; + double timeout_; + int num_timeout_; +}; + +} } diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp index acfc24c9..55eb654e 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp @@ -102,6 +102,8 @@ class MockCommunicator : public MockClass { NAME_MOCK_MEM_FUN(MockCommunicator, send_message); NAME_MOCK_MEM_FUN(MockCommunicator, receive_message); NAME_MOCK_MEM_FUN(MockCommunicator, shutdown); + NAME_MOCK_MEM_FUN(MockCommunicator, set_receive_timeout); + NAME_MOCK_MEM_FUN(MockCommunicator, get_receive_timeout); } MockCommunicator() { @@ -112,15 +114,16 @@ class MockCommunicator : public MockClass { ymmsl::Reference const & kernel, std::vector const & index, PortManager & port_manager, - Logger & logger, Profiler & profiler) + Logger & logger, Profiler & profiler, + MMPClient & manager) { init_from_return_value(); - constructor(kernel, index, port_manager, logger, profiler); + constructor(kernel, index, port_manager, logger, profiler, manager); } MockFun< Void, Val, Val const &>, - Obj, Obj, Obj> + Obj, Obj, Obj, Obj> constructor; MockFun>> get_locations; @@ -136,6 +139,10 @@ class MockCommunicator : public MockClass { ::mock_communicator::CommunicatorReceiveMessageMock receive_message; MockFun shutdown; + + MockFun> set_receive_timeout; + + MockFun> get_receive_timeout; }; using Communicator = MockCommunicator; diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp index c520b696..d00fde23 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp @@ -32,6 +32,9 @@ class MockMMPClient : public MockClass { NAME_MOCK_MEM_FUN(MockMMPClient, register_instance); NAME_MOCK_MEM_FUN(MockMMPClient, request_peers); NAME_MOCK_MEM_FUN(MockMMPClient, deregister_instance); + NAME_MOCK_MEM_FUN(MockMMPClient, waiting_for_receive); + NAME_MOCK_MEM_FUN(MockMMPClient, waiting_for_receive_done); + NAME_MOCK_MEM_FUN(MockMMPClient, is_deadlocked); // Create some empty return objects for return values with a complex // structure, to make it easier to set them in the tests or fixtures. @@ -95,6 +98,16 @@ class MockMMPClient : public MockClass { >>> request_peers; MockFun deregister_instance; + + MockFun, Val, Val> + > waiting_for_receive; + + MockFun, Val, Val> + > waiting_for_receive_done; + + MockFun> is_deadlocked; }; using MMPClient = MockMMPClient; diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp index 12a3daa1..6b6ec7f3 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp @@ -31,10 +31,19 @@ class MockMPPClient : public MockClass { MockFun const &>> constructor; - MockFun< + // Use option 1 of the "Functions with default arguments" section + // (from mock_support.hpp:192) + using BaseMockFun = MockFun< Val, mcp::ProfileData>>, - Val<::ymmsl::Reference const &> - > receive; + Val<::ymmsl::Reference const &>, Obj>; + struct MockOverloadedFun : BaseMockFun { + std::tuple, mcp::ProfileData> operator()( + ::ymmsl::Reference const & receiver, + mcp::TimeoutHandler *timeout_handler=nullptr) { + return BaseMockFun::operator()(receiver, timeout_handler); + } + } receive; + MockFun close; }; diff --git a/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp b/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp index ccd0db88..012916a7 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include // Test code dependencies @@ -62,6 +63,7 @@ using libmuscle::_MUSCLE_IMPL_NS::PeerInfo; using libmuscle::_MUSCLE_IMPL_NS::PeerLocations; using libmuscle::_MUSCLE_IMPL_NS::PortsDescription; using libmuscle::_MUSCLE_IMPL_NS::mcp::ProfileData; +using libmuscle::_MUSCLE_IMPL_NS::mcp::TimeoutHandler; using libmuscle::_MUSCLE_IMPL_NS::ProfileTimestamp; using ymmsl::Conduit; @@ -79,11 +81,12 @@ struct libmuscle_communicator MockLogger logger_; MockProfiler profiler_; MockPortManager port_manager_; + MockMMPClient manager_; Communicator communicator_; libmuscle_communicator() - : communicator_("component", {}, connected_port_manager_, logger_, profiler_) + : communicator_("component", {}, connected_port_manager_, logger_, profiler_, manager_) { port_manager_.settings_in_connected.return_value = false; } @@ -120,6 +123,9 @@ struct libmuscle_communicator2 : libmuscle_communicator { PeerInfo peer_info("component", {}, conduits, peer_dims, peer_locations); connected_communicator_.set_peer_info(peer_info); + // disable receive timeouts for these tests, so we can check call + // signatures: mpp_client->receive.called_with(..., nullptr) + connected_communicator_.set_receive_timeout(-1.0); } }; @@ -169,7 +175,7 @@ TEST_F(libmuscle_communicator2, receive_message) { double saved_until = std::get<1>(recv_msg_saved_until); auto & mpp_client = connected_communicator_.clients_.at("peer"); - ASSERT_TRUE(mpp_client->receive.called_with("component.in")); + ASSERT_TRUE(mpp_client->receive.called_with("component.in", nullptr)); ASSERT_EQ(recv_msg.timestamp(), 2.0); ASSERT_EQ(recv_msg.next_timestamp(), 3.0); @@ -192,7 +198,7 @@ TEST_F(libmuscle_communicator2, receive_message_vector) { double saved_until = std::get<1>(recv_msg_saved_until); auto mpp_client = connected_communicator_.clients_.at("peer2[5]").get(); - ASSERT_TRUE(mpp_client->receive.called_with("component.in_v[5]")); + ASSERT_TRUE(mpp_client->receive.called_with("component.in_v[5]", nullptr)); ASSERT_EQ(recv_msg.timestamp(), 4.0); ASSERT_EQ(recv_msg.next_timestamp(), 6.0); @@ -292,7 +298,7 @@ TEST_F(libmuscle_communicator2, port_discard_success_on_resume) { int count = 0; - MockMPPClient::return_value.receive.side_effect = [&](Reference const &) { + MockMPPClient::return_value.receive.side_effect = [&](Reference const &, TimeoutHandler *) { return std::make_tuple( side_effect.at(count++).encoded(), ProfileData()); }; @@ -355,7 +361,7 @@ TEST_F(libmuscle_communicator2, test_shutdown) { } } - MockMPPClient::return_value.receive.side_effect = [&](Reference const & receiver) { + MockMPPClient::return_value.receive.side_effect = [&](Reference const & receiver, TimeoutHandler*) { return std::make_tuple(messages.at(receiver)->encoded(), ProfileData()); }; diff --git a/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp b/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp index 7ff08ef0..6f8e7aab 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp @@ -179,6 +179,7 @@ struct libmuscle_instance_base : ::testing::Test, ConnectedPortManagerFixture { auto & mock_comm = MockCommunicator::return_value; mock_comm.get_locations.return_value = std::vector( {"tcp:test1,test2", "tcp:test3"}); + mock_comm.get_receive_timeout.return_value = 10.0; auto & mock_port_manager = MockPortManager::return_value; mock_port_manager.settings_in_connected.return_value = false; diff --git a/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp b/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp index 7ad4218a..3c17736e 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include diff --git a/libmuscle/python/libmuscle/communicator.py b/libmuscle/python/libmuscle/communicator.py index 19b8e9c8..b4125aaf 100644 --- a/libmuscle/python/libmuscle/communicator.py +++ b/libmuscle/python/libmuscle/communicator.py @@ -3,6 +3,7 @@ from ymmsl import Identifier, Reference, Settings from libmuscle.endpoint import Endpoint +from libmuscle.mmp_client import MMPClient from libmuscle.mpp_message import ClosePort, MPPMessage from libmuscle.mpp_client import MPPClient from libmuscle.mpp_server import MPPServer @@ -12,6 +13,7 @@ from libmuscle.profiler import Profiler from libmuscle.profiling import ( ProfileEvent, ProfileEventType, ProfileTimestamp) +from libmuscle.receive_timeout_handler import Deadlock, ReceiveTimeoutHandler _logger = logging.getLogger(__name__) @@ -70,7 +72,8 @@ class Communicator: """ def __init__( self, kernel: Reference, index: List[int], - port_manager: PortManager, profiler: Profiler) -> None: + port_manager: PortManager, profiler: Profiler, + manager: MMPClient) -> None: """Create a Communicator. The instance reference must start with one or more Identifiers, @@ -88,6 +91,9 @@ def __init__( self._index = index self._port_manager = port_manager self._profiler = profiler + self._manager = manager + # Notify manager, by default, after 10 seconds waiting in receive_message() + self._receive_timeout = 10.0 self._server = MPPServer() @@ -117,6 +123,16 @@ def set_peer_info(self, peer_info: PeerInfo) -> None: """ self._peer_info = peer_info + def set_receive_timeout(self, receive_timeout: float) -> None: + """Update the timeout after which the manager is notified that we are waiting + for a message. + + Args: + receive_timeout: Timeout (seconds). A negative number disables the deadlock + notification mechanism. + """ + self._receive_timeout = receive_timeout + def send_message( self, port_name: str, message: Message, slot: Optional[int] = None, @@ -228,13 +244,26 @@ def receive_message( snd_endpoint = self._peer_info.get_peer_endpoints( recv_endpoint.port, slot_list)[0] client = self.__get_client(snd_endpoint.instance()) + timeout_handler = None + if self._receive_timeout >= 0: + timeout_handler = ReceiveTimeoutHandler( + self._manager, snd_endpoint.instance(), + port_name, slot, self._receive_timeout) try: - mpp_message_bytes, profile = client.receive(recv_endpoint.ref()) + mpp_message_bytes, profile = client.receive( + recv_endpoint.ref(), timeout_handler) except (ConnectionError, SocketClosed) as exc: raise RuntimeError( "Error while receiving a message: connection with peer" f" '{snd_endpoint.kernel}' was lost. Did the peer crash?" ) from exc + except Deadlock: + # Profiler messages may be used for debugging the deadlock + self._profiler.shutdown() + raise RuntimeError( + "Deadlock detected when when receiving a message on " + f"port '{port_and_slot}'. See manager logs for more detail." + ) from None recv_decode_event = ProfileEvent( ProfileEventType.RECEIVE_DECODE, ProfileTimestamp(), None, diff --git a/libmuscle/python/libmuscle/instance.py b/libmuscle/python/libmuscle/instance.py index bfca4d06..c9498dca 100644 --- a/libmuscle/python/libmuscle/instance.py +++ b/libmuscle/python/libmuscle/instance.py @@ -142,7 +142,8 @@ def __init__( """PortManager for this instance.""" self._communicator = Communicator( - self._name, self._index, self._port_manager, self._profiler) + self._name, self._index, self._port_manager, self._profiler, + self.__manager) """Communicator for this instance.""" self._declared_ports = ports @@ -186,6 +187,7 @@ def __init__( self._set_local_log_level() self._set_remote_log_level() self._setup_profiling() + self._setup_receive_timeout() def reuse_instance(self) -> bool: """Decide whether to run this instance again. @@ -809,6 +811,23 @@ def _setup_profiling(self) -> None: self._profiler.set_level(profile_level_str) + def _setup_receive_timeout(self) -> None: + """Configures receive timeout with settings from settings. + """ + try: + timeout = self.get_setting('muscle_deadlock_receive_timeout', 'float') + if 0 <= timeout < 0.1: + _logger.info( + "Provided muscle_deadlock_receive_timeout (%f) was less than " + "the minimum of 0.1 seconds, setting it to 0.1.", timeout) + timeout = 0.1 + self._communicator.set_receive_timeout(timeout) + except KeyError: + pass # do nothing and keep the default + _logger.debug( + "Timeout on receiving messages set to %f", + self._communicator._receive_timeout) + def _decide_reuse_instance(self) -> bool: """Decide whether and how to reuse the instance. diff --git a/libmuscle/python/libmuscle/manager/deadlock_detector.py b/libmuscle/python/libmuscle/manager/deadlock_detector.py new file mode 100644 index 00000000..677bfaad --- /dev/null +++ b/libmuscle/python/libmuscle/manager/deadlock_detector.py @@ -0,0 +1,140 @@ +import logging +from threading import Lock +from typing import Dict, List, Optional, Tuple + + +_logger = logging.getLogger(__name__) + + +class DeadlockDetector: + """The DeadlockDetector attempts to detect when multiple instances are stuck waiting + for each other. + + This class is responsible for handling WAITING_FOR_RECEIVE, IS_DEADLOCKED and + WAITING_FOR_RECEIVE_DONE MMP messages, which are submitted by the MMPServer. + + When a deadlock is detected, the cycle of instances that is waiting on each other is + logged with FATAL severity. + """ + + def __init__(self) -> None: + """Construct a new DeadlockDetector.""" + self._mutex = Lock() + """Mutex that should be locked before accessing instance variables.""" + self._waiting_instances: Dict[str, str] = {} + """Maps instance IDs to the peer instance IDs they are waiting for.""" + self._waiting_instance_ports: Dict[str, Tuple[str, Optional[int]]] = {} + """Maps instance IDs to the port/slot they are waiting on..""" + self._detected_deadlocks: List[List[str]] = [] + """List of deadlocked instance cycles. Set by _handle_potential_deadlock.""" + + def waiting_for_receive( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int] + ) -> None: + """Process a WAITING_FOR_RECEIVE message from an instance. + + This method can be called from any thread. + + Args: + instance_id: ID of instance that is waiting to receive a message. + peer_instance_id: ID of the peer that the instance is waiting on. + port_name: Name of the input port. + slot: Optional slot number of the input port. + """ + with self._mutex: + # Sanity checks, triggering this is a bug in the instance or the manager + assert instance_id not in self._waiting_instances + + # Register that the instance is waiting + self._waiting_instances[instance_id] = peer_instance_id + self._waiting_instance_ports[instance_id] = (port_name, slot) + self._check_for_deadlock(instance_id) + + def waiting_for_receive_done( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int] + ) -> None: + """Process a WAITING_FOR_RECEIVE_DONE message from an instance. + + This method can be called from any thread. + + Args: + instance_id: ID of instance that is waiting to receive a message. + peer_instance_id: ID of the peer that the instance is waiting on. + port_name: Name of the input port. + slot: Optional slot number of the input port. + """ + with self._mutex: + # Sanity checks, triggering these is a bug in the instance or the manager + assert instance_id in self._waiting_instances + assert self._waiting_instances[instance_id] == peer_instance_id + assert self._waiting_instance_ports[instance_id] == (port_name, slot) + + # We're not waiting anymore + del self._waiting_instances[instance_id] + del self._waiting_instance_ports[instance_id] + + # Check if we were part of a deadlock + for i, instance_list in enumerate(self._detected_deadlocks): + if instance_id in instance_list: + del self._detected_deadlocks[i] + break + + def is_deadlocked(self, instance_id: str) -> bool: + """Check if the provided instance is part of a detected deadlock. + + This method can be called from any thread. + """ + with self._mutex: + for deadlock_instances in self._detected_deadlocks: + if instance_id in deadlock_instances: + _logger.fatal( + "Deadlock detected, simulation is aborting!\n%s", + self._format_deadlock(deadlock_instances)) + return True + return False + + def _check_for_deadlock(self, instance_id: str) -> None: + """Check if there is a cycle of waiting instances that involves this instance. + + Make sure to lock self._mutex before calling this. + """ + deadlock_instances = [instance_id] + cur_instance = instance_id + while cur_instance in self._waiting_instances: + cur_instance = self._waiting_instances[cur_instance] + if cur_instance == instance_id: + self._handle_potential_deadlock(deadlock_instances) + return + deadlock_instances.append(cur_instance) + _logger.debug("No deadlock detected") + + def _handle_potential_deadlock(self, deadlock_instances: List[str]) -> None: + """Handle a potential deadlock. + + Make sure to lock self._mutex before calling this. + + Args: + deadlock_instances: list of instances waiting on eachother + """ + self._detected_deadlocks.append(deadlock_instances) + + def _format_deadlock(self, deadlock_instances: List[str]) -> str: + """Create and return formatted deadlock debug info. + + Args: + deadlock_instances: list of instances waiting on eachother + """ + num_instances = str(len(deadlock_instances)) + lines = [f"The following {num_instances} instances are deadlocked:"] + for i, instance in enumerate(deadlock_instances): + num = str(i+1).rjust(len(num_instances)) + peer_instance = self._waiting_instances[instance] + port, slot = self._waiting_instance_ports[instance] + slot_txt = "" if slot is None else f"[{slot}]" + lines.append( + f"{num}. Instance '{instance}' is waiting on instance '{peer_instance}'" + f" in a receive on port '{port}{slot_txt}'." + ) + return "\n".join(lines) diff --git a/libmuscle/python/libmuscle/manager/instance_manager.py b/libmuscle/python/libmuscle/manager/instance_manager.py index 8d06c45e..58582cda 100644 --- a/libmuscle/python/libmuscle/manager/instance_manager.py +++ b/libmuscle/python/libmuscle/manager/instance_manager.py @@ -190,6 +190,9 @@ def cancel_all() -> None: _logger.info( f'Instance {result.instance} was shut down by' f' MUSCLE3 because an error occurred elsewhere') + # Ensure we don't see this as a succesful run when shutdown() is called + # by another thread: + all_seemingly_okay = False else: stderr_file = ( self._run_dir.instance_dir(result.instance) / @@ -260,6 +263,9 @@ def cancel_all() -> None: 'More output may be found in' f' {self._run_dir.instance_dir(result.instance)}\n' ) + elif not all_seemingly_okay: + # shutdown() was called by another thread (e.g. the DeadlockDetector): + _logger.error('The simulation was aborted.') else: _logger.info('The simulation finished without error.') diff --git a/libmuscle/python/libmuscle/manager/manager.py b/libmuscle/python/libmuscle/manager/manager.py index 044a28d7..f54bf4d5 100644 --- a/libmuscle/python/libmuscle/manager/manager.py +++ b/libmuscle/python/libmuscle/manager/manager.py @@ -14,6 +14,7 @@ from libmuscle.manager.run_dir import RunDir from libmuscle.manager.snapshot_registry import SnapshotRegistry from libmuscle.manager.topology_store import TopologyStore +from libmuscle.manager.deadlock_detector import DeadlockDetector _logger = logging.getLogger(__name__) @@ -45,6 +46,7 @@ def __init__( self._profile_store = ProfileStore(log_dir) self._topology_store = TopologyStore(configuration) self._instance_registry = InstanceRegistry() + self._deadlock_detector = DeadlockDetector() if run_dir is not None: snapshot_dir = run_dir.snapshot_dir() else: @@ -83,7 +85,7 @@ def __init__( self._server = MMPServer( self._logger, self._profile_store, self._configuration, self._instance_registry, self._topology_store, - self._snapshot_registry, run_dir) + self._snapshot_registry, self._deadlock_detector, run_dir) if self._instance_manager: self._instance_manager.set_manager_location( diff --git a/libmuscle/python/libmuscle/manager/mmp_server.py b/libmuscle/python/libmuscle/manager/mmp_server.py index 94847fd9..1d4266e3 100644 --- a/libmuscle/python/libmuscle/manager/mmp_server.py +++ b/libmuscle/python/libmuscle/manager/mmp_server.py @@ -20,6 +20,7 @@ from libmuscle.mcp.tcp_transport_server import TcpTransportServer from libmuscle.mcp.transport_server import RequestHandler from libmuscle.manager.profile_store import ProfileStore +from libmuscle.manager.deadlock_detector import DeadlockDetector from libmuscle.profiling import ( ProfileEvent, ProfileEventType, ProfileTimestamp) from libmuscle.snapshot import SnapshotMetadata @@ -77,6 +78,7 @@ def __init__( instance_registry: InstanceRegistry, topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, + deadlock_detector: DeadlockDetector, run_dir: Optional[RunDir] ) -> None: """Create an MMPRequestHandler. @@ -93,6 +95,7 @@ def __init__( self._instance_registry = instance_registry self._topology_store = topology_store self._snapshot_registry = snapshot_registry + self._deadlock_detector = deadlock_detector self._run_dir = run_dir self._reference_time = time.monotonic() @@ -124,6 +127,12 @@ def handle_request(self, request: bytes) -> bytes: response = self._submit_snapshot(*req_args) elif req_type == RequestType.GET_CHECKPOINT_INFO.value: response = self._get_checkpoint_info(*req_args) + elif req_type == RequestType.WAITING_FOR_RECEIVE.value: + response = self._waiting_for_receive(*req_args) + elif req_type == RequestType.WAITING_FOR_RECEIVE_DONE.value: + response = self._waiting_for_receive_done(*req_args) + elif req_type == RequestType.IS_DEADLOCKED.value: + response = self._is_deadlocked(*req_args) return cast(bytes, msgpack.packb(response, use_bin_type=True)) @@ -357,6 +366,40 @@ def _get_checkpoint_info(self, instance_id: str) -> Any: resume, snapshot_directory] + def _waiting_for_receive( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int]) -> Any: + """Indicate that the instance is waiting to receive a message. + + Args: + instance_id: The instance that is waiting + port_name: Port name that the instance is waiting on + slot: Slot that the instance is waiting on + """ + self._deadlock_detector.waiting_for_receive( + instance_id, peer_instance_id, port_name, slot) + return [ResponseType.SUCCESS.value] + + def _waiting_for_receive_done( + self, instance_id: str, peer_instance_id: str, + port_name: str, slot: Optional[int]) -> Any: + """Indicate that the instance is done waiting to receive a message. + + Args: + instance_id: The instance that is waiting + port_name: Port name that the instance is waiting on + slot: Slot that the instance is waiting on + """ + self._deadlock_detector.waiting_for_receive_done( + instance_id, peer_instance_id, port_name, slot) + return [ResponseType.SUCCESS.value] + + def _is_deadlocked(self, instance_id: str) -> Any: + """Check if the provided instance is part of a detected deadlock. + """ + result = self._deadlock_detector.is_deadlocked(instance_id) + return [ResponseType.SUCCESS.value, result] + class MMPServer: """The MUSCLE Manager Protocol server. @@ -373,6 +416,7 @@ def __init__( instance_registry: InstanceRegistry, topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, + deadlock_detector: DeadlockDetector, run_dir: Optional[RunDir] ) -> None: """Create an MMPServer. @@ -395,7 +439,7 @@ def __init__( """ self._handler = MMPRequestHandler( logger, profile_store, configuration, instance_registry, - topology_store, snapshot_registry, run_dir) + topology_store, snapshot_registry, deadlock_detector, run_dir) try: self._server = TcpTransportServer(self._handler, 9000) except OSError as e: diff --git a/libmuscle/python/libmuscle/manager/test/conftest.py b/libmuscle/python/libmuscle/manager/test/conftest.py index 78a50487..53528b6c 100644 --- a/libmuscle/python/libmuscle/manager/test/conftest.py +++ b/libmuscle/python/libmuscle/manager/test/conftest.py @@ -8,6 +8,7 @@ from libmuscle.manager.snapshot_registry import SnapshotRegistry from libmuscle.manager.topology_store import TopologyStore from libmuscle.manager.profile_store import ProfileStore +from libmuscle.manager.deadlock_detector import DeadlockDetector @pytest.fixture @@ -54,13 +55,18 @@ def snapshot_registry(mmp_configuration, topology_store) -> SnapshotRegistry: return SnapshotRegistry(mmp_configuration, None, topology_store) +@pytest.fixture +def deadlock_detector() -> DeadlockDetector: + return DeadlockDetector() + + @pytest.fixture def mmp_request_handler( logger, profile_store, mmp_configuration, instance_registry, - topology_store, snapshot_registry): + topology_store, snapshot_registry, deadlock_detector): return MMPRequestHandler( logger, profile_store, mmp_configuration, instance_registry, - topology_store, snapshot_registry, None) + topology_store, snapshot_registry, deadlock_detector, None) @pytest.fixture @@ -77,10 +83,10 @@ def loaded_instance_registry(instance_registry): @pytest.fixture def registered_mmp_request_handler( logger, profile_store, mmp_configuration, loaded_instance_registry, - topology_store, snapshot_registry): + topology_store, snapshot_registry, deadlock_detector): return MMPRequestHandler( logger, profile_store, mmp_configuration, loaded_instance_registry, - topology_store, snapshot_registry, None) + topology_store, snapshot_registry, deadlock_detector, None) @pytest.fixture @@ -133,8 +139,8 @@ def loaded_instance_registry2(): @pytest.fixture def registered_mmp_request_handler2( logger, profile_store, mmp_configuration, loaded_instance_registry2, - topology_store2, snapshot_registry2, tmp_path): + topology_store2, snapshot_registry2, deadlock_detector, tmp_path): return MMPRequestHandler( logger, profile_store, mmp_configuration, loaded_instance_registry2, topology_store2, snapshot_registry2, - RunDir(tmp_path)) + deadlock_detector, RunDir(tmp_path)) diff --git a/libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py b/libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py new file mode 100644 index 00000000..671ba22b --- /dev/null +++ b/libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py @@ -0,0 +1,69 @@ +import pytest + +from libmuscle.manager.deadlock_detector import DeadlockDetector + + +@pytest.fixture +def detector() -> DeadlockDetector: + return DeadlockDetector() + + +def test_no_deadlock(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", None) + assert not detector.is_deadlocked("macro") + assert not detector.is_deadlocked("micro") + detector.waiting_for_receive_done("macro", "micro", "s", None) + assert not detector.is_deadlocked("macro") + assert not detector.is_deadlocked("micro") + + +def test_double_waiting_log_error(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", 0) + with pytest.raises(AssertionError): + detector.waiting_for_receive("macro", "micro", "s", 1) + + +def test_not_waiting_log_error(detector: DeadlockDetector) -> None: + with pytest.raises(AssertionError): + detector.waiting_for_receive_done("macro", "micro", "s", 0) + + +def test_waiting_for_different_instance_log_error(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", 0) + with pytest.raises(AssertionError): + detector.waiting_for_receive_done("macro", "meso", "s", 0) + + +def test_waiting_for_different_port_log_error(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", 0) + with pytest.raises(AssertionError): + detector.waiting_for_receive_done("macro", "micro", "f_init", 0) + + +def test_deadlock(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", None) + assert not detector.is_deadlocked("macro") + assert not detector.is_deadlocked("micro") + detector.waiting_for_receive("micro", "macro", "f_init", None) + assert detector.is_deadlocked("macro") + assert detector.is_deadlocked("micro") + + +def test_deadlock_cancelled(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", None) + detector.waiting_for_receive("micro", "macro", "f_init", None) + detector.waiting_for_receive_done("macro", "micro", "s", None) + assert not detector.is_deadlocked("macro") + assert not detector.is_deadlocked("micro") + + +def test_double_deadlock(detector: DeadlockDetector) -> None: + detector.waiting_for_receive("macro", "micro", "s", None) + detector.waiting_for_receive("micro", "macro", "f_init", None) + detector.waiting_for_receive("cycle2", "peer2", "s", None) + detector.waiting_for_receive("peer2", "cycle2", "f_init", None) + detector.waiting_for_receive_done("macro", "micro", "s", None) + assert not detector.is_deadlocked("macro") + assert not detector.is_deadlocked("micro") + assert detector.is_deadlocked("cycle2") + assert detector.is_deadlocked("peer2") diff --git a/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py b/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py index 768d2160..0286649c 100644 --- a/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py +++ b/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py @@ -15,10 +15,10 @@ def test_create_servicer( logger, profile_store, mmp_configuration, instance_registry, - topology_store, snapshot_registry): + topology_store, snapshot_registry, deadlock_detector): MMPRequestHandler( logger, profile_store, mmp_configuration, instance_registry, - topology_store, snapshot_registry, None) + topology_store, snapshot_registry, deadlock_detector, None) def test_log_message(mmp_request_handler, caplog): diff --git a/libmuscle/python/libmuscle/mcp/protocol.py b/libmuscle/python/libmuscle/mcp/protocol.py index 5d1217ed..d02f95cc 100644 --- a/libmuscle/python/libmuscle/mcp/protocol.py +++ b/libmuscle/python/libmuscle/mcp/protocol.py @@ -22,6 +22,10 @@ class RequestType(Enum): SUBMIT_PROFILE_EVENTS = 6 SUBMIT_SNAPSHOT = 7 GET_CHECKPOINT_INFO = 8 + # Connection deadlock detection + WAITING_FOR_RECEIVE = 9 + WAITING_FOR_RECEIVE_DONE = 10 + IS_DEADLOCKED = 11 # MUSCLE Peer Protocol GET_NEXT_MESSAGE = 21 diff --git a/libmuscle/python/libmuscle/mcp/tcp_transport_client.py b/libmuscle/python/libmuscle/mcp/tcp_transport_client.py index ed9536d2..78c399b3 100644 --- a/libmuscle/python/libmuscle/mcp/tcp_transport_client.py +++ b/libmuscle/python/libmuscle/mcp/tcp_transport_client.py @@ -1,8 +1,9 @@ from errno import ENOTCONN +import select import socket from typing import Optional, Tuple -from libmuscle.mcp.transport_client import ProfileData, TransportClient +from libmuscle.mcp.transport_client import ProfileData, TransportClient, TimeoutHandler from libmuscle.mcp.tcp_util import recv_all, recv_int64, send_int64 from libmuscle.profiling import ProfileTimestamp @@ -51,13 +52,22 @@ def __init__(self, location: str) -> None: sock.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) self._socket = sock - def call(self, request: bytes) -> Tuple[bytes, ProfileData]: + if hasattr(select, "poll"): + self._poll_obj: Optional[select.poll] = select.poll() + self._poll_obj.register(self._socket, select.POLLIN) + else: + self._poll_obj = None # On platforms that don't support select.poll + + def call(self, request: bytes, timeout_handler: Optional[TimeoutHandler] = None + ) -> Tuple[bytes, ProfileData]: """Send a request to the server and receive the response. This is a blocking call. Args: request: The request to send + timeout_handler: Optional timeout handler. This is used for communication + deadlock detection. Returns: The received response @@ -66,13 +76,41 @@ def call(self, request: bytes) -> Tuple[bytes, ProfileData]: send_int64(self._socket, len(request)) self._socket.sendall(request) + did_timeout = False + if timeout_handler is not None: + while not self._poll(timeout_handler.timeout): + did_timeout = True + timeout_handler.on_timeout() + length = recv_int64(self._socket) + if did_timeout: + assert timeout_handler is not None # mypy + timeout_handler.on_receive() start_transfer = ProfileTimestamp() response = recv_all(self._socket, length) stop_transfer = ProfileTimestamp() return response, (start_wait, start_transfer, stop_transfer) + def _poll(self, timeout: float) -> bool: + """Poll the socket and return whether its ready for receiving. + + This method blocks until the socket is ready for receiving, or :param:`timeout` + seconds have passed (whichever is earlier). + + Args: + timeout: timeout in seconds + + Returns: + True if the socket is ready for receiving data, False otherwise. + """ + if self._poll_obj is not None: + ready = self._poll_obj.poll(timeout * 1000) # poll timeout is in ms + else: + # Fallback to select() + ready, _, _ = select.select([self._socket], (), (), timeout) + return bool(ready) + def close(self) -> None: """Closes this client. diff --git a/libmuscle/python/libmuscle/mcp/transport_client.py b/libmuscle/python/libmuscle/mcp/transport_client.py index 55942dc9..9b1c0ab5 100644 --- a/libmuscle/python/libmuscle/mcp/transport_client.py +++ b/libmuscle/python/libmuscle/mcp/transport_client.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import Optional, Tuple from libmuscle.profiling import ProfileTimestamp @@ -6,6 +6,28 @@ ProfileData = Tuple[ProfileTimestamp, ProfileTimestamp, ProfileTimestamp] +class TimeoutHandler: + """Object handling timeouts during :meth:`TransportClient.call`.""" + + @property + def timeout(self) -> float: + """Timeout (in seconds) after which :meth:`on_timeout` is called.""" + raise NotImplementedError() # pragma: no cover + + def on_timeout(self) -> None: + """Callback when :attr:`timeout` seconds have passed without a response from the + peer. + """ + raise NotImplementedError() # pragma: no cover + + def on_receive(self) -> None: + """Callback when receiving a response from the peer. + + Note: this method is only called when the request has timed out. + """ + raise NotImplementedError() # pragma: no cover + + class TransportClient: """A client that connects to an MCP server. @@ -25,7 +47,8 @@ def can_connect_to(location: str) -> bool: """ raise NotImplementedError() # pragma: no cover - def call(self, request: bytes) -> Tuple[bytes, ProfileData]: + def call(self, request: bytes, timeout_handler: Optional[TimeoutHandler] = None + ) -> Tuple[bytes, ProfileData]: """Send a request to the server and receive the response. This is a blocking call. Besides the result, this function @@ -36,6 +59,8 @@ def call(self, request: bytes) -> Tuple[bytes, ProfileData]: Args: request: The request to send + timeout_handler: Optional timeout handler. This is used for communication + deadlock detection. Returns: The received response, and the timestamps diff --git a/libmuscle/python/libmuscle/mmp_client.py b/libmuscle/python/libmuscle/mmp_client.py index 34238a87..d8d1a8e1 100644 --- a/libmuscle/python/libmuscle/mmp_client.py +++ b/libmuscle/python/libmuscle/mmp_client.py @@ -275,6 +275,33 @@ def deregister_instance(self) -> None: raise RuntimeError('Error deregistering instance: {}'.format( response[1])) + def waiting_for_receive( + self, peer_instance_id: Reference, port_name: str, slot: Optional[int] + ) -> None: + """Notify the manager that we're waiting to receive a message.""" + request = [ + RequestType.WAITING_FOR_RECEIVE.value, + str(self._instance_id), + str(peer_instance_id), port_name, slot] + self._call_manager(request) + + def waiting_for_receive_done( + self, peer_instance_id: Reference, port_name: str, slot: Optional[int] + ) -> None: + """Notify the manager that we're done waiting to receive a message.""" + request = [ + RequestType.WAITING_FOR_RECEIVE_DONE.value, + str(self._instance_id), + str(peer_instance_id), port_name, slot] + self._call_manager(request) + + def is_deadlocked(self) -> bool: + """Ask the manager if we're part of a deadlock.""" + request = [ + RequestType.IS_DEADLOCKED.value, str(self._instance_id)] + response = self._call_manager(request) + return bool(response[1]) + def _call_manager(self, request: Any) -> Any: """Call the manager and do en/decoding. diff --git a/libmuscle/python/libmuscle/mpp_client.py b/libmuscle/python/libmuscle/mpp_client.py index 852ec938..5eedbf97 100644 --- a/libmuscle/python/libmuscle/mpp_client.py +++ b/libmuscle/python/libmuscle/mpp_client.py @@ -4,7 +4,7 @@ from ymmsl import Reference from libmuscle.mcp.protocol import RequestType -from libmuscle.mcp.transport_client import ProfileData, TransportClient +from libmuscle.mcp.transport_client import ProfileData, TransportClient, TimeoutHandler from libmuscle.mcp.type_registry import transport_client_types @@ -40,7 +40,8 @@ def __init__(self, locations: List[str]) -> None: self._transport_client = client - def receive(self, receiver: Reference) -> Tuple[bytes, ProfileData]: + def receive(self, receiver: Reference, timeout_handler: Optional[TimeoutHandler] + ) -> Tuple[bytes, ProfileData]: """Receive a message from a port this client connects to. Args: @@ -51,7 +52,7 @@ def receive(self, receiver: Reference) -> Tuple[bytes, ProfileData]: """ request = [RequestType.GET_NEXT_MESSAGE.value, str(receiver)] encoded_request = msgpack.packb(request, use_bin_type=True) - return self._transport_client.call(encoded_request) + return self._transport_client.call(encoded_request, timeout_handler) def close(self) -> None: """Closes this client. diff --git a/libmuscle/python/libmuscle/receive_timeout_handler.py b/libmuscle/python/libmuscle/receive_timeout_handler.py new file mode 100644 index 00000000..a8ca0cbd --- /dev/null +++ b/libmuscle/python/libmuscle/receive_timeout_handler.py @@ -0,0 +1,64 @@ +from typing import Optional + +from ymmsl import Reference + +from libmuscle.mcp.transport_client import TimeoutHandler +from libmuscle.mmp_client import MMPClient + + +class Deadlock(Exception): + """Exception that is raised when the simulation has deadlocked.""" + + +class ReceiveTimeoutHandler(TimeoutHandler): + """Timeout handler when receiving messages from peers. + + This handler sends a message to the Muscle Manager when the receive times out (and + another message when the message does arrive). + + This is used by the manager to detect if the simulation is in a deadlock, where a + cycle of instances is waiting on each other. + """ + + def __init__( + self, manager: MMPClient, + peer_instance: Reference, port_name: str, slot: Optional[int], + timeout: float + ) -> None: + """Initialize a new timeout handler. + + Args: + manager: Connection to the muscle manager. + peer_instance: the peer instance we try to receive from. + port_name: the name of the port we try to receive on. + slot: the slot we try to receive on. + timeout: Timeout in seconds. + """ + self._manager = manager + self._peer_instance = peer_instance + self._port_name = port_name + self._slot = slot + self._timeout = timeout + # Counter to keep track of the number of timeouts + self._num_timeouts = 0 + + @property + def timeout(self) -> float: + # Increase timeout by a factor 1.5 with every timeout we hit: + factor = 1.5 ** self._num_timeouts + return self._timeout * factor + + def on_timeout(self) -> None: + if self._num_timeouts == 0: + # Notify the manager that we're waiting for a receive + self._manager.waiting_for_receive( + self._peer_instance, self._port_name, self._slot) + else: + # Ask the manager if we're part of a detected deadlock + if self._manager.is_deadlocked(): + raise Deadlock() + self._num_timeouts += 1 + + def on_receive(self) -> None: + self._manager.waiting_for_receive_done( + self._peer_instance, self._port_name, self._slot) diff --git a/libmuscle/python/libmuscle/test/test_communicator.py b/libmuscle/python/libmuscle/test/test_communicator.py index ff1e08dd..739777fa 100644 --- a/libmuscle/python/libmuscle/test/test_communicator.py +++ b/libmuscle/python/libmuscle/test/test_communicator.py @@ -1,5 +1,5 @@ import logging -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch import pytest @@ -46,7 +46,7 @@ def mpp_client(MPPClient): @pytest.fixture def communicator(connected_port_manager, profiler): - return Communicator(Ref('component'), [], connected_port_manager, profiler) + return Communicator(Ref('component'), [], connected_port_manager, profiler, Mock()) @pytest.fixture @@ -115,9 +115,10 @@ def test_receive_message(connected_communicator, mpp_client): mpp_client.receive.return_value = msg.encoded(), MagicMock() + connected_communicator.set_receive_timeout(-1) recv_msg, saved_until = connected_communicator.receive_message('in') - mpp_client.receive.assert_called_with(Ref('component.in')) + mpp_client.receive.assert_called_with(Ref('component.in'), None) assert recv_msg.timestamp == 2.0 assert recv_msg.next_timestamp == 3.0 @@ -135,9 +136,10 @@ def test_receive_message_vector(connected_communicator, mpp_client): mpp_client.receive.return_value = msg.encoded(), MagicMock() + connected_communicator.set_receive_timeout(-1) recv_msg, saved_until = connected_communicator.receive_message('in_v', 5) - mpp_client.receive.assert_called_with(Ref('component.in_v[5]')) + mpp_client.receive.assert_called_with(Ref('component.in_v[5]'), None) assert recv_msg.timestamp == 4.0 assert recv_msg.next_timestamp == 6.0 @@ -272,7 +274,7 @@ def test_shutdown( sender, receiver, slot, float('inf'), None, Settings(), 0, 3.5, ClosePort()) - def receive(receiver): + def receive(receiver, timeout_handler): return messages[receiver].encoded(), MagicMock() mpp_client.receive = receive diff --git a/libmuscle/python/libmuscle/test/test_instance.py b/libmuscle/python/libmuscle/test/test_instance.py index 9dc52c52..26e80f17 100644 --- a/libmuscle/python/libmuscle/test/test_instance.py +++ b/libmuscle/python/libmuscle/test/test_instance.py @@ -62,7 +62,10 @@ def communicator(): @pytest.fixture def settings_manager(): with patch('libmuscle.instance.SettingsManager') as SettingsManager: - yield SettingsManager.return_value + settings_manager = SettingsManager.return_value + # Emulate no settings available + settings_manager.get_setting.side_effect = KeyError() + yield settings_manager @pytest.fixture(autouse=True) @@ -298,6 +301,7 @@ def test_list_settings(instance, settings_manager): def test_get_setting(instance, settings_manager): + settings_manager.get_setting.side_effect = None # don't raise KeyError instance.get_setting('test', 'int') settings_manager.get_setting.assert_called_with( Ref('component'), Ref('test'), 'int') diff --git a/setup.cfg b/setup.cfg index 7f90b47c..f56bea66 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,6 +2,10 @@ testpaths = libmuscle/python integration_test addopts = --cov --cov-report xml --cov-report term-missing -s # -vv --log-cli-level=DEBUG +filterwarnings = + # raise an error when there are unhandled exceptions in a worker thread + error::pytest.PytestUnhandledThreadExceptionWarning + [mypy] files = libmuscle/python/**/*.py, scripts/*.py, muscle3/*.py