Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pendulum consolidation in several modules #17103

Merged
merged 4 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from uuid import UUID

import anyio
import pendulum

import prefect
from prefect._result_records import ResultRecordMetadata
Expand All @@ -16,6 +15,7 @@
from prefect.telemetry.run_telemetry import (
LABELS_TRACEPARENT_KEY,
)
from prefect.types._datetime import now
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.slugify import slugify

Expand Down Expand Up @@ -96,7 +96,7 @@ async def run_deployment(
raise ValueError("`timeout` cannot be negative")

if scheduled_time is None:
scheduled_time = pendulum.now("UTC")
scheduled_time = now("UTC")

parameters = parameters or {}

Expand Down
5 changes: 2 additions & 3 deletions src/prefect/docker/docker_image.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from pathlib import Path
from typing import Any, Optional

from pendulum import now as pendulum_now

from prefect.settings import (
PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE,
)
from prefect.types._datetime import now
from prefect.utilities.dockerutils import (
PushError,
build_image,
Expand Down Expand Up @@ -54,7 +53,7 @@ def __init__(
# join the namespace and repository to create the full image name
# ignore namespace if it is None
self.name: str = "/".join(filter(None, [namespace, repository]))
self.tag: str = tag or image_tag or slugify(pendulum_now("utc").isoformat())
self.tag: str = tag or image_tag or slugify(now("UTC").isoformat())
self.dockerfile: str = dockerfile
self.build_kwargs: dict[str, Any] = build_kwargs

Expand Down
7 changes: 4 additions & 3 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import os
import ssl
from datetime import timedelta
from types import TracebackType
from typing import (
TYPE_CHECKING,
Expand All @@ -21,7 +22,6 @@
from uuid import UUID

import orjson
import pendulum
from cachetools import TTLCache
from prometheus_client import Counter
from python_socks.async_.asyncio import Proxy
Expand All @@ -44,6 +44,7 @@
PREFECT_DEBUG_MODE,
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE,
)
from prefect.types._datetime import add_years, now

if TYPE_CHECKING:
from prefect.events.filters import EventFilter
Expand Down Expand Up @@ -653,8 +654,8 @@ async def _reconnect(self) -> None:
from prefect.events.filters import EventOccurredFilter

self._filter.occurred = EventOccurredFilter(
since=pendulum.now("UTC").subtract(minutes=1),
until=pendulum.now("UTC").add(years=1),
since=now("UTC") - timedelta(minutes=1),
until=add_years(now("UTC"), 1),
)

logger.debug(" filtering events since %s...", self._filter.occurred.since)
Expand Down
8 changes: 3 additions & 5 deletions src/prefect/events/related.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
)
from uuid import UUID

import pendulum

from prefect.types import DateTime
from prefect.types._datetime import DateTime, now

from .schemas.events import RelatedResource

Expand Down Expand Up @@ -79,7 +77,7 @@ async def related_resources_from_run_context(

related_objects: List[ResourceCacheEntry] = []

async def dummy_read():
async def dummy_read() -> ResourceCacheEntry:
return {}

if flow_run_context:
Expand Down Expand Up @@ -207,7 +205,7 @@ async def _get_and_cache_related_object(
"object": obj_,
}

cache[cache_key] = (entry, pendulum.now("UTC"))
cache[cache_key] = (entry, now("UTC"))

# In the case of a worker or agent this cache could be long-lived. To keep
# from running out of memory only keep `MAX_CACHE_SIZE` entries in the
Expand Down
16 changes: 8 additions & 8 deletions src/prefect/locking/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import time
from datetime import timedelta
from logging import Logger
from pathlib import Path
from typing import Optional

import anyio
import pendulum
import pydantic_core
from typing_extensions import TypedDict

from prefect.logging.loggers import get_logger
from prefect.types._datetime import DateTime, PendulumDuration
from prefect.types._datetime import DateTime, now, parse_datetime

from .protocol import LockManager

Expand All @@ -27,7 +27,7 @@ class _LockInfo(TypedDict):
"""

holder: str
expiration: Optional[pendulum.DateTime]
expiration: Optional[DateTime]
path: Path


Expand Down Expand Up @@ -64,7 +64,7 @@ def _get_lock_info(self, key: str, use_cache: bool = True) -> Optional[_LockInfo
lock_info["path"] = lock_path
expiration = lock_info.get("expiration")
lock_info["expiration"] = (
pendulum.parse(expiration) if expiration is not None else None
parse_datetime(expiration) if expiration is not None else None
)
self._locks[key] = lock_info
return lock_info
Expand All @@ -86,7 +86,7 @@ async def _aget_lock_info(
lock_info["path"] = lock_path
expiration = lock_info.get("expiration")
lock_info["expiration"] = (
pendulum.parse(expiration) if expiration is not None else None
parse_datetime(expiration) if expiration is not None else None
)
self._locks[key] = lock_info
return lock_info
Expand Down Expand Up @@ -117,7 +117,7 @@ def acquire_lock(
)
return self.acquire_lock(key, holder, acquire_timeout, hold_timeout)
expiration = (
DateTime.now("utc") + PendulumDuration(seconds=hold_timeout)
now("UTC") + timedelta(seconds=hold_timeout)
if hold_timeout is not None
else None
)
Expand Down Expand Up @@ -166,7 +166,7 @@ async def aacquire_lock(
)
return self.acquire_lock(key, holder, acquire_timeout, hold_timeout)
expiration = (
DateTime.now("utc") + PendulumDuration(seconds=hold_timeout)
now("UTC") + timedelta(seconds=hold_timeout)
if hold_timeout is not None
else None
)
Expand Down Expand Up @@ -208,7 +208,7 @@ def is_locked(self, key: str, use_cache: bool = False) -> bool:
if (expiration := lock_info.get("expiration")) is None:
return True

expired = expiration < DateTime.now("utc")
expired = expiration < now("UTC")
if expired:
Path(lock_info["path"]).unlink()
self._locks.pop(key, None)
Expand Down
18 changes: 7 additions & 11 deletions src/prefect/logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import uuid
import warnings
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, Dict, List, TextIO, Type
from typing import TYPE_CHECKING, Any, Dict, TextIO, Type

import pendulum
from rich.console import Console
from rich.highlighter import Highlighter, NullHighlighter
from rich.theme import Theme
Expand All @@ -35,6 +34,7 @@
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
)
from prefect.types._datetime import from_timestamp

if sys.version_info >= (3, 12):
StreamHandler = logging.StreamHandler[TextIO]
Expand All @@ -47,18 +47,18 @@

class APILogWorker(BatchedQueueService[Dict[str, Any]]):
@property
def _max_batch_size(self):
def _max_batch_size(self) -> int:
return max(
PREFECT_LOGGING_TO_API_BATCH_SIZE.value()
- PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
)

@property
def _min_interval(self):
def _min_interval(self) -> float | None:
return PREFECT_LOGGING_TO_API_BATCH_INTERVAL.value()

async def _handle_batch(self, items: List):
async def _handle_batch(self, items: list[dict[str, Any]]):
try:
await self._client.create_logs(items)
except Exception as e:
Expand Down Expand Up @@ -229,9 +229,7 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
worker_id=worker_id,
name=record.name,
level=record.levelno,
timestamp=pendulum.from_timestamp(
getattr(record, "created", None) or time.time()
),
timestamp=from_timestamp(getattr(record, "created", None) or time.time()),
message=self.format(record),
).model_dump(mode="json")

Expand Down Expand Up @@ -272,9 +270,7 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
worker_id=worker_id,
name=record.name,
level=record.levelno,
timestamp=pendulum.from_timestamp(
getattr(record, "created", None) or time.time()
),
timestamp=from_timestamp(getattr(record, "created", None) or time.time()),
message=self.format(record),
).model_dump(mode="json")

Expand Down
27 changes: 10 additions & 17 deletions src/prefect/runtime/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from __future__ import annotations

import os
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

import pendulum

from prefect._internal.concurrency.api import create_call, from_sync
from prefect.client.orchestration import get_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.settings import PREFECT_API_URL, PREFECT_UI_URL
from prefect.types._datetime import DateTime, Timezone, now, parse_datetime

if TYPE_CHECKING:
from prefect.client.schemas.objects import Flow, FlowRun, TaskRun
Expand All @@ -53,28 +53,21 @@
]


def _pendulum_parse(dt: str) -> pendulum.DateTime:
"""
Use pendulum to cast different format date strings to pendulum.DateTime --
tzinfo is ignored (UTC forced)
"""
return pendulum.parse(dt, tz=None, strict=False).set(tz="UTC")
def _parse_datetime_UTC(dt: str) -> DateTime:
pendulum_dt = parse_datetime(dt, tz=Timezone("UTC"), strict=False)
assert isinstance(pendulum_dt, datetime)
return DateTime.instance(pendulum_dt)


type_cast: dict[
type[bool]
| type[int]
| type[float]
| type[str]
| type[None]
| type[pendulum.DateTime],
type[bool] | type[int] | type[float] | type[str] | type[None] | type[DateTime],
Callable[[Any], Any],
] = {
bool: lambda x: x.lower() == "true",
int: int,
float: float,
str: str,
pendulum.DateTime: _pendulum_parse,
DateTime: _parse_datetime_UTC,
# for optional defined attributes, when real value is NoneType, use str
type(None): str,
}
Expand Down Expand Up @@ -221,11 +214,11 @@ def get_flow_version() -> Optional[str]:
return flow_run_ctx.flow.version


def get_scheduled_start_time() -> pendulum.DateTime:
def get_scheduled_start_time() -> DateTime:
flow_run_ctx = FlowRunContext.get()
run_id = get_id()
if flow_run_ctx is None and run_id is None:
return pendulum.now("utc")
return now("UTC")
elif flow_run_ctx is None:
flow_run = from_sync.call_soon_in_loop_thread(
create_call(_get_flow_run, run_id)
Expand Down
25 changes: 20 additions & 5 deletions src/prefect/types/_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@
from pendulum.duration import Duration as PendulumDuration
from pendulum.time import Time as PendulumTime
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic_extra_types.pendulum_dt import Date as PydanticDate
from pydantic_extra_types.pendulum_dt import DateTime as PydanticDateTime
from pydantic_extra_types.pendulum_dt import Duration as PydanticDuration
from pydantic_extra_types.pendulum_dt import (
Date as PydanticDate,
)
from pydantic_extra_types.pendulum_dt import (
DateTime as PydanticDateTime,
)
from pydantic_extra_types.pendulum_dt import (
Duration as PydanticDuration,
)
from typing_extensions import TypeAlias

DateTime: TypeAlias = PydanticDateTime
Date: TypeAlias = PydanticDate
Duration: TypeAlias = PydanticDuration
UTC: pendulum.tz.Timezone = pendulum.tz.UTC


def parse_datetime(
Expand Down Expand Up @@ -51,18 +58,26 @@ def create_datetime_instance(v: datetime.datetime) -> DateTime:
def from_format(
value: str,
fmt: str,
tz: str | Timezone = pendulum.tz.UTC,
tz: str | Timezone = UTC,
locale: str | None = None,
) -> DateTime:
return DateTime.instance(pendulum.from_format(value, fmt, tz, locale))


def from_timestamp(ts: float, tz: str | pendulum.tz.Timezone = UTC) -> DateTime:
return DateTime.instance(pendulum.from_timestamp(ts, tz))


def human_friendly_diff(dt: DateTime | datetime.datetime) -> str:
if isinstance(dt, DateTime):
return dt.diff_for_humans()
else:
return DateTime.instance(dt).diff_for_humans()


def now(tz: str | Timezone = pendulum.tz.UTC) -> DateTime:
def now(tz: str | Timezone = UTC) -> DateTime:
return DateTime.now(tz)


def add_years(dt: DateTime, years: int) -> DateTime:
return dt.add(years=years)
2 changes: 1 addition & 1 deletion src/prefect/utilities/dockerutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def push_image(
"""

if not tag:
tag = slugify(now("utc").isoformat())
tag = slugify(now("UTC").isoformat())

_, registry, _, _, _ = urlsplit(registry_url)
repository = f"{registry}/{name}"
Expand Down
Loading