Skip to content

Commit

Permalink
AIP-84 Migrate POST list Dag Runs(batch) endpoint to FastAPI (apache#…
Browse files Browse the repository at this point in the history
…44170)

* init list dag runs batch

* finish dag runs batch

* remove all() for scalars

* working tests

* fix

* fix

* update tests to use dag_run_id instead of run_id

* fix test

* add tests for reverse order

* refactor

* refactor
  • Loading branch information
rawwar authored Nov 26, 2024
1 parent 6d075cb commit d8c91aa
Show file tree
Hide file tree
Showing 10 changed files with 873 additions and 14 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def get_dag_runs(
raise BadRequest("DAGRunCollectionSchema error", detail=str(e))


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
Expand Down
18 changes: 17 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datetime import datetime
from enum import Enum

from pydantic import Field
from pydantic import AwareDatetime, Field, NonNegativeInt

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -73,3 +73,19 @@ class DAGRunCollectionResponse(BaseModel):

dag_runs: list[DAGRunResponse]
total_entries: int


class DAGRunsBatchBody(BaseModel):
"""List DAG Runs body for batch endpoint."""

order_by: str | None = None
page_offset: NonNegativeInt = 0
page_limit: NonNegativeInt = 100
dag_ids: list[str] | None = None
states: list[DagRunState | None] | None = None
logical_date_gte: AwareDatetime | None = None
logical_date_lte: AwareDatetime | None = None
start_date_gte: AwareDatetime | None = None
start_date_lte: AwareDatetime | None = None
end_date_gte: AwareDatetime | None = None
end_date_lte: AwareDatetime | None = None
124 changes: 124 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,58 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/list:
post:
tags:
- DagRun
summary: Get List Dag Runs Batch
description: Get a list of DAG Runs.
operationId: get_list_dag_runs_batch
parameters:
- name: dag_id
in: path
required: true
schema:
const: '~'
type: string
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunsBatchBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dagSources/{dag_id}:
get:
tags:
Expand Down Expand Up @@ -6342,6 +6394,78 @@ components:
- asset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DAGRunsBatchBody:
properties:
order_by:
anyOf:
- type: string
- type: 'null'
title: Order By
page_offset:
type: integer
minimum: 0.0
title: Page Offset
default: 0
page_limit:
type: integer
minimum: 0.0
title: Page Limit
default: 100
dag_ids:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Dag Ids
states:
anyOf:
- items:
anyOf:
- $ref: '#/components/schemas/DagRunState'
- type: 'null'
type: array
- type: 'null'
title: States
logical_date_gte:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date Gte
logical_date_lte:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date Lte
start_date_gte:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date Gte
start_date_lte:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date Lte
end_date_gte:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date Gte
end_date_lte:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date Lte
type: object
title: DAGRunsBatchBody
description: List DAG Runs body for batch endpoint.
DAGSourceResponse:
properties:
content:
Expand Down
65 changes: 64 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import annotations

from typing import Annotated, cast
from typing import Annotated, Literal, cast

from fastapi import Depends, HTTPException, Query, Request, status
from sqlalchemy import select
Expand All @@ -30,9 +30,13 @@
)
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
DagIdsFilter,
LimitFilter,
OffsetFilter,
QueryDagRunStateFilter,
QueryLimit,
QueryOffset,
Range,
RangeFilter,
SortParam,
datetime_range_filter_factory,
Expand All @@ -45,6 +49,7 @@
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
DAGRunsBatchBody,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import (
TaskInstanceCollectionResponse,
Expand Down Expand Up @@ -296,3 +301,61 @@ def get_dag_runs(
dag_runs=dag_runs,
total_entries=total_entries,
)


@dag_run_router.post("/list", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]))
def get_list_dag_runs_batch(
dag_id: Literal["~"], body: DAGRunsBatchBody, session: Annotated[Session, Depends(get_session)]
) -> DAGRunCollectionResponse:
"""Get a list of DAG Runs."""
dag_ids = DagIdsFilter(DagRun, body.dag_ids)
logical_date = RangeFilter(
Range(lower_bound=body.logical_date_gte, upper_bound=body.logical_date_lte),
attribute=DagRun.logical_date,
)
start_date = RangeFilter(
Range(lower_bound=body.start_date_gte, upper_bound=body.start_date_lte),
attribute=DagRun.start_date,
)
end_date = RangeFilter(
Range(lower_bound=body.end_date_gte, upper_bound=body.end_date_lte),
attribute=DagRun.end_date,
)

state = QueryDagRunStateFilter(body.states)

offset = OffsetFilter(body.page_offset)
limit = LimitFilter(body.page_limit)

order_by = SortParam(
[
"id",
"state",
"dag_id",
"logical_date",
"dag_run_id",
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
],
DagRun,
).set_value(body.order_by)

base_query = select(DagRun)
dag_runs_select, total_entries = paginated_select(
statement=base_query,
filters=[dag_ids, logical_date, start_date, end_date, state],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)

dag_runs = session.scalars(dag_runs_select)

return DAGRunCollectionResponse(
dag_runs=dag_runs,
total_entries=total_entries,
)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,9 @@ export type ConnectionServiceTestConnectionMutationResult = Awaited<
export type DagRunServiceClearDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.clearDagRun>
>;
export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited<
ReturnType<typeof DagRunService.getListDagRunsBatch>
>;
export type TaskInstanceServiceGetTaskInstancesBatchMutationResult = Awaited<
ReturnType<typeof TaskInstanceService.getTaskInstancesBatch>
>;
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
DAGPatchBody,
DAGRunClearBody,
DAGRunPatchBody,
DAGRunsBatchBody,
DagRunState,
DagWarningType,
PoolPatchBody,
Expand Down Expand Up @@ -2619,6 +2620,49 @@ export const useDagRunServiceClearDagRun = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Get List Dag Runs Batch
* Get a list of DAG Runs.
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagRunServiceGetListDagRunsBatch = <
TData = Common.DagRunServiceGetListDagRunsBatchMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: "~";
requestBody: DAGRunsBatchBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: "~";
requestBody: DAGRunsBatchBody;
},
TContext
>({
mutationFn: ({ dagId, requestBody }) =>
DagRunService.getListDagRunsBatch({
dagId,
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Get Task Instances Batch
* Get list of task instances.
Expand Down
Loading

0 comments on commit d8c91aa

Please sign in to comment.