diff --git a/src/prefect/deployments/flow_runs.py b/src/prefect/deployments/flow_runs.py index 1927933ea040..4f5ccd513a33 100644 --- a/src/prefect/deployments/flow_runs.py +++ b/src/prefect/deployments/flow_runs.py @@ -3,7 +3,6 @@ from uuid import UUID import anyio -import pendulum import prefect from prefect._result_records import ResultRecordMetadata @@ -16,6 +15,7 @@ from prefect.telemetry.run_telemetry import ( LABELS_TRACEPARENT_KEY, ) +from prefect.types._datetime import now from prefect.utilities.asyncutils import sync_compatible from prefect.utilities.slugify import slugify @@ -96,7 +96,7 @@ async def run_deployment( raise ValueError("`timeout` cannot be negative") if scheduled_time is None: - scheduled_time = pendulum.now("UTC") + scheduled_time = now("UTC") parameters = parameters or {} diff --git a/src/prefect/docker/docker_image.py b/src/prefect/docker/docker_image.py index a2de9980d52a..add3981cc590 100644 --- a/src/prefect/docker/docker_image.py +++ b/src/prefect/docker/docker_image.py @@ -1,11 +1,10 @@ from pathlib import Path from typing import Any, Optional -from pendulum import now as pendulum_now - from prefect.settings import ( PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE, ) +from prefect.types._datetime import now from prefect.utilities.dockerutils import ( PushError, build_image, @@ -54,7 +53,7 @@ def __init__( # join the namespace and repository to create the full image name # ignore namespace if it is None self.name: str = "/".join(filter(None, [namespace, repository])) - self.tag: str = tag or image_tag or slugify(pendulum_now("utc").isoformat()) + self.tag: str = tag or image_tag or slugify(now("UTC").isoformat()) self.dockerfile: str = dockerfile self.build_kwargs: dict[str, Any] = build_kwargs diff --git a/src/prefect/events/clients.py b/src/prefect/events/clients.py index 31277214c599..cfcb4be615c2 100644 --- a/src/prefect/events/clients.py +++ b/src/prefect/events/clients.py @@ -2,6 +2,7 @@ import asyncio import os import ssl +from datetime import timedelta from types import TracebackType from typing import ( TYPE_CHECKING, @@ -21,7 +22,6 @@ from uuid import UUID import orjson -import pendulum from cachetools import TTLCache from prometheus_client import Counter from python_socks.async_.asyncio import Proxy @@ -44,6 +44,7 @@ PREFECT_DEBUG_MODE, PREFECT_SERVER_ALLOW_EPHEMERAL_MODE, ) +from prefect.types._datetime import add_years, now if TYPE_CHECKING: from prefect.events.filters import EventFilter @@ -653,8 +654,8 @@ async def _reconnect(self) -> None: from prefect.events.filters import EventOccurredFilter self._filter.occurred = EventOccurredFilter( - since=pendulum.now("UTC").subtract(minutes=1), - until=pendulum.now("UTC").add(years=1), + since=now("UTC") - timedelta(minutes=1), + until=add_years(now("UTC"), 1), ) logger.debug(" filtering events since %s...", self._filter.occurred.since) diff --git a/src/prefect/events/related.py b/src/prefect/events/related.py index ee36db860352..eb2d2f55ad65 100644 --- a/src/prefect/events/related.py +++ b/src/prefect/events/related.py @@ -14,9 +14,7 @@ ) from uuid import UUID -import pendulum - -from prefect.types import DateTime +from prefect.types._datetime import DateTime, now from .schemas.events import RelatedResource @@ -79,7 +77,7 @@ async def related_resources_from_run_context( related_objects: List[ResourceCacheEntry] = [] - async def dummy_read(): + async def dummy_read() -> ResourceCacheEntry: return {} if flow_run_context: @@ -207,7 +205,7 @@ async def _get_and_cache_related_object( "object": obj_, } - cache[cache_key] = (entry, pendulum.now("UTC")) + cache[cache_key] = (entry, now("UTC")) # In the case of a worker or agent this cache could be long-lived. To keep # from running out of memory only keep `MAX_CACHE_SIZE` entries in the diff --git a/src/prefect/locking/filesystem.py b/src/prefect/locking/filesystem.py index 239ca798c5b8..89db40a76870 100644 --- a/src/prefect/locking/filesystem.py +++ b/src/prefect/locking/filesystem.py @@ -1,15 +1,15 @@ import time +from datetime import timedelta from logging import Logger from pathlib import Path from typing import Optional import anyio -import pendulum import pydantic_core from typing_extensions import TypedDict from prefect.logging.loggers import get_logger -from prefect.types._datetime import DateTime, PendulumDuration +from prefect.types._datetime import DateTime, now, parse_datetime from .protocol import LockManager @@ -27,7 +27,7 @@ class _LockInfo(TypedDict): """ holder: str - expiration: Optional[pendulum.DateTime] + expiration: Optional[DateTime] path: Path @@ -64,7 +64,7 @@ def _get_lock_info(self, key: str, use_cache: bool = True) -> Optional[_LockInfo lock_info["path"] = lock_path expiration = lock_info.get("expiration") lock_info["expiration"] = ( - pendulum.parse(expiration) if expiration is not None else None + parse_datetime(expiration) if expiration is not None else None ) self._locks[key] = lock_info return lock_info @@ -86,7 +86,7 @@ async def _aget_lock_info( lock_info["path"] = lock_path expiration = lock_info.get("expiration") lock_info["expiration"] = ( - pendulum.parse(expiration) if expiration is not None else None + parse_datetime(expiration) if expiration is not None else None ) self._locks[key] = lock_info return lock_info @@ -117,7 +117,7 @@ def acquire_lock( ) return self.acquire_lock(key, holder, acquire_timeout, hold_timeout) expiration = ( - DateTime.now("utc") + PendulumDuration(seconds=hold_timeout) + now("UTC") + timedelta(seconds=hold_timeout) if hold_timeout is not None else None ) @@ -166,7 +166,7 @@ async def aacquire_lock( ) return self.acquire_lock(key, holder, acquire_timeout, hold_timeout) expiration = ( - DateTime.now("utc") + PendulumDuration(seconds=hold_timeout) + now("UTC") + timedelta(seconds=hold_timeout) if hold_timeout is not None else None ) @@ -208,7 +208,7 @@ def is_locked(self, key: str, use_cache: bool = False) -> bool: if (expiration := lock_info.get("expiration")) is None: return True - expired = expiration < DateTime.now("utc") + expired = expiration < now("UTC") if expired: Path(lock_info["path"]).unlink() self._locks.pop(key, None) diff --git a/src/prefect/logging/handlers.py b/src/prefect/logging/handlers.py index 4b76bef55688..469cb02fbdeb 100644 --- a/src/prefect/logging/handlers.py +++ b/src/prefect/logging/handlers.py @@ -8,9 +8,8 @@ import uuid import warnings from contextlib import asynccontextmanager -from typing import TYPE_CHECKING, Any, Dict, List, TextIO, Type +from typing import TYPE_CHECKING, Any, Dict, TextIO, Type -import pendulum from rich.console import Console from rich.highlighter import Highlighter, NullHighlighter from rich.theme import Theme @@ -35,6 +34,7 @@ PREFECT_LOGGING_TO_API_MAX_LOG_SIZE, PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW, ) +from prefect.types._datetime import from_timestamp if sys.version_info >= (3, 12): StreamHandler = logging.StreamHandler[TextIO] @@ -47,7 +47,7 @@ class APILogWorker(BatchedQueueService[Dict[str, Any]]): @property - def _max_batch_size(self): + def _max_batch_size(self) -> int: return max( PREFECT_LOGGING_TO_API_BATCH_SIZE.value() - PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(), @@ -55,10 +55,10 @@ def _max_batch_size(self): ) @property - def _min_interval(self): + def _min_interval(self) -> float | None: return PREFECT_LOGGING_TO_API_BATCH_INTERVAL.value() - async def _handle_batch(self, items: List): + async def _handle_batch(self, items: list[dict[str, Any]]): try: await self._client.create_logs(items) except Exception as e: @@ -229,9 +229,7 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: worker_id=worker_id, name=record.name, level=record.levelno, - timestamp=pendulum.from_timestamp( - getattr(record, "created", None) or time.time() - ), + timestamp=from_timestamp(getattr(record, "created", None) or time.time()), message=self.format(record), ).model_dump(mode="json") @@ -272,9 +270,7 @@ def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: worker_id=worker_id, name=record.name, level=record.levelno, - timestamp=pendulum.from_timestamp( - getattr(record, "created", None) or time.time() - ), + timestamp=from_timestamp(getattr(record, "created", None) or time.time()), message=self.format(record), ).model_dump(mode="json") diff --git a/src/prefect/runtime/flow_run.py b/src/prefect/runtime/flow_run.py index 0c4709e7aa9c..fd0704f1b651 100644 --- a/src/prefect/runtime/flow_run.py +++ b/src/prefect/runtime/flow_run.py @@ -23,14 +23,14 @@ from __future__ import annotations import os +from datetime import datetime from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional -import pendulum - from prefect._internal.concurrency.api import create_call, from_sync from prefect.client.orchestration import get_client from prefect.context import FlowRunContext, TaskRunContext from prefect.settings import PREFECT_API_URL, PREFECT_UI_URL +from prefect.types._datetime import DateTime, Timezone, now, parse_datetime if TYPE_CHECKING: from prefect.client.schemas.objects import Flow, FlowRun, TaskRun @@ -53,28 +53,21 @@ ] -def _pendulum_parse(dt: str) -> pendulum.DateTime: - """ - Use pendulum to cast different format date strings to pendulum.DateTime -- - tzinfo is ignored (UTC forced) - """ - return pendulum.parse(dt, tz=None, strict=False).set(tz="UTC") +def _parse_datetime_UTC(dt: str) -> DateTime: + pendulum_dt = parse_datetime(dt, tz=Timezone("UTC"), strict=False) + assert isinstance(pendulum_dt, datetime) + return DateTime.instance(pendulum_dt) type_cast: dict[ - type[bool] - | type[int] - | type[float] - | type[str] - | type[None] - | type[pendulum.DateTime], + type[bool] | type[int] | type[float] | type[str] | type[None] | type[DateTime], Callable[[Any], Any], ] = { bool: lambda x: x.lower() == "true", int: int, float: float, str: str, - pendulum.DateTime: _pendulum_parse, + DateTime: _parse_datetime_UTC, # for optional defined attributes, when real value is NoneType, use str type(None): str, } @@ -221,11 +214,11 @@ def get_flow_version() -> Optional[str]: return flow_run_ctx.flow.version -def get_scheduled_start_time() -> pendulum.DateTime: +def get_scheduled_start_time() -> DateTime: flow_run_ctx = FlowRunContext.get() run_id = get_id() if flow_run_ctx is None and run_id is None: - return pendulum.now("utc") + return now("UTC") elif flow_run_ctx is None: flow_run = from_sync.call_soon_in_loop_thread( create_call(_get_flow_run, run_id) diff --git a/src/prefect/types/_datetime.py b/src/prefect/types/_datetime.py index 24c23221f928..2969c61d61c7 100644 --- a/src/prefect/types/_datetime.py +++ b/src/prefect/types/_datetime.py @@ -10,14 +10,21 @@ from pendulum.duration import Duration as PendulumDuration from pendulum.time import Time as PendulumTime from pendulum.tz.timezone import FixedTimezone, Timezone -from pydantic_extra_types.pendulum_dt import Date as PydanticDate -from pydantic_extra_types.pendulum_dt import DateTime as PydanticDateTime -from pydantic_extra_types.pendulum_dt import Duration as PydanticDuration +from pydantic_extra_types.pendulum_dt import ( + Date as PydanticDate, +) +from pydantic_extra_types.pendulum_dt import ( + DateTime as PydanticDateTime, +) +from pydantic_extra_types.pendulum_dt import ( + Duration as PydanticDuration, +) from typing_extensions import TypeAlias DateTime: TypeAlias = PydanticDateTime Date: TypeAlias = PydanticDate Duration: TypeAlias = PydanticDuration +UTC: pendulum.tz.Timezone = pendulum.tz.UTC def parse_datetime( @@ -51,12 +58,16 @@ def create_datetime_instance(v: datetime.datetime) -> DateTime: def from_format( value: str, fmt: str, - tz: str | Timezone = pendulum.tz.UTC, + tz: str | Timezone = UTC, locale: str | None = None, ) -> DateTime: return DateTime.instance(pendulum.from_format(value, fmt, tz, locale)) +def from_timestamp(ts: float, tz: str | pendulum.tz.Timezone = UTC) -> DateTime: + return DateTime.instance(pendulum.from_timestamp(ts, tz)) + + def human_friendly_diff(dt: DateTime | datetime.datetime) -> str: if isinstance(dt, DateTime): return dt.diff_for_humans() @@ -64,5 +75,9 @@ def human_friendly_diff(dt: DateTime | datetime.datetime) -> str: return DateTime.instance(dt).diff_for_humans() -def now(tz: str | Timezone = pendulum.tz.UTC) -> DateTime: +def now(tz: str | Timezone = UTC) -> DateTime: return DateTime.now(tz) + + +def add_years(dt: DateTime, years: int) -> DateTime: + return dt.add(years=years) diff --git a/src/prefect/utilities/dockerutils.py b/src/prefect/utilities/dockerutils.py index 57e6ae06f326..5f6bd798c71c 100644 --- a/src/prefect/utilities/dockerutils.py +++ b/src/prefect/utilities/dockerutils.py @@ -428,7 +428,7 @@ def push_image( """ if not tag: - tag = slugify(now("utc").isoformat()) + tag = slugify(now("UTC").isoformat()) _, registry, _, _, _ = urlsplit(registry_url) repository = f"{registry}/{name}" diff --git a/tests/runtime/test_flow_run.py b/tests/runtime/test_flow_run.py index 0f91a486f2af..6d862eaabb1b 100644 --- a/tests/runtime/test_flow_run.py +++ b/tests/runtime/test_flow_run.py @@ -2,7 +2,6 @@ import datetime from typing import Any -import pendulum import pytest from prefect import flow, states, tags, task @@ -12,6 +11,7 @@ from prefect.flows import Flow from prefect.runtime import flow_run from prefect.settings import PREFECT_API_URL, PREFECT_UI_URL +from prefect.types._datetime import DateTime, Timezone, now class TestAttributeAccessPatterns: @@ -41,9 +41,9 @@ async def test_new_attribute_via_env_var(self, monkeypatch: pytest.MonkeyPatch): ("str_attribute", "foo", "bar", "bar"), ( "datetime_attribute", - pendulum.DateTime(2022, 1, 1, 0, tzinfo=pendulum.UTC), + DateTime(2022, 1, 1, 0, tzinfo=Timezone("UTC")), "2023-05-13 20:00:00", - pendulum.DateTime(2023, 5, 13, 20, tzinfo=pendulum.UTC), + DateTime(2023, 5, 13, 20, tzinfo=Timezone("UTC")), ), ], ) @@ -205,7 +205,7 @@ async def test_scheduled_start_time_is_timestamp_when_not_set(self): async def test_scheduled_start_time_pulls_from_api_when_needed( self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient ): - TIMESTAMP = pendulum.now("utc").add(days=7) + TIMESTAMP = now("utc").add(days=7) run = await prefect_client.create_flow_run( flow=flow(lambda: None, name="test"), state=states.Scheduled(scheduled_time=TIMESTAMP),