diff --git a/astrobee/survey_manager/CMakeLists.txt b/astrobee/survey_manager/CMakeLists.txt index 181faab7..8e588496 100644 --- a/astrobee/survey_manager/CMakeLists.txt +++ b/astrobee/survey_manager/CMakeLists.txt @@ -58,4 +58,4 @@ include_directories( ############# catkin_install_python(PROGRAMS scripts/command_astrobee scripts/monitor_astrobee - DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}) \ No newline at end of file + DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}) diff --git a/astrobee/survey_manager/scripts/command_astrobee b/astrobee/survey_manager/scripts/command_astrobee index b1224590..bc51fe72 100755 --- a/astrobee/survey_manager/scripts/command_astrobee +++ b/astrobee/survey_manager/scripts/command_astrobee @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Copyright (c) 2021, United States Government, as represented by the # Administrator of the National Aeronautics and Space Administration. @@ -25,29 +25,62 @@ import socket import subprocess import sys import threading - - -def get_position(bay): - - if bay == "jem_bay1": - return "'11, -4, 4.8'" - if bay == "jem_bay2": - return "'11, -5, 4.8'" - if bay == "jem_bay3": - return "'11, -6, 4.8'" - if bay == "jem_bay4": - return "'11, -7, 4.8'" - if bay == "jem_bay5": - return "'11, -8, 4.8'" - if bay == "jem_bay6": - return "'11, -9, 4.8'" - if bay == "jem_bay7": - return "'11, -9.7, 4.8'" - -class SurveyExecutor: +import yaml +import pathlib + +import rospy + +from std_msgs.msg import String +from ff_msgs.msg import CommandArg, CommandStamped +from ff_msgs.msg import CommandConstants +from ff_msgs.msg import AckStamped, AckCompletedStatus + +from typing import Any, Dict, List + +# Type alias +YamlMapping = Dict[str, Any] + +# Constants +MAX_COUNTER = 10 + +def exposure_change(bay_origin, bay_destination, value): + # Going to JEM + if bay_origin == "nod2_hatch_to_jem" and bay_destination == "jem_hatch_from_nod2": + value = 100 + return True + # Going to NOD2 + if (bay_origin == "jem_hatch_to_nod2" and bay_destination == "nod2_hatch_from_jem" + or bay_origin == "usl_hatch_to_nod2" and bay_destination == "nod2_hatch_from_usl"): + value = 300 + return True + # Going to USL + if bay_origin == "nod2_hatch_to_usl" and bay_destination == "usl_hatch_from_nod2": + value = 300 + return True + return False + +def map_change(bay_origin, bay_destination, map_name): + # Going to JEM + if bay_origin == "nod2_hatch_to_jem" and bay_destination == "jem_hatch_from_nod2": + map_name = "iss.map" + return True + # Going to NOD2 + if (bay_origin == "jem_hatch_to_nod2" and bay_destination == "nod2_hatch_from_jem" + or bay_origin == "usl_hatch_to_nod2" and bay_destination == "nod2_hatch_from_usl"): + map_name = "20220331_isaac5.map" + return True + # Going to USL + if bay_origin == "nod2_hatch_to_usl" and bay_destination == "usl_hatch_from_nod2": + map_name = "20220617_Isaac10_USLonly.map" + return True + + +# This class starts a new process and lets you monitor the input and output +# Mostly used for actions where user inteference might be required +class ProcessExecutor: def __init__(self, robot_name, goal, bay, run): - self.robot = robot_name + self.input_path = '/tmp/input_' + robot_name self.output_path = '/tmp/output_' + robot_name @@ -62,30 +95,18 @@ class SurveyExecutor: self.sock_input.settimeout(1) # Set a timeout for socket operations self.sock_input.bind(self.input_path) self.sock_input.listen(1) # Listen for one connection - # self.sock_input_connected = False - # self.sock_input.accept(connect_input_callback) # Declare socket for process output self.sock_output = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock_output.settimeout(1) # Set a timeout for socket operations self.sock_output.bind(self.output_path) self.sock_output.listen(1) # Listen for one connection - # self.sock_output_connected = False - # self.sock_output.accept(connect_output_callback) - - # Open the input FIFO file descriptor with non-blocking flag - # self.input_fifo_fd = os.open(self.input_fifo_path, os.O_RDONLY | os.O_NONBLOCK) - # self.input_fifo = os.fdopen(self.input_fifo_fd) - - # Open the output FIFO file descriptor with non-blocking flag - # self.output_fifo_fd = os.open(self.output_fifo_path, os.O_WRONLY) - # self.output_fifo = os.fdopen(self.output_fifo_fd, 'w') - + # Declare event that will stop input thread self._stop_event = threading.Event() - self.survey_manager_executor(goal, bay, run) + def __del__(self): self.sock_input.close() self.sock_output.close() @@ -102,7 +123,7 @@ class SurveyExecutor: def thread_write_output(self, process): print("starting thread_write_output...") - # Store commulative output + # Store cumulative output output_total = "" connected = False try: @@ -115,30 +136,27 @@ class SurveyExecutor: output_total += output # If socket is not connected try to connect - try: - if not connected: + if not connected: + try: print("trying to connect") conn, addr = self.sock_output.accept() conn.setblocking(False) connected = True conn.send(output_total.encode("utf-8")[:1024]) - except socket.timeout: - continue - except (socket.error, BrokenPipeError): - print("Error sending data. Receiver may have disconnected.") - connected = False + except socket.timeout: + continue + except (socket.error, BrokenPipeError): + print("Error sending data. Receiver may have disconnected.") + connected = False # If socket is already connected, send output - if connected: + elif connected: try: conn.send(output.encode("utf-8")[:1024]) except (socket.error, BrokenPipeError): print("Error sending data. Receiver may have disconnected.") connected = False - - # print(output.strip()) - # self.sock_output.send(output.encode("utf-8")[:1024]) except Exception as e: print("exit output:") print(e) @@ -176,6 +194,7 @@ class SurveyExecutor: if self._stop_event.is_set(): break + # If broken pipe connect again if not request: break print("got: " + request) @@ -188,8 +207,7 @@ class SurveyExecutor: print(e) - - def send_command_with_input(self, command): + def send_command(self, command): print(command) return_code = -1 @@ -217,23 +235,17 @@ class SurveyExecutor: print("Killing input thread...") self._stop_event.set() input_thread.join() + output_thread.join() # In normal scenarios this should have already happened # Get the final exit code return return_code - def send_command(self, command): - print(command) - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - # Get the output and error (if any) - return process.wait() - - def repeat_inspection(self): + def recursive_command(self, command): while True: user_input = input("Do you want to continue? (y/n): ").lower().strip() if user_input == 'y': - if send_command_with_input("rosrun inspection inspection_tool -resume") != 0: + if send_command_with_input(command) != 0: repeat_inspection() # Continue recursively break elif user_input == 'n': @@ -242,42 +254,202 @@ class SurveyExecutor: else: print("Invalid input. Please enter 'y' or 'n'.") - - def survey_manager_executor(self, goal, bay, run): - - if goal == "dock": - self.send_command("rosrun executive teleop -dock") - - elif goal == "dock": - self.send_command("rosrun executive teleop -undock") - - elif goal == "move": - self.send_command("rosrun executive teleop -move -pos " + bay) - - # Change exposure if needed - - # Change map if needed - - elif goal == "panorama": - - self.send_command("python gds_helper_batch.py -i cmd -- bagger -start pano_" + bay + "_" + run) - if self.send_command_with_input("rosrun inspection inspection_tool -panorama -panorama_mode '5_mapper_and_hugin' -pos " + str(get_position(bay))) != 0: - self.repeat_inspection() - self.send_command("python gds_helper_batch.py -i cmd -- bagger -stop") - - - - elif goal == "stereo": - self.send_command("python gds_helper_batch.py -i cmd -- bagger -start stereo_" + bay + "_" + run) - self.send_command("python gds_helper_batch.py -i cmd -- plan -load plans/ISAAC/" + bay + "_stereo_mapping.fplan") - self.send_command("python gds_helper_batch.py -i cmd -- plan -run") - self.send_command("python gds_helper_batch.py -i cmd -- bagger -stop") + def send_command_recursive(self, command): + if send_command(command): + recursive_command(command) + + +# This class sends a command to the executor and waits to get a response +# Mostly used for short actions that should be immediate and require no feedback +# This method is needed on actions that run remotely and are not controlled by topics +class CommandExecutor: + + + def __init__(self, ns): + self.ns = ns + # Start ROS node + rospy.init_node("survey_namager_cmd_" + robot_name) + # Declare guest science command publisher + self.pub_guest_sci = rospy.Publisher(self.ns + "command", CommandStamped, queue_size=5) + + self.unique_cmd_id = "" + + def __del__(self): + rospy.shutdown() + + def start_recording(): + + # Arg is bagfile name description + arg1 = CommandArg() + arg1.data_type = CommandArg.DATA_TYPE_STRING + arg1.s = "Front" + + # Create CommandStamped message + cmd_args = [arg1] + + cmd = CommandStamped() + cmd.header = Header(stamp=rospy.Time.now()) + cmd.cmd_name = CommandConstants.CMD_NAME_START_RECORDING + cmd.cmd_id = "survey_manager" + str(rospy.Time.now().to_sec()) + self.unique_cmd_id = cmd.cmd_id + cmd.cmd_src = "isaac fsw" + cmd.cmd_origin = "isaac fsw" + + # Publish the CommandStamped message + result = publish_and_wait_response(cmd) + + def stop_recording(bag_description): + cmd = CommandStamped() + cmd.header = Header(stamp=rospy.Time.now()) + cmd.cmd_name = CommandConstants.CMD_NAME_STOP_RECORDING + cmd.cmd_id = "survey_manager" + str(rospy.Time.now().to_sec()) + self.unique_cmd_id = cmd.cmd_id + cmd.cmd_src = "isaac fsw" + cmd.cmd_origin = "isaac fsw" + + # Publish the CommandStamped message + result = publish_and_wait_response(cmd) + + def change_exposure(val): + #TBD + rospy.loginfo("Change exposure to " + val) + + def change_map(map_name): + #TBD + rospy.loginfo("Change map to " + map_name) + + def set_plan(): + # TODO Add call to plan_pub.cc + + cmd = CommandStamped() + cmd.header = Header(stamp=rospy.Time.now()) + cmd.cmd_name = CommandConstants.CMD_NAME_SET_PLAN + cmd.cmd_id = "survey_manager" + str(rospy.Time.now().to_sec()) + self.unique_cmd_id = cmd.cmd_id + cmd.cmd_src = "isaac fsw" + cmd.cmd_origin = "isaac fsw" + cmd.args = cmd_args + + # Publish the CommandStamped message + result = publish_and_wait_response(cmd) + + def run_plan(): + cmd = CommandStamped() + cmd.header = Header(stamp=rospy.Time.now()) + cmd.cmd_name = CommandConstants.CMD_NAME_RUN_PLAN + cmd.cmd_id = "survey_manager" + str(rospy.Time.now().to_sec()) + self.unique_cmd_id = cmd.cmd_id + cmd.cmd_src = "isaac fsw" + cmd.cmd_origin = "isaac fsw" + + # Publish the CommandStamped message + result = publish_and_wait_response(cmd) + + def publish_and_wait_response(cmd): + + # Publish the CommandStamped message + self.pub_guest_sci.publish(cmd) + + # Wait for ack + counter = 0 + while counter < MAX_COUNTER: + try: + msg = rospy.wait_for_message(self.ns + "gt/ack", AckStamped, timeout = 5) + if msg.cmd_id == self.unique_cmd_id: + if msg.completed_status.status == AckCompletedStatus.NOT: + rospy.loginfo("Command is being executed and has not completed.") + elif msg.completed_status.status == AckCompletedStatus.OK: + rospy.loginfo("Command completed successfully!") + return 0 + else: + rospy.loginfo("Command failed! Message: " + Ack.message) + return 1 + except: + continue + counter += 1 + + +def load_yaml(yaml_path: pathlib.Path) -> YamlMapping: + """ + Return the YAML parse result for the file at `yaml_path`. + """ + with yaml_path.open(encoding="utf-8") as yaml_stream: + return yaml.safe_load(yaml_stream) + +def get_stereo_traj(from_bay, to_bay): + # Get stereo trajectory + traj_matches = [ + traj + for traj in static_config["stereo"].values() + if traj["base_location"] == from_bay and traj["bound_location"] == to_bay + ] + assert ( + len(traj_matches) == 1 + ), f"Expected exactly 1 matching stereo trajectory with base {base} and bound {bound}, got {len(traj_matches)}" + fplan = traj_matches[0]["fplan"] + return fplan + +def survey_manager_executor(robot_name, goal, to_bay, from_bay, run, config_static_path: pathlib.Path): + + # Read the static configs that convert constants to values + config_static = load_yaml(config_static_path) + + sim = False + # Figure out robot name and whether we are in simulation or hardware + current_robot = os.environ.get('ROBOTNAME') + if not current_robot: + rospy.loginfo("We're in simulation. Let's get the robotname using the topic") + # This is a latching messge so it shouldn't take long + data = rospy.wait_for_message('/robot_name', String, timeout=2) + current_robot = data.data.lower() + sim = True + + ns = "" + # If we're commanding a robot remotely + if current_robot != robot_name: + rospy.loginfo("We're commanding a robot remotely!") + ns = " -remote -ns " + robot_name + # Command executor will add namespace for bridge forwarding + command_executor = CommandExecutor(robot_name + "/") + else: + command_executor = CommandExecutor("") + process_executor = ProcessExecutor() + + # Initialize exit code + exit_code = 0 + + if goal == "dock": + exit_code += process_executor.send_command_recursive("rosrun executive teleop -dock" + ns) + + elif goal == "undock": + exit_code += process_executor.send_command_recursive("rosrun executive teleop -undock" + ns) + + elif goal == "move": + exit_code += process_executor.send_command_recursive("rosrun executive teleop -move " + bay + ns) + # Change exposure if needed + if exposure_change(from_bay, to_bay, exposure_value): + exit_code += command_executor.change_exposure(exposure_value) + # Change map if needed + if map_change(from_bay, to_bay, map_name): + exit_code += command_executor.change_map(map_name) + + elif goal == "panorama": + exit_code += command_executor.start_recording("pano_" + bay + "_" + run) + exit_code += process_executor.send_command_recursive("rosrun inspection inspection_tool -panorama -panorama_mode '5_mapper_and_hugin' " + static_config["bays"][to_bay] + ns) + exit_code += command_executor.stop_recording() + + elif goal == "stereo": + exit_code += command_executor.start_recording("stereo_" + bay + "_" + run) + exit_code += command_executor.set_plan("plans/ISAAC/" + get_stereo_traj(from_bay, to_bay)) + exit_code += command_executor.run_plan() + exit_code += command_executor.stop_recording() + + return exit_code class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter): pass - if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=CustomFormatter @@ -293,16 +465,26 @@ if __name__ == "__main__": help="Prefix for bagfiles to merge. Bags should all be in the current working directory.", ) parser.add_argument( - "bay", + "to_bay", + default="", + help="Target bay destination.", + ) + parser.add_argument( + "from_bay", default="", help="Target bay start.", ) - parser.add_argument( "run", default="", help="Run number, increases as we add attempts.", ) + parser.add_argument( + "config_static", + help="Path to input static problem config YAML (module geometry, available stereo surveys, etc.)", + type=pathlib.Path, + default="survey_constants.yaml", + ) args = parser.parse_args() - e = SurveyExecutor(args.robot_name, args.command_name, args.bay, args.run) + exit_code = survey_manager_executor(args.robot_name, args.command_name, args.to_bay, args.from_bay, args.run, args.config_static) diff --git a/astrobee/survey_manager/scripts/monitor_astrobee b/astrobee/survey_manager/scripts/monitor_astrobee index 2a9d691c..82f53a4e 100755 --- a/astrobee/survey_manager/scripts/monitor_astrobee +++ b/astrobee/survey_manager/scripts/monitor_astrobee @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Copyright (c) 2021, United States Government, as represented by the # Administrator of the National Aeronautics and Space Administration.