Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor WorkQueue CRUD methods in client #16568

Closed
wants to merge 14 commits into from
328 changes: 7 additions & 321 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import certifi
import httpcore
import httpx
import pendulum
import pydantic
from asgi_lifespan import LifespanManager
from packaging import version
Expand All @@ -26,6 +25,11 @@
ArtifactCollectionAsyncClient,
)

from prefect.client.orchestration._work_queues.client import (
WorkQueueClient,
WorkQueueAsyncClient,
)

from prefect.client.orchestration._concurrency_limits.client import (
ConcurrencyLimitAsyncClient,
ConcurrencyLimitClient,
Expand Down Expand Up @@ -62,15 +66,10 @@
FlowRunNotificationPolicyCreate,
FlowRunNotificationPolicyUpdate,
FlowRunUpdate,
LogCreate,
GlobalConcurrencyLimitCreate,
GlobalConcurrencyLimitUpdate,
TaskRunCreate,
TaskRunUpdate,
WorkPoolCreate,
WorkPoolUpdate,
WorkQueueCreate,
WorkQueueUpdate,
)
from prefect.client.schemas.filters import (
DeploymentFilter,
Expand All @@ -81,7 +80,6 @@
WorkerFilter,
WorkPoolFilter,
WorkQueueFilter,
WorkQueueFilterName,
)
from prefect.client.schemas.objects import (
BlockDocument,
Expand All @@ -100,8 +98,6 @@
Worker,
WorkerMetadata,
WorkPool,
WorkQueue,
WorkQueueStatusDetail,
)
from prefect.client.schemas.responses import (
DeploymentResponse,
Expand Down Expand Up @@ -255,6 +251,7 @@ class PrefectClient(
LogAsyncClient,
VariableAsyncClient,
ConcurrencyLimitAsyncClient,
WorkQueueAsyncClient,
):
"""
An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).
Expand Down Expand Up @@ -822,273 +819,6 @@ async def delete_flow_run(
else:
raise

async def create_work_queue(
self,
name: str,
description: Optional[str] = None,
is_paused: Optional[bool] = None,
concurrency_limit: Optional[int] = None,
priority: Optional[int] = None,
work_pool_name: Optional[str] = None,
) -> WorkQueue:
"""
Create a work queue.

Args:
name: a unique name for the work queue
description: An optional description for the work queue.
is_paused: Whether or not the work queue is paused.
concurrency_limit: An optional concurrency limit for the work queue.
priority: The queue's priority. Lower values are higher priority (1 is the highest).
work_pool_name: The name of the work pool to use for this queue.

Raises:
prefect.exceptions.ObjectAlreadyExists: If request returns 409
httpx.RequestError: If request fails

Returns:
The created work queue
"""
create_model = WorkQueueCreate(name=name, filter=None)
if description is not None:
create_model.description = description
if is_paused is not None:
create_model.is_paused = is_paused
if concurrency_limit is not None:
create_model.concurrency_limit = concurrency_limit
if priority is not None:
create_model.priority = priority

data = create_model.model_dump(mode="json")
try:
if work_pool_name is not None:
response = await self._client.post(
f"/work_pools/{work_pool_name}/queues", json=data
)
else:
response = await self._client.post("/work_queues/", json=data)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_409_CONFLICT:
raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
elif e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
return WorkQueue.model_validate(response.json())

async def read_work_queue_by_name(
self,
name: str,
work_pool_name: Optional[str] = None,
) -> WorkQueue:
"""
Read a work queue by name.

Args:
name (str): a unique name for the work queue
work_pool_name (str, optional): the name of the work pool
the queue belongs to.

Raises:
prefect.exceptions.ObjectNotFound: if no work queue is found
httpx.HTTPStatusError: other status errors

Returns:
WorkQueue: a work queue API object
"""
try:
if work_pool_name is not None:
response = await self._client.get(
f"/work_pools/{work_pool_name}/queues/{name}"
)
else:
response = await self._client.get(f"/work_queues/name/{name}")
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

return WorkQueue.model_validate(response.json())

async def update_work_queue(self, id: UUID, **kwargs: Any) -> None:
"""
Update properties of a work queue.

Args:
id: the ID of the work queue to update
**kwargs: the fields to update

Raises:
ValueError: if no kwargs are provided
prefect.exceptions.ObjectNotFound: if request returns 404
httpx.RequestError: if the request fails

"""
if not kwargs:
raise ValueError("No fields provided to update.")

data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True)
try:
await self._client.patch(f"/work_queues/{id}", json=data)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def get_runs_in_work_queue(
self,
id: UUID,
limit: int = 10,
scheduled_before: Optional[datetime.datetime] = None,
) -> list[FlowRun]:
"""
Read flow runs off a work queue.

Args:
id: the id of the work queue to read from
limit: a limit on the number of runs to return
scheduled_before: a timestamp; only runs scheduled before this time will be returned.
Defaults to now.

Raises:
prefect.exceptions.ObjectNotFound: If request returns 404
httpx.RequestError: If request fails

Returns:
List[FlowRun]: a list of FlowRun objects read from the queue
"""
if scheduled_before is None:
scheduled_before = pendulum.now("UTC")

try:
response = await self._client.post(
f"/work_queues/{id}/get_runs",
json={
"limit": limit,
"scheduled_before": scheduled_before.isoformat(),
},
)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
return pydantic.TypeAdapter(list[FlowRun]).validate_python(response.json())

async def read_work_queue(
self,
id: UUID,
) -> WorkQueue:
"""
Read a work queue.

Args:
id: the id of the work queue to load

Raises:
prefect.exceptions.ObjectNotFound: If request returns 404
httpx.RequestError: If request fails

Returns:
WorkQueue: an instantiated WorkQueue object
"""
try:
response = await self._client.get(f"/work_queues/{id}")
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
return WorkQueue.model_validate(response.json())

async def read_work_queue_status(
self,
id: UUID,
) -> WorkQueueStatusDetail:
"""
Read a work queue status.

Args:
id: the id of the work queue to load

Raises:
prefect.exceptions.ObjectNotFound: If request returns 404
httpx.RequestError: If request fails

Returns:
WorkQueueStatus: an instantiated WorkQueueStatus object
"""
try:
response = await self._client.get(f"/work_queues/{id}/status")
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
return WorkQueueStatusDetail.model_validate(response.json())

async def match_work_queues(
self,
prefixes: list[str],
work_pool_name: Optional[str] = None,
) -> list[WorkQueue]:
"""
Query the Prefect API for work queues with names with a specific prefix.

Args:
prefixes: a list of strings used to match work queue name prefixes
work_pool_name: an optional work pool name to scope the query to

Returns:
a list of WorkQueue model representations
of the work queues
"""
page_length = 100
current_page = 0
work_queues: list[WorkQueue] = []

while True:
new_queues = await self.read_work_queues(
work_pool_name=work_pool_name,
offset=current_page * page_length,
limit=page_length,
work_queue_filter=WorkQueueFilter(
name=WorkQueueFilterName(startswith_=prefixes)
),
)
if not new_queues:
break
work_queues += new_queues
current_page += 1

return work_queues

async def delete_work_queue_by_id(
self,
id: UUID,
) -> None:
"""
Delete a work queue by its ID.

Args:
id: the id of the work queue to delete

Raises:
prefect.exceptions.ObjectNotFound: If request returns 404
httpx.RequestError: If requests fails
"""
try:
await self._client.delete(
f"/work_queues/{id}",
)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def create_block_type(self, block_type: BlockTypeCreate) -> BlockType:
"""
Create a block type in the Prefect API.
Expand Down Expand Up @@ -2621,51 +2351,6 @@ async def delete_work_pool(
else:
raise

async def read_work_queues(
self,
work_pool_name: Optional[str] = None,
work_queue_filter: Optional[WorkQueueFilter] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> list[WorkQueue]:
"""
Retrieves queues for a work pool.

Args:
work_pool_name: Name of the work pool for which to get queues.
work_queue_filter: Criteria by which to filter queues.
limit: Limit for the queue query.
offset: Limit for the queue query.

Returns:
List of queues for the specified work pool.
"""
json: dict[str, Any] = {
"work_queues": (
work_queue_filter.model_dump(mode="json", exclude_unset=True)
if work_queue_filter
else None
),
"limit": limit,
"offset": offset,
}

if work_pool_name:
try:
response = await self._client.post(
f"/work_pools/{work_pool_name}/queues/filter",
json=json,
)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
else:
response = await self._client.post("/work_queues/filter", json=json)

return pydantic.TypeAdapter(list[WorkQueue]).validate_python(response.json())

async def get_scheduled_flow_runs_for_deployments(
self,
deployment_ids: list[UUID],
Expand Down Expand Up @@ -3026,6 +2711,7 @@ class SyncPrefectClient(
LogClient,
VariableClient,
ConcurrencyLimitClient,
WorkQueueClient,
):
"""
A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).
Expand Down
Loading
Loading