Skip to content

Commit

Permalink
feat: GPSR state machine factory 01 (#164)
Browse files Browse the repository at this point in the history
* fix: incorrect folder for command similarity states

* feat: add command parser state machine

* feat: working command parser sm

* feat: initial state machine factory
  • Loading branch information
m-barker authored Apr 23, 2024
1 parent 45da894 commit 41e47ff
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 8 deletions.
1 change: 1 addition & 0 deletions tasks/gpsr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ include_directories(
## in contrast to setup.py, you can choose the destination
catkin_install_python(PROGRAMS
scripts/parse_gpsr_xmls.py
scripts/main.py
nodes/commands/question_answer
nodes/command_parser
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
Expand Down
2 changes: 1 addition & 1 deletion tasks/gpsr/data/mock_data/names.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"angel",
"axel",
"charlie",
"janes",
"jane",
"jules",
"morgan",
"paris",
Expand Down
44 changes: 44 additions & 0 deletions tasks/gpsr/scripts/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
import smach
import rospy
import sys
from typing import Dict
from gpsr.load_known_data import GPSRDataLoader
from gpsr.state_machine_factory import build_state_machine
from gpsr.regex_command_parser import Configuration
from gpsr.states import CommandParserStateMachine


def load_gpsr_configuration() -> Configuration:
gpsr_data_dir = sys.argv[1]
"""Loads the configuration for the GPSR command parser"""
data_loader = GPSRDataLoader(data_dir=gpsr_data_dir)
gpsr_known_data: Dict = data_loader.load_data()
config = Configuration(
{
"person_names": gpsr_known_data["names"],
"location_names": gpsr_known_data["non_placeable_locations"],
"placement_location_names": gpsr_known_data["placeable_locations"],
"room_names": gpsr_known_data["rooms"],
"object_names": gpsr_known_data["objects"],
"object_categories_plural": gpsr_known_data["categories_plural"],
"object_categories_singular": gpsr_known_data["categories_singular"],
}
)
return config


def main():
config = load_gpsr_configuration()
command_parser_sm = CommandParserStateMachine(data_config=config)
command_parser_sm.execute()
parsed_command: Dict = command_parser_sm.userdata.parsed_command
rospy.loginfo(f"Parsed command: {parsed_command}")
sm = build_state_machine(parsed_command)
sm.execute()


if __name__ == "__main__":
rospy.init_node("gpsr_main")
main()
rospy.spin()
76 changes: 76 additions & 0 deletions tasks/gpsr/src/gpsr/state_machine_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python3
import rospy
import smach
from smach_ros import ServiceState
from typing import Dict, List
from lasr_skills import GoToLocation, FindNamedPerson
from gpsr.states import Talk

STATE_COUNT = 0


def increment_state_count() -> int:
global STATE_COUNT
STATE_COUNT += 1
return STATE_COUNT


def build_state_machine(parsed_command: Dict) -> smach.StateMachine:
"""Constructs the parameterized state machine for the GPSR task,
given the parsed command.
Args:
parsed_command (Dict): parsed command.
Returns:
smach.StateMachine: paramaterized state machine ready to be executed.
"""
command_verbs: List[str] = parsed_command["commands"]
command_params: List[Dict] = parsed_command["params"]
sm = smach.StateMachine(outcomes=["succeeded", "failed"])
with sm:
for command_verb, command_param in zip(command_verbs, command_params):
if command_verb == "greet":
if "name" in command_param:
location_param = (
f"/gpsr/arena/rooms/{command_param['location']}/pose"
)
sm.add(
f"STATE_{increment_state_count()}",
GoToLocation(location_param=location_param),
transitions={
"succeeded": f"STATE_{STATE_COUNT + 1}",
"failed": "failed",
},
)
sm.add(
f"STATE_{increment_state_count()}",
FindNamedPerson(
name=command_param["name"], location_param=location_param
),
transitions={
"succeeded": f"STATE_{STATE_COUNT + 1}",
"failed": "failed",
},
)
elif "clothes" in command_param:
pass
else:
raise ValueError(
"Greet command received with no name or clothes in command parameters"
)
elif command_verb == "talk":
if "gesture" in command_param:
pass
elif "talk" in command_param:
sm.add(
f"STATE_{increment_state_count()}",
Talk(command_param["talk"]),
transitions={"succeeded": "succeded", "failed": "failed"},
)
else:
raise ValueError(
"Talk command received with no gesture or talk in command parameters"
)

return sm
2 changes: 2 additions & 0 deletions tasks/gpsr/src/gpsr/states/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .talk import Talk
from .command_similarity_matcher import CommandSimilarityMatcher
from .command_parser import ParseCommand, CommandParserStateMachine
82 changes: 82 additions & 0 deletions tasks/gpsr/src/gpsr/states/command_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
import smach
import rospy

from gpsr.regex_command_parser import Configuration, gpsr_compile_and_parse
from gpsr.states import CommandSimilarityMatcher
from lasr_skills import AskAndListen


class ParseCommand(smach.State):
def __init__(self, data_config: Configuration):
"""Takes in a string containing the command and runs the command parser
that outputs a dictionary of parameters for the command.
Args:
data_config (Configuration): Configuration object containing the regex patterns
"""
smach.State.__init__(
self,
outcomes=["succeeded", "failed"],
input_keys=["raw_command"],
output_keys=["parsed_command"],
)
self.data_config = data_config

def execute(self, userdata):
rospy.loginfo(f"Received command : {userdata.raw_command.lower()}")
try:
userdata.parsed_command = gpsr_compile_and_parse(
self.data_config, userdata.raw_command.lower()
)
except Exception as e:
rospy.logerr(e)
return "failed"
return "succeeded"


class CommandParserStateMachine(smach.StateMachine):
def __init__(
self,
data_config: Configuration,
n_vecs_per_txt_file: int = 1177943,
total_txt_files: int = 10,
):
"""State machine that takes in a command, matches it to a known command, and
outputs the parsed command.
Args:
data_config (Configuration): Configuration object containing the regex patterns
n_vecs_per_txt_file (int, optional): number of vectors in each gpsr txt
file. Defaults to 100.
total_txt_files (int, optional): total number of gpsr txt files. Defaults to 10.
"""
smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"])

with self:
smach.StateMachine.add(
"ASK_FOR_COMMAND",
AskAndListen(tts_phrase="Hello, please tell me your command."),
transitions={"succeeded": "PARSE_COMMAND", "failed": "failed"},
remapping={"transcribed_speech": "raw_command"},
)

smach.StateMachine.add(
"PARSE_COMMAND",
ParseCommand(data_config),
transitions={
"succeeded": "succeeded",
"failed": "COMMAND_SIMILARITY_MATCHER",
},
remapping={"parsed_command": "parsed_command"},
)

smach.StateMachine.add(
"COMMAND_SIMILARITY_MATCHER",
CommandSimilarityMatcher([n_vecs_per_txt_file] * total_txt_files),
transitions={"succeeded": "PARSE_COMMAND", "failed": "failed"},
remapping={
"command": "raw_command",
"matched_command": "raw_command",
},
)
14 changes: 7 additions & 7 deletions tasks/gpsr/src/gpsr/states/talk.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
# In future we might want to add looking at person talking to the state machine.
class Talk(smach.StateMachine):
class GenerateResponse(smach.State):
def __init__(self):
def __init__(self, talk_phrase: str):
smach.State.__init__(
self,
outcomes=["succeeded", "failed"],
input_keys=["talk_phrase"],
output_keys=["response"],
)
self._talk_phrase = talk_phrase

def _create_responses(self) -> Dict[str, str]:
response = {}
Expand Down Expand Up @@ -43,23 +43,23 @@ def _create_responses(self) -> Dict[str, str]:

def execute(self, userdata):
try:
userdata.response = self._create_responses()[userdata.talk_phrase]
userdata.response = self._create_responses()[self._talk_phrase]
except KeyError:
rospy.loginfo(
f"Failed to generate response for {userdata.talk_phrase} as it is not in the list of possible questions."
f"Failed to generate response for {self._talk_phrase} as it is not in the list of possible questions."
)
return "failed"
return "succeeded"

def __init__(self):
def __init__(self, talk_phrase: str):
smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"])

with self:
smach.StateMachine.add(
"GENERATE_RESPONSE",
self.GenerateResponse(),
self.GenerateResponse(talk_phrase),
transitions={"succeeded": "SAY_RESPONSE", "failed": "failed"},
remapping={"talk_phrase": "talk_phrase", "response": "response"},
remapping={"response": "response"},
)

smach.StateMachine.add(
Expand Down

0 comments on commit 41e47ff

Please sign in to comment.