Skip to content

Commit

Permalink
Added Caliper calls to monitor I/O for all backends (CSV, HDF5 and RMQ)
Browse files Browse the repository at this point in the history
Signed-off-by: Loic Pottier <[email protected]>
  • Loading branch information
lpottier committed Feb 27, 2025
1 parent 96933e8 commit 85f4b96
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/AMSlib/wf/basedb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
#ifndef __AMS_BASE_DB__
#define __AMS_BASE_DB__


#include <H5Ipublic.h>

#include <cstdint>
#include <experimental/filesystem>
#include <fstream>
Expand Down Expand Up @@ -38,6 +35,7 @@ namespace fs = std::experimental::filesystem;
#endif

#ifdef __ENABLE_HDF5__
#include <H5Ipublic.h>
#include <hdf5.h>
#define HDF5_ERROR(Eid) \
if (Eid < 0) { \
Expand All @@ -47,6 +45,9 @@ namespace fs = std::experimental::filesystem;
}
#endif

#ifdef __AMS_ENABLE_CALIPER__
#include <caliper/cali_macros.h>
#endif

#ifdef __ENABLE_RMQ__
#include <amqpcpp.h>
Expand Down Expand Up @@ -229,6 +230,7 @@ class csvDB final : public FileDB
inputs.size(),
outputs.size())

CALIPER(CALI_MARK_BEGIN("STORE_CSV");)
const size_t num_in = inputs.size();
const size_t num_out = outputs.size();

Expand All @@ -251,6 +253,7 @@ class csvDB final : public FileDB
}
fd << outputs[num_out - 1][i] << "\n";
}
CALIPER(CALI_MARK_END("STORE_CSV");)
}


Expand Down Expand Up @@ -1660,6 +1663,7 @@ class RMQInterface
inputs.size(),
outputs.size())

CALIPER(CALI_MARK_BEGIN("STORE_RMQ");)
AMSMessage msg(_msg_tag, _rId, domain_name, num_elements, inputs, outputs);

if (!_publisher->connectionValid()) {
Expand All @@ -1676,6 +1680,7 @@ class RMQInterface
}
_publisher->publish(std::move(msg));
_msg_tag++;
CALIPER(CALI_MARK_END("STORE_RMQ");)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/AMSlib/wf/hdf5db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ void hdf5DB::_store(size_t num_elements,
std::vector<TypeValue*>& outputs,
bool* predicate)
{
CALIPER(CALI_MARK_BEGIN("STORE_HDF5");)
if (isDouble<TypeValue>::default_value())
HDType = H5T_NATIVE_DOUBLE;
else
Expand Down Expand Up @@ -180,6 +181,7 @@ void hdf5DB::_store(size_t num_elements,
}

totalElements += num_elements;
CALIPER(CALI_MARK_END("STORE_HDF5");)
}


Expand Down
4 changes: 4 additions & 0 deletions src/AMSlib/wf/rmqdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ unsigned RMQPublisherHandler::unacknowledged() const

void RMQPublisherHandler::publish(AMSMessage&& msg)
{
CALIPER(CALI_MARK_BEGIN("RMQ_PUBLISH");)
{
const std::lock_guard<std::mutex> lock(_mutex);
_messages.push_back(msg);
Expand Down Expand Up @@ -724,6 +725,7 @@ void RMQPublisherHandler::publish(AMSMessage&& msg)
msg.id())
}
_nb_msg++;
CALIPER(CALI_MARK_END("RMQ_PUBLISH");)
}

void RMQPublisherHandler::onReady(AMQP::TcpConnection* connection)
Expand Down Expand Up @@ -982,6 +984,7 @@ bool RMQInterface::connect(std::string rmq_name,

void RMQInterface::restartPublisher()
{
CALIPER(CALI_MARK_BEGIN("RMQ_RESTART_PUBLISHER");)
std::vector<AMSMessage> messages = _publisher->getMsgBuffer();

AMSMessage& msg_min =
Expand All @@ -1008,6 +1011,7 @@ void RMQInterface::restartPublisher()
_rId, *_address, _cacert, _queue_sender, std::move(messages));
_publisher_thread = std::thread([&]() { _publisher->start(); });
connected = true;
CALIPER(CALI_MARK_END("RMQ_RESTART_PUBLISHER");)
}

void RMQInterface::close()
Expand Down

0 comments on commit 85f4b96

Please sign in to comment.