diff --git a/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py b/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py index 06f3a278..208ee368 100644 --- a/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py +++ b/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py @@ -350,7 +350,11 @@ def _check_finished(self) -> bool: ) transcription = self._transcribe_speech_client.get_result().sequence - return "yes" in transcription.lower() or "arrived" in transcription.lower() or "arrive" in transcription.lower() + return ( + "yes" in transcription.lower() + or "arrived" in transcription.lower() + or "arrive" in transcription.lower() + ) return True def _get_pose_on_path( diff --git a/tasks/carry_my_luggage/src/carry_my_luggage/state_machine.py b/tasks/carry_my_luggage/src/carry_my_luggage/state_machine.py index a04048e5..8669c160 100644 --- a/tasks/carry_my_luggage/src/carry_my_luggage/state_machine.py +++ b/tasks/carry_my_luggage/src/carry_my_luggage/state_machine.py @@ -193,11 +193,19 @@ def wait_cb(ud, msg): def start_pose_cb(ud): try: - ud.start_pose = rospy.wait_for_message("/robot_pose", PoseWithCovarianceStamped, timeout=rospy.Duration(5.0)).pose.pose + ud.start_pose = rospy.wait_for_message( + "/robot_pose", + PoseWithCovarianceStamped, + timeout=rospy.Duration(5.0), + ).pose.pose except rospy.ROSException: rospy.logerr("Failed to get robot pose") - ud.start_pose = Pose(position=Point(0., 0., 0.0), orientation=Quaternion(0., 0., 0., 1.)) + ud.start_pose = Pose( + position=Point(0.0, 0.0, 0.0), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ) return "succeeded" + smach.StateMachine.add( "GET_START_LOCATION", smach.CBState( @@ -205,7 +213,7 @@ def start_pose_cb(ud): outcomes=["succeeded"], output_keys=["start_pose"], ), - transitions={"succeeded": "SAY_STEP"} + transitions={"succeeded": "SAY_STEP"}, ) smach.StateMachine.add( @@ -261,8 +269,8 @@ def start_pose_cb(ud): ) smach.StateMachine.add( - "GO_TO_START", # todo: instead, get the start position within the state machine, and return to it at the end + "GO_TO_START", # todo: instead, get the start position within the state machine, and return to it at the end GoToLocation(), - remapping={"location" : "start_pose"}, + remapping={"location": "start_pose"}, transitions={"succeeded": "succeeded", "failed": "succeeded"}, ) diff --git a/tasks/gpsr/src/gpsr/states/object_comparison.py b/tasks/gpsr/src/gpsr/states/object_comparison.py index 4b82afe1..e5fdc376 100755 --- a/tasks/gpsr/src/gpsr/states/object_comparison.py +++ b/tasks/gpsr/src/gpsr/states/object_comparison.py @@ -19,16 +19,133 @@ class ObjectComparison(smach.StateMachine): # TODO: fill this in - _smallest_list: List[str] = ["dishwasher_tab", "candle", "strawberry", "tictac", "plum", "lemon", "pear", "peach", "orange", "apple", "sponges", "fork", "knife", "spoon", "banana", "dubbelfris", "cola", "ice_tea", "fanta", "milk", "pea_soup", "sausages", "cup", "stroopwafel", "soap", "curry", "water", "crisps", "liquorice", "candy", "hagelslag", "pancake_mix", "mayonaise", "bowl", "plate", "washcloth", "pringles", "big_coke" ] + _smallest_list: List[str] = [ + "dishwasher_tab", + "candle", + "strawberry", + "tictac", + "plum", + "lemon", + "pear", + "peach", + "orange", + "apple", + "sponges", + "fork", + "knife", + "spoon", + "banana", + "dubbelfris", + "cola", + "ice_tea", + "fanta", + "milk", + "pea_soup", + "sausages", + "cup", + "stroopwafel", + "soap", + "curry", + "water", + "crisps", + "liquorice", + "candy", + "hagelslag", + "pancake_mix", + "mayonaise", + "bowl", + "plate", + "washcloth", + "pringles", + "big_coke", + ] _biggest_list: List[str] = reversed(_smallest_list) _largest_list: List[str] = _biggest_list - _heaviest_list: List[str] = ["big_coke", "pea_soup", "milk", "mayonaise", "cola", "ice_tea", "fanta", "dubbelfris", "water", "pancake_mix", "curry", "soap", "sausages", "apple", "orange", "peach", "pear", "lemon", "plum", "banana", "stroopwafel", "cup", "bowl", "plate", "washcloth", "sponges", "pringles", "crisps", "hagelslag", "liquorice", "candy", "knife", "fork", "spoon", "candle", "strawberry", "tictac", "dishwasher_tab"] + _heaviest_list: List[str] = [ + "big_coke", + "pea_soup", + "milk", + "mayonaise", + "cola", + "ice_tea", + "fanta", + "dubbelfris", + "water", + "pancake_mix", + "curry", + "soap", + "sausages", + "apple", + "orange", + "peach", + "pear", + "lemon", + "plum", + "banana", + "stroopwafel", + "cup", + "bowl", + "plate", + "washcloth", + "sponges", + "pringles", + "crisps", + "hagelslag", + "liquorice", + "candy", + "knife", + "fork", + "spoon", + "candle", + "strawberry", + "tictac", + "dishwasher_tab", + ] _lightest_list: List[str] = reversed(_heaviest_list) - _thinnest_list: List[str] = ["knife", "fork", "spoon", "candle", "strawberry", "tictac", "dishwasher_tab", "liquorice", "candy", "crisps", "stroopwafel", "soap", "sponges", "washcloth", "plate", "bowl", "cup", "pancake_mix", "hagelslag", "curry", "mayonaise", "pea_soup", "sausages", "milk", "water", "dubbelfris", "cola", "ice_tea", "fanta", "big_coke", "apple", "orange", "peach", "pear", "lemon", "plum", "banana", "pringles"] + _thinnest_list: List[str] = [ + "knife", + "fork", + "spoon", + "candle", + "strawberry", + "tictac", + "dishwasher_tab", + "liquorice", + "candy", + "crisps", + "stroopwafel", + "soap", + "sponges", + "washcloth", + "plate", + "bowl", + "cup", + "pancake_mix", + "hagelslag", + "curry", + "mayonaise", + "pea_soup", + "sausages", + "milk", + "water", + "dubbelfris", + "cola", + "ice_tea", + "fanta", + "big_coke", + "apple", + "orange", + "peach", + "pear", + "lemon", + "plum", + "banana", + "pringles", + ] _query: Literal[ "biggest", "largest", "smallest", "heaviest", "lightest", "thinnest" diff --git a/tasks/receptionist/src/receptionist/states/get_name_and_drink.py b/tasks/receptionist/src/receptionist/states/get_name_and_drink.py index 1808dc03..b20bd862 100644 --- a/tasks/receptionist/src/receptionist/states/get_name_and_drink.py +++ b/tasks/receptionist/src/receptionist/states/get_name_and_drink.py @@ -78,7 +78,6 @@ def execute(self, userdata: UserData) -> str: return outcome - class PostRecoveryDecision(smach.State): def __init__( self, @@ -96,7 +95,6 @@ def __init__( self._possible_names = [name.lower() for name in prior_data["names"]] self._possible_drinks = [drink.lower() for drink in prior_data["drinks"]] - def execute(self, userdata: UserData) -> str: if not self._recovery_name_and_drink_required(userdata): if userdata.guest_data[self._guest_id]["name"] == "unknown": @@ -107,7 +105,6 @@ def execute(self, userdata: UserData) -> str: outcome = "failed" return outcome - def _recovery_name_and_drink_required(self, userdata: UserData) -> bool: """Determine whether both the name and drink requires recovery. @@ -126,11 +123,6 @@ def __init__( last_resort: bool, param_key: str = "/receptionist/priors", ): - self, - guest_id: str, - last_resort: bool, - param_key: str = "/receptionist/priors", - ): self._guest_id = guest_id self._param_key = param_key @@ -170,43 +162,3 @@ def __init__( "failed_drink": "failed_drink", }, ) - - def __init__( - self, - guest_id: str, - last_resort: bool, - param_key: str = "/receptionist/priors", - ): - - self._guest_id = guest_id - self._param_key = param_key - self._last_resort = last_resort - - smach.StateMachine.__init__( - self, - outcomes=["succeeded", "failed", "failed_name", "failed_drink"], - input_keys=["guest_transcription", "guest_data"], - output_keys=["guest_data", "guest_transcription"], - - ) - with self: - - smach.StateMachine.add( - "PARSE_NAME_AND_DRINK", - self.ParseNameAndDrink(guest_id=self._guest_id, param_key=self._param_key), - transitions={"succeeded": "succeeded", "failed": "SPEECH_RECOVERY"}, - ) - smach.StateMachine.add( - "SPEECH_RECOVERY", - SpeechRecovery(self._guest_id, self._last_resort), - transitions={"succeeded": "succeeded", "failed": "POST_RECOVERY_DECISION"}, - ) - smach.StateMachine.add( - "POST_RECOVERY_DECISION", - self.PostRecoveryDecision(guest_id=self._guest_id, param_key=self._param_key), - transitions={ - "failed": "failed", - "failed_name": "failed_name", - "failed_drink": "failed_drink", - }, - ) \ No newline at end of file diff --git a/tasks/receptionist/src/receptionist/states/local_speech_recognition.py b/tasks/receptionist/src/receptionist/states/local_speech_recognition.py deleted file mode 100644 index 5f79f4d8..00000000 --- a/tasks/receptionist/src/receptionist/states/local_speech_recognition.py +++ /dev/null @@ -1,216 +0,0 @@ -import jellyfish as jf - - -available_names = [ - "adel", - "angel", - "axel", - "charlie", - "jane", - "jules", - "morgan", - "paris", - "robin", - "simone", -] -available_single_drinks = [ - "cola", - "milk", -] -available_double_drinks = [ - "iced", - "tea", - "pack", - "juice", - "orange", - "red", - "wine", - "tropical", -] -double_drinks_dict = { - "iced": "iced tea", - "tea": "iced tea", - "pack": "juice pack", - "orange": "orange juice", - "red": "red wine", - "wine": "red wine", - "tropical": "tropical juice", - # "juice": ["orange juice", "tropical juice", "juice pack"], -} -available_drinks = list( - set(available_single_drinks).union(set(available_double_drinks)) -) -excluded_words = [ - "my", - "name", - "is", - "and", - "favourite", - "drink", - "you", - "can", - "call", - "me", -] - - -def speech_recovery(sentence): - sentence_split = sentence.split() - sentence_list = list(set(sentence_split) - set(excluded_words)) - print(f"final name: {handle_name(sentence_list, True)}") - print(f"final drink: {handle_drink(sentence_list, True)}") - - -def handle_name(sentence_list, last_resort): - result = handle_similar_spelt(sentence_list, available_names, 1) - if result != "unknown": - print(f"name (spelt): {result}") - return result - else: - result = handle_similar_sound(sentence_list, available_names, 0) - print(f"name (sound): {result}") - if not last_resort or result != "unknown": - return result - else: - print("Last resort name") - return handle_closest_spelt(sentence_list, available_names) - - -def handle_drink(sentence_list, last_resort): - result = infer_second_drink(sentence_list) - if result != "unknown": - return result - result = handle_similar_spelt(sentence_list, available_drinks, 1) - if result != "unknown": - print(f"drink (spelt): {result}") - else: - result = handle_similar_sound(sentence_list, available_drinks, 0) - print(f"drink (sound): {result}") - if result != "unknown": - if result in available_single_drinks: - print(f"final attempt drink: {result}") - return result - else: - sentence_list.append(result) - return infer_second_drink(sentence_list) - else: - if not last_resort: - return "unknown" - else: - print("Last resort drink") - closest_spelt = handle_closest_spelt(sentence_list, available_drinks) - if closest_spelt in available_single_drinks: - print(f"final attempt during last resort drink: {closest_spelt}") - return closest_spelt - else: - sentence_list.append(closest_spelt) - return infer_second_drink(closest_spelt) - - -def handle_similar_spelt(sentence_list, available_words, distance_threshold): - for input_word in sentence_list: - for available_word in available_words: - distance = get_damerau_levenshtein_distance(input_word, available_word) - if distance <= distance_threshold: - return available_word - return "unknown" - - -def handle_similar_sound(sentence_list, available_words, distance_threshold): - for input_word in sentence_list: - for available_word in available_words: - distance = get_levenshtein_soundex_distance(input_word, available_word) - if distance <= distance_threshold: - print(input_word, available_word) - return available_word - return "unknown" - - -def infer_second_drink(sentence_list): - for input_word in sentence_list: - if input_word == "juice": - choices = ["pack", "orange", "tropical"] - closest_word = handle_closest_spelt(sentence_list, choices) - if closest_word == "pack": - return "juice pack" - elif closest_word == "orange": - return "orange juice" - else: - return "tropical juice" - for available_word in available_double_drinks: - if input_word == available_word: - return double_drinks_dict[input_word] - return "unknown" - - -def handle_closest_spelt(sentence_list, choices): - closest_distance = float("inf") - closest_word = None - for input_word in sentence_list: - for available_word in choices: - distance = get_damerau_levenshtein_distance(input_word, available_word) - if distance < closest_distance: - closest_distance = distance - closest_word = available_word - return closest_word - - -def get_damerau_levenshtein_distance(word_1, word_2): - return jf.damerau_levenshtein_distance(word_1, word_2) - - -def get_levenshtein_soundex_distance(word_1, word_2): - soundex_word_1 = jf.soundex(word_1) - soundex_word_2 = jf.soundex(word_2) - return jf.levenshtein_distance(soundex_word_1, soundex_word_2) - - -# print(get_damerau_levenshtein_distance("juice", "shoes")) -# print(get_levenshtein_soundex_distance("juice", "shoes")) - - -# print(jf.levenshtein_distance(jf.metaphone("my"), jf.metaphone("tea"))) - -# print(jf.soundex("juice"), jf.soundex("shoes")) - -# print(jf.levenshtein_distance(jf.soundex("jane"), jf.soundex("axasel"))) - -# available_names = ["adel", "angel", "axel", "charlie", "jane", "jules", "morgan", "paris", "robin", "simone"] -# available_single_drinks = ["cola", "milk",] -# available_double_drinks = ["iced", "tea", "juice", "pack", "orange", "red", "wine", "tropical"] -# double_drinks_dict = {"iced" : "iced tea", -# "tea": "iced tea", -# "pack" : "juice pack", -# "orange" : "orange juice", -# "red" : "red wine", -# "wine" : "red wine", -# "tropical" : "tropical juice", -# "juice": ["orange juice", "tropical juice", "juice pack"], -# } - -# available_names_and_drinks = list(set(available_names).union(set(available_single_drinks)).union(set(available_double_drinks))) - -# sentence = "my name is axl and my favourite drink is orange shoes" - -# dataList = set(sentence.split()).union(set(available_names_and_drinks)) - - -if __name__ == "__main__": - sentence = "my name is jay and my favourite drink is mill" - speech_recovery(sentence) - print("======") - sentence = "my name is jayne and my favourite drink is oras juice" - speech_recovery(sentence) - print("======") - sentence = "my name is axl and my favourite drink is tropical ef" - speech_recovery(sentence) - print("======") - sentence = "my name is axl and my favourite drink is p jews" - speech_recovery(sentence) - print("======") - sentence = "my name is axasel and my favourite drink is orange juice juice" - speech_recovery(sentence) - print("======") - sentence = "my name is morgen and my favourite drink is mll" - speech_recovery(sentence) - print("======") diff --git a/tasks/receptionist/src/receptionist/states/speech_recovery.py b/tasks/receptionist/src/receptionist/states/speech_recovery.py index 42748dcc..3fdf91cd 100644 --- a/tasks/receptionist/src/receptionist/states/speech_recovery.py +++ b/tasks/receptionist/src/receptionist/states/speech_recovery.py @@ -1,3 +1,8 @@ +""" +State for recovering the speech transcribed via whisper (name and drink) by using +the spelling and pronounciation of a word. +""" + import rospy import smach import string @@ -13,6 +18,15 @@ def __init__( last_resort: bool, input_type: str = "", ): + """Recover the correct name and / or drink by parsing the transcription. + + Args: + guest_id (str): ID of the guest (identifying the guest) + last_resort (bool): Whether the program must recover a name or drink + input_type (str, optional): The type of information to try and extract useful information + (drink or name) + """ + smach.State.__init__( self, outcomes=["succeeded", "failed"], @@ -95,6 +109,17 @@ def __init__( ] def execute(self, userdata: UserData) -> str: + """Optimise the transcription, then attempt to recover the drink or / and name. + + Args: + userdata (UserData): State machine userdata assumed to contain a key + called "guest transcription" with the transcription of the guest's name or + favourite drink or both. + + Returns: + str: state outcome. Updates the userdata with the parsed information (drink or name), under + the parameter "guest_data". + """ filtered_sentence = userdata.guest_transcription.lower().translate( str.maketrans("", "", string.punctuation) ) @@ -136,7 +161,17 @@ def execute(self, userdata: UserData) -> str: else: return "succeeded" - def _handle_name(self, sentence_list, last_resort): + def _handle_name(self, sentence_list: List[str], last_resort: bool) -> str: + """Attempt to recover the name in the transcription. First recover via spelling, then pronounciation. + Enter last resort if necessary and recover the closest spelt name. + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + last_resort (bool): Whether the program must recover a name + + Returns: + str: Recovered name. 'unknown' is returned if no name is recovered. + """ result = self._handle_similar_spelt(sentence_list, self._available_names, 1) if result != "unknown": print(f"name (spelt): {result}") @@ -150,7 +185,18 @@ def _handle_name(self, sentence_list, last_resort): print("Last resort name") return self._handle_closest_spelt(sentence_list, self._available_names) - def _handle_drink(self, sentence_list, last_resort): + def _handle_drink(self, sentence_list: List[str], last_resort: bool) -> str: + """Attempt to recover the drink in the transcription. For phrases containing two words, try to infer the + second word in the phrase. If this fails, attempt to recover the drink via spelling, then pronounciation. + Enter last resort if necessary and recover the closest spelt drink. + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + last_resort (bool): Whether the program must recover a drink + + Returns: + str: Recovered drink. 'unknown' is returned if no drink is recovered. + """ result = self._infer_second_drink(sentence_list) if result != "unknown": return result @@ -187,7 +233,23 @@ def _handle_drink(self, sentence_list, last_resort): sentence_list.append(closest_spelt) return self._infer_second_drink(sentence_list) - def _handle_similar_spelt(self, sentence_list, available_words, distance_threshold): + def _handle_similar_spelt( + self, + sentence_list: List[str], + available_words: List[str], + distance_threshold: int, + ) -> str: + """Recover any word by spelling that has a similarity lower than the specified threshold + when comparing each word in the sentence list to the list of available words. + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + available_words (List[str]): List of available words to compare to (drinks or names) + distance_threshold (int): Similarity in terms of spelling distance required for a word to be recovered + + Returns: + str: Recovered word. 'unknown' is returned if no word is recovered. + """ for input_word in sentence_list: for available_word in available_words: distance = self._get_damerau_levenshtein_distance( @@ -197,7 +259,23 @@ def _handle_similar_spelt(self, sentence_list, available_words, distance_thresho return available_word return "unknown" - def _handle_similar_sound(self, sentence_list, available_words, distance_threshold): + def _handle_similar_sound( + self, + sentence_list: List[str], + available_words: List[str], + distance_threshold: int, + ) -> str: + """Recover any word by pronounciation that has a similarity lower than the specified threshold + when comparing each word in the sentence list to the list of available words. + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + available_words (List[str]): List of available words to compare to (drinks or names) + distance_threshold (int): Similarity in terms of pronounciation distance required for a word to be recovered + + Returns: + str: Recovered word or phrase. 'unknown' is returned if no word is recovered. + """ for input_word in sentence_list: for available_word in available_words: distance = self._get_levenshtein_soundex_distance( @@ -208,14 +286,35 @@ def _handle_similar_sound(self, sentence_list, available_words, distance_thresho return available_word return "unknown" - def _infer_second_drink(self, sentence_list): + def _infer_second_drink(self, sentence_list: List[str]) -> str: + """Infer the second word of a two-worded drink phrase and hence the entire phrase, if + a word contained in any of the two-worded drink phrases is detected. + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + + Returns: + str: Recovered drink phrase. 'unknown' is returned if no drink phrase is recovered. + """ for input_word in sentence_list: for available_word in self._available_double_drinks: if input_word == available_word: return self._double_drinks_dict[input_word] return "unknown" - def _handle_closest_spelt(self, sentence_list, choices): + def _handle_closest_spelt( + self, sentence_list: List[str], choices: List[str] + ) -> str: + """Get the closest spelt word from the list of choices (drinks or names) + in the sentence list (transcription). + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + choices (List[str]): List of choices to compare to (drinks or names) + + Returns: + str: Recovered closest spelt word. + """ closest_distance = float("inf") closest_word = None for input_word in sentence_list: @@ -228,7 +327,16 @@ def _handle_closest_spelt(self, sentence_list, choices): closest_word = available_word return closest_word - def _recover_dubbelfris(self, sentence_list): + def _recover_dubbelfris(self, sentence_list: List[str]) -> bool: + """Recover the drink dubbelfris if any of the words in the sentence list (transcription) is + similar enough (lower than the threshold) in terms of pronounciation to the word. + + Args: + sentence_list (List[str]): Transcription split up as a list of strings. + + Returns: + bool: Whether dubbelfris has been recovered + """ for word in sentence_list: if self._get_levenshtein_soundex_distance("dubbelfris", word) < 3: print(word) @@ -236,10 +344,29 @@ def _recover_dubbelfris(self, sentence_list): return True return False - def _get_damerau_levenshtein_distance(self, word_1, word_2): + def _get_damerau_levenshtein_distance(self, word_1: str, word_2: str) -> int: + """Get the damerau-levenshtein distance between two words for the similarity in spelling. + + Args: + word_1 (str): First word + word_2 (str): Second word + + Returns: + int: Damerau-levenshtein distance between the two words. + """ return jf.damerau_levenshtein_distance(word_1, word_2) - def _get_levenshtein_soundex_distance(self, word_1, word_2): + def _get_levenshtein_soundex_distance(self, word_1: str, word_2: str) -> int: + """Get the levenshtein distance between the soundex encoding of two words for the similarity + in pronounciation. + + Args: + word_1 (str): First word + word_2 (str): Second word + + Returns: + int: Levenshtein distance between the soundex encoding of the two words. + """ soundex_word_1 = jf.soundex(word_1) soundex_word_2 = jf.soundex(word_2) return jf.levenshtein_distance(soundex_word_1, soundex_word_2)