Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geolocation Logging #198

Merged
merged 6 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/detection_in_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ def __init__(
self.centre = centre
self.label = label
self.confidence = confidence

def __str__(self) -> str:
"""
To string.
"""
return f"{self.__class__}, vertices: {self.vertices.tolist()}, centre: {self.centre}, label: {self.label}, confidence: {self.confidence}"
41 changes: 36 additions & 5 deletions modules/geolocation/geolocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .. import detection_in_world
from .. import detections_and_time
from .. import merged_odometry_detections
from ..common.logger.modules import logger


class Geolocation:
Expand All @@ -25,6 +26,7 @@ def create(
cls,
camera_intrinsics: camera_properties.CameraIntrinsics,
camera_drone_extrinsics: camera_properties.CameraDroneExtrinsics,
local_logger: logger.Logger,
) -> "tuple[bool, Geolocation | None]":
"""
camera_intrinsics: Camera information without any outside space.
Expand All @@ -45,6 +47,9 @@ def create(
# Image space to camera space
result, value = camera_intrinsics.camera_space_from_image_space(source[0], source[1])
if not result:
local_logger.error(
f"Rotated source vector could not be created for source: {source}"
)
return False, None

# Get Pylance to stop complaining
Expand All @@ -59,6 +64,7 @@ def create(
camera_drone_extrinsics,
perspective_transform_sources,
rotated_source_vectors,
local_logger,
)

def __init__(
Expand All @@ -67,6 +73,7 @@ def __init__(
camera_drone_extrinsics: camera_properties.CameraDroneExtrinsics,
perspective_transform_sources: "list[list[float]]",
rotated_source_vectors: "list[np.ndarray]",
local_logger: logger.Logger,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -76,35 +83,45 @@ def __init__(
self.__camera_drone_extrinsics = camera_drone_extrinsics
self.__perspective_transform_sources = perspective_transform_sources
self.__rotated_source_vectors = rotated_source_vectors
self.__logger = local_logger

@staticmethod
def __ground_intersection_from_vector(
vec_camera_in_world_position: np.ndarray, vec_down: np.ndarray
vec_camera_in_world_position: np.ndarray,
vec_down: np.ndarray,
local_logger: logger.Logger,
) -> "tuple[bool, np.ndarray | None]":
"""
Get 2D coordinates of where the downwards pointing vector intersects the ground.
"""
if not camera_properties.is_vector_r3(vec_camera_in_world_position):
local_logger.error("Camera position in world space is not a vector in R3")
return False, None

if not camera_properties.is_vector_r3(vec_down):
local_logger.error("Rotated source vector in world space is not a vector in R3")
return False, None

# Check camera above ground
if vec_camera_in_world_position[2] > 0.0:
local_logger.error("Camera is underground")
return False, None

# Ensure vector is pointing down by checking angle
# cos(angle) = a dot b / (||a|| * ||b||)
vec_z = np.array([0.0, 0.0, 1.0], dtype=np.float32)
cos_angle = np.dot(vec_down, vec_z) / np.linalg.norm(vec_down)
if cos_angle < Geolocation.__MIN_DOWN_COS_ANGLE:
local_logger.error(
f"Rotated source vector in world space is not pointing down, cos(angle) = {cos_angle}"
)
return False, None

# Find scalar multiple for the vector to touch the ground (z/3rd component is 0)
# Solve for s: o3 + s * d3 = 0
scaling = -vec_camera_in_world_position[2] / vec_down[2]
if scaling < 0.0:
local_logger.error(f"Scaling value is negative, scaling = {scaling}")
return False, None

vec_ground = vec_camera_in_world_position + scaling * vec_down
Expand All @@ -118,9 +135,11 @@ def __get_perspective_transform_matrix(
Calculates the destination points, then uses OpenCV to get the matrix.
"""
if not camera_properties.is_matrix_r3x3(drone_rotation_matrix):
self.__logger.error("Drone rotation matrix is not a 3 x 3 matrix")
return False, None

if not camera_properties.is_vector_r3(drone_position_ned):
self.__logger.error("Drone position in local space is not a vector in R3")
return False, None

# Get the vectors in world space
Expand All @@ -141,6 +160,7 @@ def __get_perspective_transform_matrix(
result, ground_point = self.__ground_intersection_from_vector(
vec_camera_position,
vec_down,
self.__logger,
)
if not result:
return False, None
Expand All @@ -156,25 +176,31 @@ def __get_perspective_transform_matrix(
dst,
)
# All exceptions must be caught and logged as early as possible
# pylint: disable-next=bare-except
except:
# TODO: Logging
# pylint: disable-next=broad-exception-caught
except Exception as e:
self.__logger.error(f"Could not get perspective transform matrix: {e}")
Xierumeng marked this conversation as resolved.
Show resolved Hide resolved
return False, None

return True, matrix

@staticmethod
def __convert_detection_to_world_from_image(
detection: detections_and_time.Detection, perspective_transform_matrix: np.ndarray
detection: detections_and_time.Detection,
perspective_transform_matrix: np.ndarray,
local_logger: logger.Logger,
) -> "tuple[bool, detection_in_world.DetectionInWorld | None]":
"""
Applies the transform matrix to the detection.
perspective_transform_matrix: Element in last row and column must be 1 .
"""
if not camera_properties.is_matrix_r3x3(perspective_transform_matrix):
local_logger.error("Perspective transform matrix is not a 3 x 3 matrix")
return False, None

if not np.allclose(perspective_transform_matrix[2][2], 1.0):
local_logger.error(
"Perspective transform matrix bottom right element is not close to 1.0"
)
return False, None

centre = detection.get_centre()
Expand Down Expand Up @@ -218,6 +244,7 @@ def __convert_detection_to_world_from_image(
# https://www.w3resource.com/python-exercises/numpy/python-numpy-exercise-96.php
output_normalized = output_vertices / vec_last_element[:, None]
if not np.isfinite(output_normalized).all():
local_logger.error("Normalized output is infinite")
return False, None

# Slice to remove the last element of each row
Expand All @@ -243,6 +270,7 @@ def run(
# Camera position in world (NED system)
# Cannot be underground
if detections.odometry_local.position.down >= 0.0:
self.__logger.error("Drone is underground")
return False, None

drone_position_ned = np.array(
Expand All @@ -262,6 +290,7 @@ def run(
detections.odometry_local.orientation.orientation.roll,
)
if not result:
self.__logger.error("Drone rotation matrix could not be created")
return False, None

# Get Pylance to stop complaining
Expand All @@ -282,10 +311,12 @@ def run(
result, detection_world = self.__convert_detection_to_world_from_image(
detection,
perspective_transform_matrix,
self.__logger,
)
# Partial data not allowed
if not result:
return False, None
detections_in_world.append(detection_world)
self.__logger.info(detection_world)

return True, detections_in_world
19 changes: 17 additions & 2 deletions modules/geolocation/geolocation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
Convert bounding box data into ground data.
"""

import os
import pathlib

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import camera_properties
from . import geolocation
from ..common.logger.modules import logger


def geolocation_worker(
Expand All @@ -21,15 +25,26 @@ def geolocation_worker(
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
"""
# TODO: Logging?
# TODO: Handle errors better

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

assert local_logger is not None

local_logger.info("Logger initialized")

result, locator = geolocation.Geolocation.create(
camera_intrinsics,
camera_drone_extrinsics,
local_logger,
)
if not result:
print("Worker failed to create class object")
local_logger.error("Worker failed to create class object")
return

# Get Pylance to stop complaining
Expand Down
Loading