diff --git a/skills/src/lasr_skills/describe_people.py b/skills/src/lasr_skills/describe_people.py index 9ac1b9036..762e705c7 100644 --- a/skills/src/lasr_skills/describe_people.py +++ b/skills/src/lasr_skills/describe_people.py @@ -1,18 +1,11 @@ #!/usr/bin/env python3 -## -# WARNING: IM STILL CLEANING UP THIS CODE -# - paul -## - import cv2 import rospy import smach import cv2_img import numpy as np -from sensor_msgs.msg import Image -from PIL import Image as PillowImage from colour_estimation import closest_colours, RGB_COLOURS from lasr_vision_msgs.msg import BodyPixMaskRequest, ColourPrediction, FeatureWithColour from lasr_vision_msgs.srv import YoloDetection, BodyPixDetection, TorchFaceFeatureDetection @@ -44,16 +37,17 @@ def __init__(self): smach.Concurrence.add('SEGMENT_BODYPIX', self.SegmentBodypix()) smach.StateMachine.add('SEGMENT', sm_con, transitions={ - 'succeeded': 'FILTER'}) - # smach.StateMachine.add('FILTER', self.Filter()) - # Uncomment for segment face: - # smach.StateMachine.add('FILTER', self.Filter(), transitions={'succeeded': 'SEGMENT_FACE'}) - smach.StateMachine.add('FILTER', self.Filter(), transitions={ + 'succeeded': 'FEATURE_EXTRACTION'}) + smach.StateMachine.add('FEATURE_EXTRACTION', self.FeatureExtraction(), transitions={ 'succeeded': 'succeeded'}) - # smach.StateMachine.add('RESIZE_TEST', self.ResizeTest(), transitions={'succeeded': 'SEGMENT_FACE'}) - # smach.StateMachine.add('SEGMENT_FACE', self.SegmentFace()) class SegmentYolo(smach.State): + ''' + Segment using YOLO + + This should be turned into / merged with generic states + ''' + def __init__(self): smach.State.__init__(self, outcomes=['succeeded', 'failed'], input_keys=[ 'img_msg'], output_keys=['people_detections']) @@ -71,6 +65,12 @@ def execute(self, userdata): return 'failed' class SegmentBodypix(smach.State): + ''' + Segment using Bodypix + + This should be turned into / merged with generic states + ''' + def __init__(self): smach.State.__init__(self, outcomes=['succeeded', 'failed'], input_keys=[ 'img_msg'], output_keys=['bodypix_masks']) @@ -94,7 +94,13 @@ def execute(self, userdata): rospy.logwarn(f"Unable to perform inference. ({str(e)})") return 'failed' - class Filter(smach.State): + class FeatureExtraction(smach.State): + ''' + Perform feature extraction + + This could be split into more states in theory, but that might just be unnecessary work + ''' + def __init__(self): smach.State.__init__(self, outcomes=['succeeded', 'failed'], input_keys=[ 'img', 'people_detections', 'bodypix_masks'], output_keys=['people']) @@ -103,24 +109,24 @@ def __init__(self): def execute(self, userdata): if len(userdata.people_detections) == 0: - print("Couldn't find anyone!") + rospy.logerr("Couldn't find anyone!") return 'failed' elif len(userdata.people_detections) == 1: - print("There is one person.") + rospy.logdebug("There is one person.") else: - print(f"There are {len(userdata.people_detections)} people.") + rospy.logdebug( + f"There are {len(userdata.people_detections)} people.") - # decode the image - rospy.loginfo('Decoding') img = userdata.img height, width, _ = img.shape people = [] for person in userdata.people_detections: - print(f"\n\nFound person with confidence {person.confidence}!") + rospy.logdebug( + f"\n\nFound person with confidence {person.confidence}!") - # mask + # mask for this person mask_image = np.zeros((height, width), np.uint8) contours = np.array(person.xyseg).reshape(-1, 2) cv2.fillPoly(mask_image, pts=np.int32( @@ -139,19 +145,17 @@ def execute(self, userdata): try: part_mask[mask_bin == 0] = 0 except Exception: - print('|> Failed to check {part} is visible') + rospy.logdebug('|> Failed to check {part} is visible') continue if part_mask.any(): - print(f'|> Person has {part} visible') + rospy.logdebug(f'|> Person has {part} visible') else: - print(f'|> Person does not have {part} visible') + rospy.logdebug( + f'|> Person does not have {part} visible') continue - # TODO: construct userdata and not the stuff below! - # do colour processing on the torso - # TODO: this should actually be a separate state if part == 'torso': try: features.append(FeatureWithColour("torso", [ @@ -159,41 +163,37 @@ def execute(self, userdata): for colour, distance in closest_colours(np.median(img[part_mask == 1], axis=0), RGB_COLOURS) ])) - except Exception: - print("Failed to process colour") + except Exception as e: + rospy.logerr(f"Failed to process colour: {e}") - # TODO: additional processing through Ben's model + # do feature extraction on the head if part == 'head': - # use img[part_mask == 1] - pass - - # THIS IS FROM SegmentFace state but I am lazy for now because i dont have neough time to clean this up properly right now ok - mask_image = np.zeros((height, width), np.uint8) - contours = np.array(person.xyseg).reshape(-1, 2) - cv2.fillPoly(mask_image, pts=np.int32( - [contours]), color=(255, 255, 255)) - # mask_bin = mask_image > 128 + try: + # crop out face + face_mask = np.array(userdata.bodypix_masks[1].mask).reshape( + userdata.bodypix_masks[1].shape[0], userdata.bodypix_masks[1].shape[1]) - # crop out face - face_mask = np.array(userdata.bodypix_masks[1].mask).reshape( - userdata.bodypix_masks[1].shape[0], userdata.bodypix_masks[1].shape[1]) - mask_image[face_mask == 0] = 0 + mask_image_only_face = mask_image.copy() + mask_image_only_face[face_mask == 0] = 0 - a = cv2_img.extract_mask_region(img, mask_image) - if a is None: - print("Failed to detect face features") - continue + face_region = cv2_img.extract_mask_region( + img, mask_image_only_face) + if face_region is None: + raise Exception( + "Failed to extract mask region") - msg = cv2_img.cv2_img_to_msg(a) - features.extend(self.torch_face_features( - msg).detected_features) + msg = cv2_img.cv2_img_to_msg(face_region) + features.extend(self.torch_face_features( + msg).detected_features) + except Exception as e: + rospy.logerr(f"Failed to process extraction: {e}") people.append({ 'detection': person, 'features': features }) - # TODO: construct userdata like: + # Userdata: # - people # - - detection (YOLO) # - parts