Skip to content

Commit

Permalink
AIP-84 Migrate Modify Dag Run endpoint to FastAPI (apache#42973)
Browse files Browse the repository at this point in the history
* add modify_dag_run

* add tests

* Update airflow/api_fastapi/views/public/dag_run.py

* fix

* Update airflow/api_fastapi/routes/public/dag_run.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* Update airflow/api_fastapi/serializers/dag_run.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* use dagbag

* replace patch with put

* refactor

* use put in tests

* modify to patch

* add update_mask

* refactor update to patch

---------

Co-authored-by: Pierre Jeambrun <[email protected]>
  • Loading branch information
rawwar and pierrejeambrun authored Oct 29, 2024
1 parent 755c10b commit 3e4b344
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 3 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@provide_session
@action_logging
Expand Down
89 changes: 89 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,78 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- DagRun
summary: Patch Dag Run State
description: Modify a DAG Run.
operationId: patch_dag_run_state
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: update_mask
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Update Mask
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunPatchBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -2079,6 +2151,23 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunPatchBody:
properties:
state:
$ref: '#/components/schemas/DAGRunPatchStates'
type: object
required:
- state
title: DAGRunPatchBody
description: DAG Run Serializer for PATCH requests.
DAGRunPatchStates:
type: string
enum:
- queued
- success
- failed
title: DAGRunPatchStates
description: Enum for DAG Run states when updating a DAG Run.
DAGRunResponse:
properties:
run_id:
Expand Down
57 changes: 54 additions & 3 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

from __future__ import annotations

from fastapi import Depends, HTTPException
from fastapi import Depends, HTTPException, Query, Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api.common.mark_tasks import (
set_dag_run_state_to_failed,
set_dag_run_state_to_queued,
set_dag_run_state_to_success,
)
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.models import DagRun
from airflow.api_fastapi.core_api.serializers.dag_run import (
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
)
from airflow.models import DAG, DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")

Expand Down Expand Up @@ -57,3 +66,45 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio
)

session.delete(dag_run)


@dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
async def patch_dag_run_state(
dag_id: str,
dag_run_id: str,
patch_body: DAGRunPatchBody,
session: Annotated[Session, Depends(get_session)],
request: Request,
update_mask: list[str] | None = Query(None),
) -> DAGRunResponse:
"""Modify a DAG Run."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

dag: DAG = request.app.state.dag_bag.get_dag(dag_id)

if not dag:
raise HTTPException(404, f"Dag with id {dag_id} was not found")

if update_mask:
if update_mask != ["state"]:
raise HTTPException(400, "Only `state` field can be updated through the REST API")
else:
update_mask = ["state"]

for attr_name in update_mask:
if attr_name == "state":
state = getattr(patch_body, attr_name)
if state == DAGRunPatchStates.SUCCESS:
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True)
elif state == DAGRunPatchStates.QUEUED:
set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True)
else:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True)

dag_run = session.get(DagRun, dag_run.id)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
15 changes: 15 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,28 @@
from __future__ import annotations

from datetime import datetime
from enum import Enum

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType


class DAGRunPatchStates(str, Enum):
"""Enum for DAG Run states when updating a DAG Run."""

QUEUED = DagRunState.QUEUED
SUCCESS = DagRunState.SUCCESS
FAILED = DagRunState.FAILED


class DAGRunPatchBody(BaseModel):
"""DAG Run Serializer for PATCH requests."""

state: DAGRunPatchStates


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,9 @@ export type DagServicePatchDagMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type DagRunServicePatchDagRunStateMutationResult = Awaited<
ReturnType<typeof DagRunService.patchDagRunState>
>;
export type PoolServicePatchPoolMutationResult = Awaited<
ReturnType<typeof PoolService.patchPool>
>;
Expand Down
52 changes: 52 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "../requests/services.gen";
import {
DAGPatchBody,
DAGRunPatchBody,
DagRunState,
PoolPatchBody,
PoolPostBody,
Expand Down Expand Up @@ -948,6 +949,57 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Dag Run State
* Modify a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @param data.updateMask
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const useDagRunServicePatchDagRunState = <
TData = Common.DagRunServicePatchDagRunStateMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
updateMask?: string[];
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
updateMask?: string[];
},
TContext
>({
mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) =>
DagRunService.patchDagRunState({
dagId,
dagRunId,
requestBody,
updateMask,
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Pool
* Update a Pool.
Expand Down
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,25 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;

export const $DAGRunPatchBody = {
properties: {
state: {
$ref: "#/components/schemas/DAGRunPatchStates",
},
},
type: "object",
required: ["state"],
title: "DAGRunPatchBody",
description: "DAG Run Serializer for PATCH requests.",
} as const;

export const $DAGRunPatchStates = {
type: "string",
enum: ["queued", "success", "failed"],
title: "DAGRunPatchStates",
description: "Enum for DAG Run states when updating a DAG Run.",
} as const;

export const $DAGRunResponse = {
properties: {
run_id: {
Expand Down
38 changes: 38 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import type {
GetDagRunResponse,
DeleteDagRunData,
DeleteDagRunResponse,
PatchDagRunStateData,
PatchDagRunStateResponse,
GetHealthResponse,
DeletePoolData,
DeletePoolResponse,
Expand Down Expand Up @@ -672,6 +674,42 @@ export class DagRunService {
},
});
}

/**
* Patch Dag Run State
* Modify a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @param data.updateMask
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
public static patchDagRunState(
data: PatchDagRunStateData,
): CancelablePromise<PatchDagRunStateResponse> {
return __request(OpenAPI, {
method: "PATCH",
url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
path: {
dag_id: data.dagId,
dag_run_id: data.dagRunId,
},
query: {
update_mask: data.updateMask,
},
body: data.requestBody,
mediaType: "application/json",
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class MonitorService {
Expand Down
Loading

0 comments on commit 3e4b344

Please sign in to comment.