From d44dbcf36f5761db54da8ed56f154afa5ee11d5b Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 15:20:34 -0600 Subject: [PATCH 01/11] init --- src/prefect/client/orchestration/__init__.py | 228 +-------- .../_concurrency_limits/__init__.py | 0 .../_concurrency_limits/client.py | 454 ++++++++++++++++++ 3 files changed, 467 insertions(+), 215 deletions(-) create mode 100644 src/prefect/client/orchestration/_concurrency_limits/__init__.py create mode 100644 src/prefect/client/orchestration/_concurrency_limits/client.py diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 484944de1185..b4c305e76b0f 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -26,6 +26,11 @@ ArtifactCollectionAsyncClient, ) +from prefect.client.orchestration._concurrency_limits.client import ( + ConcurrencyLimitAsyncClient, + ConcurrencyLimitClient, +) + import prefect import prefect.exceptions import prefect.settings @@ -38,7 +43,6 @@ BlockSchemaCreate, BlockTypeCreate, BlockTypeUpdate, - ConcurrencyLimitCreate, DeploymentCreate, DeploymentFlowRunCreate, DeploymentScheduleCreate, @@ -77,7 +81,6 @@ BlockDocument, BlockSchema, BlockType, - ConcurrencyLimit, ConcurrencyOptions, Constant, DeploymentSchedule, @@ -152,15 +155,13 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": - ... +) -> "PrefectClient": ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": - ... +) -> "SyncPrefectClient": ... def get_client( @@ -244,7 +245,9 @@ def get_client( ) -class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient): +class PrefectClient( + ArtifactAsyncClient, ArtifactCollectionAsyncClient, ConcurrencyLimitAsyncClient +): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -811,213 +814,6 @@ async def delete_flow_run( else: raise - async def create_concurrency_limit( - self, - tag: str, - concurrency_limit: int, - ) -> UUID: - """ - Create a tag concurrency limit in the Prefect API. These limits govern concurrently - running tasks. - - Args: - tag: a tag the concurrency limit is applied to - concurrency_limit: the maximum number of concurrent task runs for a given tag - - Raises: - httpx.RequestError: if the concurrency limit was not created for any reason - - Returns: - the ID of the concurrency limit in the backend - """ - - concurrency_limit_create = ConcurrencyLimitCreate( - tag=tag, - concurrency_limit=concurrency_limit, - ) - response = await self._client.post( - "/concurrency_limits/", - json=concurrency_limit_create.model_dump(mode="json"), - ) - - concurrency_limit_id = response.json().get("id") - - if not concurrency_limit_id: - raise httpx.RequestError(f"Malformed response: {response}") - - return UUID(concurrency_limit_id) - - async def read_concurrency_limit_by_tag( - self, - tag: str, - ) -> ConcurrencyLimit: - """ - Read the concurrency limit set on a specific tag. - - Args: - tag: a tag the concurrency limit is applied to - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: if the concurrency limit was not created for any reason - - Returns: - the concurrency limit set on a specific tag - """ - try: - response = await self._client.get( - f"/concurrency_limits/tag/{tag}", - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - concurrency_limit_id = response.json().get("id") - - if not concurrency_limit_id: - raise httpx.RequestError(f"Malformed response: {response}") - - concurrency_limit = ConcurrencyLimit.model_validate(response.json()) - return concurrency_limit - - async def read_concurrency_limits( - self, - limit: int, - offset: int, - ) -> list[ConcurrencyLimit]: - """ - Lists concurrency limits set on task run tags. - - Args: - limit: the maximum number of concurrency limits returned - offset: the concurrency limit query offset - - Returns: - a list of concurrency limits - """ - - body = { - "limit": limit, - "offset": offset, - } - - response = await self._client.post("/concurrency_limits/filter", json=body) - return pydantic.TypeAdapter(list[ConcurrencyLimit]).validate_python( - response.json() - ) - - async def reset_concurrency_limit_by_tag( - self, - tag: str, - slot_override: Optional[list[Union[UUID, str]]] = None, - ) -> None: - """ - Resets the concurrency limit slots set on a specific tag. - - Args: - tag: a tag the concurrency limit is applied to - slot_override: a list of task run IDs that are currently using a - concurrency slot, please check that any task run IDs included in - `slot_override` are currently running, otherwise those concurrency - slots will never be released. - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - """ - if slot_override is not None: - slot_override = [str(slot) for slot in slot_override] - - try: - await self._client.post( - f"/concurrency_limits/tag/{tag}/reset", - json=dict(slot_override=slot_override), - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def delete_concurrency_limit_by_tag( - self, - tag: str, - ) -> None: - """ - Delete the concurrency limit set on a specific tag. - - Args: - tag: a tag the concurrency limit is applied to - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - """ - try: - await self._client.delete( - f"/concurrency_limits/tag/{tag}", - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def increment_v1_concurrency_slots( - self, - names: list[str], - task_run_id: UUID, - ) -> httpx.Response: - """ - Increment concurrency limit slots for the specified limits. - - Args: - names (List[str]): A list of limit names for which to increment limits. - task_run_id (UUID): The task run ID incrementing the limits. - """ - data: dict[str, Any] = { - "names": names, - "task_run_id": str(task_run_id), - } - - return await self._client.post( - "/concurrency_limits/increment", - json=data, - ) - - async def decrement_v1_concurrency_slots( - self, - names: list[str], - task_run_id: UUID, - occupancy_seconds: float, - ) -> httpx.Response: - """ - Decrement concurrency limit slots for the specified limits. - - Args: - names (List[str]): A list of limit names to decrement. - task_run_id (UUID): The task run ID that incremented the limits. - occupancy_seconds (float): The duration in seconds that the limits - were held. - - Returns: - httpx.Response: The HTTP response from the server. - """ - data: dict[str, Any] = { - "names": names, - "task_run_id": str(task_run_id), - "occupancy_seconds": occupancy_seconds, - } - - return await self._client.post( - "/concurrency_limits/decrement", - json=data, - ) - async def create_work_queue( self, name: str, @@ -3436,7 +3232,9 @@ def __exit__(self, *_: object) -> NoReturn: assert False, "This should never be called but must be defined for __enter__" -class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient): +class SyncPrefectClient( + ArtifactClient, ArtifactCollectionClient, ConcurrencyLimitClient +): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). diff --git a/src/prefect/client/orchestration/_concurrency_limits/__init__.py b/src/prefect/client/orchestration/_concurrency_limits/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py new file mode 100644 index 000000000000..847dcb321075 --- /dev/null +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -0,0 +1,454 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from httpx import HTTPStatusError, RequestError + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectNotFound + +if TYPE_CHECKING: + from uuid import UUID + + from httpx import Response + + from prefect.client.schemas.actions import ConcurrencyLimitCreate + from prefect.client.schemas.objects import ConcurrencyLimit + + +class ConcurrencyLimitClient(BaseClient): + def create_concurrency_limit( + self, + tag: str, + concurrency_limit: int, + ) -> "UUID": + """ + Create a tag concurrency limit in the Prefect API. These limits govern concurrently + running tasks. + + Args: + tag: a tag the concurrency limit is applied to + concurrency_limit: the maximum number of concurrent task runs for a given tag + + Raises: + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the ID of the concurrency limit in the backend + """ + + concurrency_limit_create = ConcurrencyLimitCreate( + tag=tag, + concurrency_limit=concurrency_limit, + ) + response = self.request( + "POST", + "/concurrency_limits/", + json=concurrency_limit_create.model_dump(mode="json"), + ) + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from uuid import UUID + + return UUID(concurrency_limit_id) + + def read_concurrency_limit_by_tag( + self, + tag: str, + ) -> "ConcurrencyLimit": + """ + Read the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the concurrency limit set on a specific tag + """ + try: + response = self.request( + "GET", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate(response.json()) + + def read_concurrency_limits( + self, + limit: int, + offset: int, + ) -> list["ConcurrencyLimit"]: + """ + Lists concurrency limits set on task run tags. + + Args: + limit: the maximum number of concurrency limits returned + offset: the concurrency limit query offset + + Returns: + a list of concurrency limits + """ + + body = { + "limit": limit, + "offset": offset, + } + + response = self.request("POST", "/concurrency_limits/filter", json=body) + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate_list(response.json()) + + def reset_concurrency_limit_by_tag( + self, + tag: str, + slot_override: list["UUID | str"] | None = None, + ) -> None: + """ + Resets the concurrency limit slots set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + slot_override: a list of task run IDs that are currently using a + concurrency slot, please check that any task run IDs included in + `slot_override` are currently running, otherwise those concurrency + slots will never be released. + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + if slot_override is not None: + slot_override = [str(slot) for slot in slot_override] + + try: + self.request( + "POST", + "/concurrency_limits/tag/{tag}/reset", + path_params={"tag": tag}, + json=dict(slot_override=slot_override), + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def delete_concurrency_limit_by_tag( + self, + tag: str, + ) -> None: + """ + Delete the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + try: + self.request( + "DELETE", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def increment_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + ) -> "Response": + """ + Increment concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to increment limits. + task_run_id (UUID): The task run ID incrementing the limits. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + } + + return self.request( + "POST", + "/concurrency_limits/increment", + json=data, + ) + + def decrement_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + occupancy_seconds: float, + ) -> "Response": + """ + Decrement concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names to decrement. + task_run_id (UUID): The task run ID that incremented the limits. + occupancy_seconds (float): The duration in seconds that the limits + were held. + + Returns: + httpx.Response: The HTTP response from the server. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + "occupancy_seconds": occupancy_seconds, + } + + return self.request( + "POST", + "/concurrency_limits/decrement", + json=data, + ) + + +class ConcurrencyLimitAsyncClient(BaseAsyncClient): + async def create_concurrency_limit( + self, + tag: str, + concurrency_limit: int, + ) -> "UUID": + """ + Create a tag concurrency limit in the Prefect API. These limits govern concurrently + running tasks. + + Args: + tag: a tag the concurrency limit is applied to + concurrency_limit: the maximum number of concurrent task runs for a given tag + + Raises: + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the ID of the concurrency limit in the backend + """ + + concurrency_limit_create = ConcurrencyLimitCreate( + tag=tag, + concurrency_limit=concurrency_limit, + ) + response = await self.request( + "POST", + "/concurrency_limits/", + json=concurrency_limit_create.model_dump(mode="json"), + ) + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from uuid import UUID + + return UUID(concurrency_limit_id) + + async def read_concurrency_limit_by_tag( + self, + tag: str, + ) -> "ConcurrencyLimit": + """ + Read the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the concurrency limit set on a specific tag + """ + try: + response = await self.request( + "GET", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate(response.json()) + + async def read_concurrency_limits( + self, + limit: int, + offset: int, + ) -> list["ConcurrencyLimit"]: + """ + Lists concurrency limits set on task run tags. + + Args: + limit: the maximum number of concurrency limits returned + offset: the concurrency limit query offset + + Returns: + a list of concurrency limits + """ + + body = { + "limit": limit, + "offset": offset, + } + + response = await self.request("POST", "/concurrency_limits/filter", json=body) + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate_list(response.json()) + + async def reset_concurrency_limit_by_tag( + self, + tag: str, + slot_override: list["UUID | str"] | None = None, + ) -> None: + """ + Resets the concurrency limit slots set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + slot_override: a list of task run IDs that are currently using a + concurrency slot, please check that any task run IDs included in + `slot_override` are currently running, otherwise those concurrency + slots will never be released. + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + if slot_override is not None: + slot_override = [str(slot) for slot in slot_override] + + try: + await self.request( + "POST", + "/concurrency_limits/tag/{tag}/reset", + path_params={"tag": tag}, + json=dict(slot_override=slot_override), + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def delete_concurrency_limit_by_tag( + self, + tag: str, + ) -> None: + """ + Delete the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + try: + await self.request( + "DELETE", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def increment_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + ) -> "Response": + """ + Increment concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to increment limits. + task_run_id (UUID): The task run ID incrementing the limits. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + } + + return await self.request( + "POST", + "/concurrency_limits/increment", + json=data, + ) + + async def decrement_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + occupancy_seconds: float, + ) -> "Response": + """ + Decrement concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names to decrement. + task_run_id (UUID): The task run ID that incremented the limits. + occupancy_seconds (float): The duration in seconds that the limits + were held. + + Returns: + httpx.Response: The HTTP response from the server. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + "occupancy_seconds": occupancy_seconds, + } + + return await self.request( + "POST", + "/concurrency_limits/decrement", + json=data, + ) From 4e13b7237ffd22d7a91ef9e7634df80d5cae8823 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 15:37:34 -0600 Subject: [PATCH 02/11] also tackle gcls --- src/prefect/client/orchestration/__init__.py | 181 ---------- .../_concurrency_limits/client.py | 319 +++++++++++++++++- 2 files changed, 310 insertions(+), 190 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index b4c305e76b0f..3935de981e68 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -53,8 +53,6 @@ FlowRunNotificationPolicyCreate, FlowRunNotificationPolicyUpdate, FlowRunUpdate, - GlobalConcurrencyLimitCreate, - GlobalConcurrencyLimitUpdate, LogCreate, TaskRunCreate, TaskRunUpdate, @@ -102,7 +100,6 @@ from prefect.client.schemas.responses import ( DeploymentResponse, FlowRunResponse, - GlobalConcurrencyLimitResponse, WorkerFlowRunResponse, ) from prefect.client.schemas.schedules import SCHEDULE_TYPES @@ -2810,136 +2807,6 @@ async def read_worker_metadata(self) -> dict[str, Any]: response.raise_for_status() return response.json() - async def increment_concurrency_slots( - self, - names: list[str], - slots: int, - mode: str, - create_if_missing: Optional[bool] = None, - ) -> httpx.Response: - return await self._client.post( - "/v2/concurrency_limits/increment", - json={ - "names": names, - "slots": slots, - "mode": mode, - "create_if_missing": create_if_missing if create_if_missing else False, - }, - ) - - async def release_concurrency_slots( - self, names: list[str], slots: int, occupancy_seconds: float - ) -> httpx.Response: - """ - Release concurrency slots for the specified limits. - - Args: - names (List[str]): A list of limit names for which to release slots. - slots (int): The number of concurrency slots to release. - occupancy_seconds (float): The duration in seconds that the slots - were occupied. - - Returns: - httpx.Response: The HTTP response from the server. - """ - - return await self._client.post( - "/v2/concurrency_limits/decrement", - json={ - "names": names, - "slots": slots, - "occupancy_seconds": occupancy_seconds, - }, - ) - - async def create_global_concurrency_limit( - self, concurrency_limit: GlobalConcurrencyLimitCreate - ) -> UUID: - response = await self._client.post( - "/v2/concurrency_limits/", - json=concurrency_limit.model_dump(mode="json", exclude_unset=True), - ) - return UUID(response.json()["id"]) - - async def update_global_concurrency_limit( - self, name: str, concurrency_limit: GlobalConcurrencyLimitUpdate - ) -> httpx.Response: - try: - response = await self._client.patch( - f"/v2/concurrency_limits/{name}", - json=concurrency_limit.model_dump(mode="json", exclude_unset=True), - ) - return response - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def delete_global_concurrency_limit_by_name( - self, name: str - ) -> httpx.Response: - try: - response = await self._client.delete(f"/v2/concurrency_limits/{name}") - return response - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def read_global_concurrency_limit_by_name( - self, name: str - ) -> GlobalConcurrencyLimitResponse: - try: - response = await self._client.get(f"/v2/concurrency_limits/{name}") - return GlobalConcurrencyLimitResponse.model_validate(response.json()) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def upsert_global_concurrency_limit_by_name( - self, name: str, limit: int - ) -> None: - """Creates a global concurrency limit with the given name and limit if one does not already exist. - - If one does already exist matching the name then update it's limit if it is different. - - Note: This is not done atomically. - """ - try: - existing_limit = await self.read_global_concurrency_limit_by_name(name) - except prefect.exceptions.ObjectNotFound: - existing_limit = None - - if not existing_limit: - await self.create_global_concurrency_limit( - GlobalConcurrencyLimitCreate( - name=name, - limit=limit, - ) - ) - elif existing_limit.limit != limit: - await self.update_global_concurrency_limit( - name, GlobalConcurrencyLimitUpdate(limit=limit) - ) - - async def read_global_concurrency_limits( - self, limit: int = 10, offset: int = 0 - ) -> list[GlobalConcurrencyLimitResponse]: - response = await self._client.post( - "/v2/concurrency_limits/filter", - json={ - "limit": limit, - "offset": offset, - }, - ) - return pydantic.TypeAdapter( - list[GlobalConcurrencyLimitResponse] - ).validate_python(response.json()) - async def create_flow_run_input( self, flow_run_id: UUID, key: str, value: str, sender: Optional[str] = None ) -> None: @@ -4017,54 +3884,6 @@ def read_deployment_by_name( return DeploymentResponse.model_validate(response.json()) - def release_concurrency_slots( - self, names: list[str], slots: int, occupancy_seconds: float - ) -> httpx.Response: - """ - Release concurrency slots for the specified limits. - - Args: - names (List[str]): A list of limit names for which to release slots. - slots (int): The number of concurrency slots to release. - occupancy_seconds (float): The duration in seconds that the slots - were occupied. - - Returns: - httpx.Response: The HTTP response from the server. - """ - return self._client.post( - "/v2/concurrency_limits/decrement", - json={ - "names": names, - "slots": slots, - "occupancy_seconds": occupancy_seconds, - }, - ) - - def decrement_v1_concurrency_slots( - self, names: list[str], occupancy_seconds: float, task_run_id: UUID - ) -> httpx.Response: - """ - Release the specified concurrency limits. - - Args: - names (List[str]): A list of limit names to decrement. - occupancy_seconds (float): The duration in seconds that the slots - were held. - task_run_id (UUID): The task run ID that incremented the limits. - - Returns: - httpx.Response: The HTTP response from the server. - """ - return self._client.post( - "/concurrency_limits/decrement", - json={ - "names": names, - "occupancy_seconds": occupancy_seconds, - "task_run_id": str(task_run_id), - }, - ) - def update_flow_run_labels( self, flow_run_id: UUID, labels: KeyValueLabelsField ) -> None: diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 847dcb321075..bddbe691bc44 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -12,8 +12,13 @@ from httpx import Response - from prefect.client.schemas.actions import ConcurrencyLimitCreate + from prefect.client.schemas.actions import ( + ConcurrencyLimitCreate, + GlobalConcurrencyLimitCreate, + GlobalConcurrencyLimitUpdate, + ) from prefect.client.schemas.objects import ConcurrencyLimit + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse class ConcurrencyLimitClient(BaseClient): @@ -66,7 +71,7 @@ def read_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: if the concurrency limit was not created for any reason Returns: @@ -134,7 +139,7 @@ def reset_concurrency_limit_by_tag( slots will never be released. Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -165,7 +170,7 @@ def delete_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -220,7 +225,7 @@ def decrement_v1_concurrency_slots( were held. Returns: - httpx.Response: The HTTP response from the server. + "Response": The HTTP response from the server. """ data: dict[str, Any] = { "names": names, @@ -234,6 +239,153 @@ def decrement_v1_concurrency_slots( json=data, ) + def increment_concurrency_slots( + self, + names: list[str], + slots: int, + mode: str, + create_if_missing: bool | None = None, + ) -> "Response": + return self.request( + "POST", + "/v2/concurrency_limits/increment", + json={ + "names": names, + "slots": slots, + "mode": mode, + "create_if_missing": create_if_missing if create_if_missing else False, + }, + ) + + def release_concurrency_slots( + self, names: list[str], slots: int, occupancy_seconds: float + ) -> "Response": + """ + Release concurrency slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to release slots. + slots (int): The number of concurrency slots to release. + occupancy_seconds (float): The duration in seconds that the slots + were occupied. + + Returns: + "Response": The HTTP response from the server. + """ + + return self.request( + "POST", + "/v2/concurrency_limits/decrement", + json={ + "names": names, + "slots": slots, + "occupancy_seconds": occupancy_seconds, + }, + ) + + def create_global_concurrency_limit( + self, concurrency_limit: "GlobalConcurrencyLimitCreate" + ) -> "UUID": + response = self.request( + "POST", + "/v2/concurrency_limits/", + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + from uuid import UUID + + return UUID(response.json()["id"]) + + def update_global_concurrency_limit( + self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate" + ) -> "Response": + try: + response = self.request( + "PATCH", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": + try: + response = self.request( + "DELETE", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def read_global_concurrency_limit_by_name( + self, name: str + ) -> "GlobalConcurrencyLimitResponse": + try: + response = self.request( + "GET", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate(response.json()) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def upsert_global_concurrency_limit_by_name(self, name: str, limit: int) -> None: + """Creates a global concurrency limit with the given name and limit if one does not already exist. + + If one does already exist matching the name then update it's limit if it is different. + + Note: This is not done atomically. + """ + from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + + try: + existing_limit = self.read_global_concurrency_limit_by_name(name) + except ObjectNotFound: + existing_limit = None + + if not existing_limit: + self.create_global_concurrency_limit( + GlobalConcurrencyLimitCreate( + name=name, + limit=limit, + ) + ) + elif existing_limit.limit != limit: + self.update_global_concurrency_limit( + name, GlobalConcurrencyLimitUpdate(limit=limit) + ) + + def read_global_concurrency_limits( + self, limit: int = 10, offset: int = 0 + ) -> list["GlobalConcurrencyLimitResponse"]: + response = self.request( + "POST", + "/v2/concurrency_limits/filter", + json={ + "limit": limit, + "offset": offset, + }, + ) + + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate_list(response.json()) + class ConcurrencyLimitAsyncClient(BaseAsyncClient): async def create_concurrency_limit( @@ -285,7 +437,7 @@ async def read_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: if the concurrency limit was not created for any reason Returns: @@ -353,7 +505,7 @@ async def reset_concurrency_limit_by_tag( slots will never be released. Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -384,7 +536,7 @@ async def delete_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -439,7 +591,7 @@ async def decrement_v1_concurrency_slots( were held. Returns: - httpx.Response: The HTTP response from the server. + "Response": The HTTP response from the server. """ data: dict[str, Any] = { "names": names, @@ -452,3 +604,152 @@ async def decrement_v1_concurrency_slots( "/concurrency_limits/decrement", json=data, ) + + async def increment_concurrency_slots( + self, + names: list[str], + slots: int, + mode: str, + create_if_missing: bool | None = None, + ) -> "Response": + return await self.request( + "POST", + "/v2/concurrency_limits/increment", + json={ + "names": names, + "slots": slots, + "mode": mode, + "create_if_missing": create_if_missing if create_if_missing else False, + }, + ) + + async def release_concurrency_slots( + self, names: list[str], slots: int, occupancy_seconds: float + ) -> "Response": + """ + Release concurrency slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to release slots. + slots (int): The number of concurrency slots to release. + occupancy_seconds (float): The duration in seconds that the slots + were occupied. + + Returns: + "Response": The HTTP response from the server. + """ + + return await self.request( + "POST", + "/v2/concurrency_limits/decrement", + json={ + "names": names, + "slots": slots, + "occupancy_seconds": occupancy_seconds, + }, + ) + + async def create_global_concurrency_limit( + self, concurrency_limit: "GlobalConcurrencyLimitCreate" + ) -> "UUID": + response = await self.request( + "POST", + "/v2/concurrency_limits/", + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + from uuid import UUID + + return UUID(response.json()["id"]) + + async def update_global_concurrency_limit( + self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate" + ) -> "Response": + try: + response = await self.request( + "PATCH", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": + try: + response = await self.request( + "DELETE", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def read_global_concurrency_limit_by_name( + self, name: str + ) -> "GlobalConcurrencyLimitResponse": + try: + response = await self.request( + "GET", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate(response.json()) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def upsert_global_concurrency_limit_by_name( + self, name: str, limit: int + ) -> None: + """Creates a global concurrency limit with the given name and limit if one does not already exist. + + If one does already exist matching the name then update it's limit if it is different. + + Note: This is not done atomically. + """ + from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + + try: + existing_limit = await self.read_global_concurrency_limit_by_name(name) + except ObjectNotFound: + existing_limit = None + + if not existing_limit: + await self.create_global_concurrency_limit( + GlobalConcurrencyLimitCreate( + name=name, + limit=limit, + ) + ) + elif existing_limit.limit != limit: + await self.update_global_concurrency_limit( + name, GlobalConcurrencyLimitUpdate(limit=limit) + ) + + async def read_global_concurrency_limits( + self, limit: int = 10, offset: int = 0 + ) -> list["GlobalConcurrencyLimitResponse"]: + response = await self.request( + "POST", + "/v2/concurrency_limits/filter", + json={ + "limit": limit, + "offset": offset, + }, + ) + + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate_list(response.json()) From 95225c3f3b3cf2d1479e55529743390a5ef23486 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 15:37:49 -0600 Subject: [PATCH 03/11] lint --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 3935de981e68..f31706910013 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -152,13 +152,15 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": ... +) -> "PrefectClient": + ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": ... +) -> "SyncPrefectClient": + ... def get_client( From 0f510f67a198b0d498c95ab317a875aed92f5bb6 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:10:15 -0600 Subject: [PATCH 04/11] init --- src/prefect/client/orchestration/__init__.py | 329 +------- .../orchestration/_work_queues/__init__.py | 0 .../orchestration/_work_queues/client.py | 736 ++++++++++++++++++ 3 files changed, 743 insertions(+), 322 deletions(-) create mode 100644 src/prefect/client/orchestration/_work_queues/__init__.py create mode 100644 src/prefect/client/orchestration/_work_queues/client.py diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 484944de1185..4f628981cf93 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -12,7 +12,6 @@ import certifi import httpcore import httpx -import pendulum import pydantic from asgi_lifespan import LifespanManager from packaging import version @@ -26,6 +25,11 @@ ArtifactCollectionAsyncClient, ) +from prefect.client.orchestration._work_queues import ( + WorkQueueClient, + WorkQueueAsyncClient, +) + import prefect import prefect.exceptions import prefect.settings @@ -58,8 +62,6 @@ VariableUpdate, WorkPoolCreate, WorkPoolUpdate, - WorkQueueCreate, - WorkQueueUpdate, ) from prefect.client.schemas.filters import ( DeploymentFilter, @@ -71,7 +73,6 @@ WorkerFilter, WorkPoolFilter, WorkQueueFilter, - WorkQueueFilterName, ) from prefect.client.schemas.objects import ( BlockDocument, @@ -93,8 +94,6 @@ Worker, WorkerMetadata, WorkPool, - WorkQueue, - WorkQueueStatusDetail, ) from prefect.client.schemas.responses import ( DeploymentResponse, @@ -152,15 +151,13 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": - ... +) -> "PrefectClient": ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": - ... +) -> "SyncPrefectClient": ... def get_client( @@ -1018,273 +1015,6 @@ async def decrement_v1_concurrency_slots( json=data, ) - async def create_work_queue( - self, - name: str, - description: Optional[str] = None, - is_paused: Optional[bool] = None, - concurrency_limit: Optional[int] = None, - priority: Optional[int] = None, - work_pool_name: Optional[str] = None, - ) -> WorkQueue: - """ - Create a work queue. - - Args: - name: a unique name for the work queue - description: An optional description for the work queue. - is_paused: Whether or not the work queue is paused. - concurrency_limit: An optional concurrency limit for the work queue. - priority: The queue's priority. Lower values are higher priority (1 is the highest). - work_pool_name: The name of the work pool to use for this queue. - - Raises: - prefect.exceptions.ObjectAlreadyExists: If request returns 409 - httpx.RequestError: If request fails - - Returns: - The created work queue - """ - create_model = WorkQueueCreate(name=name, filter=None) - if description is not None: - create_model.description = description - if is_paused is not None: - create_model.is_paused = is_paused - if concurrency_limit is not None: - create_model.concurrency_limit = concurrency_limit - if priority is not None: - create_model.priority = priority - - data = create_model.model_dump(mode="json") - try: - if work_pool_name is not None: - response = await self._client.post( - f"/work_pools/{work_pool_name}/queues", json=data - ) - else: - response = await self._client.post("/work_queues/", json=data) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_409_CONFLICT: - raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e - elif e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return WorkQueue.model_validate(response.json()) - - async def read_work_queue_by_name( - self, - name: str, - work_pool_name: Optional[str] = None, - ) -> WorkQueue: - """ - Read a work queue by name. - - Args: - name (str): a unique name for the work queue - work_pool_name (str, optional): the name of the work pool - the queue belongs to. - - Raises: - prefect.exceptions.ObjectNotFound: if no work queue is found - httpx.HTTPStatusError: other status errors - - Returns: - WorkQueue: a work queue API object - """ - try: - if work_pool_name is not None: - response = await self._client.get( - f"/work_pools/{work_pool_name}/queues/{name}" - ) - else: - response = await self._client.get(f"/work_queues/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - return WorkQueue.model_validate(response.json()) - - async def update_work_queue(self, id: UUID, **kwargs: Any) -> None: - """ - Update properties of a work queue. - - Args: - id: the ID of the work queue to update - **kwargs: the fields to update - - Raises: - ValueError: if no kwargs are provided - prefect.exceptions.ObjectNotFound: if request returns 404 - httpx.RequestError: if the request fails - - """ - if not kwargs: - raise ValueError("No fields provided to update.") - - data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) - try: - await self._client.patch(f"/work_queues/{id}", json=data) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def get_runs_in_work_queue( - self, - id: UUID, - limit: int = 10, - scheduled_before: Optional[datetime.datetime] = None, - ) -> list[FlowRun]: - """ - Read flow runs off a work queue. - - Args: - id: the id of the work queue to read from - limit: a limit on the number of runs to return - scheduled_before: a timestamp; only runs scheduled before this time will be returned. - Defaults to now. - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - List[FlowRun]: a list of FlowRun objects read from the queue - """ - if scheduled_before is None: - scheduled_before = pendulum.now("UTC") - - try: - response = await self._client.post( - f"/work_queues/{id}/get_runs", - json={ - "limit": limit, - "scheduled_before": scheduled_before.isoformat(), - }, - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return pydantic.TypeAdapter(list[FlowRun]).validate_python(response.json()) - - async def read_work_queue( - self, - id: UUID, - ) -> WorkQueue: - """ - Read a work queue. - - Args: - id: the id of the work queue to load - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - WorkQueue: an instantiated WorkQueue object - """ - try: - response = await self._client.get(f"/work_queues/{id}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return WorkQueue.model_validate(response.json()) - - async def read_work_queue_status( - self, - id: UUID, - ) -> WorkQueueStatusDetail: - """ - Read a work queue status. - - Args: - id: the id of the work queue to load - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - WorkQueueStatus: an instantiated WorkQueueStatus object - """ - try: - response = await self._client.get(f"/work_queues/{id}/status") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return WorkQueueStatusDetail.model_validate(response.json()) - - async def match_work_queues( - self, - prefixes: list[str], - work_pool_name: Optional[str] = None, - ) -> list[WorkQueue]: - """ - Query the Prefect API for work queues with names with a specific prefix. - - Args: - prefixes: a list of strings used to match work queue name prefixes - work_pool_name: an optional work pool name to scope the query to - - Returns: - a list of WorkQueue model representations - of the work queues - """ - page_length = 100 - current_page = 0 - work_queues: list[WorkQueue] = [] - - while True: - new_queues = await self.read_work_queues( - work_pool_name=work_pool_name, - offset=current_page * page_length, - limit=page_length, - work_queue_filter=WorkQueueFilter( - name=WorkQueueFilterName(startswith_=prefixes) - ), - ) - if not new_queues: - break - work_queues += new_queues - current_page += 1 - - return work_queues - - async def delete_work_queue_by_id( - self, - id: UUID, - ) -> None: - """ - Delete a work queue by its ID. - - Args: - id: the id of the work queue to delete - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If requests fails - """ - try: - await self._client.delete( - f"/work_queues/{id}", - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - async def create_block_type(self, block_type: BlockTypeCreate) -> BlockType: """ Create a block type in the Prefect API. @@ -2852,51 +2582,6 @@ async def delete_work_pool( else: raise - async def read_work_queues( - self, - work_pool_name: Optional[str] = None, - work_queue_filter: Optional[WorkQueueFilter] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - ) -> list[WorkQueue]: - """ - Retrieves queues for a work pool. - - Args: - work_pool_name: Name of the work pool for which to get queues. - work_queue_filter: Criteria by which to filter queues. - limit: Limit for the queue query. - offset: Limit for the queue query. - - Returns: - List of queues for the specified work pool. - """ - json: dict[str, Any] = { - "work_queues": ( - work_queue_filter.model_dump(mode="json", exclude_unset=True) - if work_queue_filter - else None - ), - "limit": limit, - "offset": offset, - } - - if work_pool_name: - try: - response = await self._client.post( - f"/work_pools/{work_pool_name}/queues/filter", - json=json, - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - else: - response = await self._client.post("/work_queues/filter", json=json) - - return pydantic.TypeAdapter(list[WorkQueue]).validate_python(response.json()) - async def get_scheduled_flow_runs_for_deployments( self, deployment_ids: list[UUID], diff --git a/src/prefect/client/orchestration/_work_queues/__init__.py b/src/prefect/client/orchestration/_work_queues/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_work_queues/client.py b/src/prefect/client/orchestration/_work_queues/client.py new file mode 100644 index 000000000000..f4dcac63a4df --- /dev/null +++ b/src/prefect/client/orchestration/_work_queues/client.py @@ -0,0 +1,736 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from httpx import HTTPStatusError + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound + +if TYPE_CHECKING: + import datetime + from uuid import UUID + + from prefect.client.schemas import FlowRun + from prefect.client.schemas.filters import WorkQueueFilter + from prefect.client.schemas.objects import WorkQueue, WorkQueueStatusDetail + + +class WorkQueueClient(BaseClient): + def create_work_queue( + self, + name: str, + description: str | None = None, + is_paused: bool | None = None, + concurrency_limit: int | None = None, + priority: int | None = None, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Create a work queue. + + Args: + name: a unique name for the work queue + description: An optional description for the work queue. + is_paused: Whether or not the work queue is paused. + concurrency_limit: An optional concurrency limit for the work queue. + priority: The queue's priority. Lower values are higher priority (1 is the highest). + work_pool_name: The name of the work pool to use for this queue. + + Raises: + ObjectAlreadyExists: If request returns 409 + httpx.RequestError: If request fails + + Returns: + The created work queue + """ + from prefect.client.schemas.actions import WorkQueueCreate + from prefect.client.schemas.objects import WorkQueue + + create_model = WorkQueueCreate(name=name, filter=None) + if description is not None: + create_model.description = description + if is_paused is not None: + create_model.is_paused = is_paused + if concurrency_limit is not None: + create_model.concurrency_limit = concurrency_limit + if priority is not None: + create_model.priority = priority + + data = create_model.model_dump(mode="json") + try: + if work_pool_name is not None: + response = self.request( + "POST", + "/work_pools/{work_pool_name}/queues", + json=data, + path_params={"work_pool_name": work_pool_name}, + ) + else: + response = self.request("POST", "/work_queues/", json=data) + except HTTPStatusError as e: + if e.response.status_code == 409: + raise ObjectAlreadyExists(http_exc=e) from e + elif e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + def read_work_queue_by_name( + self, + name: str, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Read a work queue by name. + + Args: + name (str): a unique name for the work queue + work_pool_name (str, optional): the name of the work pool + the queue belongs to. + + Raises: + ObjectNotFound: if no work queue is found + HTTPStatusError: other status errors + + Returns: + WorkQueue: a work queue API object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + if work_pool_name is not None: + response = self.request( + "GET", + "/work_pools/{work_pool_name}/queues/{name}", + path_params={"work_pool_name": work_pool_name, "name": name}, + ) + else: + response = self.request( + "GET", + "/work_queues/name/{name}", + path_params={"name": name}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + def update_work_queue(self, id: "UUID", **kwargs: Any) -> None: + """ + Update properties of a work queue. + + Args: + id: the ID of the work queue to update + **kwargs: the fields to update + + Raises: + ValueError: if no kwargs are provided + ObjectNotFound: if request returns 404 + httpx.RequestError: if the request fails + + """ + if not kwargs: + raise ValueError("No fields provided to update.") + from prefect.client.schemas.actions import WorkQueueUpdate + + data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) + try: + self.request( + "PATCH", + "/work_queues/{id}", + json=data, + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def get_runs_in_work_queue( + self, + id: "UUID", + limit: int = 10, + scheduled_before: "datetime.datetime | None" = None, + ) -> list["FlowRun"]: + """ + Read flow runs off a work queue. + + Args: + id: the id of the work queue to read from + limit: a limit on the number of runs to return + scheduled_before: a timestamp; only runs scheduled before this time will be returned. + Defaults to now. + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + List[FlowRun]: a list of FlowRun objects read from the queue + """ + if scheduled_before is None: + import pendulum + + scheduled_before = pendulum.now("UTC") + + try: + response = self.request( + "POST", + "/work_queues/{id}/get_runs", + path_params={"id": id}, + json={ + "limit": limit, + "scheduled_before": scheduled_before.isoformat(), + }, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + from prefect.client.schemas.objects import FlowRun + + return FlowRun.model_validate_list(response.json()) + + def read_work_queue( + self, + id: "UUID", + ) -> "WorkQueue": + """ + Read a work queue. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueue: an instantiated WorkQueue object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + response = self.request( + "GET", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueue.model_validate(response.json()) + + def read_work_queue_status( + self, + id: UUID, + ) -> WorkQueueStatusDetail: + """ + Read a work queue status. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueueStatus: an instantiated WorkQueueStatus object + """ + from prefect.client.schemas.objects import WorkQueueStatusDetail + + try: + response = self.request( + "GET", + "/work_queues/{id}/status", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueueStatusDetail.model_validate(response.json()) + + def delete_work_queue_by_id( + self, + id: "UUID", + ) -> None: + """ + Delete a work queue by its ID. + + Args: + id: the id of the work queue to delete + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If requests fails + """ + try: + self.request( + "DELETE", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def match_work_queues( + self, + prefixes: list[str], + work_pool_name: str | None = None, + ) -> list["WorkQueue"]: + """ + Query the Prefect API for work queues with names with a specific prefix. + + Args: + prefixes: a list of strings used to match work queue name prefixes + work_pool_name: an optional work pool name to scope the query to + + Returns: + a list of WorkQueue model representations + of the work queues + """ + from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName + + page_length = 100 + current_page = 0 + work_queues: list[WorkQueue] = [] + + while True: + new_queues = self.read_work_queues( + work_pool_name=work_pool_name, + offset=current_page * page_length, + limit=page_length, + work_queue_filter=WorkQueueFilter( + name=WorkQueueFilterName(startswith_=prefixes) + ), + ) + if not new_queues: + break + work_queues += new_queues + current_page += 1 + + return work_queues + + def read_work_queues( + self, + work_pool_name: str | None = None, + work_queue_filter: "WorkQueueFilter | None" = None, + limit: int | None = None, + offset: int | None = None, + ) -> list["WorkQueue"]: + """ + Retrieves queues for a work pool. + + Args: + work_pool_name: Name of the work pool for which to get queues. + work_queue_filter: Criteria by which to filter queues. + limit: Limit for the queue query. + offset: Limit for the queue query. + + Returns: + List of queues for the specified work pool. + """ + json: dict[str, Any] = { + "work_queues": ( + work_queue_filter.model_dump(mode="json", exclude_unset=True) + if work_queue_filter + else None + ), + "limit": limit, + "offset": offset, + } + + if work_pool_name: + try: + response = self.request( + "POST", + "/work_pools/{work_pool_name}/queues/filter", + path_params={"work_pool_name": work_pool_name}, + json=json, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + else: + response = self.request("POST", "/work_queues/filter", json=json) + from prefect.client.schemas.objects import WorkQueue + + return WorkQueue.model_validate_list(response.json()) + + +class WorkQueueAsyncClient(BaseAsyncClient): + async def create_work_queue( + self, + name: str, + description: str | None = None, + is_paused: bool | None = None, + concurrency_limit: int | None = None, + priority: int | None = None, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Create a work queue. + + Args: + name: a unique name for the work queue + description: An optional description for the work queue. + is_paused: Whether or not the work queue is paused. + concurrency_limit: An optional concurrency limit for the work queue. + priority: The queue's priority. Lower values are higher priority (1 is the highest). + work_pool_name: The name of the work pool to use for this queue. + + Raises: + ObjectAlreadyExists: If request returns 409 + httpx.RequestError: If request fails + + Returns: + The created work queue + """ + from prefect.client.schemas.actions import WorkQueueCreate + from prefect.client.schemas.objects import WorkQueue + + create_model = WorkQueueCreate(name=name, filter=None) + if description is not None: + create_model.description = description + if is_paused is not None: + create_model.is_paused = is_paused + if concurrency_limit is not None: + create_model.concurrency_limit = concurrency_limit + if priority is not None: + create_model.priority = priority + + data = create_model.model_dump(mode="json") + try: + if work_pool_name is not None: + response = await self.request( + "POST", + "/work_pools/{work_pool_name}/queues", + json=data, + path_params={"work_pool_name": work_pool_name}, + ) + else: + response = await self.request("POST", "/work_queues/", json=data) + except HTTPStatusError as e: + if e.response.status_code == 409: + raise ObjectAlreadyExists(http_exc=e) from e + elif e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + async def read_work_queue_by_name( + self, + name: str, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Read a work queue by name. + + Args: + name (str): a unique name for the work queue + work_pool_name (str, optional): the name of the work pool + the queue belongs to. + + Raises: + ObjectNotFound: if no work queue is found + HTTPStatusError: other status errors + + Returns: + WorkQueue: a work queue API object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + if work_pool_name is not None: + response = await self.request( + "GET", + "/work_pools/{work_pool_name}/queues/{name}", + path_params={"work_pool_name": work_pool_name, "name": name}, + ) + else: + response = await self.request( + "GET", + "/work_queues/name/{name}", + path_params={"name": name}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + async def update_work_queue(self, id: "UUID", **kwargs: Any) -> None: + """ + Update properties of a work queue. + + Args: + id: the ID of the work queue to update + **kwargs: the fields to update + + Raises: + ValueError: if no kwargs are provided + ObjectNotFound: if request returns 404 + httpx.RequestError: if the request fails + + """ + if not kwargs: + raise ValueError("No fields provided to update.") + from prefect.client.schemas.actions import WorkQueueUpdate + + data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) + try: + await self.request( + "PATCH", + "/work_queues/{id}", + json=data, + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def get_runs_in_work_queue( + self, + id: "UUID", + limit: int = 10, + scheduled_before: "datetime.datetime | None" = None, + ) -> list["FlowRun"]: + """ + Read flow runs off a work queue. + + Args: + id: the id of the work queue to read from + limit: a limit on the number of runs to return + scheduled_before: a timestamp; only runs scheduled before this time will be returned. + Defaults to now. + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + List[FlowRun]: a list of FlowRun objects read from the queue + """ + if scheduled_before is None: + import pendulum + + scheduled_before = pendulum.now("UTC") + + try: + response = await self.request( + "POST", + "/work_queues/{id}/get_runs", + path_params={"id": id}, + json={ + "limit": limit, + "scheduled_before": scheduled_before.isoformat(), + }, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + from prefect.client.schemas.objects import FlowRun + + return FlowRun.model_validate_list(response.json()) + + async def read_work_queue( + self, + id: "UUID", + ) -> "WorkQueue": + """ + Read a work queue. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueue: an instantiated WorkQueue object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + response = await self.request( + "GET", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueue.model_validate(response.json()) + + async def read_work_queue_status( + self, + id: UUID, + ) -> WorkQueueStatusDetail: + """ + Read a work queue status. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueueStatus: an instantiated WorkQueueStatus object + """ + from prefect.client.schemas.objects import WorkQueueStatusDetail + + try: + response = await self.request( + "GET", + "/work_queues/{id}/status", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueueStatusDetail.model_validate(response.json()) + + async def delete_work_queue_by_id( + self, + id: "UUID", + ) -> None: + """ + Delete a work queue by its ID. + + Args: + id: the id of the work queue to delete + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If requests fails + """ + try: + await self.request( + "DELETE", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def match_work_queues( + self, + prefixes: list[str], + work_pool_name: str | None = None, + ) -> list["WorkQueue"]: + """ + Query the Prefect API for work queues with names with a specific prefix. + + Args: + prefixes: a list of strings used to match work queue name prefixes + work_pool_name: an optional work pool name to scope the query to + + Returns: + a list of WorkQueue model representations + of the work queues + """ + from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName + + page_length = 100 + current_page = 0 + work_queues: list[WorkQueue] = [] + + while True: + new_queues = await self.read_work_queues( + work_pool_name=work_pool_name, + offset=current_page * page_length, + limit=page_length, + work_queue_filter=WorkQueueFilter( + name=WorkQueueFilterName(startswith_=prefixes) + ), + ) + if not new_queues: + break + work_queues += new_queues + current_page += 1 + + return work_queues + + async def read_work_queues( + self, + work_pool_name: str | None = None, + work_queue_filter: "WorkQueueFilter | None" = None, + limit: int | None = None, + offset: int | None = None, + ) -> list["WorkQueue"]: + """ + Retrieves queues for a work pool. + + Args: + work_pool_name: Name of the work pool for which to get queues. + work_queue_filter: Criteria by which to filter queues. + limit: Limit for the queue query. + offset: Limit for the queue query. + + Returns: + List of queues for the specified work pool. + """ + json: dict[str, Any] = { + "work_queues": ( + work_queue_filter.model_dump(mode="json", exclude_unset=True) + if work_queue_filter + else None + ), + "limit": limit, + "offset": offset, + } + + if work_pool_name: + try: + response = await self.request( + "POST", + "/work_pools/{work_pool_name}/queues/filter", + path_params={"work_pool_name": work_pool_name}, + json=json, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + else: + response = await self.request("POST", "/work_queues/filter", json=json) + from prefect.client.schemas.objects import WorkQueue + + return WorkQueue.model_validate_list(response.json()) From 2a082012ff3350330b4ac4dd5386acc5042015f2 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:23:05 -0600 Subject: [PATCH 05/11] Update client.py --- .../client/orchestration/_concurrency_limits/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index bddbe691bc44..21d494d8dd9d 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -41,6 +41,7 @@ def create_concurrency_limit( Returns: the ID of the concurrency limit in the backend """ + from prefect.client.schemas.actions import ConcurrencyLimitCreate concurrency_limit_create = ConcurrencyLimitCreate( tag=tag, @@ -351,7 +352,10 @@ def upsert_global_concurrency_limit_by_name(self, name: str, limit: int) -> None Note: This is not done atomically. """ - from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + from prefect.client.schemas.actions import ( + GlobalConcurrencyLimitCreate, + GlobalConcurrencyLimitUpdate, + ) try: existing_limit = self.read_global_concurrency_limit_by_name(name) From d3e27ad8de8594089b80eedc6cad8a56304dbd18 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:25:09 -0600 Subject: [PATCH 06/11] Update __init__.py --- src/prefect/client/orchestration/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 4f628981cf93..fef49ae12c81 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -25,7 +25,7 @@ ArtifactCollectionAsyncClient, ) -from prefect.client.orchestration._work_queues import ( +from prefect.client.orchestration._work_queues.client import ( WorkQueueClient, WorkQueueAsyncClient, ) @@ -241,7 +241,9 @@ def get_client( ) -class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient): +class PrefectClient( + ArtifactAsyncClient, ArtifactCollectionAsyncClient, WorkQueueAsyncClient +): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -3121,7 +3123,7 @@ def __exit__(self, *_: object) -> NoReturn: assert False, "This should never be called but must be defined for __enter__" -class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient): +class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient, WorkQueueClient): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). From 09814839e5f9360ed587ae4b857ce999c8ea00b0 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:29:36 -0600 Subject: [PATCH 07/11] ugh lint --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index fef49ae12c81..67008e330b90 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -151,13 +151,15 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": ... +) -> "PrefectClient": + ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": ... +) -> "SyncPrefectClient": + ... def get_client( From 9d74d9c5f91e9db85bf06597c058fce8d167fa9d Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:38:23 -0600 Subject: [PATCH 08/11] Update client.py --- .../client/orchestration/_concurrency_limits/client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 21d494d8dd9d..46093387f8c8 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -13,7 +13,6 @@ from httpx import Response from prefect.client.schemas.actions import ( - ConcurrencyLimitCreate, GlobalConcurrencyLimitCreate, GlobalConcurrencyLimitUpdate, ) @@ -411,6 +410,7 @@ async def create_concurrency_limit( Returns: the ID of the concurrency limit in the backend """ + from prefect.client.schemas.actions import ConcurrencyLimitCreate concurrency_limit_create = ConcurrencyLimitCreate( tag=tag, @@ -723,7 +723,10 @@ async def upsert_global_concurrency_limit_by_name( Note: This is not done atomically. """ - from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + from prefect.client.schemas.actions import ( + GlobalConcurrencyLimitCreate, + GlobalConcurrencyLimitUpdate, + ) try: existing_limit = await self.read_global_concurrency_limit_by_name(name) From 7d416c3791e1cf9c79601b225adb57ffe566261f Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 17:20:39 -0600 Subject: [PATCH 09/11] lint --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 955245ea424c..9103d12d5b5d 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -29,6 +29,8 @@ from prefect.client.orchestration._concurrency_limits.client import ( ConcurrencyLimitAsyncClient, ConcurrencyLimitClient, +) + from prefect.client.orchestration._logs.client import ( LogClient, LogAsyncClient, @@ -252,7 +254,7 @@ class PrefectClient( ArtifactCollectionAsyncClient, LogAsyncClient, VariableAsyncClient, - ConcurrencyLimitAsyncClient + ConcurrencyLimitAsyncClient, ): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -3023,7 +3025,7 @@ class SyncPrefectClient( ArtifactCollectionClient, LogClient, VariableClient, - ConcurrencyLimitClient + ConcurrencyLimitClient, ): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). From d4efe4043cce0bce4ea269d8e630cf74e6b342c8 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 17:22:59 -0600 Subject: [PATCH 10/11] Update __init__.py --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 5e812bd4a517..7d4907f1c695 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -153,13 +153,15 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": ... +) -> "PrefectClient": + ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": ... +) -> "SyncPrefectClient": + ... def get_client( From 05497e8aa0129772094ad1b6ecfe1025db4b99a4 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Thu, 2 Jan 2025 11:30:17 -0600 Subject: [PATCH 11/11] jfc lint --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 5e812bd4a517..7d4907f1c695 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -153,13 +153,15 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": ... +) -> "PrefectClient": + ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": ... +) -> "SyncPrefectClient": + ... def get_client(