From 20e6517ed334e5d76015bd3e170933b25f9337b5 Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Mon, 4 Nov 2024 19:55:36 +0530 Subject: [PATCH] AIP-84 Migrate DagWarning public endpoint to FastAPI (#42749) * add dag warning endpoint * refactor tests * fix test * Update tests/api_fastapi/core_api/routes/public/test_dag_warning.py * Update tests/api_fastapi/core_api/routes/public/test_dag_warning.py * Update tests/api_fastapi/core_api/routes/public/test_dag_warning.py --- .../endpoints/dag_warning_endpoint.py | 2 + airflow/api_fastapi/common/parameters.py | 32 +++++ .../core_api/openapi/v1-generated.yaml | 120 ++++++++++++++++++ .../core_api/routes/public/__init__.py | 2 + .../core_api/routes/public/dag_warning.py | 72 +++++++++++ .../core_api/serializers/dag_warning.py | 40 ++++++ airflow/ui/openapi-gen/queries/common.ts | 31 ++++- airflow/ui/openapi-gen/queries/prefetch.ts | 48 ++++++- airflow/ui/openapi-gen/queries/queries.ts | 50 ++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 51 +++++++- .../ui/openapi-gen/requests/schemas.gen.ts | 55 ++++++++ .../ui/openapi-gen/requests/services.gen.ts | 37 ++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 59 +++++++++ .../routes/public/test_dag_warning.py | 85 +++++++++++++ 14 files changed, 681 insertions(+), 3 deletions(-) create mode 100644 airflow/api_fastapi/core_api/routes/public/dag_warning.py create mode 100644 airflow/api_fastapi/core_api/serializers/dag_warning.py create mode 100644 tests/api_fastapi/core_api/routes/public/test_dag_warning.py diff --git a/airflow/api_connexion/endpoints/dag_warning_endpoint.py b/airflow/api_connexion/endpoints/dag_warning_endpoint.py index 8a15a30cece8f..a158c3f443c87 100644 --- a/airflow/api_connexion/endpoints/dag_warning_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_warning_endpoint.py @@ -29,6 +29,7 @@ from airflow.api_connexion.security import get_readable_dags from airflow.auth.managers.models.resource_details import DagAccessEntity from airflow.models.dagwarning import DagWarning as DagWarningModel +from airflow.utils.api_migration import mark_fastapi_migration_done from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session @@ -38,6 +39,7 @@ from airflow.api_connexion.types import APIResponse +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.WARNING) @format_parameters({"limit": check_limit}) @provide_session diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 7137a0a124847..bd65017637227 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -31,6 +31,7 @@ from airflow.models import Base, Connection from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun +from airflow.models.dagwarning import DagWarning, DagWarningType from airflow.utils import timezone from airflow.utils.state import DagRunState @@ -292,6 +293,34 @@ def _safe_parse_datetime(date_to_check: str) -> datetime: ) +class _WarningTypeFilter(BaseParam[str]): + """Filter on warning type.""" + + def to_orm(self, select: Select) -> Select: + if self.value is None and self.skip_none: + return select + return select.where(DagWarning.warning_type == self.value) + + def depends(self, warning_type: DagWarningType | None = None) -> _WarningTypeFilter: + return self.set_value(warning_type) + + +class _DagIdFilter(BaseParam[str]): + """Filter on dag_id.""" + + def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None: + super().__init__(skip_none) + self.attribute = attribute + + def to_orm(self, select: Select) -> Select: + if self.value is None and self.skip_none: + return select + return select.where(self.attribute == self.value) + + def depends(self, dag_id: str | None = None) -> _DagIdFilter: + return self.set_value(dag_id) + + # Common Safe DateTime DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)] # DAG @@ -310,5 +339,8 @@ def _safe_parse_datetime(date_to_check: str) -> datetime: QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)] # DagRun QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)] +# DAGWarning +QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter, Depends(_DagIdFilter(DagWarning.dag_id).depends)] +QueryWarningTypeFilter = Annotated[_WarningTypeFilter, Depends(_WarningTypeFilter().depends)] # DAGTags QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index b7e7f62693719..19a4875e704d6 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1111,6 +1111,76 @@ paths: application/json: schema: $ref: '#/components/schemas/HealthInfoSchema' + /public/dagWarnings: + get: + tags: + - DagWarning + summary: List Dag Warnings + description: Get a list of DAG warnings. + operationId: list_dag_warnings + parameters: + - name: dag_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id + - name: warning_type + in: query + required: false + schema: + anyOf: + - $ref: '#/components/schemas/DagWarningType' + - type: 'null' + title: Warning Type + - name: limit + in: query + required: false + schema: + type: integer + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + - name: order_by + in: query + required: false + schema: + type: string + default: dag_id + title: Order By + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGWarningCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/plugins/: get: tags: @@ -2505,6 +2575,44 @@ components: - total_entries title: DAGTagCollectionResponse description: DAG Tags Collection serializer for responses. + DAGWarningCollectionResponse: + properties: + dag_warnings: + items: + $ref: '#/components/schemas/DAGWarningResponse' + type: array + title: Dag Warnings + total_entries: + type: integer + title: Total Entries + type: object + required: + - dag_warnings + - total_entries + title: DAGWarningCollectionResponse + description: DAG warning collection serializer for responses. + DAGWarningResponse: + properties: + dag_id: + type: string + title: Dag Id + warning_type: + $ref: '#/components/schemas/DagWarningType' + message: + type: string + title: Message + timestamp: + type: string + format: date-time + title: Timestamp + type: object + required: + - dag_id + - warning_type + - message + - timestamp + title: DAGWarningResponse + description: DAG Warning serializer for responses. DAGWithLatestDagRunsCollectionResponse: properties: total_entries: @@ -2744,6 +2852,18 @@ components: title: DagTagPydantic description: Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. + DagWarningType: + type: string + enum: + - asset conflict + - non-existent pool + title: DagWarningType + description: 'Enum for DAG warning types. + + + This is the set of allowable values for the ``warning_type`` field + + in the DagWarning model.' EventLogResponse: properties: event_log_id: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index cc9dd9c5e1ba1..a153952287bc1 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -21,6 +21,7 @@ from airflow.api_fastapi.core_api.routes.public.connections import connections_router from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router +from airflow.api_fastapi.core_api.routes.public.dag_warning import dag_warning_router from airflow.api_fastapi.core_api.routes.public.dags import dags_router from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router @@ -40,6 +41,7 @@ public_router.include_router(dags_router) public_router.include_router(event_logs_router) public_router.include_router(monitor_router) +public_router.include_router(dag_warning_router) public_router.include_router(plugins_router) public_router.include_router(pools_router) public_router.include_router(providers_router) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_warning.py b/airflow/api_fastapi/core_api/routes/public/dag_warning.py new file mode 100644 index 0000000000000..a388fae13be18 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/public/dag_warning.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import Depends +from sqlalchemy import select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import ( + get_session, + paginated_select, +) +from airflow.api_fastapi.common.parameters import ( + QueryDagIdInDagWarningFilter, + QueryLimit, + QueryOffset, + QueryWarningTypeFilter, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.serializers.dag_warning import ( + DAGWarningCollectionResponse, + DAGWarningResponse, +) +from airflow.models import DagWarning + +dag_warning_router = AirflowRouter(tags=["DagWarning"]) + + +@dag_warning_router.get("/dagWarnings", responses=create_openapi_http_exception_doc([401, 403])) +async def list_dag_warnings( + dag_id: QueryDagIdInDagWarningFilter, + warning_type: QueryWarningTypeFilter, + limit: QueryLimit, + offset: QueryOffset, + order_by: Annotated[ + SortParam, + Depends(SortParam(["dag_id", "warning_type", "message", "timestamp"], DagWarning).dynamic_depends()), + ], + session: Annotated[Session, Depends(get_session)], +) -> DAGWarningCollectionResponse: + """Get a list of DAG warnings.""" + dag_warnings_select, total_entries = paginated_select( + select(DagWarning), [warning_type, dag_id], order_by, offset, limit, session + ) + + dag_warnings = session.scalars(dag_warnings_select).all() + + return DAGWarningCollectionResponse( + dag_warnings=[ + DAGWarningResponse.model_validate(dag_warning, from_attributes=True) + for dag_warning in dag_warnings + ], + total_entries=total_entries, + ) diff --git a/airflow/api_fastapi/core_api/serializers/dag_warning.py b/airflow/api_fastapi/core_api/serializers/dag_warning.py new file mode 100644 index 0000000000000..f38a3a8d093f7 --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/dag_warning.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel + +from airflow.models.dagwarning import DagWarningType + + +class DAGWarningResponse(BaseModel): + """DAG Warning serializer for responses.""" + + dag_id: str + warning_type: DagWarningType + message: str + timestamp: datetime + + +class DAGWarningCollectionResponse(BaseModel): + """DAG warning collection serializer for responses.""" + + dag_warnings: list[DAGWarningResponse] + total_entries: int diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 07edb67a99304..f715b41c3f4ed 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -7,6 +7,7 @@ import { DagRunService, DagService, DagSourceService, + DagWarningService, DagsService, DashboardService, EventLogService, @@ -18,7 +19,7 @@ import { VariableService, VersionService, } from "../requests/services.gen"; -import { DagRunState } from "../requests/types.gen"; +import { DagRunState, DagWarningType } from "../requests/types.gen"; export type AssetServiceNextRunAssetsDefaultResponse = Awaited< ReturnType @@ -327,6 +328,34 @@ export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array) => [ useMonitorServiceGetHealthKey, ...(queryKey ?? []), ]; +export type DagWarningServiceListDagWarningsDefaultResponse = Awaited< + ReturnType +>; +export type DagWarningServiceListDagWarningsQueryResult< + TData = DagWarningServiceListDagWarningsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagWarningServiceListDagWarningsKey = + "DagWarningServiceListDagWarnings"; +export const UseDagWarningServiceListDagWarningsKeyFn = ( + { + dagId, + limit, + offset, + orderBy, + warningType, + }: { + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + warningType?: DagWarningType; + } = {}, + queryKey?: Array, +) => [ + useDagWarningServiceListDagWarningsKey, + ...(queryKey ?? [{ dagId, limit, offset, orderBy, warningType }]), +]; export type PluginServiceGetPluginsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index db61369e19ff7..a0f3a75eb22f2 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -7,6 +7,7 @@ import { DagRunService, DagService, DagSourceService, + DagWarningService, DagsService, DashboardService, EventLogService, @@ -18,7 +19,7 @@ import { VariableService, VersionService, } from "../requests/services.gen"; -import { DagRunState } from "../requests/types.gen"; +import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -408,6 +409,51 @@ export const prefetchUseMonitorServiceGetHealth = (queryClient: QueryClient) => queryKey: Common.UseMonitorServiceGetHealthKeyFn(), queryFn: () => MonitorService.getHealth(), }); +/** + * List Dag Warnings + * Get a list of DAG warnings. + * @param data The data for the request. + * @param data.dagId + * @param data.warningType + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns DAGWarningCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagWarningServiceListDagWarnings = ( + queryClient: QueryClient, + { + dagId, + limit, + offset, + orderBy, + warningType, + }: { + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + warningType?: DagWarningType; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagWarningServiceListDagWarningsKeyFn({ + dagId, + limit, + offset, + orderBy, + warningType, + }), + queryFn: () => + DagWarningService.listDagWarnings({ + dagId, + limit, + offset, + orderBy, + warningType, + }), + }); /** * Get Plugins * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 7820656799e5b..8ffcee3defb20 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -12,6 +12,7 @@ import { DagRunService, DagService, DagSourceService, + DagWarningService, DagsService, DashboardService, EventLogService, @@ -27,6 +28,7 @@ import { DAGPatchBody, DAGRunPatchBody, DagRunState, + DagWarningType, PoolPatchBody, PoolPostBody, VariableBody, @@ -520,6 +522,54 @@ export const useMonitorServiceGetHealth = < queryFn: () => MonitorService.getHealth() as TData, ...options, }); +/** + * List Dag Warnings + * Get a list of DAG warnings. + * @param data The data for the request. + * @param data.dagId + * @param data.warningType + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns DAGWarningCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagWarningServiceListDagWarnings = < + TData = Common.DagWarningServiceListDagWarningsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + limit, + offset, + orderBy, + warningType, + }: { + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + warningType?: DagWarningType; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagWarningServiceListDagWarningsKeyFn( + { dagId, limit, offset, orderBy, warningType }, + queryKey, + ), + queryFn: () => + DagWarningService.listDagWarnings({ + dagId, + limit, + offset, + orderBy, + warningType, + }) as TData, + ...options, + }); /** * Get Plugins * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 2cb0841d71f28..6ceed83349def 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -7,6 +7,7 @@ import { DagRunService, DagService, DagSourceService, + DagWarningService, DagsService, DashboardService, EventLogService, @@ -18,7 +19,7 @@ import { VariableService, VersionService, } from "../requests/services.gen"; -import { DagRunState } from "../requests/types.gen"; +import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -508,6 +509,54 @@ export const useMonitorServiceGetHealthSuspense = < queryFn: () => MonitorService.getHealth() as TData, ...options, }); +/** + * List Dag Warnings + * Get a list of DAG warnings. + * @param data The data for the request. + * @param data.dagId + * @param data.warningType + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns DAGWarningCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagWarningServiceListDagWarningsSuspense = < + TData = Common.DagWarningServiceListDagWarningsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + limit, + offset, + orderBy, + warningType, + }: { + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + warningType?: DagWarningType; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagWarningServiceListDagWarningsKeyFn( + { dagId, limit, offset, orderBy, warningType }, + queryKey, + ), + queryFn: () => + DagWarningService.listDagWarnings({ + dagId, + limit, + offset, + orderBy, + warningType, + }) as TData, + ...options, + }); /** * Get Plugins * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 712cc8cae984e..3f74dc46a8a87 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1130,6 +1130,51 @@ export const $DAGTagCollectionResponse = { description: "DAG Tags Collection serializer for responses.", } as const; +export const $DAGWarningCollectionResponse = { + properties: { + dag_warnings: { + items: { + $ref: "#/components/schemas/DAGWarningResponse", + }, + type: "array", + title: "Dag Warnings", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["dag_warnings", "total_entries"], + title: "DAGWarningCollectionResponse", + description: "DAG warning collection serializer for responses.", +} as const; + +export const $DAGWarningResponse = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + warning_type: { + $ref: "#/components/schemas/DagWarningType", + }, + message: { + type: "string", + title: "Message", + }, + timestamp: { + type: "string", + format: "date-time", + title: "Timestamp", + }, + }, + type: "object", + required: ["dag_id", "warning_type", "message", "timestamp"], + title: "DAGWarningResponse", + description: "DAG Warning serializer for responses.", +} as const; + export const $DAGWithLatestDagRunsCollectionResponse = { properties: { total_entries: { @@ -1483,6 +1528,16 @@ export const $DagTagPydantic = { "Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API.", } as const; +export const $DagWarningType = { + type: "string", + enum: ["asset conflict", "non-existent pool"], + title: "DagWarningType", + description: `Enum for DAG warning types. + +This is the set of allowable values for the \`\`warning_type\`\` field +in the DagWarning model.`, +} as const; + export const $EventLogResponse = { properties: { event_log_id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 486e04b056f8a..6c42a500e974c 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -40,6 +40,8 @@ import type { GetEventLogData, GetEventLogResponse, GetHealthResponse, + ListDagWarningsData, + ListDagWarningsResponse, GetPluginsData, GetPluginsResponse, DeletePoolData, @@ -652,6 +654,41 @@ export class MonitorService { } } +export class DagWarningService { + /** + * List Dag Warnings + * Get a list of DAG warnings. + * @param data The data for the request. + * @param data.dagId + * @param data.warningType + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns DAGWarningCollectionResponse Successful Response + * @throws ApiError + */ + public static listDagWarnings( + data: ListDagWarningsData = {}, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dagWarnings", + query: { + dag_id: data.dagId, + warning_type: data.warningType, + limit: data.limit, + offset: data.offset, + order_by: data.orderBy, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 422: "Validation Error", + }, + }); + } +} + export class PluginService { /** * Get Plugins diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 0580694ba78f0..4a06652e3802a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -222,6 +222,24 @@ export type DAGTagCollectionResponse = { total_entries: number; }; +/** + * DAG warning collection serializer for responses. + */ +export type DAGWarningCollectionResponse = { + dag_warnings: Array; + total_entries: number; +}; + +/** + * DAG Warning serializer for responses. + */ +export type DAGWarningResponse = { + dag_id: string; + warning_type: DagWarningType; + message: string; + timestamp: string; +}; + /** * DAG with latest dag runs collection response serializer. */ @@ -312,6 +330,14 @@ export type DagTagPydantic = { dag_id: string; }; +/** + * Enum for DAG warning types. + * + * This is the set of allowable values for the ``warning_type`` field + * in the DagWarning model. + */ +export type DagWarningType = "asset conflict" | "non-existent pool"; + /** * Event Log Response. */ @@ -771,6 +797,16 @@ export type GetEventLogResponse = EventLogResponse; export type GetHealthResponse = HealthInfoSchema; +export type ListDagWarningsData = { + dagId?: string | null; + limit?: number; + offset?: number; + orderBy?: string; + warningType?: DagWarningType | null; +}; + +export type ListDagWarningsResponse = DAGWarningCollectionResponse; + export type GetPluginsData = { limit?: number; offset?: number; @@ -1347,6 +1383,29 @@ export type $OpenApiTs = { }; }; }; + "/public/dagWarnings": { + get: { + req: ListDagWarningsData; + res: { + /** + * Successful Response + */ + 200: DAGWarningCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/plugins/": { get: { req: GetPluginsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_warning.py b/tests/api_fastapi/core_api/routes/public/test_dag_warning.py new file mode 100644 index 0000000000000..61237bd10299a --- /dev/null +++ b/tests/api_fastapi/core_api/routes/public/test_dag_warning.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow.models.dag import DagModel +from airflow.models.dagwarning import DagWarning +from airflow.utils.session import provide_session + +from tests_common.test_utils.db import clear_db_dag_warnings, clear_db_dags + +pytestmark = pytest.mark.db_test + +DAG1_ID = "test_dag1" +DAG1_MESSAGE = "test message 1" +DAG2_ID = "test_dag2" +DAG2_MESSAGE = "test message 2" +DAG3_ID = "test_dag3" +DAG3_MESSAGE = "test message 3" +DAG_WARNING_TYPE = "non-existent pool" + + +@pytest.fixture(autouse=True) +@provide_session +def setup(dag_maker, session=None) -> None: + clear_db_dags() + clear_db_dag_warnings() + + session.add(DagModel(dag_id=DAG1_ID)) + session.add(DagModel(dag_id=DAG2_ID)) + session.add(DagModel(dag_id=DAG3_ID)) + session.add(DagWarning(DAG1_ID, DAG_WARNING_TYPE, DAG1_MESSAGE)) + session.add(DagWarning(DAG2_ID, DAG_WARNING_TYPE, DAG2_MESSAGE)) + session.add(DagWarning(DAG3_ID, DAG_WARNING_TYPE, DAG3_MESSAGE)) + session.commit() + + +class TestGetDagWarnings: + @pytest.mark.parametrize( + "query_params, expected_total_entries, expected_messages", + [ + ({}, 3, [DAG1_MESSAGE, DAG2_MESSAGE, DAG3_MESSAGE]), + ({"dag_id": DAG1_ID}, 1, [DAG1_MESSAGE]), + ({"warning_type": DAG_WARNING_TYPE}, 3, [DAG1_MESSAGE, DAG2_MESSAGE, DAG3_MESSAGE]), + ({"limit": 1, "order_by": "message"}, 3, [DAG1_MESSAGE]), + ({"limit": 1, "offset": 1, "order_by": "message"}, 3, [DAG2_MESSAGE]), + ({"limit": 1, "offset": 2, "order_by": "dag_id"}, 3, [DAG3_MESSAGE]), + ({"limit": 1, "offset": 2, "order_by": "-dag_id"}, 3, [DAG1_MESSAGE]), + ({"limit": 1, "order_by": "timestamp"}, 3, [DAG1_MESSAGE]), + ({"limit": 1, "order_by": "-timestamp"}, 3, [DAG3_MESSAGE]), + ({"order_by": "timestamp"}, 3, [DAG1_MESSAGE, DAG2_MESSAGE, DAG3_MESSAGE]), + ({"order_by": "-timestamp"}, 3, [DAG3_MESSAGE, DAG2_MESSAGE, DAG1_MESSAGE]), + ({"order_by": "dag_id"}, 3, [DAG1_MESSAGE, DAG2_MESSAGE, DAG3_MESSAGE]), + ({"order_by": "-dag_id"}, 3, [DAG3_MESSAGE, DAG2_MESSAGE, DAG1_MESSAGE]), + ], + ) + def test_get_dag_warnings(self, test_client, query_params, expected_total_entries, expected_messages): + response = test_client.get("/public/dagWarnings", params=query_params) + assert response.status_code == 200 + response_json = response.json() + assert response_json["total_entries"] == expected_total_entries + assert len(response_json["dag_warnings"]) == len(expected_messages) + assert [dag_warning["message"] for dag_warning in response_json["dag_warnings"]] == expected_messages + + def test_get_dag_warnings_bad_request(self, test_client): + response = test_client.get("/public/dagWarnings", params={"warning_type": "invalid"}) + response_json = response.json() + assert response.status_code == 422 + assert response_json["detail"][0]["msg"] == "Input should be 'asset conflict' or 'non-existent pool'"