forked from jbsparrow/CyberDropDownloader
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Create additional custom type to simplify model validators - Split customs into `types`, `validators` and `converters`
- Loading branch information
1 parent
722b42a
commit 1c5b19b
Showing
8 changed files
with
294 additions
and
280 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
""" | ||
Functions to use with `AfterValidator`, `field_validator(mode="after")` or `model_validator(mode="after")` | ||
""" | ||
|
||
import re | ||
from datetime import timedelta | ||
from pathlib import Path | ||
|
||
from pydantic import AnyUrl, ByteSize, TypeAdapter | ||
from yarl import URL | ||
|
||
DATE_PATTERN_REGEX = r"(\d+)\s*(second|seconds|minute|minutes|hour|hours|day|days|week|weeks|month|months|year|years)" | ||
DATE_PATTERN = re.compile(DATE_PATTERN_REGEX, re.IGNORECASE) | ||
|
||
byte_size_adapter = TypeAdapter(ByteSize) | ||
|
||
|
||
def convert_byte_size_to_str(value: ByteSize) -> str: | ||
if not isinstance(value, ByteSize): | ||
value = ByteSize(value) | ||
return value.human_readable(decimal=True) | ||
|
||
|
||
def convert_to_yarl(value: AnyUrl) -> URL: | ||
return URL(str(value)) | ||
|
||
|
||
def change_path_suffix(value: Path, suffix: str) -> Path: | ||
return value.with_suffix(suffix) | ||
|
||
|
||
def convert_to_byte_size(value: ByteSize | str | int) -> ByteSize: | ||
return byte_size_adapter.validate_python(value) | ||
|
||
|
||
def convert_str_to_timedelta(input_date: str) -> timedelta: | ||
time_str = input_date.casefold() | ||
matches: list[str] = re.findall(DATE_PATTERN, time_str) | ||
seen_units = set() | ||
time_dict = {"days": 0} | ||
|
||
for value, unit in matches: | ||
value = int(value) | ||
unit = unit.lower() | ||
normalized_unit = unit.rstrip("s") | ||
plural_unit = normalized_unit + "s" | ||
if normalized_unit in seen_units: | ||
msg = f"Duplicate time unit detected: '{unit}' conflicts with another entry" | ||
raise ValueError(msg) | ||
seen_units.add(normalized_unit) | ||
|
||
if "day" in unit: | ||
time_dict["days"] += value | ||
elif "month" in unit: | ||
time_dict["days"] += value * 30 | ||
elif "year" in unit: | ||
time_dict["days"] += value * 365 | ||
else: | ||
time_dict[plural_unit] = value | ||
|
||
if not matches: | ||
msg = f"Unable to convert '{input_date}' to timedelta object" | ||
raise ValueError(msg) | ||
return timedelta(**time_dict) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
from __future__ import annotations | ||
|
||
from functools import partial | ||
from pathlib import Path | ||
from typing import TYPE_CHECKING, Annotated | ||
|
||
from pydantic import ( | ||
AfterValidator, | ||
AnyUrl, | ||
BaseModel, | ||
BeforeValidator, | ||
ByteSize, | ||
ConfigDict, | ||
HttpUrl, | ||
NonNegativeInt, | ||
PlainSerializer, | ||
Secret, | ||
SerializationInfo, | ||
StringConstraints, | ||
model_serializer, | ||
model_validator, | ||
) | ||
|
||
from .converters import change_path_suffix, convert_byte_size_to_str, convert_to_yarl | ||
from .validators import parse_apprise_url, parse_falsy_as_none, parse_list | ||
|
||
if TYPE_CHECKING: | ||
from yarl import URL | ||
|
||
ByteSizeSerilized = Annotated[ByteSize, PlainSerializer(convert_byte_size_to_str, return_type=str)] | ||
HttpURL = Annotated[HttpUrl, AfterValidator(convert_to_yarl)] | ||
ListNonNegativeInt = Annotated[list[NonNegativeInt], BeforeValidator(parse_list)] | ||
|
||
NonEmptyStr = Annotated[str, StringConstraints(min_length=1, strip_whitespace=True)] | ||
NonEmptyStrOrNone = Annotated[NonEmptyStr | None, BeforeValidator(parse_falsy_as_none)] | ||
ListNonEmptyStr = Annotated[list[NonEmptyStr], BeforeValidator(parse_list)] | ||
|
||
PathOrNone = Annotated[Path | None, BeforeValidator(parse_falsy_as_none)] | ||
LogPath = Annotated[Path, AfterValidator(partial(change_path_suffix, suffix=".csv"))] | ||
MainLogPath = Annotated[LogPath, AfterValidator(partial(change_path_suffix, suffix=".log"))] | ||
|
||
|
||
class AliasModel(BaseModel): | ||
model_config = ConfigDict(populate_by_name=True) | ||
|
||
|
||
class FrozenModel(BaseModel): | ||
model_config = ConfigDict(frozen=True) | ||
|
||
|
||
class AppriseURLModel(FrozenModel): | ||
url: Secret[AnyUrl] | ||
tags: set[str] | ||
|
||
@model_serializer() | ||
def serialize(self, info: SerializationInfo): | ||
dump_secret = info.mode != "json" | ||
url = self.url.get_secret_value() if dump_secret else self.url | ||
tags = self.tags - set("no_logs") | ||
tags = sorted(tags) | ||
return f"{','.join(tags)}{'=' if tags else ''}{url}" | ||
|
||
@model_validator(mode="before") | ||
@staticmethod | ||
def parse_input(value: URL | dict | str) -> dict: | ||
return parse_apprise_url(value) | ||
|
||
|
||
class HttpAppriseURL(AppriseURLModel): | ||
url: Secret[HttpURL] |
Oops, something went wrong.