Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ImageVideo backend #88

Merged
merged 6 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 68 additions & 10 deletions sleap_io/io/video.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Backends for reading and writing videos."""

from __future__ import annotations
from pathlib import Path

import simplejson as json
import sys
Expand Down Expand Up @@ -42,15 +43,15 @@ class VideoBackend:
constructor to create a backend instance.

Attributes:
filename: Path to video file.
filename: Path to video file(s).
grayscale: Whether to force grayscale. If None, autodetect on first frame load.
keep_open: Whether to keep the video reader open between calls to read frames.
If False, will close the reader after each call. If True (the default), it
will keep the reader open and cache it for subsequent calls which may
enhance the performance of reading multiple frames.
"""

filename: str
filename: str | Path | list[str] | list[Path]
grayscale: Optional[bool] = None
keep_open: bool = True
_cached_shape: Optional[Tuple[int, int, int, int]] = None
Expand All @@ -59,7 +60,7 @@ class VideoBackend:
@classmethod
def from_filename(
cls,
filename: str,
filename: str | list[str],
dataset: Optional[str] = None,
grayscale: Optional[bool] = None,
keep_open: bool = True,
Expand All @@ -68,7 +69,7 @@ def from_filename(
"""Create a VideoBackend from a filename.

Args:
filename: Path to video file.
filename: Path to video file(s).
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
Expand All @@ -80,10 +81,22 @@ def from_filename(
Returns:
VideoBackend subclass instance.
"""
if type(filename) != str:
filename = str(filename)
if isinstance(filename, Path):
filename = filename.as_posix()

if type(filename) == str and Path(filename).is_dir():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use isinstance() instead of type comparison for checking if filename is a string.

-        if type(filename) == str and Path(filename).is_dir():
+        if isinstance(filename, str) and Path(filename).is_dir():

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if type(filename) == str and Path(filename).is_dir():
if isinstance(filename, str) and Path(filename).is_dir():

filename = ImageVideo.find_images(filename)

if filename.endswith(MediaVideo.EXTS):
if type(filename) == list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use isinstance() for type checking to ensure consistency and readability.

-        if type(filename) == list:
+        if isinstance(filename, list):

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if type(filename) == list:
if isinstance(filename, list):

filename = [Path(f).as_posix() for f in filename]
return ImageVideo(
filename, grayscale=grayscale, **_get_valid_kwargs(ImageVideo, kwargs)
)
elif filename.endswith(ImageVideo.EXTS):
return ImageVideo(
[filename], grayscale=grayscale, **_get_valid_kwargs(ImageVideo, kwargs)
)
elif filename.endswith(MediaVideo.EXTS):
return MediaVideo(
filename,
grayscale=grayscale,
Expand All @@ -106,8 +119,8 @@ def _read_frame(self, frame_idx: int) -> np.ndarray:
raise NotImplementedError

def _read_frames(self, frame_inds: list) -> np.ndarray:
"""Read a list of frames from the video. Must be implemented in subclasses."""
return np.stack([self._read_frame(i) for i in frame_inds], axis=0)
"""Read a list of frames from the video."""
return np.stack([self.get_frame(i) for i in frame_inds], axis=0)

def read_test_frame(self) -> np.ndarray:
"""Read a single frame from the video to test for grayscale.
Expand Down Expand Up @@ -146,7 +159,7 @@ def num_frames(self) -> int:

@property
def img_shape(self) -> Tuple[int, int, int]:
"""Shape of a single frame in the video. Must be implemented in subclasses."""
"""Shape of a single frame in the video."""
return self.get_frame(0).shape

@property
Expand Down Expand Up @@ -668,3 +681,48 @@ def _read_frames(self, frame_inds: list) -> np.ndarray:
f.close()

return imgs


@attrs.define
class ImageVideo(VideoBackend):
"""Video backend for reading videos stored as image files.

This backend supports reading videos stored as a list of images.

Attributes:
filename: Path to video files.
grayscale: Whether to force grayscale. If None, autodetect on first frame load.
"""

EXTS = ("png", "jpg", "jpeg", "tif", "tiff", "bmp")

@staticmethod
def find_images(folder: str) -> list[str]:
"""Find images in a folder and return a list of filenames."""
folder = Path(folder)
return sorted(
[f.as_posix() for f in folder.glob("*") if f.suffix[1:] in ImageVideo.EXTS]
)

@property
def num_frames(self) -> int:
"""Number of frames in the video."""
return len(self.filename)

def _read_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video.

Args:
frame_idx: Index of frame to read.

Returns:
The frame as a numpy array of shape `(height, width, channels)`.

Notes:
This does not apply grayscale conversion. It is recommended to use the
`get_frame` method of the `VideoBackend` class instead.
"""
img = iio.imread(self.filename[frame_idx])
if img.ndim == 2:
img = np.expand_dims(img, axis=-1)
return img
36 changes: 28 additions & 8 deletions sleap_io/model/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,31 @@ class Video:
backend appropriately.

Attributes:
filename: The filename of the video.
filename: The filename(s) of the video.
backend: An object that implements the basic methods for reading and
manipulating frames of a specific video type.

See also: VideoBackend
"""

filename: str
filename: str | list[str]
backend: Optional[VideoBackend] = None

EXTS = MediaVideo.EXTS + HDF5Video.EXTS

@classmethod
def from_filename(
cls,
filename: str,
filename: str | list[str],
dataset: Optional[str] = None,
grayscale: Optional[str] = None,
grayscale: Optional[bool] = None,
keep_open: bool = True,
**kwargs,
) -> VideoBackend:
"""Create a Video from a filename.

Args:
filename: Path to video file.
filename: Path to video file(s).
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
Expand Down Expand Up @@ -132,8 +132,21 @@ def __getitem__(self, inds: int | list[int] | slice) -> np.ndarray:
self.open()
return self.backend[inds]

def exists(self) -> bool:
"""Check if the video file exists."""
def exists(self, check_all: bool = False) -> bool:
"""Check if the video file exists.

Args:
check_all: If `True`, check that all filenames in a list exist. If `False`
(the default), check that the first filename exists.
"""
if isinstance(self.filename, list):
if check_all:
for f in self.filename:
if not Path(f).exists():
return False
return True
else:
return Path(self.filename[0]).exists()
return Path(self.filename).exists()

@property
Expand Down Expand Up @@ -193,7 +206,9 @@ def close(self):
del self.backend
self.backend = None

def replace_filename(self, new_filename: str | Path, open: bool = True):
def replace_filename(
self, new_filename: str | Path | list[str] | list[Path], open: bool = True
):
"""Update the filename of the video, optionally opening the backend.

Args:
Expand All @@ -204,6 +219,11 @@ def replace_filename(self, new_filename: str | Path, open: bool = True):
if isinstance(new_filename, Path):
new_filename = str(new_filename)

if isinstance(new_filename, list):
new_filename = [
p.as_posix() if isinstance(p, Path) else p for p in new_filename
]

self.filename = new_filename

if open:
Expand Down
Binary file added tests/data/videos/imgs/img.00.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/data/videos/imgs/img.01.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/data/videos/imgs/img.02.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions tests/fixtures/videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ def centered_pair_low_quality_path():
def centered_pair_low_quality_video(centered_pair_low_quality_path):
"""Video with two flies in the center."""
return sleap_io.Video.from_filename(centered_pair_low_quality_path)


@pytest.fixture
def centered_pair_frame_paths():
"""Paths to three frames of a video with two flies in the center."""
return [
"tests/data/videos/imgs/img.00.jpg",
"tests/data/videos/imgs/img.01.jpg",
"tests/data/videos/imgs/img.02.jpg",
]
23 changes: 22 additions & 1 deletion tests/io/test_video_backends.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Tests for methods in the sleap_io.io.video file."""

from sleap_io.io.video import VideoBackend, MediaVideo, HDF5Video
from sleap_io.io.video import VideoBackend, MediaVideo, HDF5Video, ImageVideo
import numpy as np
from numpy.testing import assert_equal
import h5py
import pytest
from pathlib import Path


def test_video_backend_from_filename(centered_pair_low_quality_path, slp_minimal_pkg):
Expand Down Expand Up @@ -140,3 +141,23 @@ def test_hdf5video_embedded(slp_minimal_pkg):
== "tests/data/json_format_v1/centered_pair_low_quality.mp4"
)
assert backend.has_embedded_images


def test_imagevideo(centered_pair_frame_paths):
backend = VideoBackend.from_filename(centered_pair_frame_paths)
assert type(backend) == ImageVideo
assert backend.shape == (3, 384, 384, 1)
assert backend[0].shape == (384, 384, 1)
assert backend[:3].shape == (3, 384, 384, 1)

img_folder = Path(centered_pair_frame_paths[0]).parent
imgs = ImageVideo.find_images(img_folder)
assert imgs == centered_pair_frame_paths

backend = VideoBackend.from_filename(img_folder)
assert type(backend) == ImageVideo
assert backend.shape == (3, 384, 384, 1)

backend = VideoBackend.from_filename(centered_pair_frame_paths[0])
assert type(backend) == ImageVideo
assert backend.shape == (1, 384, 384, 1)
26 changes: 23 additions & 3 deletions tests/model/test_video.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for methods in the sleap_io.model.video file."""

from sleap_io import Video
from sleap_io.io.video import MediaVideo
from sleap_io.io.video import MediaVideo, ImageVideo
import numpy as np
import pytest
from pathlib import Path
Expand Down Expand Up @@ -36,12 +36,24 @@ def test_video_repr(centered_pair_low_quality_video):
)


def test_video_exists(centered_pair_low_quality_video):
def test_video_exists(centered_pair_low_quality_video, centered_pair_frame_paths):
video = Video("test.mp4")
assert video.exists() is False

assert centered_pair_low_quality_video.exists() is True

video = Video(centered_pair_frame_paths)
assert video.exists() is True
assert video.exists(check_all=True) is True

video = Video([centered_pair_frame_paths[0], "fake.jpg"])
assert video.exists() is True
assert video.exists(check_all=True) is False

video = Video(["fake.jpg", centered_pair_frame_paths[0]])
assert video.exists() is False
assert video.exists(check_all=True) is False


def test_video_open_close(centered_pair_low_quality_path):
video = Video(centered_pair_low_quality_path)
Expand Down Expand Up @@ -79,7 +91,9 @@ def test_video_open_close(centered_pair_low_quality_path):
assert video.shape == (1100, 384, 384, 1)


def test_video_replace_filename(centered_pair_low_quality_path):
def test_video_replace_filename(
centered_pair_low_quality_path, centered_pair_frame_paths
):
video = Video.from_filename("test.mp4")
assert video.exists() is False

Expand All @@ -101,3 +115,9 @@ def test_video_replace_filename(centered_pair_low_quality_path):
assert video.exists() is True
assert video.is_open is False
assert video.backend is None

video = Video.from_filename(["fake.jpg", "fake2.jpg", "fake3.jpg"])
assert type(video.backend) == ImageVideo
video.replace_filename(centered_pair_frame_paths)
assert type(video.backend) == ImageVideo
assert video.exists(check_all=True) is True
Loading