From a2e503a4526abbe7e3bf6a7853b11dccc3bf92b1 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Wed, 27 Sep 2023 13:43:35 -0400 Subject: [PATCH 01/12] Adding submodule videolab (#1) --- src/cleanvision/videolab.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/cleanvision/videolab.py diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py new file mode 100644 index 00000000..b9589168 --- /dev/null +++ b/src/cleanvision/videolab.py @@ -0,0 +1,6 @@ +"""Videolab is an extension of Imagelab for finding issues in a video dataset.""" +from cleanvision.imagelab import Imagelab + + +class Videolab(Imagelab): + pass From 40ebe0f16f58b9f47f4db13f6b979da981968c73 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Wed, 27 Sep 2023 16:11:27 -0400 Subject: [PATCH 02/12] Adding video sampler code (#2) --- pyproject.toml | 1 + src/cleanvision/videolab.py | 133 ++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index af24286c..31245018 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ classifiers = [ ] dependencies = [ "Pillow>=9.3", + "av>=10.0.0", "numpy>=1.20.0", "pandas>=1.1.5", "tabulate>=0.8.3", # pandas optional dependency for .to_markdown() diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index b9589168..b71b9507 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -1,6 +1,139 @@ """Videolab is an extension of Imagelab for finding issues in a video dataset.""" +import os +import time +from collections import Counter, OrderedDict +from dataclasses import dataclass +from queue import Queue +from threading import Thread +from typing import Any, Dict, Iterable, Optional, Tuple, Union + +import av # type: ignore +from imagehash import phash +from PIL import Image + from cleanvision.imagelab import Imagelab +# custom type +SampleFrame = Tuple[Union[Image.Image, None], Dict[str, bool]] + + +@dataclass +class SamplerConfig: + min_frame_interval_sec: float = 1 + keyframes_only: bool = True + buffer_size: int = 10 + hash_size: int = 4 + queue_wait: float = 0.1 + debug: bool = False + + +class HashBuffer: + def __init__(self, size: int) -> None: + self.ordered_buffer: OrderedDict[ + str, Tuple[Image, Dict[str, Any]] + ] = OrderedDict() + self.max_size = size + + def add( + self, item: Image, hash_: str, metadata: Dict[str, Any] = {} + ) -> Optional[Any]: + if not self.__check_duplicate(hash_): + return self.__add(item, hash_, metadata) + return None + + def __add( + self, item: Image, hash_: str, metadata: Dict[str, Any] = {} + ) -> Optional[Any]: + self.ordered_buffer[hash_] = (item, metadata) + if len(self.ordered_buffer) >= self.max_size: + return self.ordered_buffer.popitem(last=False)[1] + return None + + def __check_duplicate(self, hash_: str) -> bool: + if hash_ in self.ordered_buffer: + self.ordered_buffer.move_to_end(hash_) + return True + return False + + +class VideoSampler: + def __init__(self, cfg: SamplerConfig) -> None: + self.cfg = cfg + self.hash_buf = HashBuffer(cfg.buffer_size) + self.stats: Counter[str] = Counter() + + def compute_hash(self, frame_img: Image) -> str: + return str(phash(frame_img, hash_size=self.cfg.hash_size)) + + def sample(self, video_path: str) -> Iterable[SampleFrame]: + """Generate sample frames from a video""" + with av.open(video_path) as container: + stream = container.streams.video[0] + if self.cfg.keyframes_only: + stream.codec_context.skip_frame = "NONKEY" + prev_time = -10 + for frame_indx, frame in enumerate(container.decode(stream)): + # skip frames if keyframes_only is True + time_diff = frame.time - prev_time + self.stats["total"] += 1 + if time_diff < self.cfg.min_frame_interval_sec: + continue + prev_time = frame.time + + frame_pil: Image = frame.to_image() + frame_hash = self.compute_hash(frame_pil) + + res = self.hash_buf.add( + frame_pil, + frame_hash, + metadata={"frame_time": frame.time, "frame_indx": frame_indx}, + ) + self.stats["decoded"] += 1 + if res: + self.stats["produced"] += 1 + yield res + + # flush buffer + for _, item in self.hash_buf.ordered_buffer.items(): + if item: + self.stats["produced"] += 1 + yield item + yield None, {"end": True} + + def write_queue(self, video_path: str, q: Queue[SampleFrame]) -> None: + for item in self.sample(video_path=video_path): + q.put(item) + + +class Worker: + def __init__(self, cfg: SamplerConfig) -> None: + self.cfg = cfg + self.processor = VideoSampler(cfg=cfg) + self.q: Queue[SampleFrame] = Queue() + + def launch(self, video_path: str, output_path: str) -> None: + os.makedirs(output_path, exist_ok=True) + proc_thread = Thread( + target=self.processor.write_queue, args=(video_path, self.q) + ) + proc_thread.start() + self.queue_reader(output_path, read_interval=self.cfg.queue_wait) + proc_thread.join() + + def queue_reader(self, output_path: str, read_interval: float = 0.1) -> None: + while True: + if not self.q.empty(): + item = self.q.get() + frame, metadata = item + if frame is not None: + if isinstance(frame, Image.Image): + frame.save( + os.path.join(output_path, f"{metadata['frame_time']}.jpg") + ) + if metadata.get("end", False): + break + time.sleep(read_interval) + class Videolab(Imagelab): pass From ecbd253f4cbb867b88f8cd8e00b7d9e6cc015e67 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Wed, 11 Oct 2023 18:41:00 -0400 Subject: [PATCH 03/12] Refactoring to minimal frame sampler (#2) --- src/cleanvision/videolab.py | 156 +++++++++--------------------------- 1 file changed, 37 insertions(+), 119 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index b71b9507..e0594ccf 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -1,138 +1,56 @@ """Videolab is an extension of Imagelab for finding issues in a video dataset.""" -import os -import time -from collections import Counter, OrderedDict -from dataclasses import dataclass -from queue import Queue -from threading import Thread -from typing import Any, Dict, Iterable, Optional, Tuple, Union - -import av # type: ignore -from imagehash import phash -from PIL import Image +from pathlib import Path +from typing import Any +from typing import Dict +from typing import Generator +from typing import Optional +import av from cleanvision.imagelab import Imagelab +from PIL.Image import Image -# custom type -SampleFrame = Tuple[Union[Image.Image, None], Dict[str, bool]] - - -@dataclass -class SamplerConfig: - min_frame_interval_sec: float = 1 - keyframes_only: bool = True - buffer_size: int = 10 - hash_size: int = 4 - queue_wait: float = 0.1 - debug: bool = False +VIDEO_FILE_EXTENSIONS = ["*.mp4", "*.avi", "*.mkv", "*.mov", "*.webm"] -class HashBuffer: - def __init__(self, size: int) -> None: - self.ordered_buffer: OrderedDict[ - str, Tuple[Image, Dict[str, Any]] - ] = OrderedDict() - self.max_size = size - def add( - self, item: Image, hash_: str, metadata: Dict[str, Any] = {} - ) -> Optional[Any]: - if not self.__check_duplicate(hash_): - return self.__add(item, hash_, metadata) - return None +class FrameSampler: + """Simplest frame sampling strategy.""" - def __add( - self, item: Image, hash_: str, metadata: Dict[str, Any] = {} - ) -> Optional[Any]: - self.ordered_buffer[hash_] = (item, metadata) - if len(self.ordered_buffer) >= self.max_size: - return self.ordered_buffer.popitem(last=False)[1] - return None + def __init__(self, k: int) -> None: + """Store frame sample interval k.""" + self.k = k - def __check_duplicate(self, hash_: str) -> bool: - if hash_ in self.ordered_buffer: - self.ordered_buffer.move_to_end(hash_) - return True - return False + def _create_frame_sample_sub_dir(self, video_file: Path, output_dir: Path) -> Path: + """Create a unique sub direcotry for storing frame samples from a video file.""" + # create new sub directory from video file name + sub_dir = output_dir / video_file.name + sub_dir.mkdir(parents=True) + # return path to new sub dir + return sub_dir -class VideoSampler: - def __init__(self, cfg: SamplerConfig) -> None: - self.cfg = cfg - self.hash_buf = HashBuffer(cfg.buffer_size) - self.stats: Counter[str] = Counter() + def sample(self, video_file: Path, output_dir: Path) -> None: + """Loop through frames and store every k-th frame.""" + # create frame samples sub directory + sample_sub_dir = self._create_frame_sample_sub_dir(video_file, output_dir) - def compute_hash(self, frame_img: Image) -> str: - return str(phash(frame_img, hash_size=self.cfg.hash_size)) - - def sample(self, video_path: str) -> Iterable[SampleFrame]: - """Generate sample frames from a video""" - with av.open(video_path) as container: + # open video file for streaming + with av.open(str(video_file)) as container: + # get video stream stream = container.streams.video[0] - if self.cfg.keyframes_only: - stream.codec_context.skip_frame = "NONKEY" - prev_time = -10 - for frame_indx, frame in enumerate(container.decode(stream)): - # skip frames if keyframes_only is True - time_diff = frame.time - prev_time - self.stats["total"] += 1 - if time_diff < self.cfg.min_frame_interval_sec: - continue - prev_time = frame.time - - frame_pil: Image = frame.to_image() - frame_hash = self.compute_hash(frame_pil) - res = self.hash_buf.add( - frame_pil, - frame_hash, - metadata={"frame_time": frame.time, "frame_indx": frame_indx}, - ) - self.stats["decoded"] += 1 - if res: - self.stats["produced"] += 1 - yield res - - # flush buffer - for _, item in self.hash_buf.ordered_buffer.items(): - if item: - self.stats["produced"] += 1 - yield item - yield None, {"end": True} - - def write_queue(self, video_path: str, q: Queue[SampleFrame]) -> None: - for item in self.sample(video_path=video_path): - q.put(item) - - -class Worker: - def __init__(self, cfg: SamplerConfig) -> None: - self.cfg = cfg - self.processor = VideoSampler(cfg=cfg) - self.q: Queue[SampleFrame] = Queue() + # iterate frames + for frame_indx, frame in enumerate(container.decode(stream)): + # check for k-th frame + if not frame_indx % self.k: + # get PIL image + frame_pil: Image = frame.to_image() - def launch(self, video_path: str, output_path: str) -> None: - os.makedirs(output_path, exist_ok=True) - proc_thread = Thread( - target=self.processor.write_queue, args=(video_path, self.q) - ) - proc_thread.start() - self.queue_reader(output_path, read_interval=self.cfg.queue_wait) - proc_thread.join() + # use frame timestamp as image file name + image_file_name = str(frame.time) + ".jpg" - def queue_reader(self, output_path: str, read_interval: float = 0.1) -> None: - while True: - if not self.q.empty(): - item = self.q.get() - frame, metadata = item - if frame is not None: - if isinstance(frame, Image.Image): - frame.save( - os.path.join(output_path, f"{metadata['frame_time']}.jpg") - ) - if metadata.get("end", False): - break - time.sleep(read_interval) + # save to output dir + frame_pil.save(sample_sub_dir / image_file_name) class Videolab(Imagelab): From 6f59e021586f0e0b2a0f8a3c00189bfabdca8508 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Wed, 11 Oct 2023 19:07:19 -0400 Subject: [PATCH 04/12] Implementing minimum viable Videolab class (#2) --- src/cleanvision/videolab.py | 75 ++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index e0594ccf..99de9307 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -53,5 +53,76 @@ def sample(self, video_file: Path, output_dir: Path) -> None: frame_pil.save(sample_sub_dir / image_file_name) -class Videolab(Imagelab): - pass +class Videolab: + """A single class to find all types of issues in video datasets.""" + + def __init__( + self, + video_dir: str, + ) -> None: + """Create Path object from video directory string.""" + self.video_dir: Path = Path(video_dir) + self.imagelab: Optional[Imagelab] = None + + def _find_videos(self) -> Generator[Path, None, None]: + """Iterate over video files in video directory.""" + # iterate over video file extensions + for ext in VIDEO_FILE_EXTENSIONS: + # loop through video paths matching ext + yield from self.video_dir.glob(f"**/{ext}") + + def _sample_frames(self, samples_dir: Path, sample_interval: int) -> None: + """Get sample frames.""" + # setup frame sampler + frame_sampler = FrameSampler(sample_interval) + + # iterate over video files in video data directory + for video_file in self._find_videos(): + # sample frames from target video data directory + frame_sampler.sample(video_file, samples_dir) + + def find_issues( + self, + frame_samples_dir: str, + frame_samples_interval: int, + issue_types: Optional[Dict[str, Any]] = None, + n_jobs: Optional[int] = None, + verbose: bool = True, + ) -> None: + """Sampe frames before call Imagelab.find_issues.""" + # create sample frames + self._sample_frames(Path(frame_samples_dir), frame_samples_interval) + + # create Imagelab instance + self.imagelab = Imagelab(frame_samples_dir) + + # call Imagelab to find issues in sampled frames + self.imagelab.find_issues(issue_types, n_jobs, verbose) + + def report( + self, + issue_types: Optional[List[str]] = None, + max_prevalence: Optional[float] = None, + num_images: Optional[int] = None, + verbosity: int = 1, + print_summary: bool = True, + show_id: bool = False, + ) -> None: + """Prints summary of the issues found in your dataset.""" + # check if imagelab instance exists + if self.imagelab is None: + print( + "Please specify some issue_types to" + "check for in videolab.find_issues()." + ) + + else: + # report on video frame samples + self.imagelab.report( + issue_types, + max_prevalence, + num_images, + verbosity, + print_summary, + show_id, + ) From 0f96d503877a2325c5b35c1c3fdeaf4026d50392 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Wed, 11 Oct 2023 19:22:35 -0400 Subject: [PATCH 05/12] Importing List type. --- src/cleanvision/videolab.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 99de9307..3bba12b1 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -3,6 +3,7 @@ from typing import Any from typing import Dict from typing import Generator +from typing import List from typing import Optional import av From 43584bcb5554d52236d657d1fb9e7f1664eaf101 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Tue, 17 Oct 2023 16:20:50 -0400 Subject: [PATCH 06/12] Adding aggregation feature (#2) --- src/cleanvision/videolab.py | 185 ++++++++++++++++++++++++++++++------ 1 file changed, 158 insertions(+), 27 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 3bba12b1..9d38b63d 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -1,15 +1,13 @@ """Videolab is an extension of Imagelab for finding issues in a video dataset.""" from pathlib import Path -from typing import Any -from typing import Dict -from typing import Generator -from typing import List -from typing import Optional +from typing import Any, Dict, Generator, List, Optional import av -from cleanvision.imagelab import Imagelab +import pandas as pd from PIL.Image import Image +from cleanvision.imagelab import Imagelab +from cleanvision.utils.utils import get_is_issue_colname VIDEO_FILE_EXTENSIONS = ["*.mp4", "*.avi", "*.mkv", "*.mov", "*.webm"] @@ -54,7 +52,7 @@ def sample(self, video_file: Path, output_dir: Path) -> None: frame_pil.save(sample_sub_dir / image_file_name) -class Videolab: +class Videolab(Imagelab): """A single class to find all types of issues in video datasets.""" def __init__( @@ -62,8 +60,8 @@ def __init__( video_dir: str, ) -> None: """Create Path object from video directory string.""" + # store video directory path self.video_dir: Path = Path(video_dir) - self.imagelab: Optional[Imagelab] = None def _find_videos(self) -> Generator[Path, None, None]: """Iterate over video files in video directory.""" @@ -82,6 +80,85 @@ def _sample_frames(self, samples_dir: Path, sample_interval: int) -> None: # sample frames from target video data directory frame_sampler.sample(video_file, samples_dir) + def _parent_dir_frame_samples_dict(self) -> Dict[str, List[str]]: + """Creates dictionary of parent directory and frame samples.""" + # set dict + cluster_frame_samples: Dict[str, List[str]] = {} + + # looper over index + for img_path in self.issues.index: + # get frame sample parent + sample_dir = Path(img_path).parents[0] + + # get key + key = str(sample_dir) + + # check if key exists + if key in cluster_frame_samples: + # update + cluster_frame_samples[key].append(img_path) + + else: + # create new entry + cluster_frame_samples[key] = [img_path] + + # get cluster dict + return cluster_frame_samples + + def _aggregate_issues(self) -> pd.DataFrame: + """Aggregate Imagelab issues into a single frame for each video.""" + # convert booleans to floats + pure_float_issues = self.issues * 1 + + # store new aggregate_issues + aggregate_issues = [] + + # loop over clusters + for _, indexes in self._parent_dir_frame_samples_dict().items(): + # get all frame issues for sample_dir subset + frame_issues = pure_float_issues.loc[indexes] + + # calculate new index + new_index = indexes[int(len(indexes) / 2)] + + # create aggregated scores df + aggregate_issues.append( + pd.DataFrame(frame_issues.mean().to_dict(), index=[new_index]) + ) + + # finally create a new DataFrame of all aggregate results + agg_df = pd.concat(aggregate_issues) + + # create lists of columns + issue_columns = [get_is_issue_colname(issue) for issue in self._issue_types] + + # convert float represent average booleans back to booleans + agg_df[issue_columns] = agg_df[issue_columns].astype(bool) + + # return the aggregated dataframe + return agg_df + + def _aggregate_summary(self) -> pd.DataFrame: + """Create issues summary for aggregate issues.""" + # setup issue summary storage + summary_dict = {} + + # loop over issue type + for issue_type in self._issue_types: + # add individual type summaries + summary_dict[issue_type] = { + "num_images": self.agg_issues[get_is_issue_colname(issue_type)].sum() + } + + # reshape summary dataframe + agg_summary = pd.DataFrame.from_dict(summary_dict, orient="index") + agg_summary = agg_summary.reset_index() + agg_summary = agg_summary.rename(columns={"index": "issue_type"}) + agg_summary = agg_summary.astype({"num_images": int, "issue_type": str}) + + # return aggregate summary + return agg_summary + def find_issues( self, frame_samples_dir: str, @@ -94,13 +171,17 @@ def find_issues( # create sample frames self._sample_frames(Path(frame_samples_dir), frame_samples_interval) - # create Imagelab instance - self.imagelab = Imagelab(frame_samples_dir) + # call parent constructor + super().__init__(frame_samples_dir) - # call Imagelab to find issues in sampled frames - self.imagelab.find_issues(issue_types, n_jobs, verbose) + # call parent find_issues on sampled frames + super().find_issues(issue_types, n_jobs, verbose) - def report( + # update aggregate issues/summary + self.agg_issues = self._aggregate_issues() + self.agg_summary = self._aggregate_summary() + + def _aggregate_report( self, issue_types: Optional[List[str]] = None, max_prevalence: Optional[float] = None, @@ -109,21 +190,71 @@ def report( print_summary: bool = True, show_id: bool = False, ) -> None: - """Prints summary of the issues found in your dataset.""" - # check if imagelab instance exists - if self.imagelab is None: + """Create report visualization for aggregate issues.""" + assert isinstance(verbosity, int) and 0 <= verbosity < 5 + + user_supplied_args = locals() + report_args = self._get_report_args(user_supplied_args) + + issue_types_to_report = ( + issue_types if issue_types else self.agg_summary["issue_type"].tolist() + ) + + # filter issues based on max_prevalence in the dataset + filtered_issue_types = self._filter_report( + issue_types_to_report, report_args["max_prevalence"] + ) + + issue_summary = self.agg_summary[ + self.agg_summary["issue_type"].isin(filtered_issue_types) + ] + if len(issue_summary) > 0: + if verbosity: + print("Issues found in videos in order of severity in the dataset\n") + if print_summary: + self._pprint_issue_summary(issue_summary) + for issue_type in filtered_issue_types: + if ( + self.agg_summary.query(f"issue_type == {issue_type!r}")[ + "num_images" + ].values[0] + == 0 + ): + continue + print(f"{' ' + issue_type + ' frames ':-^60}\n") + print( + f"Number of examples with this issue: " + f"{self.agg_issues[get_is_issue_colname(issue_type)].sum()}\n" + f"Examples representing most severe instances of this issue:\n" + ) + self._visualize( + issue_type, + report_args["num_images"], + report_args["cell_size"], + show_id, + ) + else: print( - "Please specify some issue_types to" + "Please specify some issue_types to " "check for in videolab.find_issues()." ) - else: - # report on video frame samples - self.imagelab.report( - issue_types, - max_prevalence, - num_images, - verbosity, - print_summary, - show_id, - ) + def report( + self, + issue_types: Optional[List[str]] = None, + max_prevalence: Optional[float] = None, + num_images: Optional[int] = None, + verbosity: int = 1, + print_summary: bool = True, + show_id: bool = False, + ) -> None: + """Prints summary of the issues found in your dataset.""" + # report on video frame samples + self._aggregate_report( + issue_types, + max_prevalence, + num_images, + verbosity, + print_summary, + show_id, + ) From 4fa1ca7529c5a7ab4fd95eed19ba7e49131eb0b4 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Sun, 12 Nov 2023 08:14:55 -0500 Subject: [PATCH 07/12] Refactoring to composition --- src/cleanvision/videolab.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 9d38b63d..a1e8321f 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -52,7 +52,7 @@ def sample(self, video_file: Path, output_dir: Path) -> None: frame_pil.save(sample_sub_dir / image_file_name) -class Videolab(Imagelab): +class Videolab: """A single class to find all types of issues in video datasets.""" def __init__( @@ -86,7 +86,7 @@ def _parent_dir_frame_samples_dict(self) -> Dict[str, List[str]]: cluster_frame_samples: Dict[str, List[str]] = {} # looper over index - for img_path in self.issues.index: + for img_path in self.imagelab.issues.index: # get frame sample parent sample_dir = Path(img_path).parents[0] @@ -108,7 +108,7 @@ def _parent_dir_frame_samples_dict(self) -> Dict[str, List[str]]: def _aggregate_issues(self) -> pd.DataFrame: """Aggregate Imagelab issues into a single frame for each video.""" # convert booleans to floats - pure_float_issues = self.issues * 1 + pure_float_issues = self.imagelab.issues * 1 # store new aggregate_issues aggregate_issues = [] @@ -130,7 +130,9 @@ def _aggregate_issues(self) -> pd.DataFrame: agg_df = pd.concat(aggregate_issues) # create lists of columns - issue_columns = [get_is_issue_colname(issue) for issue in self._issue_types] + issue_columns = [ + get_is_issue_colname(issue) for issue in self.imagelab._issue_types + ] # convert float represent average booleans back to booleans agg_df[issue_columns] = agg_df[issue_columns].astype(bool) @@ -144,7 +146,7 @@ def _aggregate_summary(self) -> pd.DataFrame: summary_dict = {} # loop over issue type - for issue_type in self._issue_types: + for issue_type in self.imagelab._issue_types: # add individual type summaries summary_dict[issue_type] = { "num_images": self.agg_issues[get_is_issue_colname(issue_type)].sum() @@ -167,15 +169,15 @@ def find_issues( n_jobs: Optional[int] = None, verbose: bool = True, ) -> None: - """Sampe frames before call Imagelab.find_issues.""" + """Sample frames before calling find_issues and aggregating.""" # create sample frames self._sample_frames(Path(frame_samples_dir), frame_samples_interval) - # call parent constructor - super().__init__(frame_samples_dir) + # get imagelab instance + self.imagelab = Imagelab(frame_samples_dir) - # call parent find_issues on sampled frames - super().find_issues(issue_types, n_jobs, verbose) + # use imagelab to find issues in frames + self.imagelab.find_issues(issue_types, n_jobs, verbose) # update aggregate issues/summary self.agg_issues = self._aggregate_issues() @@ -194,14 +196,14 @@ def _aggregate_report( assert isinstance(verbosity, int) and 0 <= verbosity < 5 user_supplied_args = locals() - report_args = self._get_report_args(user_supplied_args) + report_args = self.imagelab._get_report_args(user_supplied_args) issue_types_to_report = ( issue_types if issue_types else self.agg_summary["issue_type"].tolist() ) # filter issues based on max_prevalence in the dataset - filtered_issue_types = self._filter_report( + filtered_issue_types = self.imagelab._filter_report( issue_types_to_report, report_args["max_prevalence"] ) @@ -212,7 +214,7 @@ def _aggregate_report( if verbosity: print("Issues found in videos in order of severity in the dataset\n") if print_summary: - self._pprint_issue_summary(issue_summary) + self.imagelab._pprint_issue_summary(issue_summary) for issue_type in filtered_issue_types: if ( self.agg_summary.query(f"issue_type == {issue_type!r}")[ @@ -227,7 +229,7 @@ def _aggregate_report( f"{self.agg_issues[get_is_issue_colname(issue_type)].sum()}\n" f"Examples representing most severe instances of this issue:\n" ) - self._visualize( + self.imagelab._visualize( issue_type, report_args["num_images"], report_args["cell_size"], @@ -248,7 +250,7 @@ def report( print_summary: bool = True, show_id: bool = False, ) -> None: - """Prints summary of the issues found in your dataset.""" + """Prints summary of the aggregate issues found in your dataset.""" # report on video frame samples self._aggregate_report( issue_types, From 392462009272f333600b6496334cbca56f5408ef Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Mon, 13 Nov 2023 19:23:01 -0500 Subject: [PATCH 08/12] Refactoring to PyAV lazy import strategy --- pyproject.toml | 5 ++--- src/cleanvision/videolab.py | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 31245018..519e482a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,6 @@ classifiers = [ ] dependencies = [ "Pillow>=9.3", - "av>=10.0.0", "numpy>=1.20.0", "pandas>=1.1.5", "tabulate>=0.8.3", # pandas optional dependency for .to_markdown() @@ -47,11 +46,11 @@ pytorch = ["torchvision>=0.12.0"] azure = ["adlfs>=2022.2.0"] # latest compatible with Python 3.7 gcs = ["gcsfs>=2022.1.0"] # latest compatible with Python 3.7 s3 = ["s3fs>=2023.1.0"] # latest compatible with Python 3.7 +video = ["av>=10.0.0"] -all = ["cleanvision[huggingface,pytorch,azure,gcs,s3]"] +all = ["cleanvision[huggingface,pytorch,azure,gcs,s3,video]"] [project.urls] "Source" = "https://github.com/cleanlab/cleanvision" "Bug Tracker" = "https://github.com/cleanlab/cleanvision/issues" "Documentation" = "https://cleanvision.readthedocs.io/" - diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index a1e8321f..9693c362 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -1,8 +1,8 @@ """Videolab is an extension of Imagelab for finding issues in a video dataset.""" +from importlib import import_module from pathlib import Path from typing import Any, Dict, Generator, List, Optional -import av import pandas as pd from PIL.Image import Image @@ -16,9 +16,19 @@ class FrameSampler: """Simplest frame sampling strategy.""" def __init__(self, k: int) -> None: - """Store frame sample interval k.""" + """Store frame sample interval k and import PyAV.""" + # storing frame sampling interval self.k = k + # attempting to import PyAV + try: + self.av = import_module("av") + except ImportError as error: + raise ImportError( + "Cannot import package `av`. " + "Please install it via `pip install av` and then try again." + ) from error + def _create_frame_sample_sub_dir(self, video_file: Path, output_dir: Path) -> Path: """Create a unique sub direcotry for storing frame samples from a video file.""" # create new sub directory from video file name @@ -34,7 +44,7 @@ def sample(self, video_file: Path, output_dir: Path) -> None: sample_sub_dir = self._create_frame_sample_sub_dir(video_file, output_dir) # open video file for streaming - with av.open(str(video_file)) as container: + with self.av.open(str(video_file)) as container: # get video stream stream = container.streams.video[0] From cfb4981e040c7ffb17c369bc615845f4991f96bb Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Tue, 14 Nov 2023 20:24:32 -0500 Subject: [PATCH 09/12] Implementing VideoDataset class --- src/cleanvision/videolab.py | 116 +++++++++++++++++++++++++----------- 1 file changed, 81 insertions(+), 35 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 9693c362..34645f01 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -1,17 +1,66 @@ """Videolab is an extension of Imagelab for finding issues in a video dataset.""" from importlib import import_module from pathlib import Path -from typing import Any, Dict, Generator, List, Optional +from typing import Any, Dict, Generator, Iterator, List, Optional, Union import pandas as pd from PIL.Image import Image +from tqdm.auto import tqdm +from cleanvision.dataset.base_dataset import Dataset from cleanvision.imagelab import Imagelab from cleanvision.utils.utils import get_is_issue_colname VIDEO_FILE_EXTENSIONS = ["*.mp4", "*.avi", "*.mkv", "*.mov", "*.webm"] +class VideoDataset(Dataset): + """Wrapper class to handle video datasets.""" + + def __init__( + self, + data_folder: Optional[str] = None, + filepaths: Optional[List[str]] = None, + ) -> None: + """Determine video dataset source and populate index.""" + # check if data folder is given + if data_folder: + # get filepaths from video dataset directory + self._filepaths = [ + str(path) for path in self.__get_filepaths(Path(data_folder)) + ] + + else: + # store user supplied video file paths + assert filepaths is not None + self._filepaths = filepaths + + # create index + self._set_index() + + def __len__(self) -> int: + """Get video dataset file count.""" + return len(self.index) + + def __iter__(self) -> Iterator[Union[int, str]]: + """Defining the iteration behavior.""" + return iter(self.index) + + def _set_index(self) -> None: + """Create internal storage for filepaths.""" + self.index = [path for path in self._filepaths] + + def __get_filepaths(self, dataset_path: Path) -> Generator[Path, None, None]: + """Scan file system for video files and grab their file paths.""" + # notify user + print(f"Reading videos from {dataset_path}") + + # iterate over video file extensions + for ext in VIDEO_FILE_EXTENSIONS: + # loop through video paths matching ext + yield from dataset_path.glob(f"**/{ext}") + + class FrameSampler: """Simplest frame sampling strategy.""" @@ -29,37 +78,42 @@ def __init__(self, k: int) -> None: "Please install it via `pip install av` and then try again." ) from error - def _create_frame_sample_sub_dir(self, video_file: Path, output_dir: Path) -> Path: + def _create_frame_sample_sub_dir(self, output_dir: Path, idx: int) -> Path: """Create a unique sub direcotry for storing frame samples from a video file.""" - # create new sub directory from video file name - sub_dir = output_dir / video_file.name + # create new sub directory from video_dataset index + sub_dir = output_dir / str(idx) sub_dir.mkdir(parents=True) # return path to new sub dir return sub_dir - def sample(self, video_file: Path, output_dir: Path) -> None: + def sample(self, video_dataset: VideoDataset, output_dir: Path) -> None: """Loop through frames and store every k-th frame.""" - # create frame samples sub directory - sample_sub_dir = self._create_frame_sample_sub_dir(video_file, output_dir) + # notify of sampling + print(f"Sampling frames at every {self.k} frames ...") + + # iterate over video files in video data directory + for idx, video_file in enumerate(tqdm(video_dataset)): + # create frame samples sub directory + sample_sub_dir = self._create_frame_sample_sub_dir(output_dir, idx) - # open video file for streaming - with self.av.open(str(video_file)) as container: - # get video stream - stream = container.streams.video[0] + # open video file for streaming + with self.av.open(str(video_file)) as container: + # get video stream + stream = container.streams.video[0] - # iterate frames - for frame_indx, frame in enumerate(container.decode(stream)): - # check for k-th frame - if not frame_indx % self.k: - # get PIL image - frame_pil: Image = frame.to_image() + # iterate frames + for frame_indx, frame in enumerate(container.decode(stream)): + # check for k-th frame + if not frame_indx % self.k: + # get PIL image + frame_pil: Image = frame.to_image() - # use frame timestamp as image file name - image_file_name = str(frame.time) + ".jpg" + # use frame timestamp as image file name + image_file_name = str(frame.time) + ".jpg" - # save to output dir - frame_pil.save(sample_sub_dir / image_file_name) + # save to output dir + frame_pil.save(sample_sub_dir / image_file_name) class Videolab: @@ -67,28 +121,20 @@ class Videolab: def __init__( self, - video_dir: str, + video_dir: Optional[str] = None, + video_filepaths: Optional[List[str]] = None, ) -> None: """Create Path object from video directory string.""" - # store video directory path - self.video_dir: Path = Path(video_dir) - - def _find_videos(self) -> Generator[Path, None, None]: - """Iterate over video files in video directory.""" - # iterate over video file extensions - for ext in VIDEO_FILE_EXTENSIONS: - # loop through video paths matching ext - yield from self.video_dir.glob(f"**/{ext}") + # store video dataset + self.video_dataset: VideoDataset = VideoDataset(video_dir, video_filepaths) def _sample_frames(self, samples_dir: Path, sample_interval: int) -> None: """Get sample frames.""" # setup frame sampler frame_sampler = FrameSampler(sample_interval) - # iterate over video files in video data directory - for video_file in self._find_videos(): - # sample frames from target video data directory - frame_sampler.sample(video_file, samples_dir) + # sample frames from target video data directory + frame_sampler.sample(self.video_dataset, samples_dir) def _parent_dir_frame_samples_dict(self) -> Dict[str, List[str]]: """Creates dictionary of parent directory and frame samples.""" From f8e8c2129c123d5da28bc2246da898f4a46c0d44 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Wed, 15 Nov 2023 08:02:46 -0500 Subject: [PATCH 10/12] Adding default video issue types --- src/cleanvision/videolab.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 34645f01..258a0101 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -217,6 +217,39 @@ def _aggregate_summary(self) -> pd.DataFrame: # return aggregate summary return agg_summary + @staticmethod + def list_default_issue_types() -> List[str]: + """Returns list of the default issue types.""" + return [ + issue + for issue in Imagelab.list_default_issue_types() + if "duplicates" not in issue + ] + + @staticmethod + def list_possible_issue_types() -> List[str]: + """Returns list of all possible issue types including custom issues.""" + return Imagelab.list_possible_issue_types() + + def _get_issues_to_compute( + self, issue_types_with_params: Optional[Dict[str, Any]] + ) -> Dict[str, Any]: + """Configure default issue types if needed.""" + # use defaults if no issue types supplied + if not issue_types_with_params: + to_compute_issues_with_params: Dict[str, Any] = { + issue_type: {} for issue_type in self.list_default_issue_types() + } + + # ... just get user supplied issue types + else: + to_compute_issues_with_params = { + issue_type_str: params + for issue_type_str, params in issue_types_with_params.items() + } + + return to_compute_issues_with_params + def find_issues( self, frame_samples_dir: str, @@ -232,6 +265,9 @@ def find_issues( # get imagelab instance self.imagelab = Imagelab(frame_samples_dir) + # get default issues if user supplied none + issue_types = self._get_issues_to_compute(issue_types) + # use imagelab to find issues in frames self.imagelab.find_issues(issue_types, n_jobs, verbose) From 861fac0ca4808761f5fe4bad011f1c08646d216e Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Sat, 18 Nov 2023 08:29:14 -0500 Subject: [PATCH 11/12] Adding save, load, get_stats methods --- src/cleanvision/videolab.py | 202 ++++++++++++++++++++---------------- 1 file changed, 112 insertions(+), 90 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 258a0101..2d7178b6 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -1,7 +1,11 @@ """Videolab is an extension of Imagelab for finding issues in a video dataset.""" +from __future__ import annotations + +import pickle +from copy import deepcopy from importlib import import_module from pathlib import Path -from typing import Any, Dict, Generator, Iterator, List, Optional, Union +from typing import Any, Dict, Generator, Iterator, List, Optional, Type, TypeVar, Union import pandas as pd from PIL.Image import Image @@ -11,8 +15,14 @@ from cleanvision.imagelab import Imagelab from cleanvision.utils.utils import get_is_issue_colname +OBJECT_FILENAME = "videolab.pkl" +ISSUES_FILENAME = "frame_issues.csv" +ISSUE_SUMMARY_FILENAME = "frame_issue_summary.csv" VIDEO_FILE_EXTENSIONS = ["*.mp4", "*.avi", "*.mkv", "*.mov", "*.webm"] +__all__ = ["Videolab"] +TVideolab = TypeVar("TVideolab", bound="Videolab") + class VideoDataset(Dataset): """Wrapper class to handle video datasets.""" @@ -136,13 +146,16 @@ def _sample_frames(self, samples_dir: Path, sample_interval: int) -> None: # sample frames from target video data directory frame_sampler.sample(self.video_dataset, samples_dir) - def _parent_dir_frame_samples_dict(self) -> Dict[str, List[str]]: + @staticmethod + def _parent_dir_frame_samples_dict( + frame_issues: pd.Dataframe, + ) -> Dict[str, List[str]]: """Creates dictionary of parent directory and frame samples.""" # set dict cluster_frame_samples: Dict[str, List[str]] = {} # looper over index - for img_path in self.imagelab.issues.index: + for img_path in frame_issues.index: # get frame sample parent sample_dir = Path(img_path).parents[0] @@ -161,16 +174,16 @@ def _parent_dir_frame_samples_dict(self) -> Dict[str, List[str]]: # get cluster dict return cluster_frame_samples - def _aggregate_issues(self) -> pd.DataFrame: + def _aggregate_issues(self, frame_issues: pd.Dataframe) -> pd.DataFrame: """Aggregate Imagelab issues into a single frame for each video.""" # convert booleans to floats - pure_float_issues = self.imagelab.issues * 1 + pure_float_issues = frame_issues * 1 # store new aggregate_issues aggregate_issues = [] # loop over clusters - for _, indexes in self._parent_dir_frame_samples_dict().items(): + for _, indexes in self._parent_dir_frame_samples_dict(frame_issues).items(): # get all frame issues for sample_dir subset frame_issues = pure_float_issues.loc[indexes] @@ -196,7 +209,7 @@ def _aggregate_issues(self) -> pd.DataFrame: # return the aggregated dataframe return agg_df - def _aggregate_summary(self) -> pd.DataFrame: + def _aggregate_summary(self, aggregate_issues: pd.Dataframe) -> pd.DataFrame: """Create issues summary for aggregate issues.""" # setup issue summary storage summary_dict = {} @@ -205,7 +218,7 @@ def _aggregate_summary(self) -> pd.DataFrame: for issue_type in self.imagelab._issue_types: # add individual type summaries summary_dict[issue_type] = { - "num_images": self.agg_issues[get_is_issue_colname(issue_type)].sum() + "num_images": aggregate_issues[get_is_issue_colname(issue_type)].sum() } # reshape summary dataframe @@ -231,25 +244,6 @@ def list_possible_issue_types() -> List[str]: """Returns list of all possible issue types including custom issues.""" return Imagelab.list_possible_issue_types() - def _get_issues_to_compute( - self, issue_types_with_params: Optional[Dict[str, Any]] - ) -> Dict[str, Any]: - """Configure default issue types if needed.""" - # use defaults if no issue types supplied - if not issue_types_with_params: - to_compute_issues_with_params: Dict[str, Any] = { - issue_type: {} for issue_type in self.list_default_issue_types() - } - - # ... just get user supplied issue types - else: - to_compute_issues_with_params = { - issue_type_str: params - for issue_type_str, params in issue_types_with_params.items() - } - - return to_compute_issues_with_params - def find_issues( self, frame_samples_dir: str, @@ -265,73 +259,21 @@ def find_issues( # get imagelab instance self.imagelab = Imagelab(frame_samples_dir) - # get default issues if user supplied none - issue_types = self._get_issues_to_compute(issue_types) + # set default issue types + setattr( + self.imagelab, "list_default_issue_types", self.list_default_issue_types + ) # use imagelab to find issues in frames self.imagelab.find_issues(issue_types, n_jobs, verbose) - # update aggregate issues/summary - self.agg_issues = self._aggregate_issues() - self.agg_summary = self._aggregate_summary() - - def _aggregate_report( - self, - issue_types: Optional[List[str]] = None, - max_prevalence: Optional[float] = None, - num_images: Optional[int] = None, - verbosity: int = 1, - print_summary: bool = True, - show_id: bool = False, - ) -> None: - """Create report visualization for aggregate issues.""" - assert isinstance(verbosity, int) and 0 <= verbosity < 5 - - user_supplied_args = locals() - report_args = self.imagelab._get_report_args(user_supplied_args) + # get frame issues + self.frame_issues = self.imagelab.issues + self.frame_issue_summary = self.imagelab.issue_summary - issue_types_to_report = ( - issue_types if issue_types else self.agg_summary["issue_type"].tolist() - ) - - # filter issues based on max_prevalence in the dataset - filtered_issue_types = self.imagelab._filter_report( - issue_types_to_report, report_args["max_prevalence"] - ) - - issue_summary = self.agg_summary[ - self.agg_summary["issue_type"].isin(filtered_issue_types) - ] - if len(issue_summary) > 0: - if verbosity: - print("Issues found in videos in order of severity in the dataset\n") - if print_summary: - self.imagelab._pprint_issue_summary(issue_summary) - for issue_type in filtered_issue_types: - if ( - self.agg_summary.query(f"issue_type == {issue_type!r}")[ - "num_images" - ].values[0] - == 0 - ): - continue - print(f"{' ' + issue_type + ' frames ':-^60}\n") - print( - f"Number of examples with this issue: " - f"{self.agg_issues[get_is_issue_colname(issue_type)].sum()}\n" - f"Examples representing most severe instances of this issue:\n" - ) - self.imagelab._visualize( - issue_type, - report_args["num_images"], - report_args["cell_size"], - show_id, - ) - else: - print( - "Please specify some issue_types to " - "check for in videolab.find_issues()." - ) + # update aggregate issues/summary + self.imagelab.issues = self._aggregate_issues(self.frame_issues) + self.imagelab.issue_summary = self._aggregate_summary(self.imagelab.issues) def report( self, @@ -344,7 +286,7 @@ def report( ) -> None: """Prints summary of the aggregate issues found in your dataset.""" # report on video frame samples - self._aggregate_report( + self.imagelab.report( issue_types, max_prevalence, num_images, @@ -352,3 +294,83 @@ def report( print_summary, show_id, ) + + def get_stats(self) -> Any: + """Returns dict of statistics computed from video frames.""" + return self.imagelab.info["statistics"] + + def save(self, path: str, force: bool = False) -> None: + """Saves this Videolab instance.""" + # get pathlib Path object + root_save_path = Path(path) + + # check if videolab root save path exists + if not root_save_path.exists(): + # create otherwise + root_save_path.mkdir(parents=True, exist_ok=True) + else: + if not force: + raise FileExistsError("Please specify a new path or set force=True") + print( + "WARNING: Existing files will be overwritten " + f"by newly saved files at: {root_save_path}" + ) + + # create specific imagelab sub directory + imagelab_sub_dir = str(root_save_path / "imagelab") + + # now save imagelab to subdir + self.imagelab.save(imagelab_sub_dir, force) + + # save aggregate dataframes + self.frame_issues.to_csv(root_save_path / ISSUES_FILENAME) + self.frame_issue_summary.to_csv(root_save_path / ISSUE_SUMMARY_FILENAME) + + # copy videolab object + videolab_copy = deepcopy(self) + + # clear out dataframes + videolab_copy.frame_issues = None + videolab_copy.frame_issue_summary = None + + # Save the imagelab object to disk. + with open(root_save_path / OBJECT_FILENAME, "wb") as f: + pickle.dump(videolab_copy, f) + + print(f"Saved Videolab to folder: {root_save_path}") + print( + "The data path and dataset must be not be changed to maintain consistent " + "state when loading this Videolab" + ) + + @classmethod + def load(cls: Type[TVideolab], path: str) -> Videolab: + """Loads Videolab from given path.""" + # get pathlib Path object + root_save_path = Path(path) + + # check if path exists + if not root_save_path.exists(): + raise ValueError(f"No folder found at specified path: {path}") + + with open(root_save_path / OBJECT_FILENAME, "rb") as f: + videolab: Videolab = pickle.load(f) + + # Load the issues from disk. + videolab.frame_issues = pd.read_csv( + root_save_path / ISSUES_FILENAME, index_col=0 + ) + videolab.frame_issue_summary = pd.read_csv( + root_save_path / ISSUE_SUMMARY_FILENAME, index_col=0 + ) + + # create specific imagelab sub directory + imagelab_sub_dir = str(root_save_path / "imagelab") + + # store imagelab object + videolab.imagelab = Imagelab.load(imagelab_sub_dir) + + # notify user + print("Successfully loaded Videolab") + + return videolab From 9434049250a7bcf8676e2f4e3434033b1a71e9d6 Mon Sep 17 00:00:00 2001 From: Femme Phile Date: Sat, 18 Nov 2023 08:53:58 -0500 Subject: [PATCH 12/12] Fixing DataFrame type annotation capitalization --- src/cleanvision/videolab.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cleanvision/videolab.py b/src/cleanvision/videolab.py index 2d7178b6..806e9526 100644 --- a/src/cleanvision/videolab.py +++ b/src/cleanvision/videolab.py @@ -148,7 +148,7 @@ def _sample_frames(self, samples_dir: Path, sample_interval: int) -> None: @staticmethod def _parent_dir_frame_samples_dict( - frame_issues: pd.Dataframe, + frame_issues: pd.DataFrame, ) -> Dict[str, List[str]]: """Creates dictionary of parent directory and frame samples.""" # set dict @@ -174,7 +174,7 @@ def _parent_dir_frame_samples_dict( # get cluster dict return cluster_frame_samples - def _aggregate_issues(self, frame_issues: pd.Dataframe) -> pd.DataFrame: + def _aggregate_issues(self, frame_issues: pd.DataFrame) -> pd.DataFrame: """Aggregate Imagelab issues into a single frame for each video.""" # convert booleans to floats pure_float_issues = frame_issues * 1 @@ -209,7 +209,7 @@ def _aggregate_issues(self, frame_issues: pd.Dataframe) -> pd.DataFrame: # return the aggregated dataframe return agg_df - def _aggregate_summary(self, aggregate_issues: pd.Dataframe) -> pd.DataFrame: + def _aggregate_summary(self, aggregate_issues: pd.DataFrame) -> pd.DataFrame: """Create issues summary for aggregate issues.""" # setup issue summary storage summary_dict = {}