diff --git a/sleap_io/io/main.py b/sleap_io/io/main.py index 32432ac4..49eebd6b 100644 --- a/sleap_io/io/main.py +++ b/sleap_io/io/main.py @@ -123,7 +123,10 @@ def load_video(filename: str, **kwargs) -> Video: """Load a video file. Args: - filename: Path to a video file. + filename: The filename(s) of the video. Supported extensions: "mp4", "avi", + "mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif", + "tiff", "bmp". If the filename is a list, a list of image filenames are + expected. If filename is a folder, it will be searched for images. Returns: A `Video` object. diff --git a/sleap_io/model/instance.py b/sleap_io/model/instance.py index 958a0f99..25dcd0aa 100644 --- a/sleap_io/model/instance.py +++ b/sleap_io/model/instance.py @@ -265,6 +265,13 @@ def __len__(self) -> int: """Return the number of points in the instance.""" return len(self.points) + def __repr__(self) -> str: + """Return a readable representation of the instance.""" + pts = self.numpy().tolist() + track = f'"{self.track.name}"' if self.track is not None else self.track + + return f"Instance(points={pts}, track={track})" + @property def n_visible(self) -> int: """Return the number of visible points in the instance.""" @@ -327,6 +334,22 @@ class PredictedInstance(Instance): score: float = 0.0 tracking_score: Optional[float] = 0 + def __repr__(self) -> str: + """Return a readable representation of the instance.""" + pts = self.numpy().tolist() + track = f'"{self.track.name}"' if self.track is not None else self.track + + score = str(self.score) if self.score is None else f"{self.score:.2f}" + tracking_score = ( + str(self.tracking_score) + if self.tracking_score is None + else f"{self.tracking_score:.2f}" + ) + return ( + f"PredictedInstance(points={pts}, track={track}, " + f"score={score}, tracking_score={tracking_score})" + ) + @classmethod def from_numpy( # type: ignore[override] cls, diff --git a/sleap_io/model/labels.py b/sleap_io/model/labels.py index 841c272a..6e6ae3e4 100644 --- a/sleap_io/model/labels.py +++ b/sleap_io/model/labels.py @@ -23,7 +23,7 @@ from attrs import define, field from typing import Union, Optional, Any import numpy as np - +from pathlib import Path from sleap_io.model.skeleton import Skeleton @@ -57,6 +57,14 @@ class Labels: def __attrs_post_init__(self): """Append videos, skeletons, and tracks seen in `labeled_frames` to `Labels`.""" + self.update() + + def update(self): + """Update data structures based on contents. + + This function will update the list of skeletons, videos and tracks from the + labeled frames, instances and suggestions. + """ for lf in self.labeled_frames: if lf.video not in self.videos: self.videos.append(lf.video) @@ -68,7 +76,13 @@ def __attrs_post_init__(self): if inst.track is not None and inst.track not in self.tracks: self.tracks.append(inst.track) - def __getitem__(self, key: int) -> list[LabeledFrame] | LabeledFrame: + for sf in self.suggestions: + if sf.video not in self.videos: + self.videos.append(sf.video) + + def __getitem__( + self, key: int | slice | list[int] | np.ndarray | tuple[Video, int] + ) -> list[LabeledFrame] | LabeledFrame: """Return one or more labeled frames based on indexing criteria.""" if type(key) == int: return self.labeled_frames[key] @@ -111,7 +125,8 @@ def __repr__(self) -> str: f"labeled_frames={len(self.labeled_frames)}, " f"videos={len(self.videos)}, " f"skeletons={len(self.skeletons)}, " - f"tracks={len(self.tracks)}" + f"tracks={len(self.tracks)}, " + f"suggestions={len(self.suggestions)}" ")" ) @@ -119,6 +134,49 @@ def __str__(self) -> str: """Return a readable representation of the labels.""" return self.__repr__() + def append(self, lf: LabeledFrame, update: bool = True): + """Append a labeled frame to the labels. + + Args: + lf: A labeled frame to add to the labels. + update: If `True` (the default), update list of videos, tracks and + skeletons from the contents. + """ + self.labeled_frames.append(lf) + + if update: + if lf.video not in self.videos: + self.videos.append(lf.video) + + for inst in lf: + if inst.skeleton not in self.skeletons: + self.skeletons.append(inst.skeleton) + + if inst.track is not None and inst.track not in self.tracks: + self.tracks.append(inst.track) + + def extend(self, lfs: list[LabeledFrame], update: bool = True): + """Append a labeled frame to the labels. + + Args: + lfs: A list of labeled frames to add to the labels. + update: If `True` (the default), update list of videos, tracks and + skeletons from the contents. + """ + self.labeled_frames.extend(lfs) + + if update: + for lf in lfs: + if lf.video not in self.videos: + self.videos.append(lf.video) + + for inst in lf: + if inst.skeleton not in self.skeletons: + self.skeletons.append(inst.skeleton) + + if inst.track is not None and inst.track not in self.tracks: + self.tracks.append(inst.track) + def numpy( self, video: Optional[Union[Video, int]] = None, @@ -417,3 +475,79 @@ def replace_videos( for sf in self.suggestions: if sf.video in video_map: sf.video = video_map[sf.video] + + def replace_filenames( + self, + new_filenames: list[str | Path] | None = None, + filename_map: dict[str | Path, str | Path] | None = None, + prefix_map: dict[str | Path, str | Path] | None = None, + ): + """Replace video filenames. + + Args: + new_filenames: List of new filenames. Must have the same length as the + number of videos in the labels. + filename_map: Dictionary mapping old filenames (keys) to new filenames + (values). + prefix_map: Dictonary mapping old prefixes (keys) to new prefixes (values). + + Notes: + Only one of the argument types can be provided. + """ + n = 0 + if new_filenames is not None: + n += 1 + if filename_map is not None: + n += 1 + if prefix_map is not None: + n += 1 + if n != 1: + raise ValueError( + "Exactly one input method must be provided to replace filenames." + ) + + if new_filenames is not None: + if len(self.videos) != len(new_filenames): + raise ValueError( + f"Number of new filenames ({len(new_filenames)}) does not match " + f"the number of videos ({len(self.videos)})." + ) + + for video, new_filename in zip(self.videos, new_filenames): + video.replace_filename(new_filename) + + elif filename_map is not None: + for video in self.videos: + for old_fn, new_fn in filename_map.items(): + if type(video.filename) == list: + new_fns = [] + for fn in video.filename: + if Path(fn) == Path(old_fn): + new_fns.append(new_fn) + else: + new_fns.append(fn) + video.replace_filename(new_fns) + else: + if Path(video.filename) == Path(old_fn): + video.replace_filename(new_fn) + + elif prefix_map is not None: + for video in self.videos: + for old_prefix, new_prefix in prefix_map.items(): + old_prefix, new_prefix = Path(old_prefix), Path(new_prefix) + + if type(video.filename) == list: + new_fns = [] + for fn in video.filename: + fn = Path(fn) + if fn.as_posix().startswith(old_prefix.as_posix()): + new_fns.append(new_prefix / fn.relative_to(old_prefix)) + else: + new_fns.append(fn) + video.replace_filename(new_fns) + else: + fn = Path(video.filename) + if fn.as_posix().startswith(old_prefix.as_posix()): + video.replace_filename( + new_prefix / fn.relative_to(old_prefix) + ) diff --git a/sleap_io/model/skeleton.py b/sleap_io/model/skeleton.py index 458cca42..17194b5a 100644 --- a/sleap_io/model/skeleton.py +++ b/sleap_io/model/skeleton.py @@ -165,6 +165,11 @@ def edge_inds(self) -> list[Tuple[int, int]]: for edge in self.edges ] + @property + def edge_names(self) -> list[str, str]: + """Edge names as a list of 2-tuples with string node names.""" + return [(edge.source.name, edge.destination.name) for edge in self.edges] + @property def flipped_node_inds(self) -> list[int]: """Returns node indices that should be switched when horizontally flipping.""" @@ -183,6 +188,11 @@ def __len__(self) -> int: """Return the number of nodes in the skeleton.""" return len(self.nodes) + def __repr__(self) -> str: + """Return a readable representation of the skeleton.""" + nodes = ", ".join([f'"{node}"' for node in self.node_names]) + return "Skeleton(" f"nodes=[{nodes}], " f"edges={self.edge_inds}" ")" + def index(self, node: Node | str) -> int: """Return the index of a node specified as a `Node` or string name.""" if type(node) == str: diff --git a/sleap_io/model/video.py b/sleap_io/model/video.py index 2c547804..a2644cd6 100644 --- a/sleap_io/model/video.py +++ b/sleap_io/model/video.py @@ -8,7 +8,7 @@ import attrs from typing import Tuple, Optional, Optional import numpy as np -from sleap_io.io.video import VideoBackend, MediaVideo, HDF5Video +from sleap_io.io.video import VideoBackend, MediaVideo, HDF5Video, ImageVideo from pathlib import Path @@ -23,7 +23,10 @@ class Video: backend appropriately. Attributes: - filename: The filename(s) of the video. + filename: The filename(s) of the video. Supported extensions: "mp4", "avi", + "mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif", + "tiff", "bmp". If the filename is a list, a list of image filenames are + expected. If filename is a folder, it will be searched for images. backend: An object that implements the basic methods for reading and manipulating frames of a specific video type. backend_metadata: A dictionary of metadata specific to the backend. This is @@ -45,7 +48,12 @@ class Video: backend_metadata: dict[str, any] = attrs.field(factory=dict) source_video: Optional[Video] = None - EXTS = MediaVideo.EXTS + HDF5Video.EXTS + EXTS = MediaVideo.EXTS + HDF5Video.EXTS + ImageVideo.EXTS + + def __attrs_post_init__(self): + """Post init syntactic sugar.""" + if self.backend is None and self.exists(): + self.open() def __attrs_post_init__(self): """Post init syntactic sugar.""" @@ -65,7 +73,10 @@ def from_filename( """Create a Video from a filename. Args: - filename: Path to video file(s). + filename: The filename(s) of the video. Supported extensions: "mp4", "avi", + "mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif", + "tiff", "bmp". If the filename is a list, a list of image filenames are + expected. If filename is a folder, it will be searched for images. dataset: Name of dataset in HDF5 file. grayscale: Whether to force grayscale. If None, autodetect on first frame load. diff --git a/tests/model/test_instance.py b/tests/model/test_instance.py index 1381bad1..e3181ec3 100644 --- a/tests/model/test_instance.py +++ b/tests/model/test_instance.py @@ -89,6 +89,10 @@ def test_instance(): inst = Instance({"A": [0, 1], "B": [2, 3]}, skeleton=Skeleton(["A", "B"])) assert_equal(inst.numpy(), [[0, 1], [2, 3]]) assert type(inst["A"]) == Point + assert str(inst) == "Instance(points=[[0.0, 1.0], [2.0, 3.0]], track=None)" + + inst.track = Track("trk") + assert str(inst) == 'Instance(points=[[0.0, 1.0], [2.0, 3.0]], track="trk")' inst = Instance({"A": [0, 1]}, skeleton=Skeleton(["A", "B"])) assert_equal(inst.numpy(), [[0, 1], [np.nan, np.nan]]) @@ -155,3 +159,8 @@ def test_predicted_instance(): assert inst[0].score == 0.4 assert inst[1].score == 0.5 assert inst.score == 0.6 + + assert ( + str(inst) == "PredictedInstance(points=[[0.0, 1.0], [2.0, 3.0]], track=None, " + "score=0.60, tracking_score=None)" + ) diff --git a/tests/model/test_labels.py b/tests/model/test_labels.py index 52ced7c6..cc783a0d 100644 --- a/tests/model/test_labels.py +++ b/tests/model/test_labels.py @@ -9,11 +9,13 @@ PredictedInstance, LabeledFrame, Track, + SuggestionFrame, load_slp, load_video, ) from sleap_io.model.labels import Labels import numpy as np +from pathlib import Path def test_labels(): @@ -42,7 +44,98 @@ def test_labels(): for lf_idx, lf in enumerate(labels): assert lf == labels[lf_idx] - assert str(labels) == "Labels(labeled_frames=1, videos=1, skeletons=1, tracks=0)" + assert ( + str(labels) + == "Labels(labeled_frames=1, videos=1, skeletons=1, tracks=0, suggestions=0)" + ) + + +def test_update(slp_real_data): + base_labels = load_slp(slp_real_data) + + labels = Labels(base_labels.labeled_frames) + assert len(labels.videos) == len(base_labels.videos) == 1 + assert len(labels.tracks) == len(base_labels.tracks) == 0 + assert len(labels.skeletons) == len(base_labels.skeletons) == 1 + + new_video = Video.from_filename("fake.mp4") + labels.suggestions.append(SuggestionFrame(video=new_video, frame_idx=0)) + + new_track = Track("new_track") + labels[0][0].track = new_track + + new_skel = Skeleton(["A", "B"]) + new_video2 = Video.from_filename("fake2.mp4") + labels.append( + LabeledFrame( + video=new_video2, + frame_idx=0, + instances=[ + Instance.from_numpy(np.array([[0, 1], [2, 3]]), skeleton=new_skel) + ], + ), + update=False, + ) + + labels.update() + assert new_video in labels.videos + assert new_video2 in labels.videos + assert new_track in labels.tracks + assert new_skel in labels.skeletons + + +def test_append_extend(): + labels = Labels() + + new_skel = Skeleton(["A", "B"]) + new_video = Video.from_filename("fake.mp4") + new_track = Track("new_track") + labels.append( + LabeledFrame( + video=new_video, + frame_idx=0, + instances=[ + Instance.from_numpy( + np.array([[0, 1], [2, 3]]), skeleton=new_skel, track=new_track + ) + ], + ), + update=True, + ) + assert labels.videos == [new_video] + assert labels.skeletons == [new_skel] + assert labels.tracks == [new_track] + + new_video2 = Video.from_filename("fake.mp4") + new_skel2 = Skeleton(["A", "B", "C"]) + new_track2 = Track("new_track2") + labels.extend( + [ + LabeledFrame( + video=new_video, + frame_idx=1, + instances=[ + Instance.from_numpy( + np.array([[0, 1], [2, 3]]), skeleton=new_skel, track=new_track2 + ) + ], + ), + LabeledFrame( + video=new_video2, + frame_idx=0, + instances=[ + Instance.from_numpy( + np.array([[0, 1], [2, 3], [4, 5]]), skeleton=new_skel2 + ) + ], + ), + ], + update=True, + ) + + assert labels.videos == [new_video, new_video2] + assert labels.skeletons == [new_skel, new_skel2] + assert labels.tracks == [new_track, new_track2] def test_labels_numpy(labels_predictions: Labels): @@ -272,3 +365,54 @@ def test_replace_videos(slp_real_data): for sf in labels.suggestions: assert sf.video.filename == "fake.mp4" + + +def test_replace_filenames(): + labels = Labels(videos=[Video.from_filename("a.mp4"), Video.from_filename("b.mp4")]) + + with pytest.raises(ValueError): + labels.replace_filenames() + + with pytest.raises(ValueError): + labels.replace_filenames(new_filenames=[], filename_map={}) + + with pytest.raises(ValueError): + labels.replace_filenames(new_filenames=[], prefix_map={}) + + with pytest.raises(ValueError): + labels.replace_filenames(filename_map={}, prefix_map={}) + + with pytest.raises(ValueError): + labels.replace_filenames(new_filenames=[], filename_map={}, prefix_map={}) + + labels.replace_filenames(new_filenames=["c.mp4", "d.mp4"]) + assert [v.filename for v in labels.videos] == ["c.mp4", "d.mp4"] + + with pytest.raises(ValueError): + labels.replace_filenames(["f.mp4"]) + + labels.replace_filenames( + filename_map={"c.mp4": "/a/b/c.mp4", "d.mp4": "/a/b/d.mp4"} + ) + assert [Path(v.filename).as_posix() for v in labels.videos] == [ + "/a/b/c.mp4", + "/a/b/d.mp4", + ] + + labels.replace_filenames(prefix_map={"/a/b/": "/A/B"}) + assert [Path(v.filename).as_posix() for v in labels.videos] == [ + "/A/B/c.mp4", + "/A/B/d.mp4", + ] + + labels = Labels(videos=[Video.from_filename(["imgs/img0.png", "imgs/img1.png"])]) + labels.replace_filenames( + filename_map={ + "imgs/img0.png": "train/imgs/img0.png", + "imgs/img1.png": "train/imgs/img1.png", + } + ) + assert labels.video.filename == ["train/imgs/img0.png", "train/imgs/img1.png"] + + labels.replace_filenames(prefix_map={"train/": "test/"}) + assert labels.video.filename == ["test/imgs/img0.png", "test/imgs/img1.png"] diff --git a/tests/model/test_skeleton.py b/tests/model/test_skeleton.py index 49d44d96..0d861dd0 100644 --- a/tests/model/test_skeleton.py +++ b/tests/model/test_skeleton.py @@ -40,6 +40,8 @@ def test_skeleton(): assert skel.edges[0] == Edge(skel.nodes[0], skel.nodes[1]) assert skel.edge_inds == [(0, 1)] + assert str(skel) == 'Skeleton(nodes=["A", "B"], edges=[(0, 1)])' + with pytest.raises(IndexError): skel[None] @@ -119,6 +121,7 @@ def test_add_edge(): skel = Skeleton(["A", "B"]) skel.add_edge("A", "B") assert skel.edge_inds == [(0, 1)] + assert skel.edge_names == [("A", "B")] skel.add_edge("B", "A") assert skel.edge_inds == [(0, 1), (1, 0)]