Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store serialized metadata in MCAP file #1423

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage
#endif

private:
void read_metadata();
void open_impl(const std::string & uri, const std::string & preset_profile,
rosbag2_storage::storage_interfaces::IOFlag io_flag,
const std::string & storage_config_uri);
Expand Down Expand Up @@ -370,12 +371,43 @@ void MCAPStorage::open_impl(const std::string & uri, const std::string & preset_
metadata_.relative_file_paths = {get_relative_file_path()};
}

/** BaseInfoInterface **/
rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
void MCAPStorage::read_metadata()
{
ensure_summary_read();
const auto & mcap_metadatas = mcap_reader_->metadataIndexes();
auto range = mcap_metadatas.equal_range("rosbag2");
mcap::Status status{};
mcap::Record mcap_record{};
mcap::Metadata mcap_metadata{};
for (auto i = range.first; i != range.second; ++i) {
status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record);
if (!status.ok()) {
OnProblem(status);
continue;
}
status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
if (!status.ok()) {
OnProblem(status);
continue;
}
std::string serialized_metadata;
try {
serialized_metadata = mcap_metadata.metadata.at("serialized_metadata");
} catch (const std::out_of_range & /* err */) {
RCUTILS_LOG_WARN_NAMED(
LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'serialized_metadata'.");
}
if (!serialized_metadata.empty()) {
YAML::Node metadata_node = YAML::Load(serialized_metadata);
YAML::convert<rosbag2_storage::BagMetadata>::decode(metadata_node, metadata_);
}
try {
metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO");
} catch (const std::out_of_range & /* err */) {
// Ignor this error. In new versions ROS_DISTRO stored inside `serialized_metadata`.
}
}

metadata_.version = 2;
metadata_.storage_identifier = get_storage_identifier();
metadata_.bag_size = get_bagfile_size();
metadata_.relative_file_paths = {get_relative_file_path()};
Expand Down Expand Up @@ -420,35 +452,16 @@ rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
} else {
topic_info.message_count = 0;
}

metadata_.topics_with_message_count.push_back(topic_info);
}
}

const auto & mcap_metadatas = mcap_reader_->metadataIndexes();
auto range = mcap_metadatas.equal_range("rosbag2");
mcap::Status status{};
mcap::Record mcap_record{};
mcap::Metadata mcap_metadata{};
for (auto i = range.first; i != range.second; ++i) {
status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record);
if (!status.ok()) {
OnProblem(status);
continue;
}
status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
if (!status.ok()) {
OnProblem(status);
continue;
}
try {
metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO");
} catch (const std::out_of_range & /* err */) {
RCUTILS_LOG_ERROR_NAMED(
LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'ROS_DISTRO'.");
}
break;
/** BaseInfoInterface **/
rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
{
if (opened_as_ == rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY) {
read_metadata();
}

return metadata_;
}

Expand Down Expand Up @@ -831,7 +844,9 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad

mcap::Metadata metadata;
metadata.name = "rosbag2";
metadata.metadata = {{"ROS_DISTRO", bag_metadata.ros_distro}};
YAML::Node metadata_node = YAML::convert<rosbag2_storage::BagMetadata>::encode(bag_metadata);
std::string serialized_metadata = YAML::Dump(metadata_node);
metadata.metadata = {{"serialized_metadata", serialized_metadata}};
mcap::Status status = mcap_writer_->write(metadata);
if (!status.ok()) {
OnProblem(status);
Expand Down
125 changes: 121 additions & 4 deletions rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rcutils/logging_macros.h"
#include "rosbag2_storage/storage_factory.hpp"
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
#include "rosbag2_storage/storage_options.hpp"
Expand All @@ -31,6 +31,52 @@
using namespace ::testing; // NOLINT
using TemporaryDirectoryFixture = rosbag2_test_common::TemporaryDirectoryFixture;

class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFixture
{
public:
McapStorageTestFixture() = default;

std::shared_ptr<rcutils_uint8_array_t> make_serialized_message(const std::string & message)
{
rclcpp::Serialization<std_msgs::msg::String> serialization;

std_msgs::msg::String std_string_msg;
std_string_msg.data = message;
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&std_string_msg, serialized_msg.get());

auto ret = std::make_shared<rcutils_uint8_array_t>();
*ret = serialized_msg->release_rcl_serialized_message();
return ret;
}

std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> write_messages_to_mcap(
std::vector<std::tuple<std::string, int64_t, rosbag2_storage::TopicMetadata,
rosbag2_storage::MessageDefinition>> & messages,
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> rw_storage = nullptr)
{
if (nullptr == rw_storage) {
rosbag2_storage::StorageFactory factory;
rosbag2_storage::StorageOptions options;
auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag";
options.uri = uri.string();
options.storage_id = "mcap";
rw_storage = factory.open_read_write(options);
}

for (auto msg : messages) {
const rosbag2_storage::TopicMetadata & topic_metadata = std::get<2>(msg);
rw_storage->create_topic(topic_metadata, std::get<3>(msg));
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->topic_name = topic_metadata.name;
rw_storage->write(bag_message);
}
return rw_storage;
}
};

namespace rosbag2_storage
{
bool operator==(const TopicInformation & lhs, const TopicInformation & rhs)
Expand All @@ -39,6 +85,80 @@ bool operator==(const TopicInformation & lhs, const TopicInformation & rhs)
}
} // namespace rosbag2_storage

TEST_F(McapStorageTestFixture, can_store_and_read_metadata_correctly)
{
const std::string storage_id = "mcap";
auto uri = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
auto expected_bag = rcpputils::fs::path(temporary_dir_path_) / "rosbag.mcap";
const rosbag2_storage::MessageDefinition definition = {"std_msgs/msg/String", "ros2msg",
"string data", ""};

std::vector<std::string> string_messages = {"first message", "second message", "third message"};
std::vector<std::string> topics = {"topic1", "topic2"};

rosbag2_storage::TopicMetadata topic_metadata_1 = {topics[0], "std_msgs/msg/String", "cdr",
"qos_profile1", "type_hash1"};
rosbag2_storage::TopicMetadata topic_metadata_2 = {topics[1], "std_msgs/msg/String", "cdr",
"qos_profile2", "type_hash2"};

std::vector<std::tuple<std::string, int64_t, rosbag2_storage::TopicMetadata,
rosbag2_storage::MessageDefinition>>
messages = {
std::make_tuple(string_messages[0], static_cast<int64_t>(1e9), topic_metadata_1, definition),
std::make_tuple(string_messages[1], static_cast<int64_t>(2e9), topic_metadata_1, definition),
std::make_tuple(string_messages[2], static_cast<int64_t>(3e9), topic_metadata_2, definition),
};

rosbag2_storage::StorageFactory factory;
rosbag2_storage::StorageOptions options;
options.uri = uri;
options.storage_id = storage_id;

{
auto writer = factory.open_read_write(options);
writer->create_topic({"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"}, {});
writer->create_topic({"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"}, {});
(void)write_messages_to_mcap(messages, writer);
auto metadata = writer->get_metadata();
metadata.ros_distro = "rolling";
metadata.custom_data["key1"] = "value1";
writer->update_metadata(metadata);
}

options.uri = expected_bag.string();
options.storage_id = storage_id;
auto reader = factory.open_read_only(options);
const auto metadata = reader->get_metadata();

EXPECT_THAT(metadata.storage_identifier, Eq("mcap"));
EXPECT_THAT(metadata.relative_file_paths, ElementsAreArray({expected_bag.string()}));

EXPECT_THAT(
metadata.topics_with_message_count,
UnorderedElementsAreArray({
rosbag2_storage::TopicInformation{
rosbag2_storage::TopicMetadata{"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"},
1u},
rosbag2_storage::TopicInformation{
rosbag2_storage::TopicMetadata{"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"},
2u},
}));
EXPECT_THAT(metadata.message_count, Eq(3u));

const auto current_distro = "rolling";
EXPECT_EQ(metadata.ros_distro, current_distro);

EXPECT_THAT(
metadata.starting_time,
Eq(std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::seconds(1))));
EXPECT_THAT(metadata.duration, Eq(std::chrono::seconds(2)))
<< "metadata.duration=" << metadata.duration.count();

EXPECT_EQ(metadata.custom_data.size(), 1);
EXPECT_THAT(metadata.custom_data,
UnorderedElementsAreArray({std::pair<std::string, std::string>("key1", "value1")}));
}

TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
{
auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag";
Expand Down Expand Up @@ -118,9 +238,6 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
1u}}));
EXPECT_THAT(metadata.message_count, Eq(1u));

const auto current_distro = rcpputils::get_env_var("ROS_DISTRO");
EXPECT_EQ(metadata.ros_distro, current_distro);

EXPECT_TRUE(reader->has_next());

std_msgs::msg::String msg;
Expand Down