From 8f9631ece1c21a56fc3d66d9f2f7f68d6688592e Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 19 Nov 2024 12:42:05 +0530 Subject: [PATCH] AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI (#44130) * AIP-84: Migrating GET queued asset events for DAG to fastAPI * fixing tests and server code * fixing parameters * fixing parameters * AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI * Update airflow/api_fastapi/core_api/routes/public/assets.py Co-authored-by: Wei Lee * Update tests/api_fastapi/core_api/routes/public/test_assets.py Co-authored-by: Wei Lee --------- Co-authored-by: Wei Lee --- .../api_connexion/endpoints/asset_endpoint.py | 1 + .../core_api/openapi/v1-generated.yaml | 60 +++++++++++++++++++ .../core_api/routes/public/assets.py | 29 +++++++++ airflow/ui/openapi-gen/queries/common.ts | 3 + airflow/ui/openapi-gen/queries/queries.ts | 47 +++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 35 +++++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 37 ++++++++++++ .../core_api/routes/public/test_assets.py | 35 +++++++++++ 8 files changed, 247 insertions(+) diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index e6ff30bcac029..423e02d1b9c9d 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -204,6 +204,7 @@ def get_dag_asset_queued_event( return queued_event_schema.dump(queued_event) +@mark_fastapi_migration_done @security.requires_access_asset("DELETE") @security.requires_access_dag("GET") @provide_session diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 74703488fa31f..e23f83634c496 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -605,6 +605,66 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - Asset + summary: Delete Dag Asset Queued Event + description: Delete a queued asset event for a DAG. + operationId: delete_dag_asset_queued_event + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: uri + in: path + required: true + schema: + type: string + title: Uri + - name: before + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Before + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/assets/queuedEvent/{uri}: delete: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 85f9efb690c44..b94b825f76294 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -329,3 +329,32 @@ def delete_dag_asset_queued_events( if result.rowcount == 0: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") + + +@assets_router.delete( + "/dags/{dag_id}/assets/queuedEvent/{uri:path}", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), +) +def delete_dag_asset_queued_event( + dag_id: str, + uri: str, + session: Annotated[Session, Depends(get_session)], + before: OptionalDateTimeQuery = None, +): + """Delete a queued asset event for a DAG.""" + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri) + delete_statement = ( + delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") + ) + result = session.execute(delete_statement) + if result.rowcount == 0: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 1d954df4cc015..8b9af7821bfd4 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1185,6 +1185,9 @@ export type VariableServicePatchVariableMutationResult = Awaited< export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited< ReturnType >; +export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited< + ReturnType +>; export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 6b3e350c27cec..b5beed39cb56b 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2629,6 +2629,53 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < }) as unknown as Promise, ...options, }); +/** + * Delete Dag Asset Queued Event + * Delete a queued asset event for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.uri + * @param data.before + * @returns void Successful Response + * @throws ApiError + */ +export const useAssetServiceDeleteDagAssetQueuedEvent = < + TData = Common.AssetServiceDeleteDagAssetQueuedEventMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + before?: string; + dagId: string; + uri: string; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + before?: string; + dagId: string; + uri: string; + }, + TContext + >({ + mutationFn: ({ before, dagId, uri }) => + AssetService.deleteDagAssetQueuedEvent({ + before, + dagId, + uri, + }) as unknown as Promise, + ...options, + }); /** * Delete Asset Queued Events * Delete queued asset events for an asset. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index ed606e960cc4d..c36bdb8c92b6d 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -19,6 +19,8 @@ import type { DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, + DeleteDagAssetQueuedEventData, + DeleteDagAssetQueuedEventResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, HistoricalMetricsData, @@ -375,6 +377,39 @@ export class AssetService { }); } + /** + * Delete Dag Asset Queued Event + * Delete a queued asset event for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.uri + * @param data.before + * @returns void Successful Response + * @throws ApiError + */ + public static deleteDagAssetQueuedEvent( + data: DeleteDagAssetQueuedEventData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "DELETE", + url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}", + path: { + dag_id: data.dagId, + uri: data.uri, + }, + query: { + before: data.before, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Delete Asset Queued Events * Delete queued asset events for an asset. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e5960fc82f9a0..3611df15458c4 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1084,6 +1084,14 @@ export type GetDagAssetQueuedEventData = { export type GetDagAssetQueuedEventResponse = QueuedEventResponse; +export type DeleteDagAssetQueuedEventData = { + before?: string | null; + dagId: string; + uri: string; +}; + +export type DeleteDagAssetQueuedEventResponse = void; + export type DeleteAssetQueuedEventsData = { before?: string | null; uri: string; @@ -1769,6 +1777,35 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + delete: { + req: DeleteDagAssetQueuedEventData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; }; "/public/assets/queuedEvent/{uri}": { delete: { diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 5d753134ac758..3970d7e46700a 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -765,3 +765,38 @@ def test_should_respond_404(self, test_client): assert response.status_code == 404 assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + + +class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): + def test_delete_should_respond_204(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + asset_uri = "s3://bucket/key/1" + self.create_assets(session=session, num=1) + asset_id = 1 + + self._create_asset_dag_run_queues(dag_id, asset_id, session) + adrq = session.query(AssetDagRunQueue).all() + assert len(adrq) == 1 + + response = test_client.delete( + f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}", + ) + + assert response.status_code == 204 + adrq = session.query(AssetDagRunQueue).all() + assert len(adrq) == 0 + + def test_should_respond_404(self, test_client): + dag_id = "not_exists" + asset_uri = "not_exists" + + response = test_client.delete( + f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}", + ) + + assert response.status_code == 404 + assert ( + response.json()["detail"] + == "Queued event with dag_id: `not_exists` and asset uri: `not_exists` was not found" + )