diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 986ad6bd61747..e6ff30bcac029 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -181,6 +181,7 @@ def _generate_queued_event_where_clause( return where_clause +@mark_fastapi_migration_done @security.requires_access_asset("GET") @security.requires_access_dag("GET") @provide_session diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9b42d5c3a8145..74703488fa31f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -546,6 +546,65 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/assets/queuedEvent/{uri}: + get: + tags: + - Asset + summary: Get Dag Asset Queued Event + description: Get a queued asset event for a DAG. + operationId: get_dag_asset_queued_event + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: uri + in: path + required: true + schema: + type: string + title: Uri + - name: before + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Before + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/assets/queuedEvent/{uri}: delete: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index a8a1afea32a9b..85f9efb690c44 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -254,6 +254,37 @@ def get_dag_asset_queued_events( ) +@assets_router.get( + "/dags/{dag_id}/assets/queuedEvent/{uri:path}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_dag_asset_queued_event( + dag_id: str, + uri: str, + session: Annotated[Session, Depends(get_session)], + before: OptionalDateTimeQuery = None, +) -> QueuedEventResponse: + """Get a queued asset event for a DAG.""" + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before) + query = ( + select(AssetDagRunQueue) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + adrq = session.scalar(query) + if not adrq: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + ) + + return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + + @assets_router.delete( "/assets/queuedEvent/{uri:path}", status_code=status.HTTP_204_NO_CONTENT, diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 14377a81a968c..1d954df4cc015 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -151,6 +151,30 @@ export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = ( useAssetServiceGetDagAssetQueuedEventsKey, ...(queryKey ?? [{ before, dagId }]), ]; +export type AssetServiceGetDagAssetQueuedEventDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetDagAssetQueuedEventQueryResult< + TData = AssetServiceGetDagAssetQueuedEventDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetDagAssetQueuedEventKey = + "AssetServiceGetDagAssetQueuedEvent"; +export const UseAssetServiceGetDagAssetQueuedEventKeyFn = ( + { + before, + dagId, + uri, + }: { + before?: string; + dagId: string; + uri: string; + }, + queryKey?: Array, +) => [ + useAssetServiceGetDagAssetQueuedEventKey, + ...(queryKey ?? [{ before, dagId, uri }]), +]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 0c522f36e4330..3fef2a26d11ba 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -191,6 +191,36 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( }), queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }), }); +/** + * Get Dag Asset Queued Event + * Get a queued asset event for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.uri + * @param data.before + * @returns QueuedEventResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetDagAssetQueuedEvent = ( + queryClient: QueryClient, + { + before, + dagId, + uri, + }: { + before?: string; + dagId: string; + uri: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({ + before, + dagId, + uri, + }), + queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }), + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index c461db254de2d..6b3e350c27cec 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -246,6 +246,42 @@ export const useAssetServiceGetDagAssetQueuedEvents = < AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, ...options, }); +/** + * Get Dag Asset Queued Event + * Get a queued asset event for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.uri + * @param data.before + * @returns QueuedEventResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetDagAssetQueuedEvent = < + TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + uri, + }: { + before?: string; + dagId: string; + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( + { before, dagId, uri }, + queryKey, + ), + queryFn: () => + AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 1b81422281535..decdbdd375998 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -228,6 +228,42 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = < AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, ...options, }); +/** + * Get Dag Asset Queued Event + * Get a queued asset event for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.uri + * @param data.before + * @returns QueuedEventResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetDagAssetQueuedEventSuspense = < + TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + dagId, + uri, + }: { + before?: string; + dagId: string; + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( + { before, dagId, uri }, + queryKey, + ), + queryFn: () => + AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2d75d9811b127..ed606e960cc4d 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -17,6 +17,8 @@ import type { GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, + GetDagAssetQueuedEventData, + GetDagAssetQueuedEventResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, HistoricalMetricsData, @@ -341,6 +343,38 @@ export class AssetService { }); } + /** + * Get Dag Asset Queued Event + * Get a queued asset event for a DAG. + * @param data The data for the request. + * @param data.dagId + * @param data.uri + * @param data.before + * @returns QueuedEventResponse Successful Response + * @throws ApiError + */ + public static getDagAssetQueuedEvent( + data: GetDagAssetQueuedEventData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}", + path: { + dag_id: data.dagId, + uri: data.uri, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Delete Asset Queued Events * Delete queued asset events for an asset. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 85a8a7140874e..e5960fc82f9a0 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1076,6 +1076,14 @@ export type DeleteDagAssetQueuedEventsData = { export type DeleteDagAssetQueuedEventsResponse = void; +export type GetDagAssetQueuedEventData = { + before?: string | null; + dagId: string; + uri: string; +}; + +export type GetDagAssetQueuedEventResponse = QueuedEventResponse; + export type DeleteAssetQueuedEventsData = { before?: string | null; uri: string; @@ -1735,6 +1743,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/assets/queuedEvent/{uri}": { + get: { + req: GetDagAssetQueuedEventData; + res: { + /** + * Successful Response + */ + 200: QueuedEventResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/assets/queuedEvent/{uri}": { delete: { req: DeleteAssetQueuedEventsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 64a1c8a06d6a7..5d753134ac758 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -367,59 +367,6 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 - @pytest.mark.parametrize( - "query_params, expected_detail", - [ - ( - {"limit": 1, "offset": -1}, - [ - { - "type": "greater_than_equal", - "loc": ["query", "offset"], - "msg": "Input should be greater than or equal to 0", - "input": "-1", - "ctx": {"ge": 0}, - } - ], - ), - ( - {"limit": -1, "offset": 1}, - [ - { - "type": "greater_than_equal", - "loc": ["query", "limit"], - "msg": "Input should be greater than or equal to 0", - "input": "-1", - "ctx": {"ge": 0}, - } - ], - ), - ( - {"limit": -1, "offset": -1}, - [ - { - "type": "greater_than_equal", - "loc": ["query", "limit"], - "msg": "Input should be greater than or equal to 0", - "input": "-1", - "ctx": {"ge": 0}, - }, - { - "type": "greater_than_equal", - "loc": ["query", "offset"], - "msg": "Input should be greater than or equal to 0", - "input": "-1", - "ctx": {"ge": 0}, - }, - ], - ), - ], - ) - def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): - response = test_client.get("/public/assets", params=query_params) - assert response.status_code == 422 - assert response.json()["detail"] == expected_detail - class TestGetAssetEvents(TestAssets): def test_should_respond_200(self, test_client, session): @@ -772,6 +719,26 @@ def test_invalid_attr_not_allowed(self, test_client, session): assert response.status_code == 422 + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra(self, test_client, session): + self.create_assets() + event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} + response = test_client.post("/public/assets/events", json=event_payload) + assert response.status_code == 200 + assert response.json() == { + "id": mock.ANY, + "asset_id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***", "from_rest_api": True}, + "source_task_id": None, + "source_dag_id": None, + "source_run_id": None, + "source_map_index": -1, + "created_dagruns": [], + "timestamp": self.default_time.replace("+00:00", "Z"), + } + class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): @pytest.mark.usefixtures("time_freezer") @@ -798,23 +765,3 @@ def test_should_respond_404(self, test_client): assert response.status_code == 404 assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" - - @pytest.mark.usefixtures("time_freezer") - @pytest.mark.enable_redact - def test_should_mask_sensitive_extra(self, test_client, session): - self.create_assets() - event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} - response = test_client.post("/public/assets/events", json=event_payload) - assert response.status_code == 200 - assert response.json() == { - "id": mock.ANY, - "asset_id": 1, - "uri": "s3://bucket/key/1", - "extra": {"password": "***", "from_rest_api": True}, - "source_task_id": None, - "source_dag_id": None, - "source_run_id": None, - "source_map_index": -1, - "created_dagruns": [], - "timestamp": self.default_time.replace("+00:00", "Z"), - }