forked from bdaiinstitute/spot_ros2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
simple_spot_commander.py
72 lines (60 loc) · 2.3 KB
/
simple_spot_commander.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# Copyright [2023] Boston Dynamics AI Institute, Inc.
import argparse
import logging
from typing import Any, Dict, Optional
import bdai_ros2_wrappers.process as ros_process
import bdai_ros2_wrappers.scope as ros_scope
import rclpy
from bdai_ros2_wrappers.utilities import fqn, namespace_with
from rclpy.client import Client
from rclpy.node import Node
from std_srvs.srv import Trigger
TRIGGER_SERVICES = [
"claim",
"release",
"stop",
"self_right",
"sit",
"stand",
"power_on",
"power_off",
"estop/hard",
"estop/gentle",
"estop/release",
]
class SimpleSpotCommander:
def __init__(self, robot_name: Optional[str] = None, node: Optional[Node] = None) -> None:
self._logger = logging.getLogger(fqn(self.__class__))
node = node or ros_scope.node()
if node is None:
raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)")
self._command_map: Dict[str, Client] = {}
for service_basename in TRIGGER_SERVICES:
service_name = namespace_with(robot_name, service_basename)
self._command_map[service_basename] = node.create_client(Trigger, service_name)
self._logger.info(f"Waiting for service {service_basename}")
self._command_map[service_basename].wait_for_service()
self._logger.info(f"Found service {service_basename}")
def command(self, command: str) -> Any:
try:
return self._command_map[command].call(Trigger.Request())
except KeyError:
err = f"No command {command}"
self._logger.error(err)
return Trigger.Response(success=False, message=err)
def cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("robot", help="Name of the robot if the ROS driver is inside that namespace")
return parser
@ros_process.main(cli())
def main(args: argparse.Namespace) -> None:
commander = SimpleSpotCommander(args.robot)
while rclpy.ok():
cmd = input("Please enter a command:\n" + " ".join(TRIGGER_SERVICES) + "\n> ")
result = commander.command(cmd)
if not result.success:
print("Error was", result.message)
else:
print("Successfully executed command")
if __name__ == "__main__":
main()