From 94869da3eee3fa2432021b33083c20bd389aedf1 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 16 Aug 2024 16:04:20 +0200 Subject: [PATCH] Implement C++ components of deadlock detection --- libmuscle/cpp/src/libmuscle/communicator.cpp | 16 +++++-- libmuscle/cpp/src/libmuscle/communicator.hpp | 22 ++++++++- libmuscle/cpp/src/libmuscle/instance.cpp | 22 ++++++++- libmuscle/cpp/src/libmuscle/mcp/protocol.hpp | 3 ++ .../libmuscle/mcp/tcp_transport_client.cpp | 45 ++++++++++++++++++- .../libmuscle/mcp/tcp_transport_client.hpp | 3 +- .../src/libmuscle/mcp/transport_client.hpp | 18 +++++++- libmuscle/cpp/src/libmuscle/mmp_client.cpp | 24 ++++++++++ libmuscle/cpp/src/libmuscle/mmp_client.hpp | 10 +++++ libmuscle/cpp/src/libmuscle/mpp_client.cpp | 4 +- libmuscle/cpp/src/libmuscle/mpp_client.hpp | 3 +- .../src/libmuscle/receive_timeout_handler.cpp | 29 ++++++++++++ .../src/libmuscle/receive_timeout_handler.hpp | 40 +++++++++++++++++ .../tests/mocks/mock_communicator.hpp | 13 ++++-- .../libmuscle/tests/mocks/mock_mmp_client.hpp | 10 +++++ .../libmuscle/tests/mocks/mock_mpp_client.hpp | 15 +++++-- .../src/libmuscle/tests/test_communicator.cpp | 16 ++++--- .../cpp/src/libmuscle/tests/test_instance.cpp | 1 + .../libmuscle/tests/test_snapshot_manager.cpp | 1 + 19 files changed, 270 insertions(+), 25 deletions(-) create mode 100644 libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp create mode 100644 libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp diff --git a/libmuscle/cpp/src/libmuscle/communicator.cpp b/libmuscle/cpp/src/libmuscle/communicator.cpp index 29cd4293..016eecb2 100644 --- a/libmuscle/cpp/src/libmuscle/communicator.cpp +++ b/libmuscle/cpp/src/libmuscle/communicator.cpp @@ -34,14 +34,17 @@ Communicator::Communicator( ymmsl::Reference const & kernel, std::vector const & index, PortManager & port_manager, - Logger & logger, Profiler & profiler) + Logger & logger, Profiler & profiler, + MMPClient & manager) : kernel_(kernel) , index_(index) , port_manager_(port_manager) , logger_(logger) , profiler_(profiler) + , manager_(manager) , server_() , clients_() + , receive_timeout_(10.0) // Notify manager, by default, after 10 seconds waiting in receive_message() {} std::vector Communicator::get_locations() const { @@ -138,8 +141,12 @@ std::tuple Communicator::receive_message( Endpoint snd_endpoint = peer_info_.get().get_peer_endpoints( recv_endpoint.port, slot_list).at(0); MPPClient & client = get_client_(snd_endpoint.instance()); + std::string peer_instance = static_cast(snd_endpoint.instance()); + ReceiveTimeoutHandler handler( + manager_, peer_instance, port_name, slot, receive_timeout_); + ReceiveTimeoutHandler *timeout_handler = receive_timeout_ < 0 ? nullptr : &handler; auto msg_and_profile = try_receive_( - client, recv_endpoint.ref(), snd_endpoint.kernel); + client, recv_endpoint.ref(), snd_endpoint.kernel, timeout_handler); auto & msg = std::get<0>(msg_and_profile); ProfileEvent recv_decode_event( @@ -289,9 +296,10 @@ Endpoint Communicator::get_endpoint_( } std::tuple, mcp::ProfileData> Communicator::try_receive_( - MPPClient & client, Reference const & receiver, Reference const & peer) { + MPPClient & client, Reference const & receiver, Reference const & peer, + ReceiveTimeoutHandler *timeout_handler) { try { - return client.receive(receiver); + return client.receive(receiver, timeout_handler); } catch(std::runtime_error const & err) { throw std::runtime_error( "Error while receiving a message: connection with peer '" + diff --git a/libmuscle/cpp/src/libmuscle/communicator.hpp b/libmuscle/cpp/src/libmuscle/communicator.hpp index 535c988f..90257c0a 100644 --- a/libmuscle/cpp/src/libmuscle/communicator.hpp +++ b/libmuscle/cpp/src/libmuscle/communicator.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,7 @@ #include #include #include +#include #include #include @@ -53,7 +55,8 @@ class Communicator { ymmsl::Reference const & kernel, std::vector const & index, PortManager & port_manager, - Logger & logger, Profiler & profiler); + Logger & logger, Profiler & profiler, + MMPClient & manager); /** Returns a list of locations that we can be reached at. * @@ -127,6 +130,19 @@ class Communicator { */ void shutdown(); + /** Update the timeout after which the manager is notified that we are + * waiting for a message. + * + * @param receive_timeout Timeout (seconds). A negative number disables + * the deadlock notification mechanism. + */ + void set_receive_timeout(double receive_timeout) { receive_timeout_ = receive_timeout; } + + /** Get the timeout after which the manager is notified that we are + * waiting for a message. + */ + double get_receive_timeout() const { return receive_timeout_; } + PRIVATE: using Ports_ = std::unordered_map; @@ -140,7 +156,7 @@ class Communicator { std::tuple, mcp::ProfileData> try_receive_( MPPClient & client, ymmsl::Reference const & receiver, - ymmsl::Reference const & peer); + ymmsl::Reference const & peer, ReceiveTimeoutHandler *handler); void close_port_(std::string const & port_name, Optional slot = {}); @@ -186,9 +202,11 @@ class Communicator { PortManager & port_manager_; Logger & logger_; Profiler & profiler_; + MMPClient & manager_; MPPServer server_; std::unordered_map> clients_; Optional peer_info_; + double receive_timeout_; }; } } diff --git a/libmuscle/cpp/src/libmuscle/instance.cpp b/libmuscle/cpp/src/libmuscle/instance.cpp index 8edc11c1..53bbfb28 100644 --- a/libmuscle/cpp/src/libmuscle/instance.cpp +++ b/libmuscle/cpp/src/libmuscle/instance.cpp @@ -143,6 +143,7 @@ class Instance::Impl { void deregister_(); void setup_checkpointing_(); void setup_profiling_(); + void setup_receive_timeout_(); ::ymmsl::Reference make_full_name_(int argc, char const * const argv[]) const; std::string extract_manager_location_(int argc, char const * const argv[]) const; @@ -226,7 +227,7 @@ Instance::Impl::Impl( port_manager_.reset(new PortManager(index_(), ports)); communicator_.reset( new Communicator( - name_(), index_(), *port_manager_, *logger_, *profiler_)); + name_(), index_(), *port_manager_, *logger_, *profiler_, *manager_)); snapshot_manager_.reset(new SnapshotManager( instance_name_, *manager_, *port_manager_, *logger_)); trigger_manager_.reset(new TriggerManager()); @@ -239,6 +240,7 @@ Instance::Impl::Impl( set_local_log_level_(); set_remote_log_level_(); setup_profiling_(); + setup_receive_timeout_(); #ifdef MUSCLE_ENABLE_MPI auto sbase_data = Data(settings_manager_.base); msgpack::sbuffer sbuf; @@ -556,6 +558,24 @@ void Instance::Impl::setup_profiling_() { profiler_->set_level(profile_level_str); } +void Instance::Impl::setup_receive_timeout_() { + double timeout; + try { + timeout = settings_manager_.get_setting( + instance_name_, "muscle_deadlock_receive_timeout").as(); + communicator_->set_receive_timeout(timeout); + } + catch (std::runtime_error const & e) { + logger_->error(e.what() + std::string(" in muscle_deadlock_receive_timeout")); + } + catch (std::out_of_range const &) { + // muscle_deadlock_receive_timeout not set, do nothing and keep the default + } + logger_->debug( + "Timeout on receiving messages set to ", + communicator_->get_receive_timeout()); +} + Message Instance::Impl::receive_message( std::string const & port_name, Optional slot, diff --git a/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp b/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp index c45df271..25dd142b 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp +++ b/libmuscle/cpp/src/libmuscle/mcp/protocol.hpp @@ -19,6 +19,9 @@ enum class RequestType { submit_profile_events = 6, submit_snapshot = 7, get_checkpoint_info = 8, + // Connection deadlock detection + waiting_for_receive = 9, + waiting_for_receive_done = 10, // MUSCLE Peer Protocol get_next_message = 21 diff --git a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp index c2551a3b..dd5b0f14 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include namespace { @@ -128,12 +130,51 @@ TcpTransportClient::~TcpTransportClient() { } std::tuple, ProfileData> TcpTransportClient::call( - char const * req_buf, std::size_t req_len + char const * req_buf, std::size_t req_len, + TimeoutHandler* timeout_handler ) const { ProfileTimestamp start_wait; send_frame(socket_fd_, req_buf, req_len); - int64_t length = recv_int64(socket_fd_); + int64_t length; + if (timeout_handler == nullptr) { + length = recv_int64(socket_fd_); + } else { + using std::chrono::duration; + using std::chrono::steady_clock; + using std::chrono::milliseconds; + using std::chrono::duration_cast; + + const auto timeout_duration = duration(timeout_handler->get_timeout()); + const auto deadline = steady_clock::now() + timeout_duration; + int poll_result; + pollfd socket_poll_fd; + socket_poll_fd.fd = socket_fd_; + socket_poll_fd.events = POLLIN; + do { + int timeout_ms = duration_cast(deadline - steady_clock::now()).count(); + poll_result = poll(&socket_poll_fd, 1, timeout_ms); + + if (poll_result >= 0) + break; + + if (errno != EINTR) + throw std::runtime_error("Unexpected error during poll(): "+std::to_string(errno)); + + // poll() was interrupted by a signal: retry with re-calculated timeout + } while (1); + + if (poll_result == 0) { + // time limit expired + timeout_handler->on_timeout(); + length = recv_int64(socket_fd_); + timeout_handler->on_receive(); + } else { + // socket is ready for a receive, this call shouldn't block: + length = recv_int64(socket_fd_); + } + } + ProfileTimestamp start_transfer; std::vector result(length); recv_all(socket_fd_, result.data(), result.size()); diff --git a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp index 9da32e4b..4f33f4f7 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp @@ -46,7 +46,8 @@ class TcpTransportClient : public TransportClient { * @return A byte array with the received data. */ virtual std::tuple, ProfileData> call( - char const * req_buf, std::size_t req_len) const override; + char const * req_buf, std::size_t req_len, + TimeoutHandler* timeout_handler=nullptr) const override; /** Closes this client. * diff --git a/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp b/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp index 556f74f2..bfe03ee3 100644 --- a/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp @@ -18,6 +18,21 @@ namespace libmuscle { namespace _MUSCLE_IMPL_NS { namespace mcp { using ProfileData = std::tuple< ProfileTimestamp, ProfileTimestamp, ProfileTimestamp>; +class TimeoutHandler { + public: + virtual ~TimeoutHandler() = default; + + /** Timeout (in seconds) after which on_timeout is called. */ + virtual double get_timeout() = 0; + /** Callback when getTimeout seconds have passed without a response from * the peer. + */ + virtual void on_timeout() = 0; + /** Callback when receiving a response from the peer. + * + * Note: this method is only called when the request has timed out. + */ + virtual void on_receive() = 0; +}; /** A client that connects to an MCP transport server. * @@ -73,7 +88,8 @@ class TransportClient { * received data, and the timestamps. */ virtual std::tuple, ProfileData> call( - char const * req_buf, std::size_t req_len) const = 0; + char const * req_buf, std::size_t req_len, + TimeoutHandler* timeout_handler=nullptr) const = 0; /** Closes this client. * diff --git a/libmuscle/cpp/src/libmuscle/mmp_client.cpp b/libmuscle/cpp/src/libmuscle/mmp_client.cpp index ecacc2c2..e5d528e8 100644 --- a/libmuscle/cpp/src/libmuscle/mmp_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mmp_client.cpp @@ -323,6 +323,30 @@ void MMPClient::deregister_instance() { } } +void MMPClient::waiting_for_receive( + std::string const & peer_instance_id, std::string const & port_name, + Optional slot) +{ + auto request = Data::list( + static_cast(RequestType::waiting_for_receive), + static_cast(instance_id_), + peer_instance_id, port_name, encode_optional(slot)); + + auto response = call_manager_(request); +} + +void MMPClient::waiting_for_receive_done( + std::string const & peer_instance_id, std::string const & port_name, + Optional slot) +{ + auto request = Data::list( + static_cast(RequestType::waiting_for_receive_done), + static_cast(instance_id_), + peer_instance_id, port_name, encode_optional(slot)); + + auto response = call_manager_(request); +} + DataConstRef MMPClient::call_manager_(DataConstRef const & request) { std::lock_guard lock(mutex_); diff --git a/libmuscle/cpp/src/libmuscle/mmp_client.hpp b/libmuscle/cpp/src/libmuscle/mmp_client.hpp index c082809b..1de227b4 100644 --- a/libmuscle/cpp/src/libmuscle/mmp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mmp_client.hpp @@ -123,6 +123,16 @@ class MMPClient { void deregister_instance(); + /** Notify the manager that we're waiting to receive a message. */ + void waiting_for_receive( + std::string const & peer_instance_id, std::string const & port_name, + Optional slot); + + /** Notify the manager that we're done waiting to receive a message. */ + void waiting_for_receive_done( + std::string const & peer_instance_id, std::string const & port_name, + Optional slot); + private: ymmsl::Reference instance_id_; mcp::TcpTransportClient transport_client_; diff --git a/libmuscle/cpp/src/libmuscle/mpp_client.cpp b/libmuscle/cpp/src/libmuscle/mpp_client.cpp index feba677f..973fcf7c 100644 --- a/libmuscle/cpp/src/libmuscle/mpp_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mpp_client.cpp @@ -26,7 +26,7 @@ MPPClient::MPPClient(std::vector const & locations) { } std::tuple, ProfileData> MPPClient::receive( - Reference const & receiver) + Reference const & receiver, mcp::TimeoutHandler *timeout_handler) { auto request = Data::list( static_cast(RequestType::get_next_message), @@ -37,7 +37,7 @@ std::tuple, ProfileData> MPPClient::receive( // can then overwrite after encoding with the length? msgpack::pack(sbuf, request); - return transport_client_->call(sbuf.data(), sbuf.size()); + return transport_client_->call(sbuf.data(), sbuf.size(), timeout_handler); } void MPPClient::close() { diff --git a/libmuscle/cpp/src/libmuscle/mpp_client.hpp b/libmuscle/cpp/src/libmuscle/mpp_client.hpp index 8cc07a5c..3b5c58cb 100644 --- a/libmuscle/cpp/src/libmuscle/mpp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mpp_client.hpp @@ -59,7 +59,8 @@ class MPPClient { * @return The received message. */ std::tuple, mcp::ProfileData> receive( - ::ymmsl::Reference const & receiver); + ::ymmsl::Reference const & receiver, + mcp::TimeoutHandler *timeout_handler=nullptr); /** Closes this client. * diff --git a/libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp new file mode 100644 index 00000000..87ba2ea5 --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp @@ -0,0 +1,29 @@ +#include "receive_timeout_handler.hpp" + +namespace libmuscle { namespace _MUSCLE_IMPL_NS { + +ReceiveTimeoutHandler::ReceiveTimeoutHandler( + MMPClient& manager, std::string const& peer_instance, + std::string const& port_name, Optional slot, double timeout) + : manager_(manager) + , peer_instance_(peer_instance) + , port_name_(port_name) + , slot_(slot) + , timeout_(timeout) {} + +double ReceiveTimeoutHandler::get_timeout() +{ + return timeout_; +} + +void ReceiveTimeoutHandler::on_timeout() +{ + manager_.waiting_for_receive(peer_instance_, port_name_, slot_); +} + +void ReceiveTimeoutHandler::on_receive() +{ + manager_.waiting_for_receive_done(peer_instance_, port_name_, slot_); +} + +} } diff --git a/libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp new file mode 100644 index 00000000..294d6127 --- /dev/null +++ b/libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +namespace libmuscle { namespace _MUSCLE_IMPL_NS { + +/** Timeout handler when receiving messages from peers. + * + * This handler sends a message to the Muscle Manager when the receive times out (and + * another message when the message does arrive). + * + * This is used by the manager to detect if the simulation is in a deadlock, where a + * cycle of instances is waiting on each other. + */ +class ReceiveTimeoutHandler : public mcp::TimeoutHandler { + public: + ReceiveTimeoutHandler( + MMPClient & manager, + std::string const & peer_instance, + std::string const & port_name, + Optional slot, + double timeout); + + virtual ~ReceiveTimeoutHandler() = default; + + double get_timeout() override; + void on_timeout() override; + void on_receive() override; + + private: + MMPClient & manager_; + std::string const & peer_instance_; + std::string const & port_name_; + Optional slot_; + double timeout_; + +}; + +} } diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp index acfc24c9..55eb654e 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_communicator.hpp @@ -102,6 +102,8 @@ class MockCommunicator : public MockClass { NAME_MOCK_MEM_FUN(MockCommunicator, send_message); NAME_MOCK_MEM_FUN(MockCommunicator, receive_message); NAME_MOCK_MEM_FUN(MockCommunicator, shutdown); + NAME_MOCK_MEM_FUN(MockCommunicator, set_receive_timeout); + NAME_MOCK_MEM_FUN(MockCommunicator, get_receive_timeout); } MockCommunicator() { @@ -112,15 +114,16 @@ class MockCommunicator : public MockClass { ymmsl::Reference const & kernel, std::vector const & index, PortManager & port_manager, - Logger & logger, Profiler & profiler) + Logger & logger, Profiler & profiler, + MMPClient & manager) { init_from_return_value(); - constructor(kernel, index, port_manager, logger, profiler); + constructor(kernel, index, port_manager, logger, profiler, manager); } MockFun< Void, Val, Val const &>, - Obj, Obj, Obj> + Obj, Obj, Obj, Obj> constructor; MockFun>> get_locations; @@ -136,6 +139,10 @@ class MockCommunicator : public MockClass { ::mock_communicator::CommunicatorReceiveMessageMock receive_message; MockFun shutdown; + + MockFun> set_receive_timeout; + + MockFun> get_receive_timeout; }; using Communicator = MockCommunicator; diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp index c520b696..26418040 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp @@ -32,6 +32,8 @@ class MockMMPClient : public MockClass { NAME_MOCK_MEM_FUN(MockMMPClient, register_instance); NAME_MOCK_MEM_FUN(MockMMPClient, request_peers); NAME_MOCK_MEM_FUN(MockMMPClient, deregister_instance); + NAME_MOCK_MEM_FUN(MockMMPClient, waiting_for_receive); + NAME_MOCK_MEM_FUN(MockMMPClient, waiting_for_receive_done); // Create some empty return objects for return values with a complex // structure, to make it easier to set them in the tests or fixtures. @@ -95,6 +97,14 @@ class MockMMPClient : public MockClass { >>> request_peers; MockFun deregister_instance; + + MockFun, Val, Val> + > waiting_for_receive; + + MockFun, Val, Val> + > waiting_for_receive_done; }; using MMPClient = MockMMPClient; diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp index 12a3daa1..6b6ec7f3 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mpp_client.hpp @@ -31,10 +31,19 @@ class MockMPPClient : public MockClass { MockFun const &>> constructor; - MockFun< + // Use option 1 of the "Functions with default arguments" section + // (from mock_support.hpp:192) + using BaseMockFun = MockFun< Val, mcp::ProfileData>>, - Val<::ymmsl::Reference const &> - > receive; + Val<::ymmsl::Reference const &>, Obj>; + struct MockOverloadedFun : BaseMockFun { + std::tuple, mcp::ProfileData> operator()( + ::ymmsl::Reference const & receiver, + mcp::TimeoutHandler *timeout_handler=nullptr) { + return BaseMockFun::operator()(receiver, timeout_handler); + } + } receive; + MockFun close; }; diff --git a/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp b/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp index ccd0db88..012916a7 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_communicator.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include // Test code dependencies @@ -62,6 +63,7 @@ using libmuscle::_MUSCLE_IMPL_NS::PeerInfo; using libmuscle::_MUSCLE_IMPL_NS::PeerLocations; using libmuscle::_MUSCLE_IMPL_NS::PortsDescription; using libmuscle::_MUSCLE_IMPL_NS::mcp::ProfileData; +using libmuscle::_MUSCLE_IMPL_NS::mcp::TimeoutHandler; using libmuscle::_MUSCLE_IMPL_NS::ProfileTimestamp; using ymmsl::Conduit; @@ -79,11 +81,12 @@ struct libmuscle_communicator MockLogger logger_; MockProfiler profiler_; MockPortManager port_manager_; + MockMMPClient manager_; Communicator communicator_; libmuscle_communicator() - : communicator_("component", {}, connected_port_manager_, logger_, profiler_) + : communicator_("component", {}, connected_port_manager_, logger_, profiler_, manager_) { port_manager_.settings_in_connected.return_value = false; } @@ -120,6 +123,9 @@ struct libmuscle_communicator2 : libmuscle_communicator { PeerInfo peer_info("component", {}, conduits, peer_dims, peer_locations); connected_communicator_.set_peer_info(peer_info); + // disable receive timeouts for these tests, so we can check call + // signatures: mpp_client->receive.called_with(..., nullptr) + connected_communicator_.set_receive_timeout(-1.0); } }; @@ -169,7 +175,7 @@ TEST_F(libmuscle_communicator2, receive_message) { double saved_until = std::get<1>(recv_msg_saved_until); auto & mpp_client = connected_communicator_.clients_.at("peer"); - ASSERT_TRUE(mpp_client->receive.called_with("component.in")); + ASSERT_TRUE(mpp_client->receive.called_with("component.in", nullptr)); ASSERT_EQ(recv_msg.timestamp(), 2.0); ASSERT_EQ(recv_msg.next_timestamp(), 3.0); @@ -192,7 +198,7 @@ TEST_F(libmuscle_communicator2, receive_message_vector) { double saved_until = std::get<1>(recv_msg_saved_until); auto mpp_client = connected_communicator_.clients_.at("peer2[5]").get(); - ASSERT_TRUE(mpp_client->receive.called_with("component.in_v[5]")); + ASSERT_TRUE(mpp_client->receive.called_with("component.in_v[5]", nullptr)); ASSERT_EQ(recv_msg.timestamp(), 4.0); ASSERT_EQ(recv_msg.next_timestamp(), 6.0); @@ -292,7 +298,7 @@ TEST_F(libmuscle_communicator2, port_discard_success_on_resume) { int count = 0; - MockMPPClient::return_value.receive.side_effect = [&](Reference const &) { + MockMPPClient::return_value.receive.side_effect = [&](Reference const &, TimeoutHandler *) { return std::make_tuple( side_effect.at(count++).encoded(), ProfileData()); }; @@ -355,7 +361,7 @@ TEST_F(libmuscle_communicator2, test_shutdown) { } } - MockMPPClient::return_value.receive.side_effect = [&](Reference const & receiver) { + MockMPPClient::return_value.receive.side_effect = [&](Reference const & receiver, TimeoutHandler*) { return std::make_tuple(messages.at(receiver)->encoded(), ProfileData()); }; diff --git a/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp b/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp index 7ff08ef0..6f8e7aab 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_instance.cpp @@ -179,6 +179,7 @@ struct libmuscle_instance_base : ::testing::Test, ConnectedPortManagerFixture { auto & mock_comm = MockCommunicator::return_value; mock_comm.get_locations.return_value = std::vector( {"tcp:test1,test2", "tcp:test3"}); + mock_comm.get_receive_timeout.return_value = 10.0; auto & mock_port_manager = MockPortManager::return_value; mock_port_manager.settings_in_connected.return_value = false; diff --git a/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp b/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp index 7ad4218a..3c17736e 100644 --- a/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/test_snapshot_manager.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include