Skip to content

Commit

Permalink
Add missing implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiktor-99 committed Jul 14, 2024
1 parent bc0b8f0 commit ba789e6
Showing 1 changed file with 88 additions and 19 deletions.
107 changes: 88 additions & 19 deletions human_detector/human_detector/human_detector.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from human_detector.human_detector_parameters import human_detector_parameters
from rclpy.lifecycle.node import LifecycleState, TransitionCallbackReturn
from rclpy.lifecycle import LifecycleNode
from tf2_ros.transform_broadcaster import TransformBroadcaster
from geometry_msgs.msg import TransformStamped
from sensor_msgs.msg import Image, CameraInfo
from image_geometry import PinholeCameraModel
from cv_bridge.core import CvBridge
from rclpy.lifecycle import LifecycleNode
from cv_bridge.core import CvBridge
from typing import NamedTuple
import mediapipe as mp
import cv2


def mm_to_m(mm):
return mm / 1000

class HumanDetector(LifecycleNode):
def __init__(self):
Expand All @@ -18,14 +22,11 @@ def __init__(self):
self.depth_image: Image = None
self.image: Image = None
self.detected_landmarks = None
self.detected_x: int = 0
self.detected_y: int = 0
self.detected_human_pose_world_x: float = 0.0
self.detected_human_pose_world_y: float = 0.0
self.detected_human_pose_world_z: float = 0.0
self.detected_human_position_world = []
self.camera_info_is_stored: bool = False
self.cv_bridge = CvBridge()
self.model = PinholeCameraModel()
self.tf_broadcaster = TransformBroadcaster(self)
self.person_pose_estimator = mp.solutions.pose.Pose(min_detection_confidence=0.8, min_tracking_confidence=0.5)

def on_configure(self, previous_state: LifecycleState):
Expand All @@ -43,7 +44,7 @@ def on_configure(self, previous_state: LifecycleState):
self.image_with_detected_human_pub = self.create_publisher(
Image, "/camera/third_person_camera/color/person_selected", 10
)
self.timer = self.create_timer(1/self.parameters.detected_human_transform_frequency, self.publish_transform)
self.timer = self.create_timer(1/self.parameters.detected_human_transform_frequency, self.timer_callback)
self.timer.cancel()

return TransitionCallbackReturn.SUCCESS
Expand Down Expand Up @@ -84,17 +85,85 @@ def log_parameters(self):
def is_publishing_image_with_detected_human_needed(self):
return self.parameters.publish_image_with_detected_human_topic is not None

def store_image(self):
pass

def store_depth_image(self):
pass

def store_camera_info(self):
pass
def store_image(self, image_msg: Image):
self.image = cv2.cvtColor(self.cv_bridge.imgmsg_to_cv2(image_msg), cv2.COLOR_BGR2RGB)

def store_depth_image(self, depth_image_msg: Image):
self.depth_image = self.cv_bridge.imgmsg_to_cv2(depth_image_msg, desired_encoding="16UC1")

def store_camera_info(self, info: CameraInfo):
self.model.fromCameraInfo(info)
if not self.camera_info_is_stored:
self.camera_info_is_stored = True

def store_human_pose(self):
if not self.camera_info_is_stored or self.image is None or self.depth_image is None:
self.get_logger().error("No camera info or image or depth image are not stored")
return

hight, width, _ = self.image.shape
self.detected_landmarks = self.person_pose_estimator.process(self.image).pose_landmarks
x_pos_of_detected_person, y_pos_of_detected_person = \
self.get_position_of_human_in_the_image(self.detected_landmarks, width, hight)

if x_pos_of_detected_person <= 0 or y_pos_of_detected_person <= 0:
return

depth_of_given_pixel = self.depth_image[y_pos_of_detected_person, x_pos_of_detected_person]
ray = self.model.projectPixelTo3dRay((x_pos_of_detected_person, y_pos_of_detected_person))
ray_3d = [ray_element / ray[2] for ray_element in ray]
point_xyz = [ray_element * depth_of_given_pixel for ray_element in ray_3d]
self.detected_human_position_world = [mm_to_m(point_xyz[2]), -mm_to_m(point_xyz[0]), mm_to_m(point_xyz[1])]

def get_position_of_human_in_the_image(self, detected_landmarks: NamedTuple, width: int, height: int):
x, y = 0, 0
if detected_landmarks:
landmarks = mp.solutions.pose.PoseLandmark
left_hip_landmark = detected_landmarks.landmark[landmarks.LEFT_HIP]
right_hip_landmark = detected_landmarks.landmark[landmarks.RIGHT_HIP]
x = int(min((left_hip_landmark.x * width + right_hip_landmark.x * width) / 2, width - 1))
y = int(min((left_hip_landmark.y * height + right_hip_landmark.y * height) / 2, height - 1))

return x, y

def timer_callback(self):
self.broadcast_timer_callback()
self.publish_image_with_detected_human()

def broadcast_timer_callback(self):
t = TransformStamped()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = 'camera_link'
t.child_frame_id = self.parameters.detected_human_frame_id
t.transform.translation.x = self.detected_human_pose_world[0]
t.transform.translation.y = self.detected_human_pose_world[1]
t.transform.translation.z = self.detected_human_pose_world[2]
t.transform.rotation.x = 0.0
t.transform.rotation.y = 0.0
t.transform.rotation.z = 0.0
t.transform.rotation.w = 1.0
self.tf_broadcaster.sendTransform(t)

def draw_person_pose(self, image, landmarks):
if landmarks is not None:
mp_drawing = mp.solutions.drawing_utils
mp_drawing.draw_landmarks(
image,
landmarks,
mp.solutions.pose.POSE_CONNECTIONS,
mp.solutions.drawing_styles.get_default_pose_landmarks_style()
)

def publish_transform(self):
pass
def publish_image_with_detected_human(self):
if not self.is_publishing_image_with_detected_human_needed():
return
if self.detected_landmarks is not None:
self.draw_person_pose(self.image, self.detected_landmarks)
modified_image_msg = self.cv_bridge.cv2_to_imgmsg(cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR))
self.image_with_detected_human_pub.publish(modified_image_msg)
elif self.image is not None:
image_msg = self.cv_bridge.cv2_to_imgmsg(cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR))
self.image_with_detected_human_pub.publish(image_msg)


def main():
Expand Down

0 comments on commit ba789e6

Please sign in to comment.