Skip to content

Commit

Permalink
AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI (ap…
Browse files Browse the repository at this point in the history
…ache#44130)

* AIP-84: Migrating GET queued asset events for DAG to fastAPI

* fixing tests and server code

* fixing parameters

* fixing parameters

* AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI

* Update airflow/api_fastapi/core_api/routes/public/assets.py

Co-authored-by: Wei Lee <[email protected]>

* Update tests/api_fastapi/core_api/routes/public/test_assets.py

Co-authored-by: Wei Lee <[email protected]>

---------

Co-authored-by: Wei Lee <[email protected]>
  • Loading branch information
amoghrajesh and Lee-W authored Nov 19, 2024
1 parent af958bb commit 8f9631e
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def get_dag_asset_queued_event(
return queued_event_schema.dump(queued_event)


@mark_fastapi_migration_done
@security.requires_access_asset("DELETE")
@security.requires_access_dag("GET")
@provide_session
Expand Down
60 changes: 60 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,66 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
delete:
tags:
- Asset
summary: Delete Dag Asset Queued Event
description: Delete a queued asset event for a DAG.
operationId: delete_dag_asset_queued_event
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: uri
in: path
required: true
schema:
type: string
title: Uri
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'204':
description: Successful Response
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/queuedEvent/{uri}:
delete:
tags:
Expand Down
29 changes: 29 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,32 @@ def delete_dag_asset_queued_events(

if result.rowcount == 0:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found")


@assets_router.delete(
"/dags/{dag_id}/assets/queuedEvent/{uri:path}",
status_code=status.HTTP_204_NO_CONTENT,
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND,
]
),
)
def delete_dag_asset_queued_event(
dag_id: str,
uri: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
):
"""Delete a queued asset event for a DAG."""
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri)
delete_statement = (
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
)
result = session.execute(delete_statement)
if result.rowcount == 0:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found",
)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,9 @@ export type VariableServicePatchVariableMutationResult = Awaited<
export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>
>;
export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited<
ReturnType<typeof AssetService.deleteDagAssetQueuedEvent>
>;
export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteAssetQueuedEvents>
>;
Expand Down
47 changes: 47 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2629,6 +2629,53 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Dag Asset Queued Event
* Delete a queued asset event for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.uri
* @param data.before
* @returns void Successful Response
* @throws ApiError
*/
export const useAssetServiceDeleteDagAssetQueuedEvent = <
TData = Common.AssetServiceDeleteDagAssetQueuedEventMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
before?: string;
dagId: string;
uri: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
before?: string;
dagId: string;
uri: string;
},
TContext
>({
mutationFn: ({ before, dagId, uri }) =>
AssetService.deleteDagAssetQueuedEvent({
before,
dagId,
uri,
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Asset Queued Events
* Delete queued asset events for an asset.
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import type {
DeleteDagAssetQueuedEventsResponse,
GetDagAssetQueuedEventData,
GetDagAssetQueuedEventResponse,
DeleteDagAssetQueuedEventData,
DeleteDagAssetQueuedEventResponse,
DeleteAssetQueuedEventsData,
DeleteAssetQueuedEventsResponse,
HistoricalMetricsData,
Expand Down Expand Up @@ -375,6 +377,39 @@ export class AssetService {
});
}

/**
* Delete Dag Asset Queued Event
* Delete a queued asset event for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.uri
* @param data.before
* @returns void Successful Response
* @throws ApiError
*/
public static deleteDagAssetQueuedEvent(
data: DeleteDagAssetQueuedEventData,
): CancelablePromise<DeleteDagAssetQueuedEventResponse> {
return __request(OpenAPI, {
method: "DELETE",
url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}",
path: {
dag_id: data.dagId,
uri: data.uri,
},
query: {
before: data.before,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}

/**
* Delete Asset Queued Events
* Delete queued asset events for an asset.
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,14 @@ export type GetDagAssetQueuedEventData = {

export type GetDagAssetQueuedEventResponse = QueuedEventResponse;

export type DeleteDagAssetQueuedEventData = {
before?: string | null;
dagId: string;
uri: string;
};

export type DeleteDagAssetQueuedEventResponse = void;

export type DeleteAssetQueuedEventsData = {
before?: string | null;
uri: string;
Expand Down Expand Up @@ -1769,6 +1777,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
delete: {
req: DeleteDagAssetQueuedEventData;
res: {
/**
* Successful Response
*/
204: void;
/**
* Bad Request
*/
400: HTTPExceptionResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/assets/queuedEvent/{uri}": {
delete: {
Expand Down
35 changes: 35 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,38 @@ def test_should_respond_404(self, test_client):

assert response.status_code == 404
assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found"


class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
def test_delete_should_respond_204(self, test_client, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
asset_uri = "s3://bucket/key/1"
self.create_assets(session=session, num=1)
asset_id = 1

self._create_asset_dag_run_queues(dag_id, asset_id, session)
adrq = session.query(AssetDagRunQueue).all()
assert len(adrq) == 1

response = test_client.delete(
f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}",
)

assert response.status_code == 204
adrq = session.query(AssetDagRunQueue).all()
assert len(adrq) == 0

def test_should_respond_404(self, test_client):
dag_id = "not_exists"
asset_uri = "not_exists"

response = test_client.delete(
f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}",
)

assert response.status_code == 404
assert (
response.json()["detail"]
== "Queued event with dag_id: `not_exists` and asset uri: `not_exists` was not found"
)

0 comments on commit 8f9631e

Please sign in to comment.