Skip to content

Commit

Permalink
AIP-84: Migrate Dag Parsing endpoint to FastApi (apache#44416)
Browse files Browse the repository at this point in the history
* AIP-84: Migrate Dag Parsing endpoint to FastApi

* Address PR comments

* Change the test class name

* Address PR comments and fix tests

* Address PR comment

* remove database isolation option to fix failing check

* Address PR comment

---------

Co-authored-by: Sneha Prabhu <[email protected]>
  • Loading branch information
prabhusneha and Sneha Prabhu authored Nov 28, 2024
1 parent 69cd237 commit 3e427c9
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.auth.managers.models.resource_details import DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager

Expand All @@ -38,6 +39,7 @@
from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest


@mark_fastapi_migration_done
@security.requires_access_dag("PUT")
@provide_session
def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response:
Expand Down
46 changes: 46 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5612,6 +5612,52 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/parseDagFile/{file_token}:
put:
tags:
- DAG Parsing
summary: Reparse Dag File
description: Request re-parsing a DAG file.
operationId: reparse_dag_file
parameters:
- name: file_token
in: path
required: true
schema:
type: string
title: File Token
responses:
'201':
description: Successful Response
content:
application/json:
schema:
type: 'null'
title: Response Reparse Dag File
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.api_fastapi.core_api.routes.public.backfills import backfills_router
from airflow.api_fastapi.core_api.routes.public.config import config_router
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_parsing import dag_parsing_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router
Expand Down Expand Up @@ -73,6 +74,7 @@
authenticated_router.include_router(tasks_router)
authenticated_router.include_router(variables_router)
authenticated_router.include_router(task_instances_log_router)
authenticated_router.include_router(dag_parsing_router)


# Include authenticated router in public router
Expand Down
66 changes: 66 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Annotated

from fastapi import Depends, HTTPException, Request, status
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy import select
from sqlalchemy.orm import Session

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.auth.managers.models.resource_details import DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest

if TYPE_CHECKING:
from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest

dag_parsing_router = AirflowRouter(tags=["DAG Parsing"], prefix="/parseDagFile/{file_token}")


@dag_parsing_router.put(
"",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
status_code=status.HTTP_201_CREATED,
)
def reparse_dag_file(
file_token: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
) -> None:
"""Request re-parsing a DAG file."""
secret_key = request.app.state.secret_key
auth_s = URLSafeSerializer(secret_key)
try:
path = auth_s.loads(file_token)
except BadSignature:
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")

requests: Sequence[IsAuthorizedDagRequest] = [
{"method": "PUT", "details": DagDetails(id=dag_id)}
for dag_id in session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == path))
]
if not requests:
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")

parsing_request = DagPriorityParsingRequest(fileloc=path)
session.add(parsing_request)
4 changes: 4 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
BackfillService,
ConfigService,
ConnectionService,
DagParsingService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -1629,6 +1630,9 @@ export type BackfillServiceUnpauseBackfillMutationResult = Awaited<
export type BackfillServiceCancelBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.cancelBackfill>
>;
export type DagParsingServiceReparseDagFileMutationResult = Awaited<
ReturnType<typeof DagParsingService.reparseDagFile>
>;
export type ConnectionServicePatchConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.patchConnection>
>;
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
BackfillService,
ConfigService,
ConnectionService,
DagParsingService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -3175,6 +3176,45 @@ export const useBackfillServiceCancelBackfill = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Reparse Dag File
* Request re-parsing a DAG file.
* @param data The data for the request.
* @param data.fileToken
* @returns null Successful Response
* @throws ApiError
*/
export const useDagParsingServiceReparseDagFile = <
TData = Common.DagParsingServiceReparseDagFileMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
fileToken: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
fileToken: string;
},
TContext
>({
mutationFn: ({ fileToken }) =>
DagParsingService.reparseDagFile({
fileToken,
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Connection
* Update a connection entry.
Expand Down
30 changes: 30 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ import type {
GetVariablesResponse,
PostVariableData,
PostVariableResponse,
ReparseDagFileData,
ReparseDagFileResponse,
GetHealthResponse,
GetVersionResponse,
} from "./types.gen";
Expand Down Expand Up @@ -2964,6 +2966,34 @@ export class VariableService {
}
}

export class DagParsingService {
/**
* Reparse Dag File
* Request re-parsing a DAG file.
* @param data The data for the request.
* @param data.fileToken
* @returns null Successful Response
* @throws ApiError
*/
public static reparseDagFile(
data: ReparseDagFileData,
): CancelablePromise<ReparseDagFileResponse> {
return __request(OpenAPI, {
method: "PUT",
url: "/public/parseDagFile/{file_token}",
path: {
file_token: data.fileToken,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class MonitorService {
/**
* Get Health
Expand Down
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2034,6 +2034,12 @@ export type PostVariableData = {

export type PostVariableResponse = VariableResponse;

export type ReparseDagFileData = {
fileToken: string;
};

export type ReparseDagFileResponse = null;

export type GetHealthResponse = HealthInfoSchema;

export type GetVersionResponse = VersionInfo;
Expand Down Expand Up @@ -4315,6 +4321,33 @@ export type $OpenApiTs = {
};
};
};
"/public/parseDagFile/{file_token}": {
put: {
req: ReparseDagFileData;
res: {
/**
* Successful Response
*/
201: null;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/monitor/health": {
get: {
res: {
Expand Down
80 changes: 80 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_dag_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import pytest
from sqlalchemy import select

from airflow.models import DagBag
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.utils.session import provide_session

from tests_common.test_utils.db import clear_db_dag_parsing_requests

pytestmark = pytest.mark.db_test

if TYPE_CHECKING:
from airflow.models.dag import DAG


class TestDagParsingEndpoint:
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
EXAMPLE_DAG_FILE = os.path.join("airflow", "example_dags", "example_bash_operator.py")
EXAMPLE_DAG_ID = "example_bash_operator"
TEST_DAG_ID = "latest_only"
NOT_READABLE_DAG_ID = "latest_only_with_trigger"
TEST_MULTIPLE_DAGS_ID = "asset_produces_1"

@staticmethod
def clear_db():
clear_db_dag_parsing_requests()

@provide_session
@pytest.fixture(autouse=True)
def setup(self, session=None) -> None:
self.clear_db()

def teardown_method(self) -> None:
self.clear_db()

def test_201_and_400_requests(self, url_safe_serializer, session, test_client):
dagbag = DagBag(dag_folder=self.EXAMPLE_DAG_FILE)
dagbag.sync_to_db()
test_dag: DAG = dagbag.dags[self.TEST_DAG_ID]

url = f"/public/parseDagFile/{url_safe_serializer.dumps(test_dag.fileloc)}"
response = test_client.put(url, headers={"Accept": "application/json"})
assert response.status_code == 201
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert parsing_requests[0].fileloc == test_dag.fileloc

# Duplicate file parsing request
response = test_client.put(url, headers={"Accept": "application/json"})
assert response.status_code == 409
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert parsing_requests[0].fileloc == test_dag.fileloc

def test_bad_file_request(self, url_safe_serializer, session, test_client):
url = f"/public/parseDagFile/{url_safe_serializer.dumps('/some/random/file.py')}"
response = test_client.put(url, headers={"Accept": "application/json"})
assert response.status_code == 404

parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert parsing_requests == []

0 comments on commit 3e427c9

Please sign in to comment.