Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restaurant 01 #276

Merged
merged 11 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
#!/usr/bin/env python3
from typing import List, Tuple
import numpy as np

import cv2
import numpy as np
import rospy
from shapely.validation import explain_validity
from sensor_msgs.msg import Image, PointCloud2
from geometry_msgs.msg import Point, PoseWithCovarianceStamped, Polygon
from shapely.geometry.polygon import Polygon as ShapelyPolygon
from shapely.geometry.point import Point as ShapelyPoint

from lasr_vision_msgs.msg import (
Detection,
Detection3D,
CDRequest,
CDResponse,
)
from cv2_img import cv2_img_to_msg, msg_to_cv2_img
from cv2_pcl import pcl_to_cv2
from geometry_msgs.msg import Point, Polygon, PoseWithCovarianceStamped
from lasr_vision_msgs.msg import CDRequest, CDResponse, Detection, Detection3D
from lasr_vision_msgs.srv import (
YoloDetection,
YoloDetection3D,
CroppedDetectionRequest,
CroppedDetectionResponse,
YoloDetection,
YoloDetection3D,
)
from cv2_img import cv2_img_to_msg, msg_to_cv2_img
from cv2_pcl import pcl_to_cv2
from sensor_msgs.msg import Image, PointCloud2
from shapely.geometry.point import Point as ShapelyPoint
from shapely.geometry.polygon import Polygon as ShapelyPolygon
from shapely.validation import explain_validity


def _2d_bbox_crop(
Expand Down Expand Up @@ -460,7 +455,7 @@ def process_detection_requests(
depth_image_topic: str = "/xtion/depth_registered/points",
yolo_2d_service_name: str = "/yolov8/detect",
yolo_3d_service_name: str = "/yolov8/detect3d",
robot_pose_topic: str = "/amcl_pose",
robot_pose_topic: str = "/robot_pose",
debug_topic: str = "/lasr_vision/cropped_detection/debug",
) -> CroppedDetectionResponse:
"""Processes a list of detection requests.
Expand Down
2 changes: 1 addition & 1 deletion skills/src/lasr_skills/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@
from .find_person_and_tell import FindPersonAndTell
from .count_people import CountPeople
from .json_qa import JsonQuestionAnswer

from .rotate import Rotate
273 changes: 148 additions & 125 deletions skills/src/lasr_skills/go_to_location.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import smach_ros
import smach
import os
from typing import Union

import rosparam
import rospkg
import rospy
import rosservice
import smach
import smach_ros
from geometry_msgs.msg import Point, Pose, PoseStamped, Quaternion
from lasr_skills import PlayMotion
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from geometry_msgs.msg import Pose, PoseStamped, Point, Quaternion
from std_msgs.msg import Header

from typing import Union

from lasr_skills import PlayMotion

import rospkg
import rosparam

import os

PUBLIC_CONTAINER = False

try:
from pal_startup_msgs.srv import (
StartupStart,
StartupStop,
StartupStartRequest,
StartupStop,
StartupStopRequest,
)
except ModuleNotFoundError:
Expand All @@ -33,6 +30,7 @@ def __init__(
self,
location: Union[Pose, None] = None,
location_param: Union[str, None] = None,
safe_navigation: bool = True,
):
if location is not None or location_param is not None:
super(GoToLocation, self).__init__(outcomes=["succeeded", "failed"])
Expand All @@ -41,100 +39,159 @@ def __init__(
outcomes=["succeeded", "failed"], input_keys=["location"]
)

r = rospkg.RosPack()
els = rosparam.load_file(
os.path.join(r.get_path("lasr_skills"), "config", "motions.yaml")
)
for param, ns in els:
rosparam.upload_params(ns, param)
if safe_navigation:
r = rospkg.RosPack()
els = rosparam.load_file(
os.path.join(r.get_path("lasr_skills"), "config", "motions.yaml")
)
for param, ns in els:
rosparam.upload_params(ns, param)

IS_SIMULATION = (
"/pal_startup_control/start" not in rosservice.get_service_list()
)

with self:
smach.StateMachine.add(
"LOWER_BASE",
PlayMotion("pre_navigation"),
transitions={
"succeeded": (
"ENABLE_HEAD_MANAGER" if not IS_SIMULATION else "GO_TO_LOCATION"
),
"aborted": "failed",
"preempted": "failed",
},
)

if not IS_SIMULATION:
if PUBLIC_CONTAINER:
rospy.logwarn(
"You are using a public container. The head manager will not be stopped during navigation."
)
else:
smach.StateMachine.add(
"ENABLE_HEAD_MANAGER",
smach_ros.ServiceState(
"/pal_startup_control/start",
StartupStart,
request=StartupStartRequest("head_manager", ""),
),
transitions={
"succeeded": "GO_TO_LOCATION",
"preempted": "failed",
"aborted": "failed",
},
)

if location is not None:
if safe_navigation:
smach.StateMachine.add(
"GO_TO_LOCATION",
smach_ros.SimpleActionState(
"move_base",
MoveBaseAction,
goal=MoveBaseGoal(
target_pose=PoseStamped(
pose=location, header=Header(frame_id="map")
)
),
),
"LOWER_BASE",
PlayMotion("pre_navigation"),
transitions={
"succeeded": (
"DISABLE_HEAD_MANAGER"
"ENABLE_HEAD_MANAGER"
if not IS_SIMULATION
else "RAISE_BASE"
else "GO_TO_LOCATION"
),
"aborted": "failed",
"preempted": "failed",
},
)
elif location_param is not None:
smach.StateMachine.add(
"GO_TO_LOCATION",
smach_ros.SimpleActionState(
"move_base",
MoveBaseAction,
goal=MoveBaseGoal(
target_pose=PoseStamped(
pose=Pose(
position=Point(
**rospy.get_param(f"{location_param}/position")
),
orientation=Quaternion(
**rospy.get_param(
f"{location_param}/orientation"
)

if not IS_SIMULATION:
if PUBLIC_CONTAINER:
rospy.logwarn(
"You are using a public container. The head manager will not be stopped during navigation."
)
else:
smach.StateMachine.add(
"ENABLE_HEAD_MANAGER",
smach_ros.ServiceState(
"/pal_startup_control/start",
StartupStart,
request=StartupStartRequest("head_manager", ""),
),
transitions={
"succeeded": "GO_TO_LOCATION",
"preempted": "failed",
"aborted": "failed",
},
)

if location is not None:
smach.StateMachine.add(
"GO_TO_LOCATION",
smach_ros.SimpleActionState(
"move_base",
MoveBaseAction,
goal=MoveBaseGoal(
target_pose=PoseStamped(
pose=location, header=Header(frame_id="map")
)
),
),
transitions={
"succeeded": (
"DISABLE_HEAD_MANAGER"
if not IS_SIMULATION
else "RAISE_BASE"
),
"aborted": "failed",
"preempted": "failed",
},
)
elif location_param is not None:
smach.StateMachine.add(
"GO_TO_LOCATION",
smach_ros.SimpleActionState(
"move_base",
MoveBaseAction,
goal=MoveBaseGoal(
target_pose=PoseStamped(
pose=Pose(
position=Point(
**rospy.get_param(
f"{location_param}/position"
)
),
orientation=Quaternion(
**rospy.get_param(
f"{location_param}/orientation"
)
),
),
),
header=Header(frame_id="map"),
)
header=Header(frame_id="map"),
)
),
),
),
transitions={
"succeeded": (
"DISABLE_HEAD_MANAGER"
if not IS_SIMULATION
else "RAISE_BASE"
transitions={
"succeeded": (
"DISABLE_HEAD_MANAGER"
if not IS_SIMULATION
else "RAISE_BASE"
),
"aborted": "failed",
"preempted": "failed",
},
)
else:
smach.StateMachine.add(
"GO_TO_LOCATION",
smach_ros.SimpleActionState(
"move_base",
MoveBaseAction,
goal_cb=lambda ud, _: MoveBaseGoal(
target_pose=PoseStamped(
pose=ud.location, header=Header(frame_id="map")
)
),
input_keys=["location"],
),
transitions={
"succeeded": (
"DISABLE_HEAD_MANAGER"
if not IS_SIMULATION
else "RAISE_BASE"
),
"aborted": "succeeded",
"preempted": "failed",
},
)

if not IS_SIMULATION:
if PUBLIC_CONTAINER:
rospy.logwarn(
"You are using a public container. The head manager will not be start following navigation."
)
else:
smach.StateMachine.add(
"DISABLE_HEAD_MANAGER",
smach_ros.ServiceState(
"/pal_startup_control/stop",
StartupStop,
request=StartupStopRequest("head_manager"),
),
transitions={
"succeeded": "RAISE_BASE",
"aborted": "failed",
"preempted": "failed",
},
)

smach.StateMachine.add(
"RAISE_BASE",
PlayMotion("post_navigation"),
transitions={
"succeeded": "succeeded",
"aborted": "failed",
"preempted": "failed",
},
Expand All @@ -153,42 +210,8 @@ def __init__(
input_keys=["location"],
),
transitions={
"succeeded": (
"DISABLE_HEAD_MANAGER"
if not IS_SIMULATION
else "RAISE_BASE"
),
"aborted": "succeeded",
"succeeded": "succeeded",
"aborted": "failed",
"preempted": "failed",
},
)

if not IS_SIMULATION:
if PUBLIC_CONTAINER:
rospy.logwarn(
"You are using a public container. The head manager will not be start following navigation."
)
else:
smach.StateMachine.add(
"DISABLE_HEAD_MANAGER",
smach_ros.ServiceState(
"/pal_startup_control/stop",
StartupStop,
request=StartupStopRequest("head_manager"),
),
transitions={
"succeeded": "RAISE_BASE",
"aborted": "failed",
"preempted": "failed",
},
)

smach.StateMachine.add(
"RAISE_BASE",
PlayMotion("post_navigation"),
transitions={
"succeeded": "succeeded",
"aborted": "failed",
"preempted": "failed",
},
)
Loading