diff --git a/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp b/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp index 1a587be..da6f6bb 100644 --- a/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp +++ b/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., All rights reserved. // NOLINT +// Copyright (c) 2018-2022, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., UPower Robotics USA, All rights reserved. // NOLINT // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: @@ -109,6 +109,7 @@ struct SnapshotterTopicOptions rclcpp::Duration duration_limit_; // Maximum memory usage of the buffer before older messages are removed int32_t memory_limit_; + u_int64_t system_wide_memory_limit_; SnapshotterTopicOptions( rclcpp::Duration duration_limit = INHERIT_DURATION_LIMIT, @@ -124,6 +125,8 @@ struct SnapshotterOptions rclcpp::Duration default_duration_limit_; // Memory limit to use for a topic's buffer if one is not specified int32_t default_memory_limit_; + // System-wide memory limit + u_int64_t system_wide_memory_limit_; // Flag if all topics should be recorded bool all_topics_; @@ -133,7 +136,8 @@ struct SnapshotterOptions SnapshotterOptions( rclcpp::Duration default_duration_limit = rclcpp::Duration(30s), - int32_t default_memory_limit = -1); + int32_t default_memory_limit = SnapshotterTopicOptions::NO_MEMORY_LIMIT, + u_int64_t system_wide_memory_limit = SnapshotterTopicOptions::NO_MEMORY_LIMIT); // Add a new topic to the configuration, returns false if the topic was already present bool addTopic( @@ -155,6 +159,30 @@ struct SnapshotMessage rclcpp::Time time; }; +class MessageQueue; + +class MessageQueueCollectionManager +{ +public: + MessageQueueCollectionManager(const SnapshotterTopicOptions & options, const rclcpp::Logger & logger); + void report_queue_creation(MessageQueue& queue); + void report_queue_size_change(u_int64_t old_size, u_int64_t new_size); + void report_queue_size_change(int64_t delta_size); + void report_queue_size_change(); + void report_queue_destruction(MessageQueue& queue); + u_int64_t get_total_queue_collection_size(); + static MessageQueueCollectionManager & Instance(const MessageQueue & msg_queue); + void free_oldest_messages(size_t free_bytes_required); + +protected: + std::vector p_queue_; + u_int64_t size_, size_limit_; + static std::shared_ptr instance_; + const SnapshotterTopicOptions & options_; + const rclcpp::Logger & logger_; + std::mutex lock_; +}; + /* Stores a queue of buffered messages for a single topic ensuring * that the duration and memory limits are respected by truncating * as needed on push() operations. @@ -162,6 +190,7 @@ struct SnapshotMessage class MessageQueue { friend Snapshotter; + friend MessageQueueCollectionManager; private: // Logger for outputting ROS logging messages @@ -179,6 +208,7 @@ class MessageQueue public: explicit MessageQueue(const SnapshotterTopicOptions & options, const rclcpp::Logger & logger); + ~MessageQueue(); // Add a new message to the internal queue if possible, truncating the front // of the queue as needed to enforce limits void push(const SnapshotMessage & msg); @@ -198,6 +228,9 @@ class MessageQueue // Return the total message size including the meta-information int64_t getMessageSize(SnapshotMessage const & msg) const; +protected: + rclcpp::Time get_oldest_message_time(); + private: // Internal push whitch does not obtain lock void _push(SnapshotMessage const & msg); diff --git a/rosbag2_snapshot/src/snapshot.cpp b/rosbag2_snapshot/src/snapshot.cpp index fe910f2..7275ff8 100644 --- a/rosbag2_snapshot/src/snapshot.cpp +++ b/rosbag2_snapshot/src/snapshot.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., All rights reserved. // NOLINT +// Copyright (c) 2018-2022, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., UPower Robotics USA, All rights reserved. // NOLINT // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: @@ -150,6 +150,9 @@ void appendParamOptions(ros::NodeHandle & nh, SnapshotterOptions & opts) if (nh.getParam("default_memory_limit", tmp)) { opts.default_memory_limit_ = static_cast(MB_TO_BYTES * tmp); } + if (nh.getParam("system_wide_memory_limit", tmp)) { + opts.system_wide_memory_limit_ = static_cast(MB_TO_BYTES * tmp); + } if (nh.getParam("default_duration_limit", tmp)) { opts.default_duration_limit_ = ros::Duration(tmp); } diff --git a/rosbag2_snapshot/src/snapshotter.cpp b/rosbag2_snapshot/src/snapshotter.cpp index 8d0681f..1b2037f 100644 --- a/rosbag2_snapshot/src/snapshotter.cpp +++ b/rosbag2_snapshot/src/snapshotter.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., All rights reserved. // NOLINT +// Copyright (c) 2018-2022, Open Source Robotics Foundation, Inc., GAIA Platform, Inc., UPower Robotics USA, All rights reserved. // NOLINT // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: @@ -30,7 +30,13 @@ #include #include +#if __has_include() +#include +namespace fs = std::experimental::filesystem; +#else #include +namespace fs = std::filesystem; +#endif #include #include @@ -73,9 +79,11 @@ SnapshotterTopicOptions::SnapshotterTopicOptions( SnapshotterOptions::SnapshotterOptions( rclcpp::Duration default_duration_limit, - int32_t default_memory_limit) + int32_t default_memory_limit, + u_int64_t system_wide_memory_limit) : default_duration_limit_(default_duration_limit), default_memory_limit_(default_memory_limit), + system_wide_memory_limit_(system_wide_memory_limit), topics_() { } @@ -102,9 +110,159 @@ SnapshotMessage::SnapshotMessage( { } +std::shared_ptr + MessageQueueCollectionManager::instance_ = nullptr; + +MessageQueueCollectionManager::MessageQueueCollectionManager( + const SnapshotterTopicOptions & options, const rclcpp::Logger & logger +) +: options_(options), logger_(logger), p_queue_(), lock_() +{ + uint64_t mb_limit = options.system_wide_memory_limit_; + + this->size_ = 0; + + if(mb_limit > SnapshotterTopicOptions::NO_MEMORY_LIMIT) + this->size_limit_ = mb_limit * MB_TO_B; + else + this->size_limit_ = SnapshotterTopicOptions::NO_MEMORY_LIMIT; +} + +void MessageQueueCollectionManager::report_queue_creation(MessageQueue& queue){ + std::lock_guard l(this->lock_); + + this->p_queue_.push_back(&queue); + this->size_ += queue.size_; +} + +void MessageQueueCollectionManager::report_queue_size_change(){ + std::lock_guard l(this->lock_); + + int64_t new_total_size = 0; + auto it = this->p_queue_.begin(); + + while(it != this->p_queue_.end()) + { + auto & p_current_queue = * it; + new_total_size += p_current_queue->size_; + it++; + } + + this->size_ = new_total_size; +} + +void MessageQueueCollectionManager::report_queue_size_change(int64_t delta_size){ + std::lock_guard l(this->lock_); + + this->size_ += delta_size; +} + +void MessageQueueCollectionManager::report_queue_size_change(u_int64_t old_size, u_int64_t new_size){ + this->report_queue_size_change(new_size - old_size); +} + +void MessageQueueCollectionManager::report_queue_destruction(MessageQueue& queue) +{ + std::lock_guard l(this->lock_); + + auto it = this->p_queue_.begin(); + + while(it != this->p_queue_.end()) + { + auto p_current_queue = *it; + if(&queue == p_current_queue) + { + this->size_ -= queue.size_; + this->p_queue_.erase(it); + return; + } + + it++; + } +} + +u_int64_t MessageQueueCollectionManager::get_total_queue_collection_size(){ + return this->size_; +} + +MessageQueueCollectionManager & MessageQueueCollectionManager::Instance(const MessageQueue & msg_queue){ + if(MessageQueueCollectionManager::instance_ == nullptr){ + auto logger = msg_queue.logger_; + auto options = msg_queue.options_; + MessageQueueCollectionManager::instance_ = std::make_shared(options, logger); + } + + auto the_instance = MessageQueueCollectionManager::instance_.get(); + + return *the_instance; +} + +void MessageQueueCollectionManager::free_oldest_messages(size_t free_bytes_required) +{ + rclcpp::Time oldest_message_t; + MessageQueue * p_oldest_message_q; + + if(this->size_limit_ == SnapshotterTopicOptions::NO_MEMORY_LIMIT){ + return; + } + + std::lock_guard l(this->lock_); + + while(this->size_limit_ - this->size_ < free_bytes_required){ + p_oldest_message_q = NULL; + + auto it = this->p_queue_.begin(); + + while(it != this->p_queue_.end()) + { + auto p_current_queue = * it; + rclcpp::Time q_msg_t; + + try{ + q_msg_t = p_current_queue->get_oldest_message_time(); + } + catch(std::exception e){ + // the queue must be empty + continue; + } + + if(p_oldest_message_q == NULL){ + // first queue, nothing to compare with yet + p_oldest_message_q = p_current_queue; + oldest_message_t = q_msg_t; + } + else if(q_msg_t < oldest_message_t){ + // this queue has a message older than the + // previous oldest message + p_oldest_message_q = p_current_queue; + oldest_message_t = q_msg_t; + } + + it++; + } + + if(p_oldest_message_q == NULL){ + // There were no queues with any messages in storage + break; + } + + p_oldest_message_q->pop(); + } +} + MessageQueue::MessageQueue(const SnapshotterTopicOptions & options, const rclcpp::Logger & logger) : options_(options), logger_(logger), size_(0) { + MessageQueueCollectionManager::Instance(*this).report_queue_creation(*this); +} + +MessageQueue::~MessageQueue(){ + MessageQueueCollectionManager::Instance(*this).report_queue_destruction(*this); +} + +rclcpp::Time MessageQueue::get_oldest_message_time() +{ + return this->queue_.at(0).time; } void MessageQueue::setSubscriber(shared_ptr sub) @@ -121,6 +279,7 @@ void MessageQueue::clear() void MessageQueue::_clear() { queue_.clear(); + MessageQueueCollectionManager::Instance(*this).report_queue_size_change(size_, 0); size_ = 0; } @@ -141,16 +300,23 @@ bool MessageQueue::preparePush(int32_t size, rclcpp::Time const & time) _clear(); } - // The only case where message cannot be addded is if size is greater than limit - if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT && - size > options_.memory_limit_) - { - return false; + if(options_.system_wide_memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT){ + if(size > options_.system_wide_memory_limit_){ + return false; + } + + MessageQueueCollectionManager::Instance(*this).free_oldest_messages(size); } - // If memory limit is enforced, remove elements from front of queue until limit - // would be met once message is added + // The only case where message cannot be addded is if size is greater than limit if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT) { + if (size > options_.memory_limit_) + { + return false; + } + + // If memory limit is enforced, remove elements from front of queue until limit + // would be met once message is added while (queue_.size() != 0 && size_ + size > options_.memory_limit_) { _pop(); } @@ -205,6 +371,7 @@ void MessageQueue::_push(SnapshotMessage const & _out) } queue_.push_back(_out); // Add size of new message to running count to maintain correctness + MessageQueueCollectionManager::Instance(*this).report_queue_size_change(getMessageSize(_out)); size_ += getMessageSize(_out); } @@ -213,6 +380,7 @@ SnapshotMessage MessageQueue::_pop() SnapshotMessage tmp = queue_.front(); queue_.pop_front(); // Remove size of popped message to maintain correctness of size_ + MessageQueueCollectionManager::Instance(*this).report_queue_size_change(0 - getMessageSize(tmp)); size_ -= getMessageSize(tmp); return tmp; } @@ -304,6 +472,14 @@ void Snapshotter::parseOptionsFromParams() throw ex; } + try { + options_.system_wide_memory_limit_ = + declare_parameter("system_wide_memory_limit", -1.0); + } catch (const rclcpp::ParameterTypeException & ex) { + RCLCPP_ERROR(get_logger(), "system_wide_memory_limit is of incorrect type."); + throw ex; + } + // Convert memory limit in MB to B if (options_.default_memory_limit_ != -1.0) { options_.default_memory_limit_ *= MB_TO_B; @@ -725,7 +901,7 @@ SnapshotterClient::SnapshotterClient(const rclcpp::NodeOptions & options) } try { - opts.filename_ = declare_parameter("filename"); + opts.filename_ = declare_parameter(std::string("filename")); } catch (const rclcpp::ParameterTypeException & ex) { if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE && std::string{ex.what()}.find("not set") == std::string::npos) @@ -736,7 +912,7 @@ SnapshotterClient::SnapshotterClient(const rclcpp::NodeOptions & options) } try { - opts.prefix_ = declare_parameter("prefix"); + opts.prefix_ = declare_parameter(std::string("prefix")); } catch (const rclcpp::ParameterTypeException & ex) { if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE && std::string{ex.what()}.find("not set") == std::string::npos) @@ -791,7 +967,7 @@ void SnapshotterClient::setSnapshotterClientOptions(const SnapshotterClientOptio if (req->filename.empty()) { req->filename = "./"; } - std::filesystem::path p(std::filesystem::absolute(req->filename)); + fs::path p(fs::absolute(req->filename)); req->filename = p.string(); auto result_future = client->async_send_request(req);