From 360005e4b8b9e6b0b031f9c1c1d3d7a8df8c5504 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 24 Sep 2023 21:42:36 -0700 Subject: [PATCH] Store serialized metadata in MCAP file (#1423) * Store serialized metadata in MCAP file when calling update_metadata() Signed-off-by: Michael Orlov * Remove ROS_DISTRO key-value from mcap::Metadata - ROS_DISTRO now stored inside serialized metadata Signed-off-by: Michael Orlov * Fix for test failure on Windows - Use `UnorderedElementsAreArray` for checking `metadata.topics_with_message_count`. - The list of the metadata.topics_with_message_count forming from the mcap_reader_->channels() which is unordered_map. The order of the elements in the unordered_map is unpredictable and can vary on different platforms. Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov (cherry picked from commit a029ef9a435d5e118f9c4b500e8271d3998f684d) # Conflicts: # rosbag2_storage_mcap/src/mcap_storage.cpp # rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp --- rosbag2_storage_mcap/src/mcap_storage.cpp | 62 ++++++- .../test_mcap_storage.cpp | 153 +++++++++++++++++- 2 files changed, 210 insertions(+), 5 deletions(-) diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index 8492002a29..faecf00070 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -217,6 +217,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa #endif private: + void read_metadata(); void open_impl(const std::string & uri, const std::string & preset_profile, rosbag2_storage::storage_interfaces::IOFlag io_flag, const std::string & storage_config_uri); @@ -364,12 +365,43 @@ void MCAPStorage::open_impl(const std::string & uri, const std::string & preset_ metadata_.relative_file_paths = {get_relative_file_path()}; } -/** BaseInfoInterface **/ -rosbag2_storage::BagMetadata MCAPStorage::get_metadata() +void MCAPStorage::read_metadata() { ensure_summary_read(); + const auto & mcap_metadatas = mcap_reader_->metadataIndexes(); + auto range = mcap_metadatas.equal_range("rosbag2"); + mcap::Status status{}; + mcap::Record mcap_record{}; + mcap::Metadata mcap_metadata{}; + for (auto i = range.first; i != range.second; ++i) { + status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record); + if (!status.ok()) { + OnProblem(status); + continue; + } + status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata); + if (!status.ok()) { + OnProblem(status); + continue; + } + std::string serialized_metadata; + try { + serialized_metadata = mcap_metadata.metadata.at("serialized_metadata"); + } catch (const std::out_of_range & /* err */) { + RCUTILS_LOG_WARN_NAMED( + LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'serialized_metadata'."); + } + if (!serialized_metadata.empty()) { + YAML::Node metadata_node = YAML::Load(serialized_metadata); + YAML::convert::decode(metadata_node, metadata_); + } + try { + metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO"); + } catch (const std::out_of_range & /* err */) { + // Ignor this error. In new versions ROS_DISTRO stored inside `serialized_metadata`. + } + } - metadata_.version = 2; metadata_.storage_identifier = get_storage_identifier(); metadata_.bag_size = get_bagfile_size(); metadata_.relative_file_paths = {get_relative_file_path()}; @@ -410,10 +442,19 @@ rosbag2_storage::BagMetadata MCAPStorage::get_metadata() } else { topic_info.message_count = 0; } - metadata_.topics_with_message_count.push_back(topic_info); } +} +<<<<<<< HEAD +======= +/** BaseInfoInterface **/ +rosbag2_storage::BagMetadata MCAPStorage::get_metadata() +{ + if (opened_as_ == rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY) { + read_metadata(); + } +>>>>>>> a029ef9 (Store serialized metadata in MCAP file (#1423)) return metadata_; } @@ -779,7 +820,20 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad "MCAP storage plugin does not support message compression, " "consider using chunk compression by setting `compression: 'Zstd'` in storage config"); } +<<<<<<< HEAD ensure_rosdistro_metadata_added(); +======= + + mcap::Metadata metadata; + metadata.name = "rosbag2"; + YAML::Node metadata_node = YAML::convert::encode(bag_metadata); + std::string serialized_metadata = YAML::Dump(metadata_node); + metadata.metadata = {{"serialized_metadata", serialized_metadata}}; + mcap::Status status = mcap_writer_->write(metadata); + if (!status.ok()) { + OnProblem(status); + } +>>>>>>> a029ef9 (Store serialized metadata in MCAP file (#1423)) } #endif diff --git a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp index 02430f74f2..dfbe8f0e91 100644 --- a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp +++ b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp @@ -14,8 +14,8 @@ #include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" -#include "rcpputils/env.hpp" #include "rcpputils/filesystem_helper.hpp" +#include "rcutils/logging_macros.h" #include "rosbag2_storage/storage_factory.hpp" #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS #include "rosbag2_storage/storage_options.hpp" @@ -33,6 +33,137 @@ using namespace ::testing; // NOLINT using TemporaryDirectoryFixture = rosbag2_test_common::TemporaryDirectoryFixture; +<<<<<<< HEAD +======= +class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFixture +{ +public: + McapStorageTestFixture() = default; + + std::shared_ptr make_serialized_message(const std::string & message) + { + rclcpp::Serialization serialization; + + std_msgs::msg::String std_string_msg; + std_string_msg.data = message; + auto serialized_msg = std::make_shared(); + serialization.serialize_message(&std_string_msg, serialized_msg.get()); + + auto ret = std::make_shared(); + *ret = serialized_msg->release_rcl_serialized_message(); + return ret; + } + + std::shared_ptr write_messages_to_mcap( + std::vector> & messages, + std::shared_ptr rw_storage = nullptr) + { + if (nullptr == rw_storage) { + rosbag2_storage::StorageFactory factory; + rosbag2_storage::StorageOptions options; + auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag"; + options.uri = uri.string(); + options.storage_id = "mcap"; + rw_storage = factory.open_read_write(options); + } + + for (auto msg : messages) { + const rosbag2_storage::TopicMetadata & topic_metadata = std::get<2>(msg); + rw_storage->create_topic(topic_metadata, std::get<3>(msg)); + auto bag_message = std::make_shared(); + bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); + bag_message->time_stamp = std::get<1>(msg); + bag_message->topic_name = topic_metadata.name; + rw_storage->write(bag_message); + } + return rw_storage; + } +}; + +namespace rosbag2_storage +{ +bool operator==(const TopicInformation & lhs, const TopicInformation & rhs) +{ + return lhs.topic_metadata == rhs.topic_metadata && lhs.message_count == rhs.message_count; +} +} // namespace rosbag2_storage + +TEST_F(McapStorageTestFixture, can_store_and_read_metadata_correctly) +{ + const std::string storage_id = "mcap"; + auto uri = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string(); + auto expected_bag = rcpputils::fs::path(temporary_dir_path_) / "rosbag.mcap"; + const rosbag2_storage::MessageDefinition definition = {"std_msgs/msg/String", "ros2msg", + "string data", ""}; + + std::vector string_messages = {"first message", "second message", "third message"}; + std::vector topics = {"topic1", "topic2"}; + + rosbag2_storage::TopicMetadata topic_metadata_1 = {topics[0], "std_msgs/msg/String", "cdr", + "qos_profile1", "type_hash1"}; + rosbag2_storage::TopicMetadata topic_metadata_2 = {topics[1], "std_msgs/msg/String", "cdr", + "qos_profile2", "type_hash2"}; + + std::vector> + messages = { + std::make_tuple(string_messages[0], static_cast(1e9), topic_metadata_1, definition), + std::make_tuple(string_messages[1], static_cast(2e9), topic_metadata_1, definition), + std::make_tuple(string_messages[2], static_cast(3e9), topic_metadata_2, definition), + }; + + rosbag2_storage::StorageFactory factory; + rosbag2_storage::StorageOptions options; + options.uri = uri; + options.storage_id = storage_id; + + { + auto writer = factory.open_read_write(options); + writer->create_topic({"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"}, {}); + writer->create_topic({"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"}, {}); + (void)write_messages_to_mcap(messages, writer); + auto metadata = writer->get_metadata(); + metadata.ros_distro = "rolling"; + metadata.custom_data["key1"] = "value1"; + writer->update_metadata(metadata); + } + + options.uri = expected_bag.string(); + options.storage_id = storage_id; + auto reader = factory.open_read_only(options); + const auto metadata = reader->get_metadata(); + + EXPECT_THAT(metadata.storage_identifier, Eq("mcap")); + EXPECT_THAT(metadata.relative_file_paths, ElementsAreArray({expected_bag.string()})); + + EXPECT_THAT( + metadata.topics_with_message_count, + UnorderedElementsAreArray({ + rosbag2_storage::TopicInformation{ + rosbag2_storage::TopicMetadata{"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"}, + 1u}, + rosbag2_storage::TopicInformation{ + rosbag2_storage::TopicMetadata{"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"}, + 2u}, + })); + EXPECT_THAT(metadata.message_count, Eq(3u)); + + const auto current_distro = "rolling"; + EXPECT_EQ(metadata.ros_distro, current_distro); + + EXPECT_THAT( + metadata.starting_time, + Eq(std::chrono::time_point(std::chrono::seconds(1)))); + EXPECT_THAT(metadata.duration, Eq(std::chrono::seconds(2))) + << "metadata.duration=" << metadata.duration.count(); + + EXPECT_EQ(metadata.custom_data.size(), 1); + EXPECT_THAT(metadata.custom_data, + UnorderedElementsAreArray({std::pair("key1", "value1")})); +} + +>>>>>>> a029ef9 (Store serialized metadata in MCAP file (#1423)) TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) { auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag"; @@ -91,7 +222,27 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) #else auto reader = factory.open_read_only(expected_bag.string(), storage_id); #endif +<<<<<<< HEAD reader->open(options); +======= + auto topics_and_types = reader->get_all_topics_and_types(); + + EXPECT_THAT(topics_and_types, + ElementsAreArray({rosbag2_storage::TopicMetadata{ + topic_name, "std_msgs/msg/String", "cdr", "qos_profile1", "type_hash1"}})); + + const auto metadata = reader->get_metadata(); + + EXPECT_THAT(metadata.storage_identifier, Eq("mcap")); + EXPECT_THAT(metadata.relative_file_paths, ElementsAreArray({expected_bag.string()})); + EXPECT_THAT(metadata.topics_with_message_count, + ElementsAreArray({rosbag2_storage::TopicInformation{ + rosbag2_storage::TopicMetadata{topic_name, "std_msgs/msg/String", "cdr", + "qos_profile1", "type_hash1"}, + 1u}})); + EXPECT_THAT(metadata.message_count, Eq(1u)); + +>>>>>>> a029ef9 (Store serialized metadata in MCAP file (#1423)) EXPECT_TRUE(reader->has_next()); std_msgs::msg::String msg;