Skip to content

Commit

Permalink
AIP-84 De-nest Dag Tags endpoint (apache#44608)
Browse files Browse the repository at this point in the history
* AIP-84 De-nest Tag Tags endpoint

* Fix CI
  • Loading branch information
pierrejeambrun authored Dec 4, 2024
1 parent 9bd1c40 commit 5b5f8f4
Show file tree
Hide file tree
Showing 14 changed files with 629 additions and 434 deletions.
27 changes: 27 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/dag_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from pydantic import BaseModel


class DAGTagCollectionResponse(BaseModel):
"""DAG Tags Collection serializer for responses."""

tags: list[str]
total_entries: int
7 changes: 0 additions & 7 deletions airflow/api_fastapi/core_api/datamodels/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,3 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
def concurrency(self) -> int:
"""Return max_active_tasks as concurrency."""
return self.max_active_tasks


class DAGTagCollectionResponse(BaseModel):
"""DAG Tags Collection serializer for responses."""

tags: list[str]
total_entries: int
128 changes: 64 additions & 64 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2636,70 +2636,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/tags:
get:
tags:
- DAG
summary: Get Dag Tags
description: Get all DAG tags.
operationId: get_dag_tags
parameters:
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: name
title: Order By
- name: tag_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Tag Name Pattern
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGTagCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}:
get:
tags:
Expand Down Expand Up @@ -5702,6 +5638,70 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dagTags:
get:
tags:
- DAG
summary: Get Dag Tags
description: Get all DAG tags.
operationId: get_dag_tags
parameters:
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: name
title: Order By
- name: tag_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Tag Name Pattern
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGTagCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router
from airflow.api_fastapi.core_api.routes.public.dag_tags import dag_tags_router
from airflow.api_fastapi.core_api.routes.public.dag_warning import dag_warning_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
Expand Down Expand Up @@ -75,6 +76,7 @@
authenticated_router.include_router(variables_router)
authenticated_router.include_router(task_instances_log_router)
authenticated_router.include_router(dag_parsing_router)
authenticated_router.include_router(dag_tags_router)


# Include authenticated router in public router
Expand Down
71 changes: 71 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import Annotated

from fastapi import Depends
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
QueryDagTagPatternSearch,
QueryLimit,
QueryOffset,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dag_tags import DAGTagCollectionResponse
from airflow.models.dag import DagTag

dag_tags_router = AirflowRouter(tags=["DAG"], prefix="/dagTags")


@dag_tags_router.get(
"",
)
def get_dag_tags(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
["name"],
DagTag,
).dynamic_depends()
),
],
tag_name_pattern: QueryDagTagPatternSearch,
session: SessionDep,
) -> DAGTagCollectionResponse:
"""Get all DAG tags."""
query = select(DagTag.name).group_by(DagTag.name)
dag_tags_select, total_entries = paginated_select(
statement=query,
filters=[tag_name_pattern],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
dag_tags = session.execute(dag_tags_select).scalars().all()
return DAGTagCollectionResponse(tags=[x for x in dag_tags], total_entries=total_entries)
38 changes: 2 additions & 36 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Annotated

from fastapi import Depends, HTTPException, Query, Request, Response, status
from sqlalchemy import select, update
from sqlalchemy import update

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.common.db.common import (
Expand All @@ -32,7 +32,6 @@
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryDagIdPatternSearchWithNone,
QueryDagTagPatternSearch,
QueryLastDagRunStateFilter,
QueryLimit,
QueryOffset,
Expand All @@ -48,11 +47,10 @@
DAGDetailsResponse,
DAGPatchBody,
DAGResponse,
DAGTagCollectionResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel, DagTag
from airflow.models import DAG, DagModel
from airflow.models.dagrun import DagRun

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
Expand Down Expand Up @@ -107,38 +105,6 @@ def get_dags(
)


@dags_router.get(
"/tags",
)
def get_dag_tags(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
["name"],
DagTag,
).dynamic_depends()
),
],
tag_name_pattern: QueryDagTagPatternSearch,
session: SessionDep,
) -> DAGTagCollectionResponse:
"""Get all DAG tags."""
query = select(DagTag.name).group_by(DagTag.name)
dag_tags_select, total_entries = paginated_select(
statement=query,
filters=[tag_name_pattern],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
dag_tags = session.execute(dag_tags_select).scalars().all()
return DAGTagCollectionResponse(tags=[x for x in dag_tags], total_entries=total_entries)


@dags_router.get(
"/{dag_id}",
responses=create_openapi_http_exception_doc(
Expand Down
Loading

0 comments on commit 5b5f8f4

Please sign in to comment.