From 85f4b96d7a2324e41033e617cdbb79ff66922765 Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Thu, 27 Feb 2025 13:32:25 -0800 Subject: [PATCH] Added Caliper calls to monitor I/O for all backends (CSV, HDF5 and RMQ) Signed-off-by: Loic Pottier --- src/AMSlib/wf/basedb.hpp | 11 ++++++++--- src/AMSlib/wf/hdf5db.cpp | 2 ++ src/AMSlib/wf/rmqdb.cpp | 4 ++++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 343cebe2..951e75d5 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -8,9 +8,6 @@ #ifndef __AMS_BASE_DB__ #define __AMS_BASE_DB__ - -#include - #include #include #include @@ -38,6 +35,7 @@ namespace fs = std::experimental::filesystem; #endif #ifdef __ENABLE_HDF5__ +#include #include #define HDF5_ERROR(Eid) \ if (Eid < 0) { \ @@ -47,6 +45,9 @@ namespace fs = std::experimental::filesystem; } #endif +#ifdef __AMS_ENABLE_CALIPER__ +#include +#endif #ifdef __ENABLE_RMQ__ #include @@ -229,6 +230,7 @@ class csvDB final : public FileDB inputs.size(), outputs.size()) + CALIPER(CALI_MARK_BEGIN("STORE_CSV");) const size_t num_in = inputs.size(); const size_t num_out = outputs.size(); @@ -251,6 +253,7 @@ class csvDB final : public FileDB } fd << outputs[num_out - 1][i] << "\n"; } + CALIPER(CALI_MARK_END("STORE_CSV");) } @@ -1660,6 +1663,7 @@ class RMQInterface inputs.size(), outputs.size()) + CALIPER(CALI_MARK_BEGIN("STORE_RMQ");) AMSMessage msg(_msg_tag, _rId, domain_name, num_elements, inputs, outputs); if (!_publisher->connectionValid()) { @@ -1676,6 +1680,7 @@ class RMQInterface } _publisher->publish(std::move(msg)); _msg_tag++; + CALIPER(CALI_MARK_END("STORE_RMQ");) } /** diff --git a/src/AMSlib/wf/hdf5db.cpp b/src/AMSlib/wf/hdf5db.cpp index 0d3692f1..f59e78b7 100644 --- a/src/AMSlib/wf/hdf5db.cpp +++ b/src/AMSlib/wf/hdf5db.cpp @@ -139,6 +139,7 @@ void hdf5DB::_store(size_t num_elements, std::vector& outputs, bool* predicate) { + CALIPER(CALI_MARK_BEGIN("STORE_HDF5");) if (isDouble::default_value()) HDType = H5T_NATIVE_DOUBLE; else @@ -180,6 +181,7 @@ void hdf5DB::_store(size_t num_elements, } totalElements += num_elements; + CALIPER(CALI_MARK_END("STORE_HDF5");) } diff --git a/src/AMSlib/wf/rmqdb.cpp b/src/AMSlib/wf/rmqdb.cpp index 8141d9a6..2fc2e819 100644 --- a/src/AMSlib/wf/rmqdb.cpp +++ b/src/AMSlib/wf/rmqdb.cpp @@ -668,6 +668,7 @@ unsigned RMQPublisherHandler::unacknowledged() const void RMQPublisherHandler::publish(AMSMessage&& msg) { + CALIPER(CALI_MARK_BEGIN("RMQ_PUBLISH");) { const std::lock_guard lock(_mutex); _messages.push_back(msg); @@ -724,6 +725,7 @@ void RMQPublisherHandler::publish(AMSMessage&& msg) msg.id()) } _nb_msg++; + CALIPER(CALI_MARK_END("RMQ_PUBLISH");) } void RMQPublisherHandler::onReady(AMQP::TcpConnection* connection) @@ -982,6 +984,7 @@ bool RMQInterface::connect(std::string rmq_name, void RMQInterface::restartPublisher() { + CALIPER(CALI_MARK_BEGIN("RMQ_RESTART_PUBLISHER");) std::vector messages = _publisher->getMsgBuffer(); AMSMessage& msg_min = @@ -1008,6 +1011,7 @@ void RMQInterface::restartPublisher() _rId, *_address, _cacert, _queue_sender, std::move(messages)); _publisher_thread = std::thread([&]() { _publisher->start(); }); connected = true; + CALIPER(CALI_MARK_END("RMQ_RESTART_PUBLISHER");) } void RMQInterface::close()