Skip to content

Commit

Permalink
removed A* grid implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Helena Jäger committed Nov 20, 2024
1 parent 7197465 commit 3496a93
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 191 deletions.
109 changes: 1 addition & 108 deletions bitbots_navigation/bitbots_path_planning/bitbots_path_planning/map.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import cv2
import numpy as np
import shapely
import soccer_vision_3d_msgs.msg as sv3dm
import tf2_ros as tf2
from bitbots_utils.utils import get_parameters_from_other_node
from geometry_msgs.msg import Point
from nav_msgs.msg import OccupancyGrid
from rclpy.node import Node
from ros2_numpy import msgify, numpify
from ros2_numpy import numpify
from shapely import Geometry
from tf2_geometry_msgs import PointStamped, PoseWithCovarianceStamped

Expand Down Expand Up @@ -67,20 +64,6 @@ def set_ball(self, ball: PoseWithCovarianceStamped) -> None:
self.node.get_logger().warn(str(e))
self._update_geometries()

def _render_balls(self, map: np.ndarray) -> None:
"""
Draws the ball buffer onto the costmap
"""
ball: sv3dm.Ball
for ball in self.ball_buffer:
cv2.circle(
map,
self.to_map_space(ball.x, ball.y)[::-1],
round(self.config_ball_diameter / 2 * self.node.config.map.resolution),
self.config_obstacle_value,
-1,
)

def set_robots(self, robots: sv3dm.RobotArray):
"""
Adds a given robot array to the robot buffer
Expand Down Expand Up @@ -116,82 +99,6 @@ def _update_geometries(self):
self._obstacles.append(geometry)
self._obstacles_union = shapely.union_all(self._obstacles)

def _render_robots(self, map: np.ndarray) -> None:
"""
Draws the robot buffer onto the costmap
"""
robot: sv3dm.Robot
for robot in self.robot_buffer:
cv2.circle(
map,
self.to_map_space(robot.bb.center.position.x, robot.bb.center.position.y)[::-1],
round(max(numpify(robot.bb.size)[:2]) * self.node.config.map.resolution),
self.config_obstacle_value,
-1,
)

def to_map_space(self, x: float, y: float) -> tuple[int, int]:
"""
Maps a point (x, y in meters) to corresponding pixel on the costmap
"""
return (
max(
0,
min(
round((x - self.get_origin()[0]) * self.node.config.map.resolution),
self.size[0] * self.node.config.map.resolution - 1,
),
),
max(
0,
min(
round((y - self.get_origin()[1]) * self.node.config.map.resolution),
self.size[1] * self.node.config.map.resolution - 1,
),
),
)

def from_map_space_np(self, points: np.ndarray) -> np.ndarray:
"""
Maps an array of pixel coordinates from the costmap to world coordinates (meters)
"""
return points / self.node.config.map.resolution + self.get_origin()

def get_origin(self) -> np.ndarray:
"""
Origin of the costmap in meters
"""
return np.array(
[
-(self.size[0] * self.node.config.map.resolution) / self.node.config.map.resolution / 2,
-(self.size[1] * self.node.config.map.resolution) / self.node.config.map.resolution / 2,
]
)

@property
def costmap(self) -> np.ndarray:
"""
Generate the complete cost map
"""
map: np.ndarray = np.ones((np.array(self.size) * self.node.config.map.resolution).astype(int), dtype=np.int8)
if self.ball_obstacle_active:
self._render_balls(map)
self._render_robots(map)
return self.inflate(map)

def inflate(self, map: np.ndarray) -> np.ndarray:
"""
Applies inflation to all occupied areas of the costmap
"""
idx = map == 1
nmap = cv2.dilate(
map.astype(np.uint8),
np.ones((self.config_inflation_dialation, self.config_inflation_dialation), np.uint8),
iterations=2,
)
map[idx] = cv2.blur(nmap, (self.config_inflation_blur, self.config_inflation_blur)).astype(np.int8)[idx]
return map

def avoid_ball(self, state: bool) -> None:
"""
Activates or deactivates the ball obstacle
Expand All @@ -203,17 +110,3 @@ def get_frame(self) -> str:
Returns the frame of reference of the map
"""
return self.frame

def to_msg(self) -> OccupancyGrid:
"""
Returns the costmap as an OccupancyGrid message
"""
map = self.costmap
msg: OccupancyGrid = msgify(OccupancyGrid, map)
msg.header.frame_id = self.frame
msg.info.width = map.shape[0]
msg.info.height = map.shape[1]
msg.info.resolution = 1 / self.node.config.map.resolution
msg.info.origin.position.x = self.get_origin()[0]
msg.info.origin.position.y = self.get_origin()[1]
return msg
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import soccer_vision_3d_msgs.msg as sv3dm
from bitbots_tf_buffer import Buffer
from geometry_msgs.msg import PointStamped, PoseStamped, PoseWithCovarianceStamped, Twist
from nav_msgs.msg import OccupancyGrid, Path
from nav_msgs.msg import Path
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.duration import Duration
from rclpy.executors import MultiThreadedExecutor
Expand Down Expand Up @@ -68,7 +68,6 @@ def __init__(self) -> None:

# Publisher
self.cmd_vel_pub = self.create_publisher(Twist, "cmd_vel", 1)
self.costmap_pub = self.create_publisher(OccupancyGrid, "costmap", 1)
self.path_pub = self.create_publisher(Path, "path", 1)
self.carrot_pub = self.create_publisher(PointStamped, "carrot", 1)

Expand All @@ -88,9 +87,6 @@ def step(self) -> None:
self.param_listener.refresh_dynamic_parameters()
self.config = self.param_listener.get_params()
try:
# Publish the costmap for visualization
self.costmap_pub.publish(self.map.to_msg())

if self.planner.active():
# Calculate the path to the goal pose considering the current map
path = self.planner.step()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from abc import ABC, abstractmethod

import numpy as np
import pyastar2d
import tf2_ros as tf2
from geometry_msgs.msg import PoseStamped
from heapdict import heapdict
Expand Down Expand Up @@ -126,79 +124,3 @@ def step(self) -> Path:
successor.cost = tentative_g
f = tentative_g + distance(node.point, goal) # calculate new f-value (costs + heuristic)
open_list[successor] = f


class GridPlanner(Planner):
"""
A simple grid based A* interface
"""

def __init__(self, node: Node, buffer: tf2.Buffer, map: Map) -> None:
self.node = node
self.buffer = buffer
self.map = map
self.goal: PoseStamped | None = None
self.base_footprint_frame: str = self.node.config.base_footprint_frame

def set_goal(self, pose: PoseStamped) -> None:
"""
Updates the goal pose
"""
pose.header.stamp = Time(clock_type=self.node.get_clock().clock_type).to_msg()
self.goal = pose

def cancel_goal(self) -> None:
"""
Removes the current goal
"""
self.goal = None
self.path = None

def active(self) -> bool:
"""
Determine if we have an active goal
"""
return self.goal is not None

def step(self) -> Path:
"""
Generates a new A* path to the goal pose with respect to the costmap
"""
goal = self.goal

# Get current costmap
navigation_grid = self.map.costmap

# Get my pose and position on the map
my_position = self.buffer.lookup_transform(
self.map.frame, self.base_footprint_frame, Time(), Duration(seconds=0.2)
).transform.translation

# Transform goal pose to map frame if needed
if goal.header.frame_id != self.map.frame:
goal = self.buffer.transform(goal, self.map.frame, timeout=Duration(seconds=0.2))

# Run A* from our current position to the goal position
path = pyastar2d.astar_path(
navigation_grid.astype(np.float32),
self.map.to_map_space(my_position.x, my_position.y),
self.map.to_map_space(goal.pose.position.x, goal.pose.position.y),
allow_diagonal=False,
)

# Convert the pixel coordinates to world coordinates
path = self.map.from_map_space_np(path)

# Build path message
def to_pose_msg(element):
pose = PoseStamped()
pose.pose.position.x = element[0]
pose.pose.position.y = element[1]
return pose

poses = list(map(to_pose_msg, path))

poses.append(goal)
return Path(
header=Header(frame_id=self.map.get_frame(), stamp=self.node.get_clock().now().to_msg()), poses=poses
)

0 comments on commit 3496a93

Please sign in to comment.