Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address thread starvation bug on joint trajectory server preemption #150

Merged
merged 7 commits into from
Aug 7, 2024

Conversation

hello-amal
Copy link
Contributor

@hello-amal hello-amal commented Jul 26, 2024

Description

There is a thread starvation race condition in stretch_driver / joint_trajectory_server. Consider the following scenario, where steps 2 and 3 happen in quick succession:

  1. stretch_driver receives a joint trajectory server goal.
  2. stretch_driver receives another joint trajectory server goal.
  3. stretch_driver receives a change mode request.

The expected behavior here is that the second goal will preempt the first, then the second goal will execute, and then the change mode request will be processed.

In actuality, the following can happen. Note that the executor is only allocated two threads to process all callbacks.

  1. The first goal's execution callback locks robot_mode_rwlock, and occupies all the resources of one thread (it never handles control back to the executor, because it uses time.sleep() instead of rate.sleep()). Thus, the executor is only left with one thread until the first goal's execution completes.
  2. The executor receives and accepts the second goal, and invokes an asynchronous coroutine to execute it. Thus, control of that thread is handed back to the executor, with two items of work on its wait set: the change mode service callback, and the action execution coroutine.
  3. Sometimes, the switch mode service's callback gets invoked first. This waits for the robot_mode_rwlock to be released, which only happens after execution of the first goal finishes. This uses all the resources of the second thread. Thus, there are no threads left to process the new goal.
  4. Because the latest_goal_id is only incremented after the goal starts exeucting, and goal execution uses the latest_goal_id to determine whether to preempt, the first goal never knows whether to preempt.
  5. As a result of this, both threads are occupied until the first goal finishes executing, which could take up to 10s due to hardcoded timeout.

This PR implements multiple fixes to address the above bug and cleanup the joint trajectory server. Each fix directly corresponds to one of the commits.

  1. Makes the default goal callback a parameter, which defaults to 10 and can be dynamically changed by the user over the runtime of stretch_driver. That way, if their application commands short trajectories, they can reduce the timeout.
  2. Remove the unnecessary addition of the joint trajectory server as a Node. It registers all its callbacks under the stretch_driver node, so it doesn't need to be a node. This may reduce unnecessary executor computation.
  3. Remove the unused server.action_server_rate, which invokes callbacks at a rate of 10 Hz and unnecessarily takes up executor compute.
  4. Adds a missing release of the read-write lock, which could block execution.
  5. Increment the latest_goal_id when the goal is accepted, as opposed to when the goal starts executing, so that previous goals know to cancel as soon as a next goal is accepted.
  6. For good measure, this PR also increases the number of executor threads to 5. 2 is far too few for the callbacks in the executor.
    1. The node's default MutuallyExclusiveCallbackGroup has all services. That needs 1 thread.
    2. The two subscriptions, cmd_vel and gamepad_joy, have a dedicated MutuallyExclusiveCallbackGroup. That needs 1 thread.
    3. There is a timer that calls command_mobile_base_velocity_and_publish_state at a rate of 30 Hz, which is what moves the base (and stops it if the motion command is stale). So that timer should have 1 thread available to handle its callbacks.
    4. The action server has a ReentrantCallbackGroup for all its callbacks (goal request, cancellation request, execution). We should account for at least 2 actions being executed in parallel (e.g., one action starts executing while the next one is wrapping up and preempting -- this happens quite often). Thus, we should have 2 threads for this.

Testing

The below script is a minimal example of the problem. Testing was done as detailed below. I did the testing with the tablet on the end-effector, which is helpful because of #141 (where trajectories go till timeout when the wrist has additional force on it).

  • Recreate the issue:
    • Pull on humble, copy the below script into a ROS package and add it to the CMakeLists.txt / setup.py file.
    • Re-build your workspace.
    • ros2 launch stretch_core stretch_driver.launch.py
    • ros2 run <package_with_the_text_script> stretch_driver_thread_starvation_test.py
    • Verify that Goal 1 runs to termination before Goal 2 is processed.
  • Verify the fix:
    • Pull on this branch.
    • Re-build your workspace.
    • ros2 launch stretch_core stretch_driver.launch.py
    • ros2 run <package_with_the_text_script> stretch_driver_thread_starvation_test.py
    • Verify that Goal 1 gets pre-empted and Goal 2 executes.

stretch_driver_thread_starvation_test.py

(To more reliably re-create the actual race condition, I swap the nav mode callback with the second goal callback, compared with what I wrote above. However, the reality is that even if we follow the steps I wrote above from the web app, it is possible for the requests to get swapped over roslibjs and/or rosbridge.)

#! /usr/bin/env python3
import time
import threading
from typing import Dict, Optional

import rclpy
from action_msgs.msg import GoalStatus
from control_msgs.action import FollowJointTrajectory
from rclpy.action import ActionClient
from rclpy.duration import Duration
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.task import Future
from std_srvs.srv import Trigger
from trajectory_msgs.msg import JointTrajectoryPoint

# Declare the global variables
node: Optional[Node] = None
trajectory_client: Optional[ActionClient] = None


def move_to(joint_positions: Dict[str, float]) -> Future:
    """
    Move the arm to the given joint positions.

    Parameters
    ----------
    joint_positions : Dict[str, float]
        A dictionary of joint names and their corresponding positions.

    Returns
    -------
    Future
        A future object representing the result of the action.
    """
    global node, trajectory_client
    joint_names = list(joint_positions.keys())

    point1 = JointTrajectoryPoint()
    trajectory_goal = FollowJointTrajectory.Goal()
    trajectory_goal.goal_time_tolerance = Duration(seconds=1.0).to_msg()
    trajectory_goal.trajectory.joint_names = joint_names

    point1.time_from_start = Duration(seconds=10).to_msg()
    point1.positions = [joint_positions[joint_name] for joint_name in joint_names]
    trajectory_goal.trajectory.points = [point1]
    return trajectory_client.send_goal_async(trajectory_goal)


def wait_until_future_finished(future: Future, rate_hz: float = 5.0) -> None:
    """
    Wait until the future is finished.

    Parameters
    ----------
    future : Future
        The future object to wait for.
    rate_hz : float, optional
        The rate at which to check the future, by default 5.0
    """
    global node
    rate = node.create_rate(rate_hz)
    while not future.done():
        rate.sleep()


def get_action_result(
    send_goal_future: Future,
) -> Optional[FollowJointTrajectory.Result]:
    """
    Get the result of the action.

    Parameters
    ----------
    send_goal_future : Future
        The future object representing the result of the action.

    Returns
    -------
    Optional[FollowJointTrajectory.Result]
        The result of the action, if available.
    """
    global node
    # First, wait until the goal is accepted or rejected
    wait_until_future_finished(send_goal_future)

    # Check if the goal was accepted
    goal_handle = send_goal_future.result()
    if not goal_handle.accepted:
        node.get_logger().error("Goal rejected")
        return None

    # Wait until the goal is completed
    get_result_future = goal_handle.get_result_async()
    wait_until_future_finished(get_result_future)

    # Return the result
    goal_status = get_result_future.result()
    if goal_status.status != GoalStatus.STATUS_SUCCEEDED:
        node.get_logger().error("Goal failed")
        return None
    return goal_status.result


def main():
    global node, trajectory_client
    rclpy.init()
    node = rclpy.create_node("move_to_same_pose")
    trajectory_client = ActionClient(
        node, FollowJointTrajectory, "/stretch_controller/follow_joint_trajectory"
    )
    node.get_logger().info("Waiting for the follow joint trajectory controller...")
    _ = trajectory_client.wait_for_server(timeout_sec=60.0)
    switch_to_nav_mode_srv = node.create_client(
        Trigger, '/switch_to_navigation_mode'
    )
    node.get_logger().info("Waiting for the switch to navigation mode service...")
    switch_to_nav_mode_srv.wait_for_service(timeout_sec=60.0)
    switch_to_pos_mode = node.create_client(
        Trigger, '/switch_to_position_mode'
    )
    node.get_logger().info("Waiting for the switch to position mode service...")
    switch_to_pos_mode.wait_for_service(timeout_sec=60.0)


    # Spin the node in the background
    executor = SingleThreadedExecutor()
    spin_thread = threading.Thread(
        target=rclpy.spin,
        args=(node,),
        kwargs={"executor": executor},
        daemon=True,
    )
    spin_thread.start()

    # Create the goal
    goal_0 = {
        "joint_head_tilt": 0.0,
        "joint_head_pan": 0.0,
        "joint_wrist_roll": 0.0,
        "joint_wrist_pitch": 0.0,
        "joint_wrist_yaw": 0.0,
        "joint_lift": 1.1,
        "wrist_extension": 0.0,
    }
    goal_1 = {
        "joint_head_tilt": -0.02766916597432903,
        "joint_head_pan": 0.050683455439404626,
        "joint_wrist_roll": -0.006135923151542565,
        "joint_wrist_pitch": -0.15339807878856412,
        "joint_wrist_yaw": 0.0012783173232380344,
        "joint_lift": 0.5305833379477396,
        "wrist_extension": 0.45,
    }
    goal_2 = {
        "joint_head_tilt": -0.03766916597432903,
        "joint_head_pan": 0.060683455439404626,
        "joint_wrist_roll": -0.016135923151542565,
        "joint_wrist_pitch": -0.16339807878856412,
        "joint_wrist_yaw": 0.0112783173232380344,
        "joint_lift": 0.6305833379477396,
        "wrist_extension": 0.5145,
    }

    # Switch to position mode
    node.get_logger().info("Switching to position mode...")
    switch_to_pos_mode_future = switch_to_pos_mode.call_async(Trigger.Request())
    wait_until_future_finished(switch_to_pos_mode_future)

    # Move to Goal 0
    node.get_logger().info("Moving to goal 0...")
    send_goal_future_0 = move_to(goal_0)
    get_action_result(send_goal_future_0)

    # Move to Goal 1
    node.get_logger().info("Moving to goal 1...")
    send_goal_future_1 = move_to(goal_1)

    # Switch to navigation mode
    node.get_logger().info("Switching to navigation mode...")
    switch_to_nav_mode_future = switch_to_nav_mode_srv.call_async(Trigger.Request())

    # Move to Goal 2
    time.sleep(0.25)
    node.get_logger().info("Moving to goal 2...")
    send_goal_future_2 = move_to(goal_2)

    # Wait for all remaining futures
    node.get_logger().info("Waiting for Goal 1 to finish...")
    get_action_result(send_goal_future_1)
    node.get_logger().info("Waiting for the switch to navigation mode service...")
    wait_until_future_finished(switch_to_nav_mode_future)
    node.get_logger().info("Waiting for Goal 2 to finish...")
    get_action_result(send_goal_future_2)
    node.get_logger().info("Done!")

    # Spin in the foreground
    spin_thread.join()

    # Clean up
    node.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()

@hello-amal hello-amal requested a review from hello-binit July 26, 2024 20:12
@hello-amal hello-amal changed the base branch from bugfix/cancel_goals to humble July 26, 2024 20:13
@hello-amal hello-amal force-pushed the bugfix/stretch_driver_thread_starvation branch from 168db27 to 979abfa Compare July 26, 2024 20:14
@hello-amal hello-amal changed the title Address thread starvation bug on joint trajectory server cancellation Address thread starvation bug on joint trajectory server preemption Jul 26, 2024
@hello-binit hello-binit force-pushed the bugfix/stretch_driver_thread_starvation branch from 979abfa to 725efc4 Compare July 30, 2024 15:44
@hello-amal
Copy link
Contributor Author

Done with requested fixes. @hello-binit I don't have access to a Stretch right now, so could you just run this code on your Stretch and ensure it still works with the typing change (it should)?

Also, I saw that you've been adding some of these edge cases to the regression tests. What do you think of adding the above test case to the regression tests? I can create an issue for it.

@hello-amal hello-amal requested a review from hello-binit July 30, 2024 22:31
@hello-amal
Copy link
Contributor Author

I tested this on 3053 and it works. Ready for re-review.

@hello-binit
Copy link
Contributor

Verified on 3004.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants