Skip to content

Commit

Permalink
fuse -> ROS 2 fuse_models: Port fuse_models (#304)
Browse files Browse the repository at this point in the history
* Port messages

Signed-off-by: methylDragon <[email protected]>

* Port fuse_models

Signed-off-by: methylDragon <[email protected]>

* Fix alloc error and some bugs

Signed-off-by: methylDragon <[email protected]>

* Wait on result

Signed-off-by: methylDragon <[email protected]>

Signed-off-by: methylDragon <[email protected]>
  • Loading branch information
methylDragon authored Jan 14, 2023
1 parent 3860ca3 commit 5c46265
Show file tree
Hide file tree
Showing 74 changed files with 1,767 additions and 1,216 deletions.
2 changes: 1 addition & 1 deletion doc/Variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void print(std::ostream& stream = std::cout) const override
{
stream << type() << ":\n"
<< " uuid: " << uuid_ << "\n"
<< " stamp: " << stamp_ << "\n"
<< " stamp: " << stamp_.nanoseconds() << "\n"
<< " device_id: " << device_id_ << "\n"
<< " size: " << data_.size() << "\n"
<< " data:\n"
Expand Down
12 changes: 8 additions & 4 deletions fuse_core/include/fuse_core/async_motion_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ class AsyncMotionModel : public MotionModel
* @param[in] name A unique name to give this plugin instance
* @throws runtime_error if already initialized
*/
void initialize(const std::string & name) override;
void initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name) override;

/**
* @brief Get the unique name of this motion model
Expand Down Expand Up @@ -191,13 +193,15 @@ class AsyncMotionModel : public MotionModel
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_;

std::string name_; //!< The unique name for this motion model instance
rclcpp::Node::SharedPtr node_; //!< The node for this motion model

//! The node interfaces
node_interfaces::NodeInterfaces<node_interfaces::Base, node_interfaces::Waitables> interfaces_;
rclcpp::CallbackGroup::SharedPtr cb_group_; //!< Internal re-entrant callback group

//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
rclcpp::Executor::SharedPtr executor_;

size_t executor_thread_count_;
size_t executor_thread_count_{1};
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized

Expand Down
10 changes: 7 additions & 3 deletions fuse_core/include/fuse_core/async_sensor_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include <fuse_core/callback_wrapper.hpp>
#include <fuse_core/graph.hpp>
#include <fuse_core/node_interfaces/node_interfaces.hpp>
#include <fuse_core/sensor_model.hpp>
#include <fuse_core/transaction.hpp>

Expand Down Expand Up @@ -127,6 +128,7 @@ class AsyncSensorModel : public SensorModel
* @throws runtime_error if already initialized
*/
void initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name,
TransactionCallback transaction_callback) override;

Expand Down Expand Up @@ -193,15 +195,17 @@ class AsyncSensorModel : public SensorModel
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_;

std::string name_; //!< The unique name for this sensor model instance
rclcpp::Node::SharedPtr node_; //!< The node for this sensor model

//! The node interfaces
node_interfaces::NodeInterfaces<node_interfaces::Base, node_interfaces::Waitables> interfaces_;
rclcpp::CallbackGroup::SharedPtr cb_group_; //!< Internal re-entrant callback group

//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
rclcpp::Executor::SharedPtr executor_;

//! The function to be executed every time a Transaction is "published"
TransactionCallback transaction_callback_;
size_t executor_thread_count_;
size_t executor_thread_count_{1};
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized

Expand Down
7 changes: 4 additions & 3 deletions fuse_core/include/fuse_core/console.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class DelayedThrottleFilter
*/
bool isEnabled()
{
const auto now = time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());
const auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now());

if (last_hit_.time_since_epoch().count() < 0.0) {
last_hit_ = now;
Expand All @@ -102,8 +103,8 @@ class DelayedThrottleFilter
*/
void reset()
{
last_hit_ =
time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::from_time_t(-1));
last_hit_ = std::chrono::time_point_cast<std::chrono::milliseconds>(
std::chrono::system_clock::from_time_t(-1));
}

private:
Expand Down
4 changes: 3 additions & 1 deletion fuse_core/include/fuse_core/motion_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ class MotionModel
*
* @param[in] name A unique name to give this plugin instance
*/
virtual void initialize(const std::string & name) = 0;
virtual void initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name) = 0;

/**
* @brief Get the unique name of this motion model
Expand Down
1 change: 1 addition & 0 deletions fuse_core/include/fuse_core/sensor_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class SensorModel
* @param[in] transaction_callback The function to call every time a transaction is published
*/
virtual void initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name,
TransactionCallback transaction_callback) = 0;

Expand Down
43 changes: 21 additions & 22 deletions fuse_core/src/async_motion_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,44 +75,43 @@ bool AsyncMotionModel::apply(Transaction & transaction)
return result.get();
}

void AsyncMotionModel::initialize(const std::string & name)
void AsyncMotionModel::initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name)
{
if (initialized_) {
throw std::runtime_error("Calling initialize on an already initialized AsyncMotionModel!");
}

// Initialize internal state
name_ = name;
std::string node_namespace = "";

// TODO(CH3): Pass in the context or a node to get the context from
rclcpp::Context::SharedPtr ros_context = rclcpp::contexts::get_global_default_context();
auto node_options = rclcpp::NodeOptions();
node_options.context(ros_context); // set a context to generate the node in

// TODO(CH3): Potentially pass in the optimizer node instead of spinning a new one
node_ = rclcpp::Node::make_shared(name_, node_namespace, node_options);
interfaces_ = interfaces;

auto context = interfaces_.get_node_base_interface()->get_context();
auto executor_options = rclcpp::ExecutorOptions();
executor_options.context = ros_context;
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options,
executor_thread_count_
);
executor_options.context = context;

if (executor_thread_count_ == 1) {
executor_ = rclcpp::executors::SingleThreadedExecutor::make_shared(executor_options);
} else {
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options, executor_thread_count_);
}

callback_queue_ = std::make_shared<CallbackAdapter>(ros_context);
callback_queue_ = std::make_shared<CallbackAdapter>(context);

// This callback group MUST be re-entrant in order to support parallelization
cb_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
node_->get_node_waitables_interface()->add_waitable(
callback_queue_, cb_group_);
cb_group_ = interfaces_.get_node_base_interface()->create_callback_group(
rclcpp::CallbackGroupType::Reentrant, false);
interfaces_.get_node_waitables_interface()->add_waitable(callback_queue_, cb_group_);

// Call the derived onInit() function to perform implementation-specific initialization
onInit();

// Make sure the executor will service the given node
// TODO(sloretz) add just the callback group here when using Optimizer's Node
executor_->add_node(node_);
// We can add this without any guards because the callback group was set to not get automatically
// added to executors
executor_->add_callback_group(cb_group_, interfaces_.get_node_base_interface());

// Start the executor
spinner_ = std::thread(
Expand Down Expand Up @@ -155,7 +154,7 @@ void AsyncMotionModel::stop()
{
// Prefer to call onStop in executor's thread so downstream users don't have
// to worry about threads in ROS callbacks when there's only 1 thread.
if (node_->get_node_base_interface()->get_context()->is_valid()) {
if (interfaces_.get_node_base_interface()->get_context()->is_valid()) {
auto callback = std::make_shared<CallbackWrapper<void>>(
std::bind(&AsyncMotionModel::onStop, this)
);
Expand Down
6 changes: 2 additions & 4 deletions fuse_core/src/async_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ void AsyncPublisher::initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name)
{
interfaces_ = interfaces;

if (initialized_) {
throw std::runtime_error("Calling initialize on an already initialized AsyncPublisher!");
}

// Initialize internal state
name_ = name; // NOTE(methylDragon): Used in derived classes
interfaces_ = interfaces;

auto context = interfaces_.get_node_base_interface()->get_context();
auto executor_options = rclcpp::ExecutorOptions();
Expand All @@ -72,8 +71,7 @@ void AsyncPublisher::initialize(
executor_options, executor_thread_count_);
}

callback_queue_ =
std::make_shared<CallbackAdapter>(context);
callback_queue_ = std::make_shared<CallbackAdapter>(context);

// This callback group MUST be re-entrant in order to support parallelization
cb_group_ = interfaces_.get_node_base_interface()->create_callback_group(
Expand Down
39 changes: 19 additions & 20 deletions fuse_core/src/async_sensor_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ AsyncSensorModel::~AsyncSensorModel()
}

void AsyncSensorModel::initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name,
TransactionCallback transaction_callback)
{
Expand All @@ -66,37 +67,35 @@ void AsyncSensorModel::initialize(

// Initialize internal state
name_ = name;
std::string node_namespace = "";

// TODO(CH3): Pass in the context or a node to get the context from
rclcpp::Context::SharedPtr ros_context = rclcpp::contexts::get_global_default_context();
auto node_options = rclcpp::NodeOptions();
node_options.context(ros_context); // set a context to generate the node in

// TODO(CH3): Potentially pass in the optimizer node instead of spinning a new one
node_ = rclcpp::Node::make_shared(name_, node_namespace, node_options);
interfaces_ = interfaces;

auto context = interfaces_.get_node_base_interface()->get_context();
auto executor_options = rclcpp::ExecutorOptions();
executor_options.context = ros_context;
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options,
executor_thread_count_);
executor_options.context = context;

if (executor_thread_count_ == 1) {
executor_ = rclcpp::executors::SingleThreadedExecutor::make_shared(executor_options);
} else {
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options, executor_thread_count_);
}

callback_queue_ = std::make_shared<CallbackAdapter>(ros_context);
callback_queue_ = std::make_shared<CallbackAdapter>(context);

// This callback group MUST be re-entrant in order to support parallelization
cb_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
node_->get_node_waitables_interface()->add_waitable(
callback_queue_, cb_group_);
cb_group_ = interfaces_.get_node_base_interface()->create_callback_group(
rclcpp::CallbackGroupType::Reentrant, false);
interfaces_.get_node_waitables_interface()->add_waitable(callback_queue_, cb_group_);

transaction_callback_ = transaction_callback;

// Call the derived onInit() function to perform implementation-specific initialization
onInit();

// Make sure the executor will service the given node
// TODO(sloretz) add just the callback group here when using Optimizer's Node
executor_->add_node(node_);
// We can add this without any guards because the callback group was set to not get automatically
// added to executors
executor_->add_callback_group(cb_group_, interfaces_.get_node_base_interface());

// Start the executor
spinner_ = std::thread(
Expand Down Expand Up @@ -144,7 +143,7 @@ void AsyncSensorModel::stop()
{
// Prefer to call onStop in executor's thread so downstream users don't have
// to worry about threads in ROS callbacks when there's only 1 thread.
if (node_->get_node_base_interface()->get_context()->is_valid()) {
if (interfaces_.get_node_base_interface()->get_context()->is_valid()) {
auto callback = std::make_shared<CallbackWrapper<void>>(
std::bind(&AsyncSensorModel::onStop, this)
);
Expand Down
8 changes: 4 additions & 4 deletions fuse_core/src/ceres_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace fuse_core
{

// Helper function to get a namespace string with a '.' suffix, but only if not empty
static std::string get_well_formatted_namespace_string(std::string ns)
static std::string get_well_formatted_param_namespace_string(std::string ns)
{
return ns.empty() || ns.back() == '.' ? ns : ns + ".";
}
Expand All @@ -70,7 +70,7 @@ void loadCovarianceOptionsFromROS(
{
rcl_interfaces::msg::ParameterDescriptor tmp_descr;

std::string ns = get_well_formatted_namespace_string(namespace_string);
std::string ns = get_well_formatted_param_namespace_string(namespace_string);

// The sparse_linear_algebra_library_type field was added to ceres::Covariance::Options in version
// 1.13.0, see https://github.com/ceres-solver/ceres-
Expand Down Expand Up @@ -127,7 +127,7 @@ void loadProblemOptionsFromROS(
{
rcl_interfaces::msg::ParameterDescriptor tmp_descr;

std::string ns = get_well_formatted_namespace_string(namespace_string);
std::string ns = get_well_formatted_param_namespace_string(namespace_string);

tmp_descr.description = "trades memory for faster Problem::RemoveResidualBlock()";
problem_options.enable_fast_removal = fuse_core::getParam(
Expand Down Expand Up @@ -162,7 +162,7 @@ void loadSolverOptionsFromROS(
{
rcl_interfaces::msg::ParameterDescriptor tmp_descr;

std::string ns = get_well_formatted_namespace_string(namespace_string);
std::string ns = get_well_formatted_param_namespace_string(namespace_string);

// Minimizer options
solver_options.minimizer_type =
Expand Down
8 changes: 4 additions & 4 deletions fuse_core/src/fuse_echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ class FuseEcho : public rclcpp::Node
: Node("fuse_echo", options)
{
// Subscribe to the constraint topic
graph_subscriber_ = this->create_subscription<fuse_msgs::msg::SerializedGraph>(
graph_sub_ = this->create_subscription<fuse_msgs::msg::SerializedGraph>(
"graph",
rclcpp::QoS(100),
std::bind(&FuseEcho::graphCallback, this, std::placeholders::_1)
);
transaction_subscriber_ = this->create_subscription<fuse_msgs::msg::SerializedTransaction>(
transaction_sub_ = this->create_subscription<fuse_msgs::msg::SerializedTransaction>(
"transaction",
rclcpp::QoS(100),
std::bind(&FuseEcho::transactionCallback, this, std::placeholders::_1)
Expand All @@ -69,8 +69,8 @@ class FuseEcho : public rclcpp::Node
private:
fuse_core::GraphDeserializer graph_deserializer_;
fuse_core::TransactionDeserializer transaction_deserializer_;
rclcpp::Subscription<fuse_msgs::msg::SerializedGraph>::SharedPtr graph_subscriber_;
rclcpp::Subscription<fuse_msgs::msg::SerializedTransaction>::SharedPtr transaction_subscriber_;
rclcpp::Subscription<fuse_msgs::msg::SerializedGraph>::SharedPtr graph_sub_;
rclcpp::Subscription<fuse_msgs::msg::SerializedTransaction>::SharedPtr transaction_sub_;

void graphCallback(const fuse_msgs::msg::SerializedGraph & msg)
{
Expand Down
16 changes: 10 additions & 6 deletions fuse_core/test/test_async_motion_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MyMotionModel : public fuse_core::AsyncMotionModel
{
public:
MyMotionModel()
: fuse_core::AsyncMotionModel(0),
: fuse_core::AsyncMotionModel(1),
initialized(false)
{
}
Expand Down Expand Up @@ -91,23 +91,26 @@ TEST_F(TestAsyncMotionModel, OnInit)
{
for (int i = 0; i < 50; i++) {
MyMotionModel motion_model;
motion_model.initialize("my_motion_model_" + std::to_string(i));
auto node = rclcpp::Node::make_shared("test_async_motion_model_node");
motion_model.initialize(node, "my_motion_model_" + std::to_string(i));
EXPECT_TRUE(motion_model.initialized);
}
}

TEST_F(TestAsyncMotionModel, DoubleInit)
{
MyMotionModel motion_model;
motion_model.initialize("my_motion_model");
auto node = rclcpp::Node::make_shared("test_async_motion_model_node");
motion_model.initialize(node, "my_motion_model");
EXPECT_TRUE(motion_model.initialized);
EXPECT_THROW(motion_model.initialize("test"), std::runtime_error);
EXPECT_THROW(motion_model.initialize(node, "test"), std::runtime_error);
}

TEST_F(TestAsyncMotionModel, OnGraphUpdate)
{
MyMotionModel motion_model;
motion_model.initialize("my_motion_model");
auto node = rclcpp::Node::make_shared("test_async_motion_model_node");
motion_model.initialize(node, "my_motion_model");

// Execute the graph callback in this thread. This should push a call to
// MyMotionModel::onGraphUpdate() into MyMotionModel's callback queue, which will get executed by
Expand All @@ -133,7 +136,8 @@ TEST_F(TestAsyncMotionModel, OnGraphUpdate)
TEST_F(TestAsyncMotionModel, ApplyCallback)
{
MyMotionModel motion_model;
motion_model.initialize("my_motion_model");
auto node = rclcpp::Node::make_shared("test_async_motion_model_node");
motion_model.initialize(node, "my_motion_model");

// Call the motion model base class "apply()" method to send a transaction to the derived model.
// The AsyncMotionModel will then inject a call to applyCallback() into the motion model's
Expand Down
Loading

0 comments on commit 5c46265

Please sign in to comment.