Skip to content

Commit

Permalink
In filter.deduplicate store history on disk on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
user committed Oct 29, 2024
1 parent c44192c commit 37afe19
Showing 1 changed file with 51 additions and 2 deletions.
53 changes: 51 additions & 2 deletions avtdl/plugins/filters/filters.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import asyncio
import datetime
import json
from collections import OrderedDict
from pathlib import Path
from typing import List, Optional, Sequence

import dateutil.tz
from pydantic import Field, field_serializer, field_validator

from avtdl.core import utils
from avtdl.core.config import Plugins
from avtdl.core.interfaces import ActorConfig, Event, Filter, FilterEntity, Record, TextRecord
from avtdl.core.utils import Fmt, find_matching_field
Expand All @@ -21,7 +24,6 @@
@Plugins.register('filter.json', Plugins.kind.ACTOR_CONFIG)
@Plugins.register('filter.format', Plugins.kind.ACTOR_CONFIG)
@Plugins.register('filter.format.event', Plugins.kind.ACTOR_CONFIG)
@Plugins.register('filter.deduplicate', Plugins.kind.ACTOR_CONFIG)
class EmptyFilterConfig(ActorConfig):
pass

Expand Down Expand Up @@ -346,6 +348,23 @@ def match(self, entity: FormatEventFilterEntity, record: Record) -> Event:
return Event(event_type=event_type, text=text, record=record)


@Plugins.register('filter.deduplicate', Plugins.kind.ACTOR_CONFIG)
class DeduplicateFilterConfig(ActorConfig):
history_dir: Optional[Path] = Field(default='cache/deduplicate/', validate_default=True)
"""directory to store entities history between restarts"""

@field_validator('history_dir')
@classmethod
def check_dir(cls, path: Optional[Path]):
if path is None:
return path
ok = utils.check_dir(path, create=True)
if ok:
return path
else:
raise ValueError(f'check path "{path}" exists and is a writeable directory')


@Plugins.register('filter.deduplicate', Plugins.kind.ACTOR_ENTITY)
class DeduplicateFilterEntity(FilterEntity):
field: str = 'hash'
Expand Down Expand Up @@ -378,7 +397,7 @@ class DeduplicateFilter(Filter):
restarts.
"""

def __init__(self, config: EmptyFilterConfig, entities: Sequence[DeduplicateFilterEntity]):
def __init__(self, config: DeduplicateFilterConfig, entities: Sequence[DeduplicateFilterEntity]):
super().__init__(config, entities)

def match(self, entity: DeduplicateFilterEntity, record: Record) -> Optional[Record]:
Expand Down Expand Up @@ -408,3 +427,33 @@ def match(self, entity: DeduplicateFilterEntity, record: Record) -> Optional[Rec
entity.history.move_to_end(value)
self.logger.debug(f'[{entity.name}] record with {entity.field}={value} has not yet been seen, letting through')
return record

async def run(self) -> None:
if self.conf.history_dir is None:
return

for entity in self.entities.values():
filename = self.conf.history_dir / Path(f'{self.conf.name}-{entity.name}-history.json')
try:
if filename.exists() and filename.is_file():
with open(filename, 'rt', encoding='utf8') as fp:
history = json.load(fp)
entity.history.update(history)
self.logger.info(f'[{entity.name}] history ({len(entity.history)} items) successfully loaded from {filename}')
except Exception as e:
self.logger.info(f'[{entity.name}] failed to load history from "{filename}": {e}')

try:
await asyncio.Future()
except (asyncio.CancelledError, KeyboardInterrupt):
for entity in self.entities.values():
if not entity.history:
continue
filename = self.conf.history_dir / Path(f'{self.conf.name}-{entity.name}-history.json')
try:
with open(filename, 'wt', encoding='utf8') as fp:
json.dump(entity.history, fp, ensure_ascii=False, indent=4)
self.logger.info(f'[{entity.name}] history ({len(entity.history)} items) successfully stored at {filename}')
except Exception as e:
self.logger.info(f'[{entity.name}] failed to store history to "{filename}": {e}')
raise

0 comments on commit 37afe19

Please sign in to comment.