Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple action clients send requests to an action server, continuously constructing, sending requests, waiting for results, and then destructing. The action server returns error code RETCODE_OUT_OF_RESOURCES during writing. #5359

Open
1 task done
Eternity1987 opened this issue Oct 28, 2024 · 5 comments
Labels
need more info Issue that requires more info from contributor

Comments

@Eternity1987
Copy link

Eternity1987 commented Oct 28, 2024

Is there an already existing issue for this?

  • I have searched the existing issues

Expected behavior

The action server receives requests normally and returns results.

Current behavior

The action server returns error code RETCODE_OUT_OF_RESOURCES during writing.

Steps to reproduce

Multiple action clients send requests to an action server, continuously constructing, sending requests, waiting for results, and then destructing

Fast DDS version/commit

version: 2.14.3

Platform/Architecture

Ubuntu Focal 20.04 arm64

Transport layer

Shared Memory Transport (SHM)

Additional context

XML configuration file

No response

Relevant log output

No response

Network traffic capture

No response

@Eternity1987 Eternity1987 added the triage Issue pending classification label Oct 28, 2024
@Eternity1987 Eternity1987 changed the title Multiple action clients send requests to an action server, continuously constructing, sending requests, waiting for results, and then destructing. The action server returns error code during writing. Multiple action clients send requests to an action server, continuously constructing, sending requests, waiting for results, and then destructing. The action server returns error code RETCODE_OUT_OF_RESOURCES during writing. Oct 28, 2024
@Eternity1987
Copy link
Author

image

@JesusPoderoso
Copy link
Contributor

Hi @Eternity1987, thanks for using Fast DDS.
Can you provide a reproducer as a docker or source code, so we can further investigate?

@JesusPoderoso JesusPoderoso added need more info Issue that requires more info from contributor and removed triage Issue pending classification labels Oct 28, 2024
@Eternity1987
Copy link
Author

action client

#include <memory>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp_action/rclcpp_action.hpp"

#include "example_interfaces/action/fibonacci.hpp"

using namespace std::chrono_literals;

enum class Progress{
    STEP_UINIT,
    STEP_FAIL,
    STEP_SEND,
    STEP_SUCCESS
};

// using Fibonacci = example_interfaces::action::Fibonacci;

// template <typename ActionT>
class ActionClient : public rclcpp::Node
{
 public:
  using ActionT = example_interfaces::action::Fibonacci;

  using GoalT = typename ActionT::Goal;
  using FeedbackT = typename ActionT::Feedback;

  using GoalHandleActionT = rclcpp_action::ClientGoalHandle<ActionT>;
  using ResultT = typename rclcpp_action::ClientGoalHandle<ActionT>::WrappedResult;
  using OptionsT = typename rclcpp_action::Client<ActionT>::SendGoalOptions;
public:
  explicit ActionClient(std::string node_name, const rclcpp::NodeOptions &node_options = rclcpp::NodeOptions())
      : Node(node_name, node_options),
        current_step_(Progress::STEP_UINIT),
        bChange_(false),nloop_(0)
  {
    // this->get_logger().set_level(rclcpp::Logger::Level::Debug);
    this->client_ = rclcpp_action::create_client<ActionT>(
        this,
    //   this->get_node_base_interface(),
    //   this->get_node_graph_interface(),
    //   this->get_node_logging_interface(),
    //   this->get_node_waitables_interface(),
      "fibonacci");
    progress_thread_ = std::thread(&ActionClient::running, this);
  }

  ~ActionClient(){
    if(progress_thread_.joinable()){
        progress_thread_.join();
    }
  }

  void running (){
    while(rclcpp::ok()){
        std::unique_lock<std::mutex> lock(mtx_);
        cv_.wait(lock, [this]{ return current_step_ != Progress::STEP_UINIT && bChange_ == true ; });
        if(++nloop_ > 5000){
          RCLCPP_INFO(this->get_logger(), "sum nloop_: {}",  nloop_);
          return;
        }
        bChange_ = false;
        RCLCPP_INFO(this->get_logger(), "nloop_: %d", nloop_);
        switch(current_step_){
            case Progress::STEP_FAIL:
                {
                    RCLCPP_INFO(this->get_logger(), "current Progress::STEP_FAIL");
                    reset();
                }
                // break;
            case Progress::STEP_SEND:
                {
                    RCLCPP_INFO(this->get_logger(), "current Progress::STEP_SEND");
                    client_->wait_for_action_server(std::chrono::seconds(20));
                    if (client_->action_server_is_ready())
                    {
                        send();
                    }
                    else
                    {
                        RCLCPP_INFO(this->get_logger(), "waiting action server");
                    }
                }
                break;
            case Progress::STEP_SUCCESS:
                {
                    RCLCPP_INFO(this->get_logger(), "current Progress::STEP_SUCCESS");
                }
                break;
            default:
                RCLCPP_ERROR(this->get_logger(), "error step");
        }
    }
  }

  void reset(){
    RCLCPP_INFO(this->get_logger(), "reset 1");
    client_.reset();
    RCLCPP_INFO(this->get_logger(), "reset 2");
    this->client_ = rclcpp_action::create_client<ActionT>(
        this,
    //   this->get_node_base_interface(),
    //   this->get_node_graph_interface(),
    //   this->get_node_logging_interface(),
    //   this->get_node_waitables_interface(),
      "fibonacci");
    current_step_ = Progress::STEP_SEND;
    RCLCPP_INFO(this->get_logger(), "reset ==> send");

  }
  void send(){
    OptionsT options = OptionsT();

    options.goal_response_callback =
        [&](const GoalHandleActionT::SharedPtr & goal_handle) {
        //   std::this_thread::sleep_for(10ms);
          auto status_g = future_.wait_for(1s);
          if (status_g == std::future_status::timeout)
          {
            RCLCPP_ERROR(this->get_logger(), "Goal response timeout.");
          }
          else if (status_g == std::future_status::ready)
          {
            if (!future_.get())
            {
              RCLCPP_INFO(this->get_logger(), "Goal was rejected!");
              // std::unique_lock<std::mutex> lock(mtx_);
              current_step_ = Progress::STEP_FAIL;
              bChange_ = true;
              cv_.notify_one();
            }
            else
            {
              RCLCPP_INFO(this->get_logger(), "Goal accepted.");
            }
          }
        };

    options.feedback_callback =
        [&](GoalHandleActionT::SharedPtr,
            const std::shared_ptr<const FeedbackT> feedback) {
            RCLCPP_INFO(this->get_logger(), "Received feedback: %d", feedback->sequence.back());
        };

    options.result_callback = [&](const ResultT &result) {
    //   std::this_thread::sleep_for(10ms);
      if (result.code == rclcpp_action::ResultCode::CANCELED)
      {
        RCLCPP_ERROR(this->get_logger(), "Goal canceled!");
        // std::unique_lock<std::mutex> lock(mtx_);
        current_step_ = Progress::STEP_FAIL;
        bChange_ = true;
        cv_.notify_one();
      }
      else if (result.code == rclcpp_action::ResultCode::UNKNOWN)
      {
        RCLCPP_ERROR(this->get_logger(), "Goal failed! ResultCode: UNKNOWN");
        // std::unique_lock<std::mutex> lock(mtx_);
        current_step_ = Progress::STEP_FAIL;
        bChange_ = true;
        cv_.notify_one();
      }
      else if (result.code == rclcpp_action::ResultCode::ABORTED)
      {
        RCLCPP_ERROR(this->get_logger(), "Goal aborted!");
        // std::unique_lock<std::mutex> lock(mtx_);
        current_step_ = Progress::STEP_FAIL;
        bChange_ = true;
        cv_.notify_one();
      }
      else if (result.code == rclcpp_action::ResultCode::SUCCEEDED)
      {
        RCLCPP_INFO(this->get_logger(), "Received result.");
        // std::unique_lock<std::mutex> lock(mtx_);
        current_step_ = Progress::STEP_FAIL;
        bChange_ = true;
        cv_.notify_one();
      }
    };

    client_->wait_for_action_server(std::chrono::seconds(20));
    auto goal_msg = GoalT();
    goal_msg.order = 4;
    RCLCPP_INFO(this->get_logger(), "client_->async_send_goal 1");
    future_ = client_->async_send_goal(goal_msg, options);
    RCLCPP_INFO(this->get_logger(), "client_->async_send_goal 2");
  }

private:
  rclcpp_action::Client<ActionT>::SharedPtr client_;
  std::shared_future<GoalHandleActionT::SharedPtr> future_;
  Progress current_step_;
  std::mutex mtx_;
  std::condition_variable cv_;
  std::thread progress_thread_;
  std::atomic_bool bChange_;
  int nloop_;
};

int main(int argc, char** argv){
  rclcpp::init(argc, argv);
  auto action_client1 = std::make_shared<ActionClient>("simple_action_client_1");
  auto action_client2 = std::make_shared<ActionClient>("simple_action_client_2");


  rclcpp::executors::MultiThreadedExecutor MultiThreadScheduler;
  MultiThreadScheduler.add_node(action_client1);
  MultiThreadScheduler.add_node(action_client2);
  std::thread t([&] { MultiThreadScheduler.spin(); });
  std::this_thread::sleep_for(100ms);
  action_client1->send();
  action_client2->send();
  t.join();
  rclcpp::shutdown();
}

action server

// Copyright 2018 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>
#include <thread>

#include "example_interfaces/action/fibonacci.hpp"
#include "rclcpp/rclcpp.hpp"
// TODO(jacobperron): Remove this once it is included as part of 'rclcpp.hpp'
#include "rclcpp_action/rclcpp_action.hpp"

using Fibonacci = example_interfaces::action::Fibonacci;
using GoalHandleFibonacci = rclcpp_action::ServerGoalHandle<Fibonacci>;

rclcpp_action::GoalResponse handle_goal(
  const rclcpp_action::GoalUUID & uuid, std::shared_ptr<const Fibonacci::Goal> goal)
{
  RCLCPP_INFO(rclcpp::get_logger("server"), "Got goal request with order %d", goal->order);
  (void)uuid;
  // Let's reject sequences that are over 9000
  if (goal->order > 9000) {
    return rclcpp_action::GoalResponse::REJECT;
  }
  return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
}

rclcpp_action::CancelResponse handle_cancel(
  const std::shared_ptr<GoalHandleFibonacci> goal_handle)
{
  RCLCPP_INFO(rclcpp::get_logger("server"), "Got request to cancel goal");
  (void)goal_handle;
  return rclcpp_action::CancelResponse::ACCEPT;
}

void execute(
  const std::shared_ptr<GoalHandleFibonacci> goal_handle)
{
  RCLCPP_INFO(rclcpp::get_logger("server"), "Executing goal");
  rclcpp::Rate loop_rate(1000);
  const auto goal = goal_handle->get_goal();
  auto feedback = std::make_shared<Fibonacci::Feedback>();
  auto & sequence = feedback->sequence;
  sequence.push_back(0);
  sequence.push_back(1);
  auto result = std::make_shared<Fibonacci::Result>();

  for (int i = 1; (i < goal->order) && rclcpp::ok(); ++i) {
    // Check if there is a cancel request
    if (goal_handle->is_canceling()) {
      result->sequence = sequence;
      goal_handle->canceled(result);
      RCLCPP_INFO(rclcpp::get_logger("server"), "Goal Canceled");
      return;
    }
    // Update sequence
    sequence.push_back(sequence[i] + sequence[i - 1]);
    // Publish feedback
    goal_handle->publish_feedback(feedback);
    RCLCPP_INFO(rclcpp::get_logger("server"), "Publish Feedback");

    loop_rate.sleep();
  }

  // Check if goal is done
  if (rclcpp::ok()) {
    result->sequence = sequence;
    goal_handle->succeed(result);
    RCLCPP_INFO(rclcpp::get_logger("server"), "Goal Succeeded");
  }
}

void handle_accepted(const std::shared_ptr<GoalHandleFibonacci> goal_handle)
{
  // this needs to return quickly to avoid blocking the executor, so spin up a new thread
  std::thread{execute, goal_handle}.detach();
}

int main(int argc, char ** argv)
{
  rclcpp::init(argc, argv);
  auto node = rclcpp::Node::make_shared("minimal_action_server");
  node->get_logger().set_level(rclcpp::Logger::Level::Debug);

  // Create an action server with three callbacks
  //   'handle_goal' and 'handle_cancel' are called by the Executor (rclcpp::spin)
  //   'execute' is called whenever 'handle_goal' returns by accepting a goal
  //    Calls to 'execute' are made in an available thread from a pool of four.
  auto action_server = rclcpp_action::create_server<Fibonacci>(
    node,
    "fibonacci",
    handle_goal,
    handle_cancel,
    handle_accepted);

  rclcpp::spin(node);

  rclcpp::shutdown();
  return 0;
}

@Eternity1987
Copy link
Author

Or start multiple client processes.

#include <chrono>

#include "example_interfaces/action/fibonacci.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp_action/rclcpp_action.hpp"
#include <cinttypes>

using namespace std::literals;

using Fibonacci = example_interfaces::action::Fibonacci;

rclcpp::Node::SharedPtr g_node = nullptr;

void feedback_callback(
  rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr,
  const std::shared_ptr<const Fibonacci::Feedback> feedback)
{
  RCLCPP_INFO(
    g_node->get_logger(),
    "Next number in sequence received: %" PRId32,
    feedback->sequence.back());
}

void ResultCallback(
    const rclcpp_action::ClientGoalHandle<Fibonacci>::WrappedResult& result)
{
    RCLCPP_INFO(
    g_node->get_logger(),
    "Next number in sequence received: %" PRId32,
    (int)result.code);
}

int main(int argc, char ** argv)
{
  rclcpp::init(argc, argv);
  g_node = rclcpp::Node::make_shared("minimal_action_client");
  // g_node->get_logger().set_level(rclcpp::Logger::Level::Debug);
  int success_cnt = 0;
  int nloop = 0;
  do{
    auto action_client = rclcpp_action::create_client<Fibonacci>(g_node, "fibonacci");

    if (!action_client->wait_for_action_server(std::chrono::seconds(20))) {
      RCLCPP_ERROR(g_node->get_logger(), "Action server not available after waiting");
      return 1;
    }

    // Populate a goal
    auto goal_msg = Fibonacci::Goal();
    goal_msg.order = 4;

    RCLCPP_INFO(g_node->get_logger(), "Sending goal");
    // Ask server to achieve some goal and wait until it's accepted
    auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
    send_goal_options.feedback_callback = feedback_callback;
    send_goal_options.result_callback = ResultCallback;
    auto goal_handle_future = action_client->async_send_goal(goal_msg, send_goal_options);

    if (rclcpp::spin_until_future_complete(g_node, goal_handle_future) !=
      rclcpp::FutureReturnCode::SUCCESS)
    {
      RCLCPP_ERROR(g_node->get_logger(), "send goal call failed :(");
      return 1;
    }

    rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr goal_handle = goal_handle_future.get();
    if (!goal_handle) {
      RCLCPP_ERROR(g_node->get_logger(), "Goal was rejected by server");
      return 1;
    }

    // Wait for the server to be done with the goal
    auto result_future = action_client->async_get_result(goal_handle);

    RCLCPP_INFO(g_node->get_logger(), "Waiting for result");
    if (rclcpp::spin_until_future_complete(g_node, result_future, 5s) !=
      rclcpp::FutureReturnCode::SUCCESS)
    {
      RCLCPP_ERROR(g_node->get_logger(), "get result call failed :(");
      return 1;
    }

    rclcpp_action::ClientGoalHandle<Fibonacci>::WrappedResult wrapped_result = result_future.get();

    switch (wrapped_result.code) {
      case rclcpp_action::ResultCode::SUCCEEDED:
        success_cnt++;
        break;
      case rclcpp_action::ResultCode::ABORTED:
        RCLCPP_ERROR(g_node->get_logger(), "Goal was aborted");
        return 1;
      case rclcpp_action::ResultCode::CANCELED:
        RCLCPP_ERROR(g_node->get_logger(), "Goal was canceled");
        return 1;
      default:
        RCLCPP_ERROR(g_node->get_logger(), "Unknown result code");
        return 1;
    }

    RCLCPP_INFO(g_node->get_logger(), "result received");
    for (auto number : wrapped_result.result->sequence) {
      RCLCPP_INFO(g_node->get_logger(), "%" PRId32, number);
    }
    action_client.reset();
    RCLCPP_INFO(g_node->get_logger(), "nloop: %d", ++nloop);
  }while(nloop<5000);

  RCLCPP_INFO(g_node->get_logger(), "success_cnt: %d", success_cnt);
  g_node.reset();
  rclcpp::shutdown();
  return 0;
}

@Eternity1987
Copy link
Author

This issue only occurs when enabling data sharing; regular shared memory does not exhibit this behavior.
writer_qos.data_sharing().automatic(); and writer_qos.data_sharing().off();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need more info Issue that requires more info from contributor
Projects
None yet
Development

No branches or pull requests

2 participants