diff --git a/avtdl/plugins/filters/filters.py b/avtdl/plugins/filters/filters.py index eaf938d..fbf61bf 100644 --- a/avtdl/plugins/filters/filters.py +++ b/avtdl/plugins/filters/filters.py @@ -1,11 +1,14 @@ +import asyncio import datetime import json from collections import OrderedDict +from pathlib import Path from typing import List, Optional, Sequence import dateutil.tz from pydantic import Field, field_serializer, field_validator +from avtdl.core import utils from avtdl.core.config import Plugins from avtdl.core.interfaces import ActorConfig, Event, Filter, FilterEntity, Record, TextRecord from avtdl.core.utils import Fmt, find_matching_field @@ -21,7 +24,6 @@ @Plugins.register('filter.json', Plugins.kind.ACTOR_CONFIG) @Plugins.register('filter.format', Plugins.kind.ACTOR_CONFIG) @Plugins.register('filter.format.event', Plugins.kind.ACTOR_CONFIG) -@Plugins.register('filter.deduplicate', Plugins.kind.ACTOR_CONFIG) class EmptyFilterConfig(ActorConfig): pass @@ -346,6 +348,23 @@ def match(self, entity: FormatEventFilterEntity, record: Record) -> Event: return Event(event_type=event_type, text=text, record=record) +@Plugins.register('filter.deduplicate', Plugins.kind.ACTOR_CONFIG) +class DeduplicateFilterConfig(ActorConfig): + history_dir: Optional[Path] = Field(default='cache/deduplicate/', validate_default=True) + """directory to store entities history between restarts""" + + @field_validator('history_dir') + @classmethod + def check_dir(cls, path: Optional[Path]): + if path is None: + return path + ok = utils.check_dir(path, create=True) + if ok: + return path + else: + raise ValueError(f'check path "{path}" exists and is a writeable directory') + + @Plugins.register('filter.deduplicate', Plugins.kind.ACTOR_ENTITY) class DeduplicateFilterEntity(FilterEntity): field: str = 'hash' @@ -378,7 +397,7 @@ class DeduplicateFilter(Filter): restarts. """ - def __init__(self, config: EmptyFilterConfig, entities: Sequence[DeduplicateFilterEntity]): + def __init__(self, config: DeduplicateFilterConfig, entities: Sequence[DeduplicateFilterEntity]): super().__init__(config, entities) def match(self, entity: DeduplicateFilterEntity, record: Record) -> Optional[Record]: @@ -408,3 +427,33 @@ def match(self, entity: DeduplicateFilterEntity, record: Record) -> Optional[Rec entity.history.move_to_end(value) self.logger.debug(f'[{entity.name}] record with {entity.field}={value} has not yet been seen, letting through') return record + + async def run(self) -> None: + if self.conf.history_dir is None: + return + + for entity in self.entities.values(): + filename = self.conf.history_dir / Path(f'{self.conf.name}-{entity.name}-history.json') + try: + if filename.exists() and filename.is_file(): + with open(filename, 'rt', encoding='utf8') as fp: + history = json.load(fp) + entity.history.update(history) + self.logger.info(f'[{entity.name}] history ({len(entity.history)} items) successfully loaded from {filename}') + except Exception as e: + self.logger.info(f'[{entity.name}] failed to load history from "{filename}": {e}') + + try: + await asyncio.Future() + except (asyncio.CancelledError, KeyboardInterrupt): + for entity in self.entities.values(): + if not entity.history: + continue + filename = self.conf.history_dir / Path(f'{self.conf.name}-{entity.name}-history.json') + try: + with open(filename, 'wt', encoding='utf8') as fp: + json.dump(entity.history, fp, ensure_ascii=False, indent=4) + self.logger.info(f'[{entity.name}] history ({len(entity.history)} items) successfully stored at {filename}') + except Exception as e: + self.logger.info(f'[{entity.name}] failed to store history to "{filename}": {e}') + raise