From ba789e6ff117fc66edc6fe216658aa2ebb30f421 Mon Sep 17 00:00:00 2001 From: Wiktor Bajor Date: Sun, 14 Jul 2024 16:06:10 +0200 Subject: [PATCH] Add missing implementation --- .../human_detector/human_detector.py | 107 ++++++++++++++---- 1 file changed, 88 insertions(+), 19 deletions(-) diff --git a/human_detector/human_detector/human_detector.py b/human_detector/human_detector/human_detector.py index 9422725..d547f96 100644 --- a/human_detector/human_detector/human_detector.py +++ b/human_detector/human_detector/human_detector.py @@ -1,13 +1,17 @@ from human_detector.human_detector_parameters import human_detector_parameters from rclpy.lifecycle.node import LifecycleState, TransitionCallbackReturn -from rclpy.lifecycle import LifecycleNode +from tf2_ros.transform_broadcaster import TransformBroadcaster +from geometry_msgs.msg import TransformStamped from sensor_msgs.msg import Image, CameraInfo from image_geometry import PinholeCameraModel -from cv_bridge.core import CvBridge from rclpy.lifecycle import LifecycleNode +from cv_bridge.core import CvBridge +from typing import NamedTuple import mediapipe as mp +import cv2 - +def mm_to_m(mm): + return mm / 1000 class HumanDetector(LifecycleNode): def __init__(self): @@ -18,14 +22,11 @@ def __init__(self): self.depth_image: Image = None self.image: Image = None self.detected_landmarks = None - self.detected_x: int = 0 - self.detected_y: int = 0 - self.detected_human_pose_world_x: float = 0.0 - self.detected_human_pose_world_y: float = 0.0 - self.detected_human_pose_world_z: float = 0.0 + self.detected_human_position_world = [] self.camera_info_is_stored: bool = False self.cv_bridge = CvBridge() self.model = PinholeCameraModel() + self.tf_broadcaster = TransformBroadcaster(self) self.person_pose_estimator = mp.solutions.pose.Pose(min_detection_confidence=0.8, min_tracking_confidence=0.5) def on_configure(self, previous_state: LifecycleState): @@ -43,7 +44,7 @@ def on_configure(self, previous_state: LifecycleState): self.image_with_detected_human_pub = self.create_publisher( Image, "/camera/third_person_camera/color/person_selected", 10 ) - self.timer = self.create_timer(1/self.parameters.detected_human_transform_frequency, self.publish_transform) + self.timer = self.create_timer(1/self.parameters.detected_human_transform_frequency, self.timer_callback) self.timer.cancel() return TransitionCallbackReturn.SUCCESS @@ -84,17 +85,85 @@ def log_parameters(self): def is_publishing_image_with_detected_human_needed(self): return self.parameters.publish_image_with_detected_human_topic is not None - def store_image(self): - pass - - def store_depth_image(self): - pass - - def store_camera_info(self): - pass + def store_image(self, image_msg: Image): + self.image = cv2.cvtColor(self.cv_bridge.imgmsg_to_cv2(image_msg), cv2.COLOR_BGR2RGB) + + def store_depth_image(self, depth_image_msg: Image): + self.depth_image = self.cv_bridge.imgmsg_to_cv2(depth_image_msg, desired_encoding="16UC1") + + def store_camera_info(self, info: CameraInfo): + self.model.fromCameraInfo(info) + if not self.camera_info_is_stored: + self.camera_info_is_stored = True + + def store_human_pose(self): + if not self.camera_info_is_stored or self.image is None or self.depth_image is None: + self.get_logger().error("No camera info or image or depth image are not stored") + return + + hight, width, _ = self.image.shape + self.detected_landmarks = self.person_pose_estimator.process(self.image).pose_landmarks + x_pos_of_detected_person, y_pos_of_detected_person = \ + self.get_position_of_human_in_the_image(self.detected_landmarks, width, hight) + + if x_pos_of_detected_person <= 0 or y_pos_of_detected_person <= 0: + return + + depth_of_given_pixel = self.depth_image[y_pos_of_detected_person, x_pos_of_detected_person] + ray = self.model.projectPixelTo3dRay((x_pos_of_detected_person, y_pos_of_detected_person)) + ray_3d = [ray_element / ray[2] for ray_element in ray] + point_xyz = [ray_element * depth_of_given_pixel for ray_element in ray_3d] + self.detected_human_position_world = [mm_to_m(point_xyz[2]), -mm_to_m(point_xyz[0]), mm_to_m(point_xyz[1])] + + def get_position_of_human_in_the_image(self, detected_landmarks: NamedTuple, width: int, height: int): + x, y = 0, 0 + if detected_landmarks: + landmarks = mp.solutions.pose.PoseLandmark + left_hip_landmark = detected_landmarks.landmark[landmarks.LEFT_HIP] + right_hip_landmark = detected_landmarks.landmark[landmarks.RIGHT_HIP] + x = int(min((left_hip_landmark.x * width + right_hip_landmark.x * width) / 2, width - 1)) + y = int(min((left_hip_landmark.y * height + right_hip_landmark.y * height) / 2, height - 1)) + + return x, y + + def timer_callback(self): + self.broadcast_timer_callback() + self.publish_image_with_detected_human() + + def broadcast_timer_callback(self): + t = TransformStamped() + t.header.stamp = self.get_clock().now().to_msg() + t.header.frame_id = 'camera_link' + t.child_frame_id = self.parameters.detected_human_frame_id + t.transform.translation.x = self.detected_human_pose_world[0] + t.transform.translation.y = self.detected_human_pose_world[1] + t.transform.translation.z = self.detected_human_pose_world[2] + t.transform.rotation.x = 0.0 + t.transform.rotation.y = 0.0 + t.transform.rotation.z = 0.0 + t.transform.rotation.w = 1.0 + self.tf_broadcaster.sendTransform(t) + + def draw_person_pose(self, image, landmarks): + if landmarks is not None: + mp_drawing = mp.solutions.drawing_utils + mp_drawing.draw_landmarks( + image, + landmarks, + mp.solutions.pose.POSE_CONNECTIONS, + mp.solutions.drawing_styles.get_default_pose_landmarks_style() + ) - def publish_transform(self): - pass + def publish_image_with_detected_human(self): + if not self.is_publishing_image_with_detected_human_needed(): + return + if self.detected_landmarks is not None: + self.draw_person_pose(self.image, self.detected_landmarks) + modified_image_msg = self.cv_bridge.cv2_to_imgmsg(cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR)) + self.image_with_detected_human_pub.publish(modified_image_msg) + elif self.image is not None: + image_msg = self.cv_bridge.cv2_to_imgmsg(cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR)) + self.image_with_detected_human_pub.publish(image_msg) def main():