Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/systemwide mem limit #11

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2021, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., All rights reserved. // NOLINT
// Copyright (c) 2018-2022, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., UPower Robotics USA, All rights reserved. // NOLINT
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -109,6 +109,7 @@ struct SnapshotterTopicOptions
rclcpp::Duration duration_limit_;
// Maximum memory usage of the buffer before older messages are removed
int32_t memory_limit_;
u_int64_t system_wide_memory_limit_;

SnapshotterTopicOptions(
rclcpp::Duration duration_limit = INHERIT_DURATION_LIMIT,
Expand All @@ -124,6 +125,8 @@ struct SnapshotterOptions
rclcpp::Duration default_duration_limit_;
// Memory limit to use for a topic's buffer if one is not specified
int32_t default_memory_limit_;
// System-wide memory limit
u_int64_t system_wide_memory_limit_;
// Flag if all topics should be recorded
bool all_topics_;

Expand All @@ -133,7 +136,8 @@ struct SnapshotterOptions

SnapshotterOptions(
rclcpp::Duration default_duration_limit = rclcpp::Duration(30s),
int32_t default_memory_limit = -1);
int32_t default_memory_limit = SnapshotterTopicOptions::NO_MEMORY_LIMIT,
u_int64_t system_wide_memory_limit = SnapshotterTopicOptions::NO_MEMORY_LIMIT);

// Add a new topic to the configuration, returns false if the topic was already present
bool addTopic(
Expand All @@ -155,13 +159,38 @@ struct SnapshotMessage
rclcpp::Time time;
};

class MessageQueue;

class MessageQueueCollectionManager
{
public:
MessageQueueCollectionManager(const SnapshotterTopicOptions & options, const rclcpp::Logger & logger);
void report_queue_creation(MessageQueue& queue);
void report_queue_size_change(u_int64_t old_size, u_int64_t new_size);
void report_queue_size_change(int64_t delta_size);
void report_queue_size_change();
void report_queue_destruction(MessageQueue& queue);
u_int64_t get_total_queue_collection_size();
static MessageQueueCollectionManager & Instance(const MessageQueue & msg_queue);
void free_oldest_messages(size_t free_bytes_required);

protected:
std::vector<MessageQueue *> p_queue_;
u_int64_t size_, size_limit_;
static std::shared_ptr<MessageQueueCollectionManager> instance_;
const SnapshotterTopicOptions & options_;
const rclcpp::Logger & logger_;
std::mutex lock_;
};

/* Stores a queue of buffered messages for a single topic ensuring
* that the duration and memory limits are respected by truncating
* as needed on push() operations.
*/
class MessageQueue
{
friend Snapshotter;
friend MessageQueueCollectionManager;

private:
// Logger for outputting ROS logging messages
Expand All @@ -179,6 +208,7 @@ class MessageQueue

public:
explicit MessageQueue(const SnapshotterTopicOptions & options, const rclcpp::Logger & logger);
~MessageQueue();
// Add a new message to the internal queue if possible, truncating the front
// of the queue as needed to enforce limits
void push(const SnapshotMessage & msg);
Expand All @@ -198,6 +228,9 @@ class MessageQueue
// Return the total message size including the meta-information
int64_t getMessageSize(SnapshotMessage const & msg) const;

protected:
rclcpp::Time get_oldest_message_time();

private:
// Internal push whitch does not obtain lock
void _push(SnapshotMessage const & msg);
Expand Down
5 changes: 4 additions & 1 deletion rosbag2_snapshot/src/snapshot.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2021, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., All rights reserved. // NOLINT
// Copyright (c) 2018-2022, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., UPower Robotics USA, All rights reserved. // NOLINT
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -150,6 +150,9 @@ void appendParamOptions(ros::NodeHandle & nh, SnapshotterOptions & opts)
if (nh.getParam("default_memory_limit", tmp)) {
opts.default_memory_limit_ = static_cast<int>(MB_TO_BYTES * tmp);
}
if (nh.getParam("system_wide_memory_limit", tmp)) {
opts.system_wide_memory_limit_ = static_cast<int>(MB_TO_BYTES * tmp);
}
if (nh.getParam("default_duration_limit", tmp)) {
opts.default_duration_limit_ = ros::Duration(tmp);
}
Expand Down
200 changes: 188 additions & 12 deletions rosbag2_snapshot/src/snapshotter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2021, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., All rights reserved. // NOLINT
// Copyright (c) 2018-2022, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., UPower Robotics USA, All rights reserved. // NOLINT
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -30,7 +30,13 @@
#include <rclcpp/rclcpp.hpp>
#include <rosbag2_snapshot/snapshotter.hpp>

#if __has_include(<experimental/filesystem>)
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
#else
#include <filesystem>
namespace fs = std::filesystem;
#endif

#include <cassert>
#include <chrono>
Expand Down Expand Up @@ -73,9 +79,11 @@ SnapshotterTopicOptions::SnapshotterTopicOptions(

SnapshotterOptions::SnapshotterOptions(
rclcpp::Duration default_duration_limit,
int32_t default_memory_limit)
int32_t default_memory_limit,
u_int64_t system_wide_memory_limit)
: default_duration_limit_(default_duration_limit),
default_memory_limit_(default_memory_limit),
system_wide_memory_limit_(system_wide_memory_limit),
topics_()
{
}
Expand All @@ -102,9 +110,159 @@ SnapshotMessage::SnapshotMessage(
{
}

std::shared_ptr<MessageQueueCollectionManager>
MessageQueueCollectionManager::instance_ = nullptr;

MessageQueueCollectionManager::MessageQueueCollectionManager(
const SnapshotterTopicOptions & options, const rclcpp::Logger & logger
)
: options_(options), logger_(logger), p_queue_(), lock_()
{
uint64_t mb_limit = options.system_wide_memory_limit_;

this->size_ = 0;

if(mb_limit > SnapshotterTopicOptions::NO_MEMORY_LIMIT)
this->size_limit_ = mb_limit * MB_TO_B;
else
this->size_limit_ = SnapshotterTopicOptions::NO_MEMORY_LIMIT;
}

void MessageQueueCollectionManager::report_queue_creation(MessageQueue& queue){
std::lock_guard<std::mutex> l(this->lock_);

this->p_queue_.push_back(&queue);
this->size_ += queue.size_;
}

void MessageQueueCollectionManager::report_queue_size_change(){
std::lock_guard<std::mutex> l(this->lock_);

int64_t new_total_size = 0;
auto it = this->p_queue_.begin();

while(it != this->p_queue_.end())
{
auto & p_current_queue = * it;
new_total_size += p_current_queue->size_;
it++;
}

this->size_ = new_total_size;
}

void MessageQueueCollectionManager::report_queue_size_change(int64_t delta_size){
std::lock_guard<std::mutex> l(this->lock_);

this->size_ += delta_size;
}

void MessageQueueCollectionManager::report_queue_size_change(u_int64_t old_size, u_int64_t new_size){
this->report_queue_size_change(new_size - old_size);
}

void MessageQueueCollectionManager::report_queue_destruction(MessageQueue& queue)
{
std::lock_guard<std::mutex> l(this->lock_);

auto it = this->p_queue_.begin();

while(it != this->p_queue_.end())
{
auto p_current_queue = *it;
if(&queue == p_current_queue)
{
this->size_ -= queue.size_;
this->p_queue_.erase(it);
return;
}

it++;
}
}

u_int64_t MessageQueueCollectionManager::get_total_queue_collection_size(){
return this->size_;
}

MessageQueueCollectionManager & MessageQueueCollectionManager::Instance(const MessageQueue & msg_queue){
if(MessageQueueCollectionManager::instance_ == nullptr){
auto logger = msg_queue.logger_;
auto options = msg_queue.options_;
MessageQueueCollectionManager::instance_ = std::make_shared<MessageQueueCollectionManager>(options, logger);
}

auto the_instance = MessageQueueCollectionManager::instance_.get();

return *the_instance;
}

void MessageQueueCollectionManager::free_oldest_messages(size_t free_bytes_required)
{
rclcpp::Time oldest_message_t;
MessageQueue * p_oldest_message_q;

if(this->size_limit_ == SnapshotterTopicOptions::NO_MEMORY_LIMIT){
return;
}

std::lock_guard<std::mutex> l(this->lock_);

while(this->size_limit_ - this->size_ < free_bytes_required){
p_oldest_message_q = NULL;

auto it = this->p_queue_.begin();

while(it != this->p_queue_.end())
{
auto p_current_queue = * it;
rclcpp::Time q_msg_t;

try{
q_msg_t = p_current_queue->get_oldest_message_time();
}
catch(std::exception e){
// the queue must be empty
continue;
}

if(p_oldest_message_q == NULL){
// first queue, nothing to compare with yet
p_oldest_message_q = p_current_queue;
oldest_message_t = q_msg_t;
}
else if(q_msg_t < oldest_message_t){
// this queue has a message older than the
// previous oldest message
p_oldest_message_q = p_current_queue;
oldest_message_t = q_msg_t;
}

it++;
}

if(p_oldest_message_q == NULL){
// There were no queues with any messages in storage
break;
}

p_oldest_message_q->pop();
}
}

MessageQueue::MessageQueue(const SnapshotterTopicOptions & options, const rclcpp::Logger & logger)
: options_(options), logger_(logger), size_(0)
{
MessageQueueCollectionManager::Instance(*this).report_queue_creation(*this);
}

MessageQueue::~MessageQueue(){
MessageQueueCollectionManager::Instance(*this).report_queue_destruction(*this);
}

rclcpp::Time MessageQueue::get_oldest_message_time()
{
return this->queue_.at(0).time;
}

void MessageQueue::setSubscriber(shared_ptr<rclcpp::GenericSubscription> sub)
Expand All @@ -121,6 +279,7 @@ void MessageQueue::clear()
void MessageQueue::_clear()
{
queue_.clear();
MessageQueueCollectionManager::Instance(*this).report_queue_size_change(size_, 0);
size_ = 0;
}

Expand All @@ -141,16 +300,23 @@ bool MessageQueue::preparePush(int32_t size, rclcpp::Time const & time)
_clear();
}

// The only case where message cannot be addded is if size is greater than limit
if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT &&
size > options_.memory_limit_)
{
return false;
if(options_.system_wide_memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT){
if(size > options_.system_wide_memory_limit_){
return false;
}

MessageQueueCollectionManager::Instance(*this).free_oldest_messages(size);
}

// If memory limit is enforced, remove elements from front of queue until limit
// would be met once message is added
// The only case where message cannot be addded is if size is greater than limit
if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT) {
if (size > options_.memory_limit_)
{
return false;
}

// If memory limit is enforced, remove elements from front of queue until limit
// would be met once message is added
while (queue_.size() != 0 && size_ + size > options_.memory_limit_) {
_pop();
}
Expand Down Expand Up @@ -205,6 +371,7 @@ void MessageQueue::_push(SnapshotMessage const & _out)
}
queue_.push_back(_out);
// Add size of new message to running count to maintain correctness
MessageQueueCollectionManager::Instance(*this).report_queue_size_change(getMessageSize(_out));
size_ += getMessageSize(_out);
}

Expand All @@ -213,6 +380,7 @@ SnapshotMessage MessageQueue::_pop()
SnapshotMessage tmp = queue_.front();
queue_.pop_front();
// Remove size of popped message to maintain correctness of size_
MessageQueueCollectionManager::Instance(*this).report_queue_size_change(0 - getMessageSize(tmp));
size_ -= getMessageSize(tmp);
return tmp;
}
Expand Down Expand Up @@ -304,6 +472,14 @@ void Snapshotter::parseOptionsFromParams()
throw ex;
}

try {
options_.system_wide_memory_limit_ =
declare_parameter<double>("system_wide_memory_limit", -1.0);
} catch (const rclcpp::ParameterTypeException & ex) {
RCLCPP_ERROR(get_logger(), "system_wide_memory_limit is of incorrect type.");
throw ex;
}

// Convert memory limit in MB to B
if (options_.default_memory_limit_ != -1.0) {
options_.default_memory_limit_ *= MB_TO_B;
Expand Down Expand Up @@ -725,7 +901,7 @@ SnapshotterClient::SnapshotterClient(const rclcpp::NodeOptions & options)
}

try {
opts.filename_ = declare_parameter<std::string>("filename");
opts.filename_ = declare_parameter<std::string>(std::string("filename"));
} catch (const rclcpp::ParameterTypeException & ex) {
if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE &&
std::string{ex.what()}.find("not set") == std::string::npos)
Expand All @@ -736,7 +912,7 @@ SnapshotterClient::SnapshotterClient(const rclcpp::NodeOptions & options)
}

try {
opts.prefix_ = declare_parameter<std::string>("prefix");
opts.prefix_ = declare_parameter<std::string>(std::string("prefix"));
} catch (const rclcpp::ParameterTypeException & ex) {
if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE &&
std::string{ex.what()}.find("not set") == std::string::npos)
Expand Down Expand Up @@ -791,7 +967,7 @@ void SnapshotterClient::setSnapshotterClientOptions(const SnapshotterClientOptio
if (req->filename.empty()) {
req->filename = "./";
}
std::filesystem::path p(std::filesystem::absolute(req->filename));
fs::path p(fs::absolute(req->filename));
req->filename = p.string();

auto result_future = client->async_send_request(req);
Expand Down