From f03425b8a66068d980eaf1b1b9c2e83e46c96cf0 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Wed, 26 Jun 2024 19:57:41 +0900 Subject: [PATCH] feat(aeva): add ros wrapper --- nebula_ros/CMakeLists.txt | 24 ++ .../nebula_ros/aeva/aeva_ros_wrapper.hpp | 76 ++++++ .../nebula_ros/aeva/hw_monitor_wrapper.hpp | 90 +++++++ .../common/nebula_packet_stream.hpp | 58 +++++ nebula_ros/src/aeva/aeva_ros_wrapper.cpp | 246 ++++++++++++++++++ nebula_ros/src/aeva/hw_monitor_wrapper.cpp | 176 +++++++++++++ 6 files changed, 670 insertions(+) create mode 100644 nebula_ros/include/nebula_ros/aeva/aeva_ros_wrapper.hpp create mode 100644 nebula_ros/include/nebula_ros/aeva/hw_monitor_wrapper.hpp create mode 100644 nebula_ros/include/nebula_ros/common/nebula_packet_stream.hpp create mode 100644 nebula_ros/src/aeva/aeva_ros_wrapper.cpp create mode 100644 nebula_ros/src/aeva/hw_monitor_wrapper.cpp diff --git a/nebula_ros/CMakeLists.txt b/nebula_ros/CMakeLists.txt index 709de2025..19c4b085c 100644 --- a/nebula_ros/CMakeLists.txt +++ b/nebula_ros/CMakeLists.txt @@ -215,11 +215,34 @@ rclcpp_components_register_node(continental_srr520_ros_wrapper EXECUTABLE continental_srr520_ros_wrapper_node ) +## Aeva +add_library(aeva_ros_wrapper SHARED + src/aeva/aeva_ros_wrapper.cpp + src/aeva/hw_monitor_wrapper.cpp + src/common/parameter_descriptors.cpp +) + +target_include_directories(aeva_ros_wrapper PUBLIC + ${nebula_decoders_INCLUDE_DIRS} + ${nebula_hw_interfaces_INCLUDE_DIRS} +) + +target_link_libraries(aeva_ros_wrapper PUBLIC + nebula_decoders::nebula_decoders_aeva + nebula_hw_interfaces::nebula_hw_interfaces_aeva +) + +rclcpp_components_register_node(aeva_ros_wrapper + PLUGIN "AevaRosWrapper" + EXECUTABLE aeva_ros_wrapper_node +) + install(TARGETS hesai_ros_wrapper EXPORT export_hesai_ros_wrapper) install(TARGETS velodyne_ros_wrapper EXPORT export_velodyne_ros_wrapper) install(TARGETS robosense_ros_wrapper EXPORT export_robosense_ros_wrapper) install(TARGETS continental_ars548_ros_wrapper EXPORT export_continental_ars548_ros_wrapper) install(TARGETS continental_srr520_ros_wrapper EXPORT export_continental_srr520_ros_wrapper) +install(TARGETS aeva_ros_wrapper EXPORT export_aeva_ros_wrapper) install(DIRECTORY include/ DESTINATION include/${PROJECT_NAME}) if(BUILD_TESTING) @@ -232,6 +255,7 @@ ament_export_targets(export_hesai_ros_wrapper) ament_export_targets(export_velodyne_ros_wrapper) ament_export_targets(export_robosense_ros_wrapper) ament_export_targets(export_continental_ars548_ros_wrapper) +ament_export_targets(export_aeva_ros_wrapper) install( DIRECTORY config launch diff --git a/nebula_ros/include/nebula_ros/aeva/aeva_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/aeva/aeva_ros_wrapper.hpp new file mode 100644 index 000000000..e8932970f --- /dev/null +++ b/nebula_ros/include/nebula_ros/aeva/aeva_ros_wrapper.hpp @@ -0,0 +1,76 @@ +// Copyright 2024 TIER IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "nebula_ros/aeva/hw_monitor_wrapper.hpp" +#include "nebula_ros/common/watchdog_timer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace nebula::ros +{ + +/// @brief Ros wrapper of hesai driver +class AevaRosWrapper final : public rclcpp::Node +{ +public: + explicit AevaRosWrapper(const rclcpp::NodeOptions & options); + +private: + Status declareAndGetSensorConfigParams(); + Status validateAndSetConfig(std::shared_ptr & new_config); + + void recordRawPacket(const std::vector & bytes); + + rclcpp::Publisher::SharedPtr packets_pub_{}; + std::mutex mtx_current_scan_msg_; + nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_{}; + + rclcpp::Publisher::SharedPtr cloud_pub_{}; + std::shared_ptr cloud_watchdog_; + + rclcpp::Subscription::SharedPtr packets_sub_{}; + + std::shared_ptr sensor_cfg_ptr_; + + std::optional hw_interface_; + std::optional hw_monitor_; + drivers::AevaAries2Decoder decoder_{}; +}; + +} // namespace nebula::ros diff --git a/nebula_ros/include/nebula_ros/aeva/hw_monitor_wrapper.hpp b/nebula_ros/include/nebula_ros/aeva/hw_monitor_wrapper.hpp new file mode 100644 index 000000000..1cd9bf9d5 --- /dev/null +++ b/nebula_ros/include/nebula_ros/aeva/hw_monitor_wrapper.hpp @@ -0,0 +1,90 @@ +// Copyright 2024 TIER IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace nebula::ros +{ + +using nlohmann::json; + +class AevaHwMonitorWrapper +{ +public: + AevaHwMonitorWrapper( + rclcpp::Node * const parent_node, std::shared_ptr config); + + void onTelemetryFragment(const json & diff); + + void onHealthCodes(std::vector health_codes); + +private: + struct NodeTelemetry + { + json values; + rclcpp::Time last_update; + }; + + struct TelemetryState + { + std::map entries; + }; + + struct HealthState + { + std::vector codes; + rclcpp::Time last_update; + }; + + void publishDiagnostics(); + + rclcpp::Logger logger_; + rclcpp::Node * const parent_node_; + std::shared_ptr config_; + + rclcpp::Publisher::SharedPtr diagnostics_pub_{}; + rclcpp::TimerBase::SharedPtr diagnostics_pub_timer_{}; + uint16_t diag_span_; + + std::mutex mtx_hardware_id_; + std::optional hardware_id_; + + std::mutex mtx_telemetry_; + TelemetryState telemetry_; + + std::mutex mtx_health_; + std::optional health_; +}; +} // namespace nebula::ros diff --git a/nebula_ros/include/nebula_ros/common/nebula_packet_stream.hpp b/nebula_ros/include/nebula_ros/common/nebula_packet_stream.hpp new file mode 100644 index 000000000..ff9b6a53b --- /dev/null +++ b/nebula_ros/include/nebula_ros/common/nebula_packet_stream.hpp @@ -0,0 +1,58 @@ +// Copyright 2024 TIER IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/byte_stream.hpp" + +#include +#include + +#include "nebula_msgs/msg/detail/nebula_packets__struct.hpp" + +#include +#include +#include +#include + +namespace nebula::ros +{ + +class NebulaPacketStream : public drivers::connections::ObservableByteStream +{ +public: + NebulaPacketStream() = default; + + void registerBytesCallback(callback_t callback) override + { + std::lock_guard lock(mtx_callback_); + callback_ = std::move(callback); + } + + void onNebulaPackets(std::unique_ptr packets) + { + std::lock_guard lock(mtx_callback_); + if (!callback_) return; + + for (auto & packet : packets->packets) { + callback_(packet.data); + } + } + +private: + std::mutex mtx_callback_{}; + callback_t callback_{}; +}; + +} // namespace nebula::ros diff --git a/nebula_ros/src/aeva/aeva_ros_wrapper.cpp b/nebula_ros/src/aeva/aeva_ros_wrapper.cpp new file mode 100644 index 000000000..c16001d60 --- /dev/null +++ b/nebula_ros/src/aeva/aeva_ros_wrapper.cpp @@ -0,0 +1,246 @@ +// Copyright 2024 TIER IV, Inc. + +#include "nebula_ros/aeva/aeva_ros_wrapper.hpp" + +#include "nebula_hw_interfaces/nebula_hw_interfaces_aeva/connections/pointcloud.hpp" +#include "nebula_ros/aeva/hw_monitor_wrapper.hpp" +#include "nebula_ros/common/nebula_packet_stream.hpp" +#include "nebula_ros/common/parameter_descriptors.hpp" +#include "nebula_ros/common/rclcpp_logger.hpp" +#include "nebula_ros/common/watchdog_timer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#pragma clang diagnostic ignored "-Wbitwise-instead-of-logical" + +namespace nebula::ros +{ +using drivers::AevaAries2Decoder; +using drivers::aeva::Aeries2Config; +using drivers::connections::PointcloudParser; +using nlohmann::json; +using namespace std::chrono_literals; // NOLINT +using AevaPointCloudUniquePtr = AevaAries2Decoder::AevaPointCloudUniquePtr; + +AevaRosWrapper::AevaRosWrapper(const rclcpp::NodeOptions & options) +: rclcpp::Node("aeva_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)) +{ + auto status = declareAndGetSensorConfigParams(); + + if (status != Status::OK) { + throw std::runtime_error( + (std::stringstream{} << "Sensor configuration invalid: " << status).str()); + } + + RCLCPP_INFO_STREAM(get_logger(), "SensorConfig: " << *sensor_cfg_ptr_); + + auto qos_profile = rmw_qos_profile_sensor_data; + auto pointcloud_qos = + rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 10), qos_profile); + + auto launch_hw = declare_parameter("launch_hw", param_read_only()); + auto setup_sensor = declare_parameter("setup_sensor", param_read_only()); + auto hw_interface_logger = drivers::loggers::RclcppLogger(get_logger()).child("HwInterface"); + + if (!launch_hw && setup_sensor) { + setup_sensor = false; + RCLCPP_WARN(get_logger(), "Ignoring setup_sensor:=true in offline mode."); + } + + if (launch_hw) { + // //////////////////////////////////////// + // If HW is connected, also publish packets + // //////////////////////////////////////// + hw_interface_.emplace(hw_interface_logger, setup_sensor, sensor_cfg_ptr_); + hw_monitor_.emplace(this, sensor_cfg_ptr_); + + packets_pub_ = + create_publisher("nebula_packets", pointcloud_qos); + + drivers::connections::ObservableByteStream::callback_t raw_packet_cb = [&](const auto & bytes) { + this->recordRawPacket(std::move(bytes)); + }; + + hw_interface_->registerRawCloudPacketCallback(std::move(raw_packet_cb)); + + hw_interface_->registerTelemetryCallback( + [&](const auto & msg) { hw_monitor_->onTelemetryFragment(msg); }); + hw_interface_->registerHealthCallback([&](auto codes) { hw_monitor_->onHealthCodes(codes); }); + } else { + // //////////////////////////////////////// + // If HW is disconnected, subscribe to + // packets topic + // //////////////////////////////////////// + auto packet_stream = std::make_shared(); + auto packet_buffer = + std::make_shared(packet_stream, 100, [&]() { + RCLCPP_ERROR_THROTTLE( + get_logger(), *get_clock(), 1000, + "Packet stream buffer overflowed, packet loss occurred."); + }); + + packets_sub_ = create_subscription( + "nebula_packets", rclcpp::SensorDataQoS(), + [=](std::unique_ptr packets) { + packet_stream->onNebulaPackets(std::move(packets)); + }); + + auto pointcloud_parser = std::make_shared(packet_buffer); + hw_interface_.emplace( + hw_interface_logger, setup_sensor, sensor_cfg_ptr_, pointcloud_parser, nullptr, nullptr, + nullptr); + + RCLCPP_INFO_STREAM( + get_logger(), + "Hardware connection disabled, listening for packets on " << packets_sub_->get_topic_name()); + } + + RCLCPP_DEBUG(get_logger(), "Starting stream"); + + PointcloudParser::callback_t pointcloud_message_cb = [this](const auto & message) { + decoder_.processPointcloudMessage(message); + }; + + hw_interface_->registerCloudPacketCallback(std::move(pointcloud_message_cb)); + + cloud_pub_ = create_publisher("/nebula_points", pointcloud_qos); + + cloud_watchdog_ = std::make_shared(*this, 100'000us, [&](bool ok) { + if (ok) return; + RCLCPP_WARN_THROTTLE(get_logger(), *get_clock(), 5000, "Missed pointcloud output deadline"); + }); + + { + AevaAries2Decoder::callback_t pointcloud_cb = + [&](AevaPointCloudUniquePtr cloud_ptr, auto timestamp) { + auto now = this->now(); + cloud_watchdog_->update(); + + if ( + cloud_pub_->get_subscription_count() > 0 || + cloud_pub_->get_intra_process_subscription_count() > 0) { + auto ros_pc_msg_ptr = std::make_unique(); + pcl::toROSMsg(*cloud_ptr, *ros_pc_msg_ptr); + ros_pc_msg_ptr->header.frame_id = sensor_cfg_ptr_->frame_id; + ros_pc_msg_ptr->header.stamp = rclcpp::Time(static_cast(timestamp)); + cloud_pub_->publish(std::move(ros_pc_msg_ptr)); + } + + std::lock_guard lock(mtx_current_scan_msg_); + if (current_scan_msg_ && packets_pub_) { + packets_pub_->publish(std::move(current_scan_msg_)); + current_scan_msg_ = {}; + } + }; + + decoder_.registerPointCloudCallback(std::move(pointcloud_cb)); + } +} + +Status AevaRosWrapper::declareAndGetSensorConfigParams() +{ + Aeries2Config config; + + std::string raw_sensor_model = declare_parameter("sensor_model", param_read_only()); + config.sensor_model = drivers::SensorModelFromString(raw_sensor_model); + config.sensor_ip = declare_parameter("sensor_ip", param_read_only()); + config.frame_id = declare_parameter("frame_id", param_read_only()); + config.dithering_enable_ego_speed = + declare_parameter("dithering_enable_ego_speed", param_read_only()); + config.dithering_pattern_option = + declare_parameter("dithering_pattern_option", param_read_only()); + config.ele_offset_rad = declare_parameter("ele_offset_rad", param_read_only()); + config.elevation_auto_adjustment = + declare_parameter("elevation_auto_adjustment", param_read_only()); + config.enable_frame_dithering = + declare_parameter("enable_frame_dithering", param_read_only()); + config.enable_frame_sync = declare_parameter("enable_frame_sync", param_read_only()); + config.flip_pattern_vertically = + declare_parameter("flip_pattern_vertically", param_read_only()); + config.frame_sync_offset_in_ms = + declare_parameter("frame_sync_offset_in_ms", param_read_only()); + config.frame_sync_type = declare_parameter("frame_sync_type", param_read_only()); + config.frame_synchronization_on_rising_edge = + declare_parameter("frame_synchronization_on_rising_edge", param_read_only()); + config.hfov_adjustment_deg = declare_parameter("hfov_adjustment_deg", param_read_only()); + config.hfov_rotation_deg = declare_parameter("hfov_rotation_deg", param_read_only()); + config.highlight_ROI = declare_parameter("highlight_ROI", param_read_only()); + config.horizontal_fov_degrees = + declare_parameter("horizontal_fov_degrees", param_read_only()); + config.roi_az_offset_rad = declare_parameter("roi_az_offset_rad", param_read_only()); + config.vertical_pattern = declare_parameter("vertical_pattern", param_read_only()); + + auto new_cfg_ptr = std::make_shared(config); + return validateAndSetConfig(new_cfg_ptr); +} + +Status AevaRosWrapper::validateAndSetConfig(std::shared_ptr & new_config) +{ + if (!new_config) { + return Status::SENSOR_CONFIG_ERROR; + } + + if (new_config->sensor_model == nebula::drivers::SensorModel::UNKNOWN) { + return Status::INVALID_SENSOR_MODEL; + } + + if (new_config->frame_id.empty()) { + return Status::SENSOR_CONFIG_ERROR; + } + + if (hw_interface_) { + hw_interface_->onConfigChange(new_config); + } + + sensor_cfg_ptr_ = new_config; + return Status::OK; +} + +void AevaRosWrapper::recordRawPacket(const std::vector & vector) +{ + std::lock_guard lock(mtx_current_scan_msg_); + + if ( + !packets_pub_ || (packets_pub_->get_subscription_count() == 0 && + packets_pub_->get_intra_process_subscription_count() == 0)) { + return; + } + + auto packet_stamp = now(); + + if (!current_scan_msg_) { + current_scan_msg_ = std::make_unique(); + auto & header = current_scan_msg_->header; + header.frame_id = sensor_cfg_ptr_->frame_id; + header.stamp = packet_stamp; + } + + auto & packet = current_scan_msg_->packets.emplace_back(); + packet.stamp = packet_stamp; + packet.data = vector; +} + +RCLCPP_COMPONENTS_REGISTER_NODE(AevaRosWrapper) +} // namespace nebula::ros diff --git a/nebula_ros/src/aeva/hw_monitor_wrapper.cpp b/nebula_ros/src/aeva/hw_monitor_wrapper.cpp new file mode 100644 index 000000000..90682ffe3 --- /dev/null +++ b/nebula_ros/src/aeva/hw_monitor_wrapper.cpp @@ -0,0 +1,176 @@ +// Copyright 2024 TIER IV, Inc. + +#include "nebula_ros/aeva/hw_monitor_wrapper.hpp" + +#include "nebula_ros/common/parameter_descriptors.hpp" + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace nebula::ros +{ + +using diagnostic_msgs::msg::DiagnosticStatus; +using nebula::util::get_if_exists; + +AevaHwMonitorWrapper::AevaHwMonitorWrapper( + rclcpp::Node * const parent_node, std::shared_ptr config) +: logger_(parent_node->get_logger().get_child("HwMonitor")), + parent_node_(parent_node), + config_(std::move(config)) +{ + diag_span_ = parent_node->declare_parameter("diag_span", 500, param_read_only()); + + diagnostics_pub_ = + parent_node->create_publisher("/diagnostics", 10); + + diagnostics_pub_timer_ = parent_node->create_wall_timer( + std::chrono::milliseconds(diag_span_), [&]() { publishDiagnostics(); }); +} + +void AevaHwMonitorWrapper::onTelemetryFragment(const json & diff) +{ + std::scoped_lock lock(mtx_telemetry_, mtx_hardware_id_); + + // //////////////////////////////////////// + // Update aggregated telemetry tree + // //////////////////////////////////////// + + auto now = parent_node_->now(); + bool any_changed = false; + for (const auto & node : diff.items()) { + bool new_node = telemetry_.entries.count(node.key()) == 0; + bool node_changed = !new_node && telemetry_.entries.at(node.key()).values != diff[node.key()]; + + if (new_node) { + telemetry_.entries[node.key()] = {json::object(), now}; + } + + if (new_node || node_changed) { + any_changed = true; + auto & entry = telemetry_.entries[node.key()]; + entry.values.update(diff[node.key()]); + entry.last_update = now; + } + } + + if (!any_changed) { + return; + } + + // //////////////////////////////////////// + // Initialize hardware ID if unset + // //////////////////////////////////////// + + if (!hardware_id_) { + auto serial = get_if_exists(diff, {"sysinfo_main", "serial_number"}); + auto model = get_if_exists(diff, {"sysinfo_main", "platform"}); + + if (!serial || !model) return; + + hardware_id_.emplace(*model + ":" + *serial); + } +} + +void AevaHwMonitorWrapper::onHealthCodes(std::vector health_codes) +{ + std::scoped_lock lock(mtx_health_, mtx_hardware_id_); + + if (!hardware_id_) { + return; + } + + auto now = parent_node_->now(); + health_ = {std::move(health_codes), now}; +} + +void AevaHwMonitorWrapper::publishDiagnostics() +{ + std::scoped_lock lock(mtx_health_, mtx_telemetry_, mtx_hardware_id_); + + if (!hardware_id_) { + return; + } + + auto now = parent_node_->now(); + + diagnostic_msgs::msg::DiagnosticArray diagnostic_array_msg; + diagnostic_array_msg.header.stamp.sec = static_cast(now.seconds()); + diagnostic_array_msg.header.stamp.nanosec = now.nanoseconds() % 1'000'000'000; + diagnostic_array_msg.header.frame_id = config_->frame_id; + + auto diag_entry = [](const std::string & key, const std::string & value) { + diagnostic_msgs::msg::KeyValue entry{}; + entry.key = key; + entry.value = value; + return entry; + }; + + // //////////////////////////////////////// + // Add telemetry to diagnostics + // //////////////////////////////////////// + + for (const auto & entry : telemetry_.entries) { + auto & status = diagnostic_array_msg.status.emplace_back(); + status.level = DiagnosticStatus::OK; + status.hardware_id = *hardware_id_; + status.name = "aeva.telemetry." + entry.first; + + for (const auto & item : entry.second.values.items()) { + auto value = item.value(); + std::string stringified = + value.is_string() ? value.template get() : value.dump(); + status.values.emplace_back(diag_entry(item.key(), stringified)); + } + } + + // //////////////////////////////////////// + // Add health codes to diagnostics + // //////////////////////////////////////// + + auto & status = diagnostic_array_msg.status.emplace_back(); + status.level = DiagnosticStatus::OK; + status.hardware_id = *hardware_id_; + status.name = "aeva.health"; + + if (health_) { + auto & codes = health_->codes; + + json warning_codes = json::array(); + json error_codes = json::array(); + + std::copy_if(codes.cbegin(), codes.cend(), std::back_inserter(warning_codes), [](auto code) { + return !drivers::aeva::is_error_code(code); + }); + + std::copy_if( + codes.cbegin(), codes.cend(), std::back_inserter(error_codes), drivers::aeva::is_error_code); + + if (!error_codes.empty()) { + status.level = DiagnosticStatus::ERROR; + } else if (!warning_codes.empty()) { + status.level = DiagnosticStatus::WARN; + } + + status.values.emplace_back(diag_entry("warning_codes", warning_codes.dump())); + status.values.emplace_back(diag_entry("error_codes", error_codes.dump())); + } + + diagnostics_pub_->publish(diagnostic_array_msg); +} + +} // namespace nebula::ros