From f4efc7de4b77a5be1da2e0100d94bdd94bef2965 Mon Sep 17 00:00:00 2001 From: Jared Swift Date: Sat, 6 Jul 2024 14:19:24 +0100 Subject: [PATCH] Revert "Quick update for torso adjustment. (#238)" This reverts commit 66cd8b2e5099d6fc95167db08fcf4372e665f3bc. --- skills/config/motions.yaml | 12 +- skills/scripts/unit_test_adjust_camera.py | 6 +- skills/src/lasr_skills/adjust_camera.py | 150 +++++++++++------- .../src/receptionist/state_machine.py | 2 +- .../receptionist/states/get_name_and_drink.py | 41 +++-- .../receptionist/states/get_name_or_drink.py | 37 ++--- .../src/receptionist/states/handle_guest.py | 3 +- .../states/local_speech_recognition.py | 88 +++------- .../receptionist/states/speech_recovery.py | 114 ++++--------- 9 files changed, 189 insertions(+), 264 deletions(-) diff --git a/skills/config/motions.yaml b/skills/config/motions.yaml index 97afe2c75..9409d156b 100644 --- a/skills/config/motions.yaml +++ b/skills/config/motions.yaml @@ -84,33 +84,33 @@ play_motion: u3l: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, 0.35, 0.05] + - positions: [0.4, 0.35, 0.15] time_from_start: 0.0 u3m: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, 0.0, 0.05] + - positions: [0.4, 0.0, 0.15] time_from_start: 0.0 u3r: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, -0.35, 0.05] + - positions: [0.4, -0.35, 0.15] time_from_start: 0.0 u2l: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.35, 0.35, 0.05] + - positions: [0.4, 0.35, 0.05] time_from_start: 0.0 u2m: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.35, 0.0, 0.05] + - positions: [0.4, 0.0, 0.05] time_from_start: 0.0 u2r: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.35, -0.35, 0.05] + - positions: [0.4, -0.35, 0.05] time_from_start: 0.0 u1l: joints: [torso_lift_joint, head_1_joint, head_2_joint] diff --git a/skills/scripts/unit_test_adjust_camera.py b/skills/scripts/unit_test_adjust_camera.py index 8270b5d59..51fe74251 100644 --- a/skills/scripts/unit_test_adjust_camera.py +++ b/skills/scripts/unit_test_adjust_camera.py @@ -11,11 +11,7 @@ with sm: sm.add( "AdjustCamera", - AdjustCamera( - max_attempts=1000, - debug=True, - init_state="u1m", - ), + AdjustCamera(debug=True), transitions={"finished": "end", "failed": "end", "truncated": "end"}, ) sm.execute() diff --git a/skills/src/lasr_skills/adjust_camera.py b/skills/src/lasr_skills/adjust_camera.py index c57fef38b..c885398ae 100644 --- a/skills/src/lasr_skills/adjust_camera.py +++ b/skills/src/lasr_skills/adjust_camera.py @@ -77,17 +77,14 @@ (0, 1): "mr", } -inverse_position_dict = {value: key for key, value in position_dict.items()} - class AdjustCamera(smach.StateMachine): def __init__( self, bodypix_model: str = "resnet50", bodypix_confidence: float = 0.7, - max_attempts=5, + max_attempts=1000, debug=False, - init_state="u1m", ): smach.StateMachine.__init__( self, @@ -108,8 +105,8 @@ def __init__( with self: smach.StateMachine.add( - "init", - PlayMotion(motion_name=init_state), + "init_u2m", + PlayMotion(motion_name="u2m"), transitions={ "succeeded": "GET_IMAGE", "aborted": "GET_IMAGE", @@ -125,7 +122,7 @@ def __init__( else: _transitions = { "succeeded": "DECIDE_ADJUST_CAMERA", - "failed": "GET_IMAGE", + "failed": "failed", } smach.StateMachine.add( "GET_IMAGE", @@ -134,7 +131,10 @@ def __init__( method="closest", use_mask=True, ), - transitions=_transitions, + transitions={ + "succeeded": "DECIDE_ADJUST_CAMERA", + "failed": "failed", + }, ) if debug: @@ -158,7 +158,6 @@ def __init__( bodypix_model=bodypix_model, bodypix_confidence=bodypix_confidence, max_attempts=max_attempts, - init_state=init_state, ), transitions=_transitions, ) @@ -177,10 +176,10 @@ def __init__( class DecideAdjustCamera(smach.State): def __init__( self, + # keypoints_to_detect: List[str] = ALL_KEYS, bodypix_model: str = "resnet50", bodypix_confidence: float = 0.7, max_attempts=1000, - init_state="u1m", ): smach.State.__init__( self, @@ -196,16 +195,18 @@ def __init__( output_keys=[], ) self.max_attempts = max_attempts + # self._keypoints_to_detect = keypoints_to_detect self._bodypix_model = bodypix_model self._bodypix_confidence = bodypix_confidence self._bodypix_client = rospy.ServiceProxy( "/bodypix/keypoint_detection", BodyPixKeypointDetection ) - self.position = [i for i in inverse_position_dict[init_state]] + self.position = [2, 0] self.counter = 0 def execute(self, userdata): + req = BodyPixKeypointDetectionRequest() req.image_raw = userdata.img_msg req.dataset = self._bodypix_model @@ -242,15 +243,10 @@ def execute(self, userdata): rospy.logwarn( f"missing shoulders: {missing_keypoints.intersection(MIDDLE)}, missing eyes: {missing_keypoints.intersection(HEAD)}" ) + # has_torso = len(missing_keypoints.intersection(TORSO)) <= 1 if not has_more_than_one_shoulder and not has_more_than_one_one_eye: - # This is the case that not any centre points can be used, - # In this case most likely it is the guest standing either too close or not in the camera at all. - # However we may still try to get this person back into the frame if some part of them are detected. - # Otherwise we say something like "Please stand in front of me but keep a bit distance.". - rospy.logwarn( - "The person might not actually be in the frame, trying to recover." - ) + # 'Try recovery behaviour or give up, need a bit polish miss_head = len(missing_keypoints.intersection(HEAD)) >= 2 miss_middle = len(missing_keypoints.intersection(MIDDLE)) >= 2 miss_torso = len(missing_keypoints.intersection(TORSO)) >= 4 @@ -267,7 +263,14 @@ def execute(self, userdata): f"Needs to move up: {needs_to_move_up}, down: {needs_to_move_down}, left: {needs_to_move_left}, right: {needs_to_move_right}." ) + # if counter > maxmum, check if head is in, if not, move up to get head, otherwise return finished. + if self.counter > self.max_attempts: + if not miss_head or self.counter > self.max_attempts + 2: + return "truncated" + + # self.counter += 1 if not (needs_to_move_left and needs_to_move_right): + # return "failed" if needs_to_move_left: self.position = ( self.position[0], @@ -277,7 +280,8 @@ def execute(self, userdata): else self.position[1] ), ) - elif needs_to_move_right: + return position_dict[self.position] + if needs_to_move_right: self.position = ( self.position[0], ( @@ -286,26 +290,30 @@ def execute(self, userdata): else self.position[1] ), ) - if not needs_to_move_up and needs_to_move_down: - if needs_to_move_up: - self.position = ( - ( - self.position[0] + 1 - if self.position[0] < 3 - else self.position[0] - ), - self.position[1], - ) - elif needs_to_move_down: - self.position = ( - ( - self.position[0] - 1 - if self.position[0] > 0 - else self.position[0] - ), - self.position[1], - ) - + return position_dict[self.position] + if needs_to_move_up and needs_to_move_down: + return "failed" + if needs_to_move_up: + self.position = ( + ( + self.position[0] + 1 + if self.position[0] < 3 + else self.position[0] + ), + self.position[1], + ) + return position_dict[userdata.position] + if needs_to_move_down: + self.position = ( + ( + self.position[0] - 1 + if self.position[0] > 0 + else self.position[0] + ), + self.position[1], + ) + return position_dict[userdata.position] + return "finished" elif has_both_eyes and not has_both_shoulders: # in this case try to make eyes into the upper 1/3 of the frame, eyes_middle = ( @@ -321,14 +329,13 @@ def execute(self, userdata): # if y at upper 1/3: wonder why no shoulders but never mind in this case else: pass - # if x at left 2/7 or left shoulder dissappear, move left 1 step - if eyes_middle[0] <= 2 / 7: + # if x at left 1/3 or left shoulder dissappear, move left 1 step + if eyes_middle[0] <= 1 / 3: self.position[1] -= 1 - # if x at right 2/7 or right shoulder dissappear, move right 1 step - elif eyes_middle[0] >= 5 / 7: + # if x at right 1/3 or right shoulder dissappear, move right 1 step + elif eyes_middle[0] >= 2 / 3: self.position[1] += 1 pass - elif not has_both_eyes and has_both_shoulders: shoulders_middle = ( ( @@ -344,14 +351,13 @@ def execute(self, userdata): # if y at upper 1/4: up move 1 step elif shoulders_middle[1] <= 1 / 4: self.position[0] += 1 - # if x at left 2/7, move left 1 step - if shoulders_middle[0] <= 2 / 7: + # if x at left 1/3, move left 1 step + if shoulders_middle[0] <= 1 / 3: self.position[1] -= 1 - # if x at right 2/7, move right 1 step - elif shoulders_middle[0] >= 5 / 7: + # if x at right 1/3, move right 1 step + elif shoulders_middle[0] >= 2 / 3: self.position[1] += 1 pass - elif has_both_eyes and has_both_shoulders: eyes_middle = ( (keypoint_info["leftEye"][0] + keypoint_info["rightEye"][0]) / 2, @@ -389,17 +395,44 @@ def execute(self, userdata): elif very_middle[1] <= 1 / 4: self.position[0] += 1 print("if y at upper 1/3: up move 1 step.") - # if x at left 2/7, move left 1 step - if very_middle[0] <= 2 / 7: + # if x at left 1/3, move left 1 step + if very_middle[0] <= 1 / 3: self.position[1] -= 1 - print("if x at left 2/7, move left 1 step.") - # if x at right 2/7, move right 1 step - elif very_middle[0] >= 5 / 7: + print("if x at left 1/3, move left 1 step.") + # if x at right 1/3, move right 1 step + elif very_middle[0] >= 2 / 3: self.position[1] += 1 - print("if x at right 2/7, move right 1 step.") + print("if x at right 1/3, move right 1 step.") + pass + elif has_more_than_one_shoulder: # but not both + # shoulders_middle = ((keypoint_info["leftShoulder"][0] + keypoint_info["rightShoulder"][0]) / 2, (keypoint_info["leftEye"][1] + keypoint_info["rightEye"][1]) / 2) + # # move one step opposite left or right + # # if x at left 1/3, move left 1 step + # if shoulders_middle[0] <= 1/3: + # position[1] -= 1 + # # if x at right 1/3, move right 1 step + # elif shoulders_middle[0] >= 2/3: + # position[1] += 1 + # pass + # if not has_more_than_one_one_eye: + # # move up! + # position[0] += 1 + # pass + pass + else: # has_more_than_one_one_eye: + # eyes_middle = ((keypoint_info["leftEye"][0] + keypoint_info["rightEye"][0]) / 2, (keypoint_info["leftEye"][1] + keypoint_info["rightEye"][1]) / 2) + # # move one step opposite, + # # if x at left 1/3, move left 1 step + # if eyes_middle[0] <= 1/3: + # position[1] += 1 + # # if x at right 1/3, move right 1 step + # elif eyes_middle[0] >= 2/3: + # position[1] -= 1 + # # probably move down + # position[0] -= 1 + # pass pass - # keep the position in the range. if self.position[0] < 0: self.position[0] = 0 elif self.position[0] > 3: @@ -409,9 +442,4 @@ def execute(self, userdata): elif self.position[1] > 1: self.position[1] = 1 - # if counter > maxmum. - if self.counter > self.max_attempts: - return "truncated" - self.counter += 1 - return position_dict[(self.position[0], self.position[1])] diff --git a/tasks/receptionist/src/receptionist/state_machine.py b/tasks/receptionist/src/receptionist/state_machine.py index 78ed090ba..5ffc64b75 100755 --- a/tasks/receptionist/src/receptionist/state_machine.py +++ b/tasks/receptionist/src/receptionist/state_machine.py @@ -279,4 +279,4 @@ def _guide_guest(self, guest_id: int) -> None: "preempted": "failed", "aborted": "failed", }, - ) + ) \ No newline at end of file diff --git a/tasks/receptionist/src/receptionist/states/get_name_and_drink.py b/tasks/receptionist/src/receptionist/states/get_name_and_drink.py index 4afb63975..11171f088 100644 --- a/tasks/receptionist/src/receptionist/states/get_name_and_drink.py +++ b/tasks/receptionist/src/receptionist/states/get_name_and_drink.py @@ -7,8 +7,9 @@ import smach from smach import UserData from typing import List, Dict, Any -from receptionist.states import SpeechRecovery - +from receptionist.states import ( + SpeechRecovery +) class GetNameAndDrink(smach.StateMachine): class ParseNameAndDrink(smach.State): @@ -77,25 +78,25 @@ def execute(self, userdata: UserData) -> str: outcome = "failed" return outcome - + class PostRecoveryDecision(smach.State): def __init__( self, guest_id: str, param_key: str = "/receptionist/priors", ): - + smach.State.__init__( self, outcomes=["succeeded", "failed", "failed_name", "failed_drink"], input_keys=["guest_transcription", "guest_data"], output_keys=["guest_data", "guest_transcription"], - ) + ) self._guest_id = guest_id prior_data: Dict[str, List[str]] = rospy.get_param(param_key) self._possible_names = [name.lower() for name in prior_data["names"]] self._possible_drinks = [drink.lower() for drink in prior_data["drinks"]] - + def execute(self, userdata: UserData) -> str: if not self._recovery_name_and_drink_required(userdata): if userdata.guest_data[self._guest_id]["name"] == "unknown": @@ -105,7 +106,7 @@ def execute(self, userdata: UserData) -> str: else: outcome = "failed" return outcome - + def _recovery_name_and_drink_required(self, userdata: UserData) -> bool: """Determine whether both the name and drink requires recovery. @@ -119,11 +120,11 @@ def _recovery_name_and_drink_required(self, userdata: UserData) -> bool: return False def __init__( - self, - guest_id: str, - last_resort: bool, - param_key: str = "/receptionist/priors", - ): + self, + guest_id: str, + last_resort: bool, + param_key: str = "/receptionist/priors", + ): self._guest_id = guest_id self._param_key = param_key @@ -134,32 +135,26 @@ def __init__( outcomes=["succeeded", "failed", "failed_name", "failed_drink"], input_keys=["guest_transcription", "guest_data"], output_keys=["guest_data", "guest_transcription"], + ) with self: smach.StateMachine.add( "PARSE_NAME_AND_DRINK", - self.ParseNameAndDrink( - guest_id=self._guest_id, param_key=self._param_key - ), + self.ParseNameAndDrink(guest_id=self._guest_id, param_key=self._param_key), transitions={"succeeded": "succeeded", "failed": "SPEECH_RECOVERY"}, ) smach.StateMachine.add( "SPEECH_RECOVERY", SpeechRecovery(self._guest_id, self._last_resort), - transitions={ - "succeeded": "succeeded", - "failed": "POST_RECOVERY_DECISION", - }, + transitions={"succeeded": "succeeded", "failed": "POST_RECOVERY_DECISION"}, ) smach.StateMachine.add( "POST_RECOVERY_DECISION", - self.PostRecoveryDecision( - guest_id=self._guest_id, param_key=self._param_key - ), + self.PostRecoveryDecision(guest_id=self._guest_id, param_key=self._param_key), transitions={ "failed": "failed", "failed_name": "failed_name", "failed_drink": "failed_drink", }, - ) + ) \ No newline at end of file diff --git a/tasks/receptionist/src/receptionist/states/get_name_or_drink.py b/tasks/receptionist/src/receptionist/states/get_name_or_drink.py index 67e0b7409..0ca946bbe 100644 --- a/tasks/receptionist/src/receptionist/states/get_name_or_drink.py +++ b/tasks/receptionist/src/receptionist/states/get_name_or_drink.py @@ -7,8 +7,9 @@ import smach from smach import UserData from typing import List, Dict, Any -from receptionist.states import SpeechRecovery - +from receptionist.states import ( + SpeechRecovery +) class GetNameOrDrink(smach.StateMachine): class ParseTranscribedInfo(smach.State): @@ -38,10 +39,9 @@ def __init__( prior_data: Dict[str, List[str]] = rospy.get_param(param_key) possible_drinks = [drink.lower() for drink in prior_data["drinks"]] possible_names = [name.lower() for name in prior_data["names"]] - self._possible_information = { - "drink": possible_drinks, - "name": possible_names, - }[self._type] + self._possible_information = {"drink": possible_drinks, "name": possible_names}[ + self._type + ] def execute(self, userdata: UserData) -> str: """Parse the guest's information. @@ -64,9 +64,7 @@ def execute(self, userdata: UserData) -> str: for key_phrase in self._possible_information: if key_phrase in transcription: userdata.guest_data[self._guest_id][self._type] = key_phrase - rospy.loginfo( - f"Guest/Drink {self._type} identified as: {key_phrase}" - ) + rospy.loginfo(f"Guest/Drink {self._type} identified as: {key_phrase}") information_found = True break if not information_found: @@ -75,14 +73,14 @@ def execute(self, userdata: UserData) -> str: outcome = "failed" return outcome - + def __init__( - self, - guest_id: str, - last_resort: bool, - info_type: str, - param_key: str = "/receptionist/priors", - ): + self, + guest_id: str, + last_resort: bool, + info_type: str, + param_key: str = "/receptionist/priors", + ): self._last_resort = last_resort self._guest_id = guest_id @@ -94,16 +92,13 @@ def __init__( outcomes=["succeeded", "failed"], input_keys=["guest_transcription", "guest_data"], output_keys=["guest_data", "guest_transcription"], + ) with self: smach.StateMachine.add( "PARSE_NAME_OR_DRINK", - self.ParseTranscribedInfo( - guest_id=self._guest_id, - info_type=self._info_type, - param_key=self._param_key, - ), + self.ParseTranscribedInfo(guest_id=self._guest_id, info_type=self._info_type, param_key=self._param_key), transitions={"succeeded": "succeeded", "failed": "SPEECH_RECOVERY"}, ) smach.StateMachine.add( diff --git a/tasks/receptionist/src/receptionist/states/handle_guest.py b/tasks/receptionist/src/receptionist/states/handle_guest.py index b48bea021..8f9c39947 100644 --- a/tasks/receptionist/src/receptionist/states/handle_guest.py +++ b/tasks/receptionist/src/receptionist/states/handle_guest.py @@ -201,9 +201,8 @@ def __init__(self, guest_id: str, learn_face: bool): smach.StateMachine.add( "AdjustCamera", AdjustCamera( - max_attempts=5, + max_attempts=3, debug=False, - init_state="u1m", ), transitions={ "finished": "HANDLE_GUEST", diff --git a/tasks/receptionist/src/receptionist/states/local_speech_recognition.py b/tasks/receptionist/src/receptionist/states/local_speech_recognition.py index 867dba845..565111d07 100644 --- a/tasks/receptionist/src/receptionist/states/local_speech_recognition.py +++ b/tasks/receptionist/src/receptionist/states/local_speech_recognition.py @@ -1,57 +1,21 @@ import jellyfish as jf -available_names = [ - "adel", - "angel", - "axel", - "charlie", - "jane", - "jules", - "morgan", - "paris", - "robin", - "simone", -] -available_single_drinks = [ - "cola", - "milk", -] -available_double_drinks = [ - "iced", - "tea", - "pack", - "juice", - "orange", - "red", - "wine", - "tropical", -] -double_drinks_dict = { - "iced": "iced tea", - "tea": "iced tea", - "pack": "juice pack", - "orange": "orange juice", - "red": "red wine", - "wine": "red wine", - "tropical": "tropical juice", - # "juice": ["orange juice", "tropical juice", "juice pack"], -} -available_drinks = list( - set(available_single_drinks).union(set(available_double_drinks)) -) -excluded_words = [ - "my", - "name", - "is", - "and", - "favourite", - "drink", - "you", - "can", - "call", - "me", -] + +available_names = ["adel", "angel", "axel", "charlie", "jane", "jules", "morgan", "paris", "robin", "simone"] +available_single_drinks = ["cola", "milk",] +available_double_drinks = ["iced", "tea", "pack", "juice", "orange", "red", "wine", "tropical"] +double_drinks_dict = {"iced" : "iced tea", + "tea": "iced tea", + "pack" : "juice pack", + "orange" : "orange juice", + "red" : "red wine", + "wine" : "red wine", + "tropical" : "tropical juice", + # "juice": ["orange juice", "tropical juice", "juice pack"], + } +available_drinks = list(set(available_single_drinks).union(set(available_double_drinks))) +excluded_words = ["my", "name", "is", "and", "favourite","drink", "you", "can", "call", "me"] def speech_recovery(sentence): @@ -59,7 +23,7 @@ def speech_recovery(sentence): sentence_list = list(set(sentence_split) - set(excluded_words)) print(f"final name: {handle_name(sentence_list, True)}") print(f"final drink: {handle_drink(sentence_list, True)}") - + def handle_name(sentence_list, last_resort): result = handle_similar_spelt(sentence_list, available_names, 1) @@ -74,7 +38,7 @@ def handle_name(sentence_list, last_resort): else: print("Last resort name") return handle_closest_spelt(sentence_list, available_names) - + def handle_drink(sentence_list, last_resort): result = infer_second_drink(sentence_list) @@ -86,7 +50,7 @@ def handle_drink(sentence_list, last_resort): else: result = handle_similar_sound(sentence_list, available_drinks, 0) print(f"drink (sound): {result}") - + if result != "unknown": if result in available_single_drinks: print(f"final attempt drink: {result}") @@ -124,14 +88,13 @@ def handle_similar_sound(sentence_list, available_words, distance_threshold): if distance <= distance_threshold: print(input_word, available_word) return available_word - return "unknown" - + return "unknown" def infer_second_drink(sentence_list): for input_word in sentence_list: if input_word == "juice": choices = ["pack", "orange", "tropical"] - closest_word = handle_closest_spelt(sentence_list, choices) + closest_word = handle_closest_spelt(sentence_list, choices) if closest_word == "pack": return "juice pack" elif closest_word == "orange": @@ -143,9 +106,8 @@ def infer_second_drink(sentence_list): return double_drinks_dict[input_word] return "unknown" - def handle_closest_spelt(sentence_list, choices): - closest_distance = float("inf") + closest_distance = float('inf') closest_word = None for input_word in sentence_list: for available_word in choices: @@ -159,13 +121,11 @@ def handle_closest_spelt(sentence_list, choices): def get_damerau_levenshtein_distance(word_1, word_2): return jf.damerau_levenshtein_distance(word_1, word_2) - def get_levenshtein_soundex_distance(word_1, word_2): soundex_word_1 = jf.soundex(word_1) soundex_word_2 = jf.soundex(word_2) return jf.levenshtein_distance(soundex_word_1, soundex_word_2) - # print(get_damerau_levenshtein_distance("juice", "shoes")) # print(get_levenshtein_soundex_distance("juice", "shoes")) @@ -179,8 +139,8 @@ def get_levenshtein_soundex_distance(word_1, word_2): # available_names = ["adel", "angel", "axel", "charlie", "jane", "jules", "morgan", "paris", "robin", "simone"] # available_single_drinks = ["cola", "milk",] # available_double_drinks = ["iced", "tea", "juice", "pack", "orange", "red", "wine", "tropical"] -# double_drinks_dict = {"iced" : "iced tea", -# "tea": "iced tea", +# double_drinks_dict = {"iced" : "iced tea", +# "tea": "iced tea", # "pack" : "juice pack", # "orange" : "orange juice", # "red" : "red wine", @@ -214,4 +174,4 @@ def get_levenshtein_soundex_distance(word_1, word_2): print("======") sentence = "my name is morgen and my favourite drink is mll" speech_recovery(sentence) - print("======") + print("======") \ No newline at end of file diff --git a/tasks/receptionist/src/receptionist/states/speech_recovery.py b/tasks/receptionist/src/receptionist/states/speech_recovery.py index d44d45e69..e2113bf8d 100644 --- a/tasks/receptionist/src/receptionist/states/speech_recovery.py +++ b/tasks/receptionist/src/receptionist/states/speech_recovery.py @@ -5,7 +5,6 @@ from smach import UserData from typing import List, Dict, Any - class SpeechRecovery(smach.State): def __init__( self, @@ -21,64 +20,25 @@ def __init__( ) self._guest_id = guest_id - self._last_resort = last_resort + self._last_resort = last_resort self._input_type = input_type - self._available_names = [ - "adel", - "angel", - "axel", - "charlie", - "jane", - "jules", - "morgan", - "paris", - "robin", - "simone", - ] - self._available_single_drinks = [ - "cola", - "milk", - ] - self._available_double_drinks = [ - "iced", - "tea", - "pack", - "juice", - "orange", - "red", - "wine", - "tropical", - ] - self._double_drinks_dict = { - "iced": "iced tea", - "tea": "iced tea", - "pack": "juice pack", - "orange": "orange juice", - "red": "red wine", - "wine": "red wine", - "tropical": "tropical juice", - # "juice": ["orange juice", "tropical juice", "juice pack"], - } - self._available_drinks = list( - set(self._available_single_drinks).union(set(self._available_double_drinks)) - ) - self._excluded_words = [ - "my", - "name", - "is", - "and", - "favourite", - "drink", - "you", - "can", - "call", - "me", - ] + self._available_names = ["adel", "angel", "axel", "charlie", "jane", "jules", "morgan", "paris", "robin", "simone"] + self._available_single_drinks = ["cola", "milk",] + self._available_double_drinks = ["iced", "tea", "pack", "juice", "orange", "red", "wine", "tropical"] + self._double_drinks_dict = {"iced" : "iced tea", + "tea": "iced tea", + "pack" : "juice pack", + "orange" : "orange juice", + "red" : "red wine", + "wine" : "red wine", + "tropical" : "tropical juice", + # "juice": ["orange juice", "tropical juice", "juice pack"], + } + self._available_drinks = list(set(self._available_single_drinks).union(set(self._available_double_drinks))) + self._excluded_words = ["my", "name", "is", "and", "favourite","drink", "you", "can", "call", "me"] def execute(self, userdata: UserData) -> str: - filtered_sentence = userdata.guest_transcription.lower().translate( - str.maketrans("", "", string.punctuation) - ) + filtered_sentence = userdata.guest_transcription.lower().translate(str.maketrans('', '', string.punctuation)) sentence_split = filtered_sentence.split() if sentence_split is None: return "failed" @@ -105,17 +65,15 @@ def execute(self, userdata: UserData) -> str: final_name = self._handle_name(sentence_list, self._last_resort) userdata.guest_data[self._guest_id]["name"] = final_name print(f"Recovered name: {final_name} ") - if userdata.guest_data[self._guest_id]["drink"] == "unknown": + if userdata.guest_data[self._guest_id]["drink"] == "unknown": final_drink = self._handle_drink(sentence_list, self._last_resort) userdata.guest_data[self._guest_id]["drink"] = final_drink print(f"Recovered drink: {final_drink} ") - if ( - userdata.guest_data[self._guest_id]["name"] == "unknown" - or userdata.guest_data[self._guest_id]["drink"] == "unknown" - ): + if userdata.guest_data[self._guest_id]["name"] == "unknown" or userdata.guest_data[self._guest_id]["drink"] == "unknown": return "failed" else: return "succeeded" + def _handle_name(self, sentence_list, last_resort): result = self._handle_similar_spelt(sentence_list, self._available_names, 1) @@ -130,6 +88,7 @@ def _handle_name(self, sentence_list, last_resort): else: print("Last resort name") return self._handle_closest_spelt(sentence_list, self._available_names) + def _handle_drink(self, sentence_list, last_resort): result = self._infer_second_drink(sentence_list) @@ -139,11 +98,9 @@ def _handle_drink(self, sentence_list, last_resort): if result != "unknown": print(f"drink (spelt): {result}") else: - result = self._handle_similar_sound( - sentence_list, self._available_drinks, 0 - ) + result = self._handle_similar_sound(sentence_list, self._available_drinks, 0) print(f"drink (sound): {result}") - + if result != "unknown": if result in self._available_single_drinks: print(f"final attempt drink: {result}") @@ -156,9 +113,7 @@ def _handle_drink(self, sentence_list, last_resort): return "unknown" else: print("Last resort drink") - closest_spelt = self._handle_closest_spelt( - sentence_list, self._available_drinks - ) + closest_spelt = self._handle_closest_spelt(sentence_list, self._available_drinks) if closest_spelt in self._available_single_drinks: print(f"final attempt during last resort drink: {closest_spelt}") return closest_spelt @@ -166,32 +121,30 @@ def _handle_drink(self, sentence_list, last_resort): sentence_list.append(closest_spelt) return self._infer_second_drink(closest_spelt) + def _handle_similar_spelt(self, sentence_list, available_words, distance_threshold): for input_word in sentence_list: for available_word in available_words: - distance = self._get_damerau_levenshtein_distance( - input_word, available_word - ) + distance = self._get_damerau_levenshtein_distance(input_word, available_word) if distance <= distance_threshold: return available_word return "unknown" + def _handle_similar_sound(self, sentence_list, available_words, distance_threshold): for input_word in sentence_list: for available_word in available_words: - distance = self._get_levenshtein_soundex_distance( - input_word, available_word - ) + distance = self._get_levenshtein_soundex_distance(input_word, available_word) if distance <= distance_threshold: print(input_word, available_word) return available_word - return "unknown" + return "unknown" def _infer_second_drink(self, sentence_list): for input_word in sentence_list: if input_word == "juice": choices = ["pack", "orange", "tropical"] - closest_word = self._handle_closest_spelt(sentence_list, choices) + closest_word = self._handle_closest_spelt(sentence_list, choices) if closest_word == "pack": return "juice pack" elif closest_word == "orange": @@ -204,22 +157,21 @@ def _infer_second_drink(self, sentence_list): return "unknown" def _handle_closest_spelt(self, sentence_list, choices): - closest_distance = float("inf") + closest_distance = float('inf') closest_word = None for input_word in sentence_list: for available_word in choices: - distance = self._get_damerau_levenshtein_distance( - input_word, available_word - ) + distance = self._get_damerau_levenshtein_distance(input_word, available_word) if distance < closest_distance: closest_distance = distance closest_word = available_word return closest_word + def _get_damerau_levenshtein_distance(self, word_1, word_2): return jf.damerau_levenshtein_distance(word_1, word_2) def _get_levenshtein_soundex_distance(self, word_1, word_2): soundex_word_1 = jf.soundex(word_1) soundex_word_2 = jf.soundex(word_2) - return jf.levenshtein_distance(soundex_word_1, soundex_word_2) + return jf.levenshtein_distance(soundex_word_1, soundex_word_2) \ No newline at end of file