diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 6a38eb27ff45c..0c0a0322e6283 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -403,6 +403,7 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str, session: Session = NEW return dagrun_schema.dump(dag_run) +@mark_fastapi_migration_done @security.requires_access_dag("PUT", DagAccessEntity.RUN) @action_logging @provide_session diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 102567f699763..8241885aff2fe 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -41,6 +41,12 @@ class DAGRunPatchBody(BaseModel): note: str | None = Field(None, max_length=1000) +class DAGRunClearBody(BaseModel): + """DAG Run serializer for clear endpoint body.""" + + dry_run: bool = True + + class DAGRunResponse(BaseModel): """DAG Run serializer for responses.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 890fb6b6c8d0a..56e40a7899a5e 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1164,6 +1164,65 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/clear: + post: + tags: + - DagRun + summary: Clear Dag Run + operationId: clear_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunClearBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/TaskInstanceCollectionResponse' + - $ref: '#/components/schemas/DAGRunResponse' + title: Response Clear Dag Run + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dagSources/{file_token}: get: tags: @@ -4383,6 +4442,15 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. + DAGRunClearBody: + properties: + dry_run: + type: boolean + title: Dry Run + default: true + type: object + title: DAGRunClearBody + description: DAG Run serializer for clear endpoint body. DAGRunPatchBody: properties: state: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 810896806eeae..d95cf76f69aec 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -31,10 +31,15 @@ from airflow.api_fastapi.common.db.common import get_session from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.dag_run import ( + DAGRunClearBody, DAGRunPatchBody, DAGRunPatchStates, DAGRunResponse, ) +from airflow.api_fastapi.core_api.datamodels.task_instances import ( + TaskInstanceCollectionResponse, + TaskInstanceResponse, +) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.models import DAG, DagRun @@ -142,3 +147,51 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post( + "/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]) +) +def clear_dag_run( + dag_id: str, + dag_run_id: str, + body: DAGRunClearBody, + request: Request, + session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: + dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) + if dag_run is None: + raise HTTPException( + 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" + ) + + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + start_date = dag_run.logical_date + end_date = dag_run.logical_date + + if body.dry_run: + task_instances = dag.clear( + start_date=start_date, + end_date=end_date, + task_ids=None, + only_failed=False, + dry_run=True, + session=session, + ) + + return TaskInstanceCollectionResponse( + task_instances=[ + TaskInstanceResponse.model_validate(ti, from_attributes=True) for ti in task_instances + ], + total_entries=len(task_instances), + ) + else: + dag.clear( + start_date=dag_run.start_date, + end_date=dag_run.end_date, + task_ids=None, + only_failed=False, + session=session, + ) + dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) + return DAGRunResponse.model_validate(dag_run_cleared, from_attributes=True) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e48ec0a9a9c57..8ca80d019e024 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1404,6 +1404,40 @@ def set_task_group_state( return altered + @overload + def clear( + self, + *, + dry_run: Literal[True], + task_ids: Collection[str | tuple[str, int]] | None = None, + start_date: datetime | None = None, + end_date: datetime | None = None, + only_failed: bool = False, + only_running: bool = False, + confirm_prompt: bool = False, + dag_run_state: DagRunState = DagRunState.QUEUED, + session: Session = NEW_SESSION, + dag_bag: DagBag | None = None, + exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), + ) -> list[TaskInstance]: ... # pragma: no cover + + @overload + def clear( + self, + *, + task_ids: Collection[str | tuple[str, int]] | None = None, + start_date: datetime | None = None, + end_date: datetime | None = None, + only_failed: bool = False, + only_running: bool = False, + confirm_prompt: bool = False, + dag_run_state: DagRunState = DagRunState.QUEUED, + dry_run: Literal[False] = False, + session: Session = NEW_SESSION, + dag_bag: DagBag | None = None, + exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), + ) -> int: ... # pragma: no cover + @provide_session def clear( self, @@ -1418,7 +1452,7 @@ def clear( session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), - ) -> int | Iterable[TaskInstance]: + ) -> int | list[TaskInstance]: """ Clear a set of task instances associated with the current dag for a specified date range. diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 7758a94a410ea..d1bb56614347c 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1054,6 +1054,9 @@ export type ConnectionServicePostConnectionMutationResult = Awaited< export type ConnectionServiceTestConnectionMutationResult = Awaited< ReturnType >; +export type DagRunServiceClearDagRunMutationResult = Awaited< + ReturnType +>; export type PoolServicePostPoolMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 3b57095183abd..dc00175ba4dcb 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -33,6 +33,7 @@ import { BackfillPostBody, ConnectionBody, DAGPatchBody, + DAGRunClearBody, DAGRunPatchBody, DagRunState, DagWarningType, @@ -1814,6 +1815,52 @@ export const useConnectionServiceTestConnection = < }) as unknown as Promise, ...options, }); +/** + * Clear Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagRunServiceClearDagRun = < + TData = Common.DagRunServiceClearDagRunMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunClearBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunClearBody; + }, + TContext + >({ + mutationFn: ({ dagId, dagRunId, requestBody }) => + DagRunService.clearDagRun({ + dagId, + dagRunId, + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Post Pool * Create a Pool. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 8a8a50bb7437a..2011934848b87 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1303,6 +1303,19 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; +export const $DAGRunClearBody = { + properties: { + dry_run: { + type: "boolean", + title: "Dry Run", + default: true, + }, + }, + type: "object", + title: "DAGRunClearBody", + description: "DAG Run serializer for clear endpoint body.", +} as const; + export const $DAGRunPatchBody = { properties: { state: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2cd106a8e0869..d5efbd4ab2f4d 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -45,6 +45,8 @@ import type { DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, + ClearDagRunData, + ClearDagRunResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, @@ -735,6 +737,36 @@ export class DagRunService { }, }); } + + /** + * Clear Dag Run + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public static clearDagRun( + data: ClearDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "POST", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + body: data.requestBody, + mediaType: "application/json", + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class DagSourceService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 35848f3a88f97..a1ee89237a804 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -273,6 +273,13 @@ export type DAGResponse = { readonly file_token: string; }; +/** + * DAG Run serializer for clear endpoint body. + */ +export type DAGRunClearBody = { + dry_run?: boolean; +}; + /** * DAG Run Serializer for PATCH requests. */ @@ -1114,6 +1121,16 @@ export type PatchDagRunData = { export type PatchDagRunResponse = DAGRunResponse; +export type ClearDagRunData = { + dagId: string; + dagRunId: string; + requestBody: DAGRunClearBody; +}; + +export type ClearDagRunResponse = + | TaskInstanceCollectionResponse + | DAGRunResponse; + export type GetDagSourceData = { accept?: string; fileToken: string; @@ -1978,6 +1995,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": { + post: { + req: ClearDagRunData; + res: { + /** + * Successful Response + */ + 200: TaskInstanceCollectionResponse | DAGRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dagSources/{file_token}": { get: { req: GetDagSourceData; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 64c3512e88b77..eec6955b788c1 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -20,10 +20,12 @@ from datetime import datetime, timezone import pytest +from sqlalchemy import select +from airflow.models import DagRun from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session -from airflow.utils.state import DagRunState +from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags @@ -65,15 +67,20 @@ def setup(dag_maker, session=None): schedule="@daily", start_date=START_DATE, ): - EmptyOperator(task_id="task_1") + task1 = EmptyOperator(task_id="task_1") dag_run1 = dag_maker.create_dagrun( run_id=DAG1_RUN1_ID, state=DAG1_RUN1_STATE, run_type=DAG1_RUN1_RUN_TYPE, triggered_by=DAG1_RUN1_TRIGGERED_BY, ) + dag_run1.note = (DAG1_RUN1_NOTE, 1) + ti1 = dag_run1.get_task_instance(task_id="task_1") + ti1.task = task1 + ti1.state = State.SUCCESS + dag_maker.create_dagrun( run_id=DAG1_RUN2_ID, state=DAG1_RUN2_STATE, @@ -106,6 +113,7 @@ def setup(dag_maker, session=None): dag_maker.dagbag.sync_to_db() dag_maker.dag_model dag_maker.dag_model.has_task_concurrency_limits = True + session.merge(ti1) session.merge(dag_maker.dag_model) session.commit() @@ -254,3 +262,42 @@ def test_delete_dag_run_not_found(self, test_client): assert response.status_code == 404 body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + +class TestClearDagRun: + def test_clear_dag_run(self, test_client): + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json={"dry_run": False} + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_id"] == DAG1_ID + assert body["run_id"] == DAG1_RUN1_ID + assert body["state"] == "queued" + + @pytest.mark.parametrize( + "body", + [{"dry_run": True}, {}], + ) + def test_clear_dag_run_dry_run(self, test_client, session, body): + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body) + assert response.status_code == 200 + body = response.json() + assert body["total_entries"] == 1 + for each in body["task_instances"]: + assert each["state"] == "success" + dag_run = session.scalar(select(DagRun).filter_by(dag_id=DAG1_ID, run_id=DAG1_RUN1_ID)) + assert dag_run.state == DAG1_RUN1_STATE + + def test_clear_dag_run_not_found(self, test_client): + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/invalid/clear", json={"dry_run": False}) + assert response.status_code == 404 + body = response.json() + assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + def test_clear_dag_run_unprocessable_entity(self, test_client): + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear") + assert response.status_code == 422 + body = response.json() + assert body["detail"][0]["msg"] == "Field required" + assert body["detail"][0]["loc"][0] == "body"