Skip to content

Commit

Permalink
AIP-84 Migrate DagWarning public endpoint to FastAPI (apache#42749)
Browse files Browse the repository at this point in the history
* add dag warning endpoint

* refactor tests

* fix test

* Update tests/api_fastapi/core_api/routes/public/test_dag_warning.py

* Update tests/api_fastapi/core_api/routes/public/test_dag_warning.py

* Update tests/api_fastapi/core_api/routes/public/test_dag_warning.py
  • Loading branch information
rawwar authored Nov 4, 2024
1 parent 12950dd commit 20e6517
Show file tree
Hide file tree
Showing 14 changed files with 681 additions and 3 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_warning_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.api_connexion.security import get_readable_dags
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models.dagwarning import DagWarning as DagWarningModel
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session

Expand All @@ -38,6 +39,7 @@
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.WARNING)
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
32 changes: 32 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.models import Base, Connection
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.utils import timezone
from airflow.utils.state import DagRunState

Expand Down Expand Up @@ -292,6 +293,34 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
)


class _WarningTypeFilter(BaseParam[str]):
"""Filter on warning type."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(DagWarning.warning_type == self.value)

def depends(self, warning_type: DagWarningType | None = None) -> _WarningTypeFilter:
return self.set_value(warning_type)


class _DagIdFilter(BaseParam[str]):
"""Filter on dag_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, dag_id: str | None = None) -> _DagIdFilter:
return self.set_value(dag_id)


# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
# DAG
Expand All @@ -310,5 +339,8 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]
# DagRun
QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)]
# DAGWarning
QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter, Depends(_DagIdFilter(DagWarning.dag_id).depends)]
QueryWarningTypeFilter = Annotated[_WarningTypeFilter, Depends(_WarningTypeFilter().depends)]
# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)]
120 changes: 120 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,76 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HealthInfoSchema'
/public/dagWarnings:
get:
tags:
- DagWarning
summary: List Dag Warnings
description: Get a list of DAG warnings.
operationId: list_dag_warnings
parameters:
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: warning_type
in: query
required: false
schema:
anyOf:
- $ref: '#/components/schemas/DagWarningType'
- type: 'null'
title: Warning Type
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: dag_id
title: Order By
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGWarningCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/plugins/:
get:
tags:
Expand Down Expand Up @@ -2505,6 +2575,44 @@ components:
- total_entries
title: DAGTagCollectionResponse
description: DAG Tags Collection serializer for responses.
DAGWarningCollectionResponse:
properties:
dag_warnings:
items:
$ref: '#/components/schemas/DAGWarningResponse'
type: array
title: Dag Warnings
total_entries:
type: integer
title: Total Entries
type: object
required:
- dag_warnings
- total_entries
title: DAGWarningCollectionResponse
description: DAG warning collection serializer for responses.
DAGWarningResponse:
properties:
dag_id:
type: string
title: Dag Id
warning_type:
$ref: '#/components/schemas/DagWarningType'
message:
type: string
title: Message
timestamp:
type: string
format: date-time
title: Timestamp
type: object
required:
- dag_id
- warning_type
- message
- timestamp
title: DAGWarningResponse
description: DAG Warning serializer for responses.
DAGWithLatestDagRunsCollectionResponse:
properties:
total_entries:
Expand Down Expand Up @@ -2744,6 +2852,18 @@ components:
title: DagTagPydantic
description: Serializable representation of the DagTag ORM SqlAlchemyModel used
by internal API.
DagWarningType:
type: string
enum:
- asset conflict
- non-existent pool
title: DagWarningType
description: 'Enum for DAG warning types.
This is the set of allowable values for the ``warning_type`` field
in the DagWarning model.'
EventLogResponse:
properties:
event_log_id:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_warning import dag_warning_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
Expand All @@ -40,6 +41,7 @@
public_router.include_router(dags_router)
public_router.include_router(event_logs_router)
public_router.include_router(monitor_router)
public_router.include_router(dag_warning_router)
public_router.include_router(plugins_router)
public_router.include_router(pools_router)
public_router.include_router(providers_router)
Expand Down
72 changes: 72 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_warning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import Depends
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
QueryDagIdInDagWarningFilter,
QueryLimit,
QueryOffset,
QueryWarningTypeFilter,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_warning import (
DAGWarningCollectionResponse,
DAGWarningResponse,
)
from airflow.models import DagWarning

dag_warning_router = AirflowRouter(tags=["DagWarning"])


@dag_warning_router.get("/dagWarnings", responses=create_openapi_http_exception_doc([401, 403]))
async def list_dag_warnings(
dag_id: QueryDagIdInDagWarningFilter,
warning_type: QueryWarningTypeFilter,
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(SortParam(["dag_id", "warning_type", "message", "timestamp"], DagWarning).dynamic_depends()),
],
session: Annotated[Session, Depends(get_session)],
) -> DAGWarningCollectionResponse:
"""Get a list of DAG warnings."""
dag_warnings_select, total_entries = paginated_select(
select(DagWarning), [warning_type, dag_id], order_by, offset, limit, session
)

dag_warnings = session.scalars(dag_warnings_select).all()

return DAGWarningCollectionResponse(
dag_warnings=[
DAGWarningResponse.model_validate(dag_warning, from_attributes=True)
for dag_warning in dag_warnings
],
total_entries=total_entries,
)
40 changes: 40 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dag_warning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel

from airflow.models.dagwarning import DagWarningType


class DAGWarningResponse(BaseModel):
"""DAG Warning serializer for responses."""

dag_id: str
warning_type: DagWarningType
message: str
timestamp: datetime


class DAGWarningCollectionResponse(BaseModel):
"""DAG warning collection serializer for responses."""

dag_warnings: list[DAGWarningResponse]
total_entries: int
31 changes: 30 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DagSourceService,
DagWarningService,
DagsService,
DashboardService,
EventLogService,
Expand All @@ -18,7 +19,7 @@ import {
VariableService,
VersionService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
import { DagRunState, DagWarningType } from "../requests/types.gen";

export type AssetServiceNextRunAssetsDefaultResponse = Awaited<
ReturnType<typeof AssetService.nextRunAssets>
Expand Down Expand Up @@ -327,6 +328,34 @@ export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array<unknown>) => [
useMonitorServiceGetHealthKey,
...(queryKey ?? []),
];
export type DagWarningServiceListDagWarningsDefaultResponse = Awaited<
ReturnType<typeof DagWarningService.listDagWarnings>
>;
export type DagWarningServiceListDagWarningsQueryResult<
TData = DagWarningServiceListDagWarningsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagWarningServiceListDagWarningsKey =
"DagWarningServiceListDagWarnings";
export const UseDagWarningServiceListDagWarningsKeyFn = (
{
dagId,
limit,
offset,
orderBy,
warningType,
}: {
dagId?: string;
limit?: number;
offset?: number;
orderBy?: string;
warningType?: DagWarningType;
} = {},
queryKey?: Array<unknown>,
) => [
useDagWarningServiceListDagWarningsKey,
...(queryKey ?? [{ dagId, limit, offset, orderBy, warningType }]),
];
export type PluginServiceGetPluginsDefaultResponse = Awaited<
ReturnType<typeof PluginService.getPlugins>
>;
Expand Down
Loading

0 comments on commit 20e6517

Please sign in to comment.