-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add state machine for showcasing command parsing
- Loading branch information
Showing
5 changed files
with
188 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
#!/usr/bin/env python3 | ||
import argparse | ||
import smach | ||
import rospy | ||
from typing import Dict | ||
|
||
from gpsr.load_known_data import GPSRDataLoader | ||
from gpsr.regex_command_parser import Configuration, gpsr_compile_and_parse | ||
from lasr_skills import AskAndListen, Say | ||
|
||
ENGISH_MAPPING = { | ||
"goToLoc": "Go to location", | ||
"findPrsInRoom": "Find people in room", | ||
"meetPrsAtBeac": "Meet person at beacon", | ||
"countPrsInRoom": "Count people in room", | ||
"tellPrsInfoInLoc": "Tell person information in location", | ||
"talkInfoToGestPrsInRoom": "Talk information to guest in room", | ||
"answerToGestPrsInRoom": "Answer to guest in room", | ||
"followNameFromBeacToRoom": "Follow name from beacon to room", | ||
"guideNameFromBeacToBeac": "Guide name from beacon to beacon", | ||
"guidePrsFromBeacToBeac": "Guide person from beacon to beacon", | ||
"guideClothPrsFromBeacToBeac": "Guide clothed person from beacon to beacon", | ||
"greetClothDscInRm": "Greet clothed description in room", | ||
"greetNameInRm": "Greet name in room", | ||
"meetNameAtLocThenFindInRm": "Meet name at location then find in room", | ||
"countClothPrsInRoom": "Count clothed people in room", | ||
"tellPrsInfoAtLocToPrsAtLoc": "Tell person information at location to person at location", | ||
"followPrsAtLoc": "Follow person at location", | ||
"takeObjFromPlcmt": "Take object from placement", | ||
"findObjInRoom": "Find object in room", | ||
"countObjOnPlcmt": "Count object on placement", | ||
"tellObjPropOnPlcmt": "Tell object properties on placement", | ||
"bringMeObjFromPlcmt": "Bring me object from placement", | ||
"tellCatPropOnPlcmt": "Tell category properties on placement", | ||
} | ||
|
||
|
||
def parse_args() -> Dict: | ||
parser = argparse.ArgumentParser(description="GPSR Command Parser") | ||
parser.add_argument( | ||
"--data-dir", | ||
type=str, | ||
default="../data/mock_data/", | ||
help="Path to the directory that contains the data json files.", | ||
) | ||
known, unknown = parser.parse_known_args() | ||
return vars(known) | ||
|
||
|
||
class ParseCommand(smach.State): | ||
def __init__(self, data_config: Configuration): | ||
smach.State.__init__( | ||
self, | ||
outcomes=["succeeded", "failed"], | ||
input_keys=["transcribed_speech"], | ||
output_keys=["command"], | ||
) | ||
self.data_config = data_config | ||
|
||
def execute(self, userdata): | ||
try: | ||
userdata.command = gpsr_compile_and_parse( | ||
self.data_config, userdata.transcribed_speech.lower() | ||
) | ||
except Exception as e: | ||
rospy.logerr(e) | ||
return "failed" | ||
return "succeeded" | ||
|
||
|
||
class OutputParsedCommand(smach.State): | ||
def __init__(self): | ||
smach.State.__init__( | ||
self, | ||
outcomes=["succeeded", "failed"], | ||
input_keys=["command"], | ||
output_keys=["command_string"], | ||
) | ||
|
||
def execute(self, userdata): | ||
try: | ||
command = userdata.command | ||
english_command = ENGISH_MAPPING[command["command"]] | ||
command_parameters = command[command["command"]] | ||
rospy.loginfo(f"Command: {english_command}") | ||
rospy.loginfo(f"Parameters: {command_parameters}") | ||
tts_phrase = f"I parsed the command as you want me to: {english_command}, with the following parameters:" | ||
for key, value in command_parameters.items(): | ||
if isinstance(value, list): | ||
value = " and ".join(value) | ||
tts_phrase += f" {key}: {value}," | ||
except Exception as e: | ||
rospy.logerr(e) | ||
return "failed" | ||
rospy.loginfo(tts_phrase) | ||
userdata.command_string = tts_phrase | ||
return "succeeded" | ||
|
||
|
||
class CommandParserStateMachine(smach.StateMachine): | ||
def __init__(self, config: Configuration): | ||
smach.StateMachine.__init__( | ||
self, | ||
outcomes=["succeeded", "failed"], | ||
input_keys=["tts_phrase", "command_string"], | ||
output_keys=["command"], | ||
) | ||
self.config = config | ||
with self: | ||
smach.StateMachine.add( | ||
"GET_COMMAND", | ||
AskAndListen(), | ||
transitions={"succeeded": "PARSE_COMMAND", "failed": "GET_COMMAND"}, | ||
remapping={ | ||
"tts_phrase": "tts_phrase", | ||
"transcribed_speech": "transcribed_speech", | ||
}, | ||
) | ||
smach.StateMachine.add( | ||
"PARSE_COMMAND", | ||
ParseCommand(data_config=self.config), | ||
transitions={ | ||
"succeeded": "OUTPUT_PARSED_COMMAND", | ||
"failed": "GET_COMMAND", | ||
}, | ||
remapping={ | ||
"transcribed_speech": "transcribed_speech", | ||
"command": "command", | ||
}, | ||
) | ||
smach.StateMachine.add( | ||
"OUTPUT_PARSED_COMMAND", | ||
OutputParsedCommand(), | ||
transitions={ | ||
"succeeded": "SAY_PARSED_COMMAND", | ||
"failed": "GET_COMMAND", | ||
}, | ||
remapping={"command": "command", "tts_phrase": "tts_phrase"}, | ||
) | ||
smach.StateMachine.add( | ||
"SAY_PARSED_COMMAND", | ||
Say(), | ||
transitions={ | ||
"succeeded": "GET_COMMAND", | ||
"aborted": "GET_COMMAND", | ||
"preempted": "GET_COMMAND", | ||
}, | ||
remapping={"text": "command_string"}, | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
rospy.init_node("gpsr_command_parser") | ||
args = parse_args() | ||
data_loader = GPSRDataLoader(data_dir=args["data_dir"]) | ||
gpsr_known_data: Dict = data_loader.load_data() | ||
config = Configuration( | ||
{ | ||
"person_names": gpsr_known_data["names"], | ||
"location_names": gpsr_known_data["non_placeable_locations"], | ||
"placement_location_names": gpsr_known_data["placeable_locations"], | ||
"room_names": gpsr_known_data["rooms"], | ||
"object_names": gpsr_known_data["objects"], | ||
"object_categories_plural": gpsr_known_data["categories_plural"], | ||
"object_categories_singular": gpsr_known_data["categories_singular"], | ||
} | ||
) | ||
rospy.loginfo("GPSR Command Parser: Initialized") | ||
sm = CommandParserStateMachine(config) | ||
sm.userdata.tts_phrase = "I am ready to receive a command; ask away!" | ||
result = sm.execute() | ||
rospy.spin() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters