Skip to content

Commit

Permalink
Add eyelid data hadling and example on aperture
Browse files Browse the repository at this point in the history
  • Loading branch information
mikelgg93 committed Feb 3, 2025
1 parent 5b63298 commit 2a8b3c2
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 32 deletions.
110 changes: 110 additions & 0 deletions examples/eye_lid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import sys

import cv2
import numpy as np
from tqdm import tqdm

import pupil_labs.neon_recording as nr
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame


def overlay_image(img, img_overlay, x, y):
"""Overlay `img_overlay` onto `img` at (x, y)."""
# Image ranges
y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])

# Overlay ranges
y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)

if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
return

img_crop = img[y1:y2, x1:x2]
img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
img_crop[:] = img_overlay_crop


def plot(img, data, value_range, x_width, color, line_width=2):
for idx in range(1, len(data)):
x_values = [int(idx2 * x_width) for idx2 in [idx - 1, idx]]

y_norms = [
(data[idx2] - value_range[0]) / (value_range[1] - value_range[0])
for idx2 in [idx - 1, idx]
]
y_values = [int((1 - y_norm) * img.shape[0]) for y_norm in y_norms]

points = [[*v] for v in zip(x_values, y_values, strict=False)]

cv2.line(img, points[0], points[1], color, line_width)


def make_eyelid_video(recording_dir, output_video_path):
recording = nr.load(recording_dir)

fps = 200

video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.eye.width, recording.eye.height),
)

output_timestamps = np.arange(recording.eye.ts[0], recording.eye.ts[-1], 1 / fps)

eye_video_sampled = recording.eye.sample(output_timestamps)
eyelid_sampled = recording.eyelid.sample(output_timestamps)
combined_data = zip(
output_timestamps,
eye_video_sampled,
eyelid_sampled,
strict=False,
)

plot_metas = {
"eyelid_aperture_left": {"color": [0, 0, 255]},
"eyelid_aperture_right": {"color": [0, 255, 0]},
}

for plot_name, plot_meta in plot_metas.items():
plot_meta["range"] = (
np.min(recording.eyelid.data[plot_name]),
np.max(recording.eyelid.data[plot_name]),
)

plot_duration = 0.5
plot_point_count = plot_duration * fps
plot_x_width = recording.eye.width / plot_point_count

for ts, eye_frame, eyelid in tqdm(combined_data, total=len(output_timestamps)):
if abs(eye_frame.ts - ts) < 2 / fps:
eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
else:
eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr

for plot_name, plot_meta in plot_metas.items():
min_ts = ts - plot_duration
time_frame = (min_ts < eyelid_sampled.data.ts) & (
eyelid_sampled.data.ts <= ts
)
plot_data = eyelid_sampled.data[time_frame][plot_name]
plot(
eye_pixels,
plot_data,
plot_meta["range"],
plot_x_width,
plot_meta["color"],
)

video_writer.write(eye_pixels)
cv2.imshow("Frame", eye_pixels)
cv2.pollKey()

video_writer.release()


if __name__ == "__main__":
make_eyelid_video(sys.argv[1], "eye-lid-output-video.avi")
63 changes: 33 additions & 30 deletions src/pupil_labs/neon_recording/neon_recording.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import json
import pathlib
from typing import Union

from . import structlog
from .calib import Calibration
from .stream.gaze_stream import GazeStream
from .stream.av_stream.audio_stream import AudioStream
from .stream.av_stream.video_stream import VideoStream
from .stream.event_stream import EventStream
from .stream.imu import IMUStream
from .stream.eye_state_stream import EyeStateStream
from .stream.av_stream.video_stream import VideoStream
from .stream.av_stream.audio_stream import AudioStream
from .stream.eyelid_stream import EyeLidStream
from .stream.gaze_stream import GazeStream
from .stream.imu import IMUStream

log = structlog.get_logger(__name__)


class NeonRecording:
"""
Class to handle the Neon Recording data
"""Class to handle the Neon Recording data
Attributes:
* `info` (dict): Information loaded from info.json
Expand All @@ -28,20 +27,20 @@ class NeonRecording:
* `streams` (dict): data streams of the recording
"""

def __init__(self, rec_dir_in: Union[pathlib.Path, str]):
"""
Initialize the NeonRecording object
def __init__(self, rec_dir_in: pathlib.Path | str):
"""Initialize the NeonRecording object
Args:
rec_dir_in: Path to the recording directory.
Raises:
FileNotFoundError: If the directory does not exist or is not valid.
"""

self._rec_dir = pathlib.Path(rec_dir_in).resolve()
if not self._rec_dir.exists() or not self._rec_dir.is_dir():
raise FileNotFoundError(f"Directory not found or not valid: {self._rec_dir}")
raise FileNotFoundError(
f"Directory not found or not valid: {self._rec_dir}"
)

log.debug(f"NeonRecording: Loading recording from {rec_dir_in}")

Expand Down Expand Up @@ -69,15 +68,15 @@ def __init__(self, rec_dir_in: Union[pathlib.Path, str]):
"events": None,
"eye": None,
"eye_state": None,
"eyelid": None,
"gaze": None,
"imu": None,
"scene": None,
}

@property
def gaze(self) -> GazeStream:
"""
2D gaze data in scene-camera space
"""2D gaze data in scene-camera space
Returns:
GazeStream: Each record contains
Expand All @@ -92,8 +91,7 @@ def gaze(self) -> GazeStream:

@property
def imu(self) -> IMUStream:
"""
Motion and orientation data
"""Motion and orientation data
Returns:
IMUStream:
Expand All @@ -105,8 +103,7 @@ def imu(self) -> IMUStream:

@property
def eye_state(self) -> EyeStateStream:
"""
Eye state data
"""Eye state data
Returns:
EyeStateStream
Expand All @@ -117,9 +114,20 @@ def eye_state(self) -> EyeStateStream:
return self.streams["eye_state"]

@property
def scene(self) -> VideoStream:
def eyelid(self) -> EyeLidStream:
"""Eyelid's data
Returns:
EyeLidStream
"""
Frames of video from the scene camera
if self.streams["eyelid"] is None:
self.streams["eyelid"] = EyeLidStream(self)

return self.streams["eyelid"]

@property
def scene(self) -> VideoStream:
"""Frames of video from the scene camera
Returns:
VideoStream
Expand All @@ -131,8 +139,7 @@ def scene(self) -> VideoStream:

@property
def eye(self) -> VideoStream:
"""
Frames of video from the eye cameras
"""Frames of video from the eye cameras
Returns:
VideoStream
Expand All @@ -144,8 +151,7 @@ def eye(self) -> VideoStream:

@property
def events(self) -> EventStream:
"""
Event annotations
"""Event annotations
Returns:
EventStream
Expand All @@ -157,8 +163,7 @@ def events(self) -> EventStream:

@property
def audio(self) -> AudioStream:
"""
Audio from the scene video
"""Audio from the scene video
Returns:
AudioStream
Expand All @@ -169,8 +174,6 @@ def audio(self) -> AudioStream:
return self.streams["audio"]


def load(rec_dir_in: Union[pathlib.Path, str]) -> NeonRecording:
"""
Load a :class:`.NeonRecording`
"""
def load(rec_dir_in: pathlib.Path | str) -> NeonRecording:
"""Load a :class:`.NeonRecording`"""
return NeonRecording(rec_dir_in)
9 changes: 7 additions & 2 deletions src/pupil_labs/neon_recording/stream/eye_state_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ def __init__(self, recording):
eye_state_files = find_sorted_multipart_files(recording._rec_dir, "eye_state")
eye_state_data, time_data = load_multipart_data_time_pairs(eye_state_files, "<f4", 2)

data = eye_state_data.reshape(-1, 14)
data = np.vstack([time_data, data.T])
if eye_state_data.size % 20 == 0:
data = eye_state_data.reshape(-1, 20)
elif eye_state_data.size % 14 == 0:
data = eye_state_data.reshape(-1, 14)
else:
raise ValueError("Unexpected eye state data size")
data = np.vstack([time_data, data.T[:14]])
data = np.rec.fromarrays(
data,
names=[
Expand Down
46 changes: 46 additions & 0 deletions src/pupil_labs/neon_recording/stream/eyelid_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import numpy as np

from .. import structlog
from ..utils import find_sorted_multipart_files, load_multipart_data_time_pairs
from .stream import Stream

log = structlog.get_logger(__name__)


class EyeLidStream(Stream):
"""EyeLid state data
Each record contains:
* `ts`: The moment these data were recorded
* `eyelid_angle_top_left`: The angle of the top left eyelid in degrees
* `eyelid_angle_bottom_left`: The angle of the bottom left eyelid in degrees
* `eyelid_aperture_left`: The aperture of the eyelid in mm
"""

def __init__(self, recording):
log.debug("NeonRecording: Loading eye state data")

eye_state_files = find_sorted_multipart_files(recording._rec_dir, "eye_state")
eye_state_data, time_data = load_multipart_data_time_pairs(
eye_state_files, "<f4", 2
)

if eye_state_data.size % 20 == 0:
data = eye_state_data.reshape(-1, 20)
else:
raise ValueError("This recording does not contain eyelid data.")
data = np.vstack([time_data, data.T[14:]])
data = np.rec.fromarrays(
data,
names=[
"ts",
"eyelid_angle_top_left",
"eyelid_angle_bottom_left",
"eyelid_aperture_left",
"eyelid_angle_top_right",
"eyelid_angle_bottom_right",
"eyelid_aperture_right",
],
)

super().__init__("eyelid", recording, data)

0 comments on commit 2a8b3c2

Please sign in to comment.