Skip to content

Commit

Permalink
pendulum consolidation in prefect.concurrency (#17102)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Feb 11, 2025
1 parent 949e865 commit bf451fb
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
7 changes: 4 additions & 3 deletions src/prefect/concurrency/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Optional, Union

import anyio
import pendulum

from prefect.types._datetime import now

from ._asyncio import (
AcquireConcurrencySlotTimeoutError as AcquireConcurrencySlotTimeoutError,
Expand Down Expand Up @@ -69,13 +70,13 @@ async def main():
max_retries=max_retries,
strict=strict,
)
acquisition_time = pendulum.now("UTC")
acquisition_time = now("UTC")
emitted_events = emit_concurrency_acquisition_events(limits, occupy)

try:
yield
finally:
occupancy_period = pendulum.now("UTC") - acquisition_time
occupancy_period = now("UTC") - acquisition_time
try:
await arelease_concurrency_slots(
names, occupy, occupancy_period.total_seconds()
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/concurrency/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from contextlib import contextmanager
from typing import Optional, TypeVar, Union

import pendulum
from typing_extensions import Literal

from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse
from prefect.types._datetime import now
from prefect.utilities.asyncutils import run_coro_as_sync

from ._asyncio import (
Expand Down Expand Up @@ -98,13 +98,13 @@ def main():
strict=strict,
max_retries=max_retries,
)
acquisition_time = pendulum.now("UTC")
acquisition_time = now("UTC")
emitted_events = emit_concurrency_acquisition_events(limits, occupy)

try:
yield
finally:
occupancy_period = pendulum.now("UTC") - acquisition_time
occupancy_period = now("UTC") - acquisition_time
_release_concurrency_slots(names, occupy, occupancy_period.total_seconds())
emit_concurrency_release_events(limits, occupy, emitted_events)

Expand Down
6 changes: 3 additions & 3 deletions src/prefect/concurrency/v1/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from uuid import UUID

import anyio
import pendulum

from prefect.concurrency.v1._asyncio import (
acquire_concurrency_slots,
Expand All @@ -15,6 +14,7 @@
emit_concurrency_release_events,
)
from prefect.concurrency.v1.context import ConcurrencyContext
from prefect.types._datetime import now

from ._asyncio import (
AcquireConcurrencySlotTimeoutError as AcquireConcurrencySlotTimeoutError,
Expand Down Expand Up @@ -67,13 +67,13 @@ async def main():
if TYPE_CHECKING:
assert not isinstance(acquire_slots, list)
limits = await acquire_slots
acquisition_time = pendulum.now("UTC")
acquisition_time = now("UTC")
emitted_events = emit_concurrency_acquisition_events(limits, task_run_id)

try:
yield
finally:
occupancy_period = pendulum.now("UTC") - acquisition_time
occupancy_period = now("UTC") - acquisition_time
try:
release_slots = release_concurrency_slots(
names_normalized, task_run_id, occupancy_period.total_seconds()
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/concurrency/v1/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional, TypeVar, Union
from uuid import UUID

import pendulum
from prefect.types._datetime import now

from ._asyncio import acquire_concurrency_slots, release_concurrency_slots
from ._events import (
Expand Down Expand Up @@ -59,13 +59,13 @@ def main():
)
assert not asyncio.iscoroutine(result)
limits = result
acquisition_time = pendulum.now("UTC")
acquisition_time = now("UTC")
emitted_events = emit_concurrency_acquisition_events(limits, task_run_id)

try:
yield
finally:
occupancy_period = pendulum.now("UTC") - acquisition_time
occupancy_period = now("UTC") - acquisition_time
release_concurrency_slots(
names, task_run_id, occupancy_period.total_seconds(), **force
)
Expand Down

0 comments on commit bf451fb

Please sign in to comment.