diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 9103d12d5b5d..7d4907f1c695 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -12,7 +12,6 @@ import certifi import httpcore import httpx -import pendulum import pydantic from asgi_lifespan import LifespanManager from packaging import version @@ -26,6 +25,11 @@ ArtifactCollectionAsyncClient, ) +from prefect.client.orchestration._work_queues.client import ( + WorkQueueClient, + WorkQueueAsyncClient, +) + from prefect.client.orchestration._concurrency_limits.client import ( ConcurrencyLimitAsyncClient, ConcurrencyLimitClient, @@ -62,15 +66,10 @@ FlowRunNotificationPolicyCreate, FlowRunNotificationPolicyUpdate, FlowRunUpdate, - LogCreate, - GlobalConcurrencyLimitCreate, - GlobalConcurrencyLimitUpdate, TaskRunCreate, TaskRunUpdate, WorkPoolCreate, WorkPoolUpdate, - WorkQueueCreate, - WorkQueueUpdate, ) from prefect.client.schemas.filters import ( DeploymentFilter, @@ -81,7 +80,6 @@ WorkerFilter, WorkPoolFilter, WorkQueueFilter, - WorkQueueFilterName, ) from prefect.client.schemas.objects import ( BlockDocument, @@ -100,8 +98,6 @@ Worker, WorkerMetadata, WorkPool, - WorkQueue, - WorkQueueStatusDetail, ) from prefect.client.schemas.responses import ( DeploymentResponse, @@ -255,6 +251,7 @@ class PrefectClient( LogAsyncClient, VariableAsyncClient, ConcurrencyLimitAsyncClient, + WorkQueueAsyncClient, ): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -822,273 +819,6 @@ async def delete_flow_run( else: raise - async def create_work_queue( - self, - name: str, - description: Optional[str] = None, - is_paused: Optional[bool] = None, - concurrency_limit: Optional[int] = None, - priority: Optional[int] = None, - work_pool_name: Optional[str] = None, - ) -> WorkQueue: - """ - Create a work queue. - - Args: - name: a unique name for the work queue - description: An optional description for the work queue. - is_paused: Whether or not the work queue is paused. - concurrency_limit: An optional concurrency limit for the work queue. - priority: The queue's priority. Lower values are higher priority (1 is the highest). - work_pool_name: The name of the work pool to use for this queue. - - Raises: - prefect.exceptions.ObjectAlreadyExists: If request returns 409 - httpx.RequestError: If request fails - - Returns: - The created work queue - """ - create_model = WorkQueueCreate(name=name, filter=None) - if description is not None: - create_model.description = description - if is_paused is not None: - create_model.is_paused = is_paused - if concurrency_limit is not None: - create_model.concurrency_limit = concurrency_limit - if priority is not None: - create_model.priority = priority - - data = create_model.model_dump(mode="json") - try: - if work_pool_name is not None: - response = await self._client.post( - f"/work_pools/{work_pool_name}/queues", json=data - ) - else: - response = await self._client.post("/work_queues/", json=data) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_409_CONFLICT: - raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e - elif e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return WorkQueue.model_validate(response.json()) - - async def read_work_queue_by_name( - self, - name: str, - work_pool_name: Optional[str] = None, - ) -> WorkQueue: - """ - Read a work queue by name. - - Args: - name (str): a unique name for the work queue - work_pool_name (str, optional): the name of the work pool - the queue belongs to. - - Raises: - prefect.exceptions.ObjectNotFound: if no work queue is found - httpx.HTTPStatusError: other status errors - - Returns: - WorkQueue: a work queue API object - """ - try: - if work_pool_name is not None: - response = await self._client.get( - f"/work_pools/{work_pool_name}/queues/{name}" - ) - else: - response = await self._client.get(f"/work_queues/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - return WorkQueue.model_validate(response.json()) - - async def update_work_queue(self, id: UUID, **kwargs: Any) -> None: - """ - Update properties of a work queue. - - Args: - id: the ID of the work queue to update - **kwargs: the fields to update - - Raises: - ValueError: if no kwargs are provided - prefect.exceptions.ObjectNotFound: if request returns 404 - httpx.RequestError: if the request fails - - """ - if not kwargs: - raise ValueError("No fields provided to update.") - - data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) - try: - await self._client.patch(f"/work_queues/{id}", json=data) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def get_runs_in_work_queue( - self, - id: UUID, - limit: int = 10, - scheduled_before: Optional[datetime.datetime] = None, - ) -> list[FlowRun]: - """ - Read flow runs off a work queue. - - Args: - id: the id of the work queue to read from - limit: a limit on the number of runs to return - scheduled_before: a timestamp; only runs scheduled before this time will be returned. - Defaults to now. - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - List[FlowRun]: a list of FlowRun objects read from the queue - """ - if scheduled_before is None: - scheduled_before = pendulum.now("UTC") - - try: - response = await self._client.post( - f"/work_queues/{id}/get_runs", - json={ - "limit": limit, - "scheduled_before": scheduled_before.isoformat(), - }, - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return pydantic.TypeAdapter(list[FlowRun]).validate_python(response.json()) - - async def read_work_queue( - self, - id: UUID, - ) -> WorkQueue: - """ - Read a work queue. - - Args: - id: the id of the work queue to load - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - WorkQueue: an instantiated WorkQueue object - """ - try: - response = await self._client.get(f"/work_queues/{id}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return WorkQueue.model_validate(response.json()) - - async def read_work_queue_status( - self, - id: UUID, - ) -> WorkQueueStatusDetail: - """ - Read a work queue status. - - Args: - id: the id of the work queue to load - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - WorkQueueStatus: an instantiated WorkQueueStatus object - """ - try: - response = await self._client.get(f"/work_queues/{id}/status") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return WorkQueueStatusDetail.model_validate(response.json()) - - async def match_work_queues( - self, - prefixes: list[str], - work_pool_name: Optional[str] = None, - ) -> list[WorkQueue]: - """ - Query the Prefect API for work queues with names with a specific prefix. - - Args: - prefixes: a list of strings used to match work queue name prefixes - work_pool_name: an optional work pool name to scope the query to - - Returns: - a list of WorkQueue model representations - of the work queues - """ - page_length = 100 - current_page = 0 - work_queues: list[WorkQueue] = [] - - while True: - new_queues = await self.read_work_queues( - work_pool_name=work_pool_name, - offset=current_page * page_length, - limit=page_length, - work_queue_filter=WorkQueueFilter( - name=WorkQueueFilterName(startswith_=prefixes) - ), - ) - if not new_queues: - break - work_queues += new_queues - current_page += 1 - - return work_queues - - async def delete_work_queue_by_id( - self, - id: UUID, - ) -> None: - """ - Delete a work queue by its ID. - - Args: - id: the id of the work queue to delete - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If requests fails - """ - try: - await self._client.delete( - f"/work_queues/{id}", - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - async def create_block_type(self, block_type: BlockTypeCreate) -> BlockType: """ Create a block type in the Prefect API. @@ -2621,51 +2351,6 @@ async def delete_work_pool( else: raise - async def read_work_queues( - self, - work_pool_name: Optional[str] = None, - work_queue_filter: Optional[WorkQueueFilter] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - ) -> list[WorkQueue]: - """ - Retrieves queues for a work pool. - - Args: - work_pool_name: Name of the work pool for which to get queues. - work_queue_filter: Criteria by which to filter queues. - limit: Limit for the queue query. - offset: Limit for the queue query. - - Returns: - List of queues for the specified work pool. - """ - json: dict[str, Any] = { - "work_queues": ( - work_queue_filter.model_dump(mode="json", exclude_unset=True) - if work_queue_filter - else None - ), - "limit": limit, - "offset": offset, - } - - if work_pool_name: - try: - response = await self._client.post( - f"/work_pools/{work_pool_name}/queues/filter", - json=json, - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - else: - response = await self._client.post("/work_queues/filter", json=json) - - return pydantic.TypeAdapter(list[WorkQueue]).validate_python(response.json()) - async def get_scheduled_flow_runs_for_deployments( self, deployment_ids: list[UUID], @@ -3026,6 +2711,7 @@ class SyncPrefectClient( LogClient, VariableClient, ConcurrencyLimitClient, + WorkQueueClient, ): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 5872fceae766..26f50809472f 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -562,11 +562,16 @@ async def increment_v1_concurrency_slots( task_run_id: "UUID", ) -> "Response": """ - Increment concurrency limit slots for the specified limits. - - Args: - names: A list of limit names for which to increment limits. - task_run_id: The task run ID incrementing the limits. + Increment concurrency limit slots for the specified limits. + + Args: + <<<<<<< HEAD + names (List[str]): A list of limit names for which to increment limits. + task_run_id (UUID): The task run ID incrementing the limits. + ======= + names: A list of limit names for which to increment limits. + task_run_id: The task run ID incrementing the limits. + >>>>>>> main """ data: dict[str, Any] = { "names": names, diff --git a/src/prefect/client/orchestration/_work_queues/__init__.py b/src/prefect/client/orchestration/_work_queues/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_work_queues/client.py b/src/prefect/client/orchestration/_work_queues/client.py new file mode 100644 index 000000000000..f4dcac63a4df --- /dev/null +++ b/src/prefect/client/orchestration/_work_queues/client.py @@ -0,0 +1,736 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from httpx import HTTPStatusError + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound + +if TYPE_CHECKING: + import datetime + from uuid import UUID + + from prefect.client.schemas import FlowRun + from prefect.client.schemas.filters import WorkQueueFilter + from prefect.client.schemas.objects import WorkQueue, WorkQueueStatusDetail + + +class WorkQueueClient(BaseClient): + def create_work_queue( + self, + name: str, + description: str | None = None, + is_paused: bool | None = None, + concurrency_limit: int | None = None, + priority: int | None = None, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Create a work queue. + + Args: + name: a unique name for the work queue + description: An optional description for the work queue. + is_paused: Whether or not the work queue is paused. + concurrency_limit: An optional concurrency limit for the work queue. + priority: The queue's priority. Lower values are higher priority (1 is the highest). + work_pool_name: The name of the work pool to use for this queue. + + Raises: + ObjectAlreadyExists: If request returns 409 + httpx.RequestError: If request fails + + Returns: + The created work queue + """ + from prefect.client.schemas.actions import WorkQueueCreate + from prefect.client.schemas.objects import WorkQueue + + create_model = WorkQueueCreate(name=name, filter=None) + if description is not None: + create_model.description = description + if is_paused is not None: + create_model.is_paused = is_paused + if concurrency_limit is not None: + create_model.concurrency_limit = concurrency_limit + if priority is not None: + create_model.priority = priority + + data = create_model.model_dump(mode="json") + try: + if work_pool_name is not None: + response = self.request( + "POST", + "/work_pools/{work_pool_name}/queues", + json=data, + path_params={"work_pool_name": work_pool_name}, + ) + else: + response = self.request("POST", "/work_queues/", json=data) + except HTTPStatusError as e: + if e.response.status_code == 409: + raise ObjectAlreadyExists(http_exc=e) from e + elif e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + def read_work_queue_by_name( + self, + name: str, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Read a work queue by name. + + Args: + name (str): a unique name for the work queue + work_pool_name (str, optional): the name of the work pool + the queue belongs to. + + Raises: + ObjectNotFound: if no work queue is found + HTTPStatusError: other status errors + + Returns: + WorkQueue: a work queue API object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + if work_pool_name is not None: + response = self.request( + "GET", + "/work_pools/{work_pool_name}/queues/{name}", + path_params={"work_pool_name": work_pool_name, "name": name}, + ) + else: + response = self.request( + "GET", + "/work_queues/name/{name}", + path_params={"name": name}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + def update_work_queue(self, id: "UUID", **kwargs: Any) -> None: + """ + Update properties of a work queue. + + Args: + id: the ID of the work queue to update + **kwargs: the fields to update + + Raises: + ValueError: if no kwargs are provided + ObjectNotFound: if request returns 404 + httpx.RequestError: if the request fails + + """ + if not kwargs: + raise ValueError("No fields provided to update.") + from prefect.client.schemas.actions import WorkQueueUpdate + + data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) + try: + self.request( + "PATCH", + "/work_queues/{id}", + json=data, + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def get_runs_in_work_queue( + self, + id: "UUID", + limit: int = 10, + scheduled_before: "datetime.datetime | None" = None, + ) -> list["FlowRun"]: + """ + Read flow runs off a work queue. + + Args: + id: the id of the work queue to read from + limit: a limit on the number of runs to return + scheduled_before: a timestamp; only runs scheduled before this time will be returned. + Defaults to now. + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + List[FlowRun]: a list of FlowRun objects read from the queue + """ + if scheduled_before is None: + import pendulum + + scheduled_before = pendulum.now("UTC") + + try: + response = self.request( + "POST", + "/work_queues/{id}/get_runs", + path_params={"id": id}, + json={ + "limit": limit, + "scheduled_before": scheduled_before.isoformat(), + }, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + from prefect.client.schemas.objects import FlowRun + + return FlowRun.model_validate_list(response.json()) + + def read_work_queue( + self, + id: "UUID", + ) -> "WorkQueue": + """ + Read a work queue. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueue: an instantiated WorkQueue object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + response = self.request( + "GET", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueue.model_validate(response.json()) + + def read_work_queue_status( + self, + id: UUID, + ) -> WorkQueueStatusDetail: + """ + Read a work queue status. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueueStatus: an instantiated WorkQueueStatus object + """ + from prefect.client.schemas.objects import WorkQueueStatusDetail + + try: + response = self.request( + "GET", + "/work_queues/{id}/status", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueueStatusDetail.model_validate(response.json()) + + def delete_work_queue_by_id( + self, + id: "UUID", + ) -> None: + """ + Delete a work queue by its ID. + + Args: + id: the id of the work queue to delete + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If requests fails + """ + try: + self.request( + "DELETE", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def match_work_queues( + self, + prefixes: list[str], + work_pool_name: str | None = None, + ) -> list["WorkQueue"]: + """ + Query the Prefect API for work queues with names with a specific prefix. + + Args: + prefixes: a list of strings used to match work queue name prefixes + work_pool_name: an optional work pool name to scope the query to + + Returns: + a list of WorkQueue model representations + of the work queues + """ + from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName + + page_length = 100 + current_page = 0 + work_queues: list[WorkQueue] = [] + + while True: + new_queues = self.read_work_queues( + work_pool_name=work_pool_name, + offset=current_page * page_length, + limit=page_length, + work_queue_filter=WorkQueueFilter( + name=WorkQueueFilterName(startswith_=prefixes) + ), + ) + if not new_queues: + break + work_queues += new_queues + current_page += 1 + + return work_queues + + def read_work_queues( + self, + work_pool_name: str | None = None, + work_queue_filter: "WorkQueueFilter | None" = None, + limit: int | None = None, + offset: int | None = None, + ) -> list["WorkQueue"]: + """ + Retrieves queues for a work pool. + + Args: + work_pool_name: Name of the work pool for which to get queues. + work_queue_filter: Criteria by which to filter queues. + limit: Limit for the queue query. + offset: Limit for the queue query. + + Returns: + List of queues for the specified work pool. + """ + json: dict[str, Any] = { + "work_queues": ( + work_queue_filter.model_dump(mode="json", exclude_unset=True) + if work_queue_filter + else None + ), + "limit": limit, + "offset": offset, + } + + if work_pool_name: + try: + response = self.request( + "POST", + "/work_pools/{work_pool_name}/queues/filter", + path_params={"work_pool_name": work_pool_name}, + json=json, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + else: + response = self.request("POST", "/work_queues/filter", json=json) + from prefect.client.schemas.objects import WorkQueue + + return WorkQueue.model_validate_list(response.json()) + + +class WorkQueueAsyncClient(BaseAsyncClient): + async def create_work_queue( + self, + name: str, + description: str | None = None, + is_paused: bool | None = None, + concurrency_limit: int | None = None, + priority: int | None = None, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Create a work queue. + + Args: + name: a unique name for the work queue + description: An optional description for the work queue. + is_paused: Whether or not the work queue is paused. + concurrency_limit: An optional concurrency limit for the work queue. + priority: The queue's priority. Lower values are higher priority (1 is the highest). + work_pool_name: The name of the work pool to use for this queue. + + Raises: + ObjectAlreadyExists: If request returns 409 + httpx.RequestError: If request fails + + Returns: + The created work queue + """ + from prefect.client.schemas.actions import WorkQueueCreate + from prefect.client.schemas.objects import WorkQueue + + create_model = WorkQueueCreate(name=name, filter=None) + if description is not None: + create_model.description = description + if is_paused is not None: + create_model.is_paused = is_paused + if concurrency_limit is not None: + create_model.concurrency_limit = concurrency_limit + if priority is not None: + create_model.priority = priority + + data = create_model.model_dump(mode="json") + try: + if work_pool_name is not None: + response = await self.request( + "POST", + "/work_pools/{work_pool_name}/queues", + json=data, + path_params={"work_pool_name": work_pool_name}, + ) + else: + response = await self.request("POST", "/work_queues/", json=data) + except HTTPStatusError as e: + if e.response.status_code == 409: + raise ObjectAlreadyExists(http_exc=e) from e + elif e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + async def read_work_queue_by_name( + self, + name: str, + work_pool_name: str | None = None, + ) -> WorkQueue: + """ + Read a work queue by name. + + Args: + name (str): a unique name for the work queue + work_pool_name (str, optional): the name of the work pool + the queue belongs to. + + Raises: + ObjectNotFound: if no work queue is found + HTTPStatusError: other status errors + + Returns: + WorkQueue: a work queue API object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + if work_pool_name is not None: + response = await self.request( + "GET", + "/work_pools/{work_pool_name}/queues/{name}", + path_params={"work_pool_name": work_pool_name, "name": name}, + ) + else: + response = await self.request( + "GET", + "/work_queues/name/{name}", + path_params={"name": name}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + return WorkQueue.model_validate(response.json()) + + async def update_work_queue(self, id: "UUID", **kwargs: Any) -> None: + """ + Update properties of a work queue. + + Args: + id: the ID of the work queue to update + **kwargs: the fields to update + + Raises: + ValueError: if no kwargs are provided + ObjectNotFound: if request returns 404 + httpx.RequestError: if the request fails + + """ + if not kwargs: + raise ValueError("No fields provided to update.") + from prefect.client.schemas.actions import WorkQueueUpdate + + data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) + try: + await self.request( + "PATCH", + "/work_queues/{id}", + json=data, + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def get_runs_in_work_queue( + self, + id: "UUID", + limit: int = 10, + scheduled_before: "datetime.datetime | None" = None, + ) -> list["FlowRun"]: + """ + Read flow runs off a work queue. + + Args: + id: the id of the work queue to read from + limit: a limit on the number of runs to return + scheduled_before: a timestamp; only runs scheduled before this time will be returned. + Defaults to now. + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + List[FlowRun]: a list of FlowRun objects read from the queue + """ + if scheduled_before is None: + import pendulum + + scheduled_before = pendulum.now("UTC") + + try: + response = await self.request( + "POST", + "/work_queues/{id}/get_runs", + path_params={"id": id}, + json={ + "limit": limit, + "scheduled_before": scheduled_before.isoformat(), + }, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + from prefect.client.schemas.objects import FlowRun + + return FlowRun.model_validate_list(response.json()) + + async def read_work_queue( + self, + id: "UUID", + ) -> "WorkQueue": + """ + Read a work queue. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueue: an instantiated WorkQueue object + """ + from prefect.client.schemas.objects import WorkQueue + + try: + response = await self.request( + "GET", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueue.model_validate(response.json()) + + async def read_work_queue_status( + self, + id: UUID, + ) -> WorkQueueStatusDetail: + """ + Read a work queue status. + + Args: + id: the id of the work queue to load + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + Returns: + WorkQueueStatus: an instantiated WorkQueueStatus object + """ + from prefect.client.schemas.objects import WorkQueueStatusDetail + + try: + response = await self.request( + "GET", + "/work_queues/{id}/status", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return WorkQueueStatusDetail.model_validate(response.json()) + + async def delete_work_queue_by_id( + self, + id: "UUID", + ) -> None: + """ + Delete a work queue by its ID. + + Args: + id: the id of the work queue to delete + + Raises: + ObjectNotFound: If request returns 404 + httpx.RequestError: If requests fails + """ + try: + await self.request( + "DELETE", + "/work_queues/{id}", + path_params={"id": id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def match_work_queues( + self, + prefixes: list[str], + work_pool_name: str | None = None, + ) -> list["WorkQueue"]: + """ + Query the Prefect API for work queues with names with a specific prefix. + + Args: + prefixes: a list of strings used to match work queue name prefixes + work_pool_name: an optional work pool name to scope the query to + + Returns: + a list of WorkQueue model representations + of the work queues + """ + from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName + + page_length = 100 + current_page = 0 + work_queues: list[WorkQueue] = [] + + while True: + new_queues = await self.read_work_queues( + work_pool_name=work_pool_name, + offset=current_page * page_length, + limit=page_length, + work_queue_filter=WorkQueueFilter( + name=WorkQueueFilterName(startswith_=prefixes) + ), + ) + if not new_queues: + break + work_queues += new_queues + current_page += 1 + + return work_queues + + async def read_work_queues( + self, + work_pool_name: str | None = None, + work_queue_filter: "WorkQueueFilter | None" = None, + limit: int | None = None, + offset: int | None = None, + ) -> list["WorkQueue"]: + """ + Retrieves queues for a work pool. + + Args: + work_pool_name: Name of the work pool for which to get queues. + work_queue_filter: Criteria by which to filter queues. + limit: Limit for the queue query. + offset: Limit for the queue query. + + Returns: + List of queues for the specified work pool. + """ + json: dict[str, Any] = { + "work_queues": ( + work_queue_filter.model_dump(mode="json", exclude_unset=True) + if work_queue_filter + else None + ), + "limit": limit, + "offset": offset, + } + + if work_pool_name: + try: + response = await self.request( + "POST", + "/work_pools/{work_pool_name}/queues/filter", + path_params={"work_pool_name": work_pool_name}, + json=json, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + else: + response = await self.request("POST", "/work_queues/filter", json=json) + from prefect.client.schemas.objects import WorkQueue + + return WorkQueue.model_validate_list(response.json())