Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded topics sometimes don't respond #368

Open
kaola-zero opened this issue Dec 15, 2023 · 5 comments
Open

Multithreaded topics sometimes don't respond #368

kaola-zero opened this issue Dec 15, 2023 · 5 comments
Labels
more-information-needed Further information is required

Comments

@kaola-zero
Copy link

Bug report

Required Info:

  • Operating System:
    Ubuntu 22.04 humble
  • Installation type:
    binaries
  • Version or commit hash:
    Output of git rev-parse HEAD, release version, or repos file
  • DDS implementation:
    rmw_implementation used Fast-RTPS
  • Client library (if applicable):
    rclcpp

Steps to reproduce issue

I change https://github.com/ros2/examples/blob/humble/rclcpp/executors/multithreaded_executor/multithreaded_executor.cpp this demo , stop the timer and build another node to publish , but sometime the Subscription do not work . I do not sure the Subscription not received or some other error

Subscription

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using namespace std::chrono_literals;

/**
 * A small convenience function for converting a thread ID to a string
 **/
std::string string_thread_id()
{
  auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
  return std::to_string(hashed);
}

class DualThreadedNode : public rclcpp::Node
{
public:
  DualThreadedNode()
  : Node("DualThreadedNode")
  {
    /* These define the callback groups
     * They don't really do much on their own, but they have to exist in order to
     * assign callbacks to them. They're also what the executor looks for when trying to run multiple threads
     */
    callback_group_subscriber1_ = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    callback_group_subscriber2_ = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);

    // Each of these callback groups is basically a thread
    // Everything assigned to one of them gets bundled into the same thread
    auto sub1_opt = rclcpp::SubscriptionOptions();
    sub1_opt.callback_group = callback_group_subscriber1_;
    auto sub2_opt = rclcpp::SubscriptionOptions();
    sub2_opt.callback_group = callback_group_subscriber2_;

    subscription1_ = this->create_subscription<std_msgs::msg::String>(
      "topic",
      rclcpp::QoS(10),
      // std::bind is sort of C++'s way of passing a function
      // If you're used to function-passing, skip these comments
      std::bind(
        &DualThreadedNode::subscriber1_cb,  // First parameter is a reference to the function
        this,                               // What the function should be bound to
        std::placeholders::_1),             // At this point we're not positive of all the
                                            // parameters being passed
                                            // So we just put a generic placeholder
                                            // into the binder
                                            // (since we know we need ONE parameter)
      sub1_opt);                  // This is where we set the callback group.
                                  // This subscription will run with callback group subscriber1

    subscription2_ = this->create_subscription<std_msgs::msg::String>(
      "topic",
      rclcpp::QoS(10),
      std::bind(
        &DualThreadedNode::subscriber2_cb,
        this,
        std::placeholders::_1),
      sub2_opt);
  }

private:
  /**
   * Simple function for generating a timestamp
   * Used for somewhat ineffectually demonstrating that the multithreading doesn't cripple performace
   */
  std::string timing_string()
  {
    rclcpp::Time time = this->now();
    return std::to_string(time.nanoseconds());
  }

  /**
   * Every time the Publisher publishes something, all subscribers to the topic get poked
   * This function gets called when Subscriber1 is poked (due to the std::bind we used when defining it)
   */
  void subscriber1_cb(const std_msgs::msg::String::ConstSharedPtr msg)
  {
    auto message_received_at = timing_string();

    // Extract current thread
    RCLCPP_INFO(
      this->get_logger(), "THREAD %s => Heard '%s' at %s",
      string_thread_id().c_str(), msg->data.c_str(), message_received_at.c_str());
  }

  /**
   * This function gets called when Subscriber2 is poked
   * Since it's running on a separate thread than Subscriber 1, it will run at (more-or-less) the same time!
   */
  void subscriber2_cb(const std_msgs::msg::String::ConstSharedPtr msg)
  {
    auto message_received_at = timing_string();

    // Prep display message
    RCLCPP_INFO(
      this->get_logger(), "THREAD %s => Heard '%s' at %s",
      string_thread_id().c_str(), msg->data.c_str(), message_received_at.c_str());
  }

  rclcpp::CallbackGroup::SharedPtr callback_group_subscriber1_;
  rclcpp::CallbackGroup::SharedPtr callback_group_subscriber2_;
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription1_;
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription2_;
};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);

  // You MUST use the MultiThreadedExecutor to use, well, multiple threads
  rclcpp::executors::MultiThreadedExecutor executor;
  auto subnode = std::make_shared<DualThreadedNode>();  // This contains BOTH subscriber callbacks.
                                                        // They will still run on different threads
                                                        // One Node. Two callbacks. Two Threads
  executor.add_node(subnode);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}

Publisher

// Created by ubuntu on 23-11-28.
//
#include <chrono>
#include <functional>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "std_msgs/msg/bool.hpp"
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;
using std::placeholders::_1;

class ChangeWorkFrame: public rclcpp::Node
{
  public:
    ChangeWorkFrame();                                                                   
    void change_work_frame_programe();                                                 
  
  private:

    rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;                 

    size_t count_;
};


void ChangeWorkFrame::change_work_frame_programe()
{
    std_msgs::msg::String change_work_frame;
    change_work_frame.data = "sss" + std::to_string(this->count_++);                          
    RCLCPP_INFO(this->get_logger(), " Publishing '%s'", change_work_frame.data.c_str());
    this->publisher_->publish(change_work_frame);
}
/***********************************************end**************************************************/


ChangeWorkFrame::ChangeWorkFrame():rclcpp::Node("changeframe")
{
  publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
  sleep(2);
  change_work_frame_programe();
}
/***********************************************end**************************************************/

int main(int argc, char** argv)
{
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<ChangeWorkFrame>());
  rclcpp::shutdown();
  return 0;
}

Expected behavior

When I run the Publisher node the Subscription node will response.

Actual behavior

Sometimes Subscription not response.

Additional information

I want to use the multithreaded_executor in real scene like this response mechanism ,but when I try it have this problem .

Sincere thanks

Kaola

@fujitatomoya
Copy link
Collaborator

with your SSCCE, i can not find the problem. publisher and subscription works fine.

root@tomoyafujita:~/ros2_ws/colcon_ws# ros2 run prover_rclcpp examples_368_sub
[INFO] [1702660421.069453964] [DualThreadedNode]: THREAD 13945239655426654374 => Heard 'sss0' at 1702660421069439658
[INFO] [1702660421.069553711] [DualThreadedNode]: THREAD 810583239088216020 => Heard 'sss0' at 1702660421069544503
[INFO] [1702660428.103174897] [DualThreadedNode]: THREAD 4072990762974177855 => Heard 'sss0' at 1702660428103162530
[INFO] [1702660428.103292905] [DualThreadedNode]: THREAD 11150870177299451571 => Heard 'sss0' at 1702660428103283488
[INFO] [1702660432.737718155] [DualThreadedNode]: THREAD 810583239088216020 => Heard 'sss0' at 1702660432737707146
[INFO] [1702660432.737988652] [DualThreadedNode]: THREAD 2573894213746311180 => Heard 'sss0' at 1702660432737979286
[INFO] [1702660437.898976822] [DualThreadedNode]: THREAD 10157345792866725780 => Heard 'sss0' at 1702660437898965164
[INFO] [1702660437.899021263] [DualThreadedNode]: THREAD 2590413702080541566 => Heard 'sss0' at 1702660437899009670

root@tomoyafujita:~/ros2_ws/colcon_ws# ros2 run prover_rclcpp examples_368_pub
[INFO] [1702660329.630537607] [changeframe]:  Publishing 'sss0'
^C[INFO] [1702660417.937769061] [rclcpp]: signal_handler(signum=2)
root@tomoyafujita:~/ros2_ws/colcon_ws# ros2 run prover_rclcpp examples_368_pub
[INFO] [1702660421.068601807] [changeframe]:  Publishing 'sss0'
^C[INFO] [1702660425.233031352] [rclcpp]: signal_handler(signum=2)
root@tomoyafujita:~/ros2_ws/colcon_ws# ros2 run prover_rclcpp examples_368_pub
[INFO] [1702660428.102292844] [changeframe]:  Publishing 'sss0'
^C[INFO] [1702660429.654872417] [rclcpp]: signal_handler(signum=2)
root@tomoyafujita:~/ros2_ws/colcon_ws# ros2 run prover_rclcpp examples_368_pub
[INFO] [1702660432.736855154] [changeframe]:  Publishing 'sss0'
^C[INFO] [1702660434.793063563] [rclcpp]: signal_handler(signum=2)

could you provide more details especially the procedure step by step?

@fujitatomoya fujitatomoya added the more-information-needed Further information is required label Dec 15, 2023
fujitatomoya added a commit to fujitatomoya/ros2_test_prover that referenced this issue Dec 15, 2023
@kaola-zero
Copy link
Author

Hello
Thinks for your answer, I noticed something that when I use Humble "ros2 topic pub -- once" command, the terminal will prompt 'wait matched', so I think this question should be publish without matching the topic, sub in not receiving it. How can I ensure that the other party matches before publishing the data.

@fujitatomoya
Copy link
Collaborator

so I think this question should be publish without matching the topic, sub in not receiving it.
How can I ensure that the other party matches before publishing the data.

can you rephrase your question ? I am not sure about your question, sorry. to match the endpoints, endpoints need to either publish or subscribe the topic.

@kaola-zero
Copy link
Author

image
Hello!
I noticed that ROS2 uses FastDDS as its underlying layer. When I publish, sometimes I receive a prompt saying 'Waiting for at least 1 matching subscription', which means that data is only published after a connection is established between the publish and subscribe nodes. When I write to other publish nodes, sometimes there is no response, which may be the reason for not establishing a connection. So, is there any good solution to this problem.

@fujitatomoya
Copy link
Collaborator

@kaola-zero

which means that data is only published after a connection is established between the publish and subscribe nodes. When I write to other publish nodes, sometimes there is no response, which may be the reason for not establishing a connection. So, is there any good solution to this problem.

I still do not understand what exactly the problem here. without connection, where publisher calls send to the system? could you elaborate a bit?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
more-information-needed Further information is required
Projects
None yet
Development

No branches or pull requests

2 participants