diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 047f3f557bc25..9721157998564 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -102,7 +102,6 @@ class AssetEventResponse(BaseModel): id: int asset_id: int - uri: str extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None @@ -127,8 +126,8 @@ class AssetEventCollectionResponse(BaseModel): class QueuedEventResponse(BaseModel): """Queued Event serializer for responses..""" - uri: str dag_id: str + asset_id: int created_at: datetime @@ -142,7 +141,7 @@ class QueuedEventCollectionResponse(BaseModel): class CreateAssetEventsBody(BaseModel): """Create asset events request.""" - uri: str + asset_id: int extra: dict = Field(default_factory=dict) @field_validator("extra", mode="after") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 3b6d55c93137c..00f1c5aa5106f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -594,7 +594,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/queuedEvents/{uri}: + /public/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -602,12 +602,12 @@ paths: description: Get queued asset events for an asset. operationId: get_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -654,12 +654,12 @@ paths: description: Delete queued asset events for an asset. operationId: delete_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -695,7 +695,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/{uri}: + /public/assets/{asset_id}: get: tags: - Asset @@ -703,12 +703,12 @@ paths: description: Get an asset. operationId: get_asset parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id responses: '200': description: Successful Response @@ -846,7 +846,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvents/{uri}: + /public/dags/{dag_id}/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -860,12 +860,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -918,12 +918,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -5982,9 +5982,6 @@ components: asset_id: type: integer title: Asset Id - uri: - type: string - title: Uri extra: anyOf: - type: object @@ -6021,7 +6018,6 @@ components: required: - id - asset_id - - uri - source_map_index - created_dagruns - timestamp @@ -6548,16 +6544,16 @@ components: description: Connection Test serializer for responses. CreateAssetEventsBody: properties: - uri: - type: string - title: Uri + asset_id: + type: integer + title: Asset Id extra: type: object title: Extra additionalProperties: false type: object required: - - uri + - asset_id title: CreateAssetEventsBody description: Create asset events request. DAGCollectionResponse: @@ -8273,20 +8269,20 @@ components: description: Queued Event Collection serializer for responses. QueuedEventResponse: properties: - uri: - type: string - title: Uri dag_id: type: string title: Dag Id + asset_id: + type: integer + title: Asset Id created_at: type: string format: date-time title: Created At type: object required: - - uri - dag_id + - asset_id - created_at title: QueuedEventResponse description: Queued Event serializer for responses.. diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 779c32f6a4c1b..c8cc9fb0f7695 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -59,20 +59,16 @@ def _generate_queued_event_where_clause( *, + asset_id: int | None = None, dag_id: str | None = None, - uri: str | None = None, before: datetime | str | None = None, ) -> list: """Get AssetDagRunQueue where clause.""" where_clause = [] if dag_id is not None: where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) + if asset_id is not None: + where_clause.append(AssetDagRunQueue.asset_id == asset_id) if before is not None: where_clause.append(AssetDagRunQueue.created_at < before) return where_clause @@ -227,9 +223,9 @@ def create_asset_event( session: SessionDep, ) -> AssetEventResponse: """Create asset events.""" - asset_model = session.scalar(select(AssetModel).where(AssetModel.uri == body.uri).limit(1)) + asset_model = session.scalar(select(AssetModel).where(AssetModel.id == body.asset_id).limit(1)) if not asset_model: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") timestamp = timezone.utcnow() assets_event = asset_manager.register_asset_change( @@ -240,41 +236,35 @@ def create_asset_event( ) if not assets_event: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") - return assets_event + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") + return AssetEventResponse.model_validate(assets_event) @assets_router.get( - "/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventCollectionResponse: """Get queued asset events for an asset.""" - print(f"uri: {uri}") - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Queue event with asset_id: `{asset_id}` was not found", + ) queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -284,33 +274,29 @@ def get_asset_queued_events( @assets_router.get( - "/assets/{uri:path}", + "/assets/{asset_id}", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset( - uri: str, + asset_id: int, session: SessionDep, ) -> AssetResponse: """Get an asset.""" asset = session.scalar( select(AssetModel) - .where(AssetModel.uri == uri) + .where(AssetModel.id == asset_id) .options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks)) ) if asset is None: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with ID: `{asset_id}` was not found") return AssetResponse.model_validate(asset) @assets_router.get( "/dags/{dag_id}/assets/queuedEvents", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_events( dag_id: str, @@ -319,20 +305,16 @@ def get_dag_asset_queued_events( ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -342,56 +324,47 @@ def get_dag_asset_queued_events( @assets_router.get( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventResponse: """Get a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before) - query = ( - select(AssetDagRunQueue) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) adrq = session.scalar(query) if not adrq: raise HTTPException( status.HTTP_404_NOT_FOUND, - f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) - return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=asset_id) @assets_router.delete( - "/assets/queuedEvents/{uri:path}", + "/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def delete_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete queued asset events for an asset.""" - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) delete_stmt = delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") result = session.execute(delete_stmt) if result.rowcount == 0: - raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail=f"Queue event with asset_id: `{asset_id}` was not found", + ) @assets_router.delete( @@ -419,7 +392,7 @@ def delete_dag_asset_queued_events( @assets_router.delete( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, responses=create_openapi_http_exception_doc( [ @@ -430,12 +403,12 @@ def delete_dag_asset_queued_events( ) def delete_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, asset_id=asset_id) delete_statement = ( delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") ) @@ -443,5 +416,5 @@ def delete_dag_asset_queued_event( if result.rowcount == 0: raise HTTPException( status.HTTP_404_NOT_FOUND, - detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + detail=f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b759c0f2f23a1..ed1d6066bdcd6 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -174,16 +174,16 @@ export const useAssetServiceGetAssetQueuedEventsKey = "AssetServiceGetAssetQueuedEvents"; export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: Array, ) => [ useAssetServiceGetAssetQueuedEventsKey, - ...(queryKey ?? [{ before, uri }]), + ...(queryKey ?? [{ assetId, before }]), ]; export type AssetServiceGetAssetDefaultResponse = Awaited< ReturnType @@ -195,12 +195,12 @@ export type AssetServiceGetAssetQueryResult< export const useAssetServiceGetAssetKey = "AssetServiceGetAsset"; export const UseAssetServiceGetAssetKeyFn = ( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: Array, -) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; +) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ assetId }])]; export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< ReturnType >; @@ -234,18 +234,18 @@ export const useAssetServiceGetDagAssetQueuedEventKey = "AssetServiceGetDagAssetQueuedEvent"; export const UseAssetServiceGetDagAssetQueuedEventKeyFn = ( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: Array, ) => [ useAssetServiceGetDagAssetQueuedEventKey, - ...(queryKey ?? [{ before, dagId, uri }]), + ...(queryKey ?? [{ assetId, before, dagId }]), ]; export type ConfigServiceGetConfigsDefaultResponse = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index eab8ea6b3567d..4bb01a7feaad8 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -219,7 +219,7 @@ export const prefetchUseAssetServiceGetAssetEvents = ( * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -227,36 +227,39 @@ export const prefetchUseAssetServiceGetAssetEvents = ( export const prefetchUseAssetServiceGetAssetQueuedEvents = ( queryClient: QueryClient, { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ + assetId, + before, + }), + queryFn: () => AssetService.getAssetQueuedEvents({ assetId, before }), }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ export const prefetchUseAssetServiceGetAsset = ( queryClient: QueryClient, { - uri, + assetId, }: { - uri: string; + assetId: number; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), - queryFn: () => AssetService.getAsset({ uri }), + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }), + queryFn: () => AssetService.getAsset({ assetId }), }); /** * Get Dag Asset Queued Events @@ -289,7 +292,7 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -297,22 +300,23 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( export const prefetchUseAssetServiceGetDagAssetQueuedEvent = ( queryClient: QueryClient, { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({ + assetId, before, dagId, - uri, }), - queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }), + queryFn: () => + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }), }); /** * Get Configs diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e90ae58e6b0e3..f8c8bcd7a58b3 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -279,7 +279,7 @@ export const useAssetServiceGetAssetEvents = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -290,28 +290,29 @@ export const useAssetServiceGetAssetQueuedEvents = < TQueryKey extends Array = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -321,16 +322,16 @@ export const useAssetServiceGetAsset = < TQueryKey extends Array = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -371,7 +372,7 @@ export const useAssetServiceGetDagAssetQueuedEvents = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -382,24 +383,24 @@ export const useAssetServiceGetDagAssetQueuedEvent = < TQueryKey extends Array = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** @@ -3801,7 +3802,7 @@ export const useVariableServicePatchVariable = < * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3816,8 +3817,8 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >, @@ -3828,15 +3829,15 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >({ - mutationFn: ({ before, uri }) => + mutationFn: ({ assetId, before }) => AssetService.deleteAssetQueuedEvents({ + assetId, before, - uri, }) as unknown as Promise, ...options, }); @@ -3887,7 +3888,7 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3902,9 +3903,9 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >, @@ -3915,17 +3916,17 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >({ - mutationFn: ({ before, dagId, uri }) => + mutationFn: ({ assetId, before, dagId }) => AssetService.deleteDagAssetQueuedEvent({ + assetId, before, dagId, - uri, }) as unknown as Promise, ...options, }); diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index e7b213387607e..cd6d7c5a7983f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -254,7 +254,7 @@ export const useAssetServiceGetAssetEventsSuspense = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -265,28 +265,29 @@ export const useAssetServiceGetAssetQueuedEventsSuspense = < TQueryKey extends Array = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -296,16 +297,16 @@ export const useAssetServiceGetAssetSuspense = < TQueryKey extends Array = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -346,7 +347,7 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -357,24 +358,24 @@ export const useAssetServiceGetDagAssetQueuedEventSuspense = < TQueryKey extends Array = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 622ee94d697fa..965355b0c1937 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -180,10 +180,6 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, - uri: { - type: "string", - title: "Uri", - }, extra: { anyOf: [ { @@ -249,7 +245,6 @@ export const $AssetEventResponse = { required: [ "id", "asset_id", - "uri", "source_map_index", "created_dagruns", "timestamp", @@ -1025,9 +1020,9 @@ export const $ConnectionTestResponse = { export const $CreateAssetEventsBody = { properties: { - uri: { - type: "string", - title: "Uri", + asset_id: { + type: "integer", + title: "Asset Id", }, extra: { type: "object", @@ -1036,7 +1031,7 @@ export const $CreateAssetEventsBody = { }, additionalProperties: false, type: "object", - required: ["uri"], + required: ["asset_id"], title: "CreateAssetEventsBody", description: "Create asset events request.", } as const; @@ -3640,14 +3635,14 @@ export const $QueuedEventCollectionResponse = { export const $QueuedEventResponse = { properties: { - uri: { - type: "string", - title: "Uri", - }, dag_id: { type: "string", title: "Dag Id", }, + asset_id: { + type: "integer", + title: "Asset Id", + }, created_at: { type: "string", format: "date-time", @@ -3655,7 +3650,7 @@ export const $QueuedEventResponse = { }, }, type: "object", - required: ["uri", "dag_id", "created_at"], + required: ["dag_id", "asset_id", "created_at"], title: "QueuedEventResponse", description: "Queued Event serializer for responses..", } as const; diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c855ca580924e..f0446ae0fd858 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -370,7 +370,7 @@ export class AssetService { * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -380,9 +380,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -400,7 +400,7 @@ export class AssetService { * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -410,9 +410,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "DELETE", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -430,7 +430,7 @@ export class AssetService { * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -439,9 +439,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/{uri}", + url: "/public/assets/{asset_id}", path: { - uri: data.uri, + asset_id: data.assetId, }, errors: { 401: "Unauthorized", @@ -517,7 +517,7 @@ export class AssetService { * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -527,10 +527,10 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -549,7 +549,7 @@ export class AssetService { * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -559,10 +559,10 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "DELETE", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 96a159bad073d..88f6a2fc763c1 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -60,7 +60,6 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; - uri: string; extra?: { [key: string]: unknown; } | null; @@ -265,7 +264,7 @@ export type ConnectionTestResponse = { * Create asset events request. */ export type CreateAssetEventsBody = { - uri: string; + asset_id: number; extra?: { [key: string]: unknown; }; @@ -899,8 +898,8 @@ export type QueuedEventCollectionResponse = { * Queued Event serializer for responses.. */ export type QueuedEventResponse = { - uri: string; dag_id: string; + asset_id: number; created_at: string; }; @@ -1353,21 +1352,21 @@ export type CreateAssetEventData = { export type CreateAssetEventResponse = AssetEventResponse; export type GetAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; export type DeleteAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type DeleteAssetQueuedEventsResponse = void; export type GetAssetData = { - uri: string; + assetId: number; }; export type GetAssetResponse = AssetResponse; @@ -1387,17 +1386,17 @@ export type DeleteDagAssetQueuedEventsData = { export type DeleteDagAssetQueuedEventsResponse = void; export type GetDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type GetDagAssetQueuedEventResponse = QueuedEventResponse; export type DeleteDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type DeleteDagAssetQueuedEventResponse = void; @@ -2226,7 +2225,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/queuedEvents/{uri}": { + "/public/assets/{asset_id}/queuedEvents": { get: { req: GetAssetQueuedEventsData; res: { @@ -2278,7 +2277,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/{uri}": { + "/public/assets/{asset_id}": { get: { req: GetAssetData; res: { @@ -2361,7 +2360,7 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvents/{uri}": { + "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents": { get: { req: GetDagAssetQueuedEventData; res: { diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index c7c2c2405d2f6..46c769640a8f3 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import urllib from collections.abc import Generator from datetime import datetime from unittest import mock @@ -543,7 +542,6 @@ def test_should_respond_200(self, test_client, session): { "id": 1, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -566,7 +564,6 @@ def test_should_respond_200(self, test_client, session): { "id": 2, "asset_id": 2, - "uri": "s3://bucket/key/2", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -618,17 +615,17 @@ def test_order_by_raises_400_for_invalid_attr(self, test_client, session): assert response.json()["detail"] == msg @pytest.mark.parametrize( - "params, expected_asset_uris", + "params, expected_asset_ids", [ # Limit test data - ({"limit": "1"}, ["s3://bucket/key/1"]), - ({"limit": "100"}, [f"s3://bucket/key/{i}" for i in range(1, 101)]), + ({"limit": "1"}, [1]), + ({"limit": "100"}, list(range(1, 101))), # Offset test data - ({"offset": "1"}, [f"s3://bucket/key/{i}" for i in range(2, 102)]), - ({"offset": "3"}, [f"s3://bucket/key/{i}" for i in range(4, 104)]), + ({"offset": "1"}, list(range(2, 102))), + ({"offset": "3"}, list(range(4, 104))), ], ) - def test_limit_and_offset(self, test_client, params, expected_asset_uris): + def test_limit_and_offset(self, test_client, params, expected_asset_ids): self.create_assets(num=110) self.create_assets_events(num=110) self.create_dag_run(num=110) @@ -637,8 +634,8 @@ def test_limit_and_offset(self, test_client, params, expected_asset_uris): response = test_client.get("/public/assets/events", params=params) assert response.status_code == 200 - asset_uris = [asset["uri"] for asset in response.json()["asset_events"]] - assert asset_uris == expected_asset_uris + asset_ids = [asset["id"] for asset in response.json()["asset_events"]] + assert asset_ids == expected_asset_ids @pytest.mark.usefixtures("time_freezer") @pytest.mark.enable_redact @@ -655,7 +652,6 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "id": 1, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -678,7 +674,6 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "id": 2, "asset_id": 2, - "uri": "s3://bucket/key/2", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -704,24 +699,13 @@ def test_should_mask_sensitive_extra(self, test_client, session): class TestGetAssetEndpoint(TestAssets): - @pytest.mark.parametrize( - "url", - [ - urllib.parse.quote( - "s3://bucket/key/1", safe="" - ), # api should cover raw as well as unquoted case like legacy - "s3://bucket/key/1", - ], - ) @provide_session - def test_should_respond_200(self, test_client, url, session): + def test_should_respond_200(self, test_client, session): self.create_assets(num=1) assert session.query(AssetModel).count() == 1 tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE) with assert_queries_count(6): - response = test_client.get( - f"/public/assets/{url}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 200 assert response.json() == { "id": 1, @@ -737,21 +721,16 @@ def test_should_respond_200(self, test_client, url, session): } def test_should_respond_404(self, test_client): - response = test_client.get( - f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 404 - assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + assert response.json()["detail"] == "The Asset with ID: `1` was not found" @pytest.mark.usefixtures("time_freezer") @pytest.mark.enable_redact def test_should_mask_sensitive_extra(self, test_client, session): self.create_assets_with_sensitive_extra() tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE) - uri = "s3://bucket/key/1" - response = test_client.get( - f"/public/assets/{uri}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 200 assert response.json() == { "id": 1, @@ -808,9 +787,9 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): assert response.json() == { "queued_events": [ { - "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), - "uri": "s3://bucket/key/1", + "asset_id": 1, "dag_id": "dag", + "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), } ], "total_entries": 1, @@ -875,13 +854,12 @@ class TestPostAssetEvents(TestAssets): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session): self.create_assets() - event_payload = {"uri": "s3://bucket/key/1", "extra": {"foo": "bar"}} + event_payload = {"asset_id": 1, "extra": {"foo": "bar"}} response = test_client.post("/public/assets/events", json=event_payload) assert response.status_code == 200 assert response.json() == { "id": mock.ANY, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"foo": "bar", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -902,13 +880,12 @@ def test_invalid_attr_not_allowed(self, test_client, session): @pytest.mark.enable_redact def test_should_mask_sensitive_extra(self, test_client, session): self.create_assets(session) - event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} + event_payload = {"asset_id": 1, "extra": {"password": "bar"}} response = test_client.post("/public/assets/events", json=event_payload) assert response.status_code == 200 assert response.json() == { "id": mock.ANY, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"password": "***", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -925,34 +902,26 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id self.create_assets(session=session, num=1) - uri = "s3://bucket/key/1" asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) - response = test_client.get( - f"/public/assets/queuedEvents/{uri}", - ) + response = test_client.get(f"/public/assets/{asset_id}/queuedEvents/") assert response.status_code == 200 assert response.json() == { "queued_events": [ { - "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), - "uri": "s3://bucket/key/1", + "asset_id": asset_id, "dag_id": "dag", + "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), } ], "total_entries": 1, } def test_should_respond_404(self, test_client): - uri = "not_exists" - - response = test_client.get( - f"/public/assets/queuedEvents/{uri}", - ) - + response = test_client.get("/public/assets/1/queuedEvents") assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + assert response.json()["detail"] == "Queue event with asset_id: `1` was not found" class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): @@ -960,33 +929,25 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): def test_should_respond_204(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - uri = "s3://bucket/key/1" self.create_assets(session=session, num=1) asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) - response = test_client.delete( - f"/public/assets/queuedEvents/{uri}", - ) + assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is not None + response = test_client.delete(f"/public/assets/{asset_id}/queuedEvents") assert response.status_code == 204 - assert session.query(AssetDagRunQueue).filter_by(asset_id=1).first() is None + assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is None def test_should_respond_404(self, test_client): - uri = "not_exists" - - response = test_client.delete( - f"/public/assets/queuedEvents/{uri}", - ) - + response = test_client.delete("/public/assets/1/queuedEvents") assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + assert response.json()["detail"] == "Queue event with asset_id: `1` was not found" class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): def test_delete_should_respond_204(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - asset_uri = "s3://bucket/key/1" self.create_assets(session=session, num=1) asset_id = 1 @@ -995,7 +956,7 @@ def test_delete_should_respond_204(self, test_client, session, create_dummy_dag) assert len(adrq) == 1 response = test_client.delete( - f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}", + f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", ) assert response.status_code == 204 @@ -1004,14 +965,14 @@ def test_delete_should_respond_204(self, test_client, session, create_dummy_dag) def test_should_respond_404(self, test_client): dag_id = "not_exists" - asset_uri = "not_exists" + asset_id = 1 response = test_client.delete( - f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}", + f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents/", ) assert response.status_code == 404 assert ( response.json()["detail"] - == "Queued event with dag_id: `not_exists` and asset uri: `not_exists` was not found" + == "Queued event with dag_id: `not_exists` and asset_id: `1` was not found" ) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 995b98a61dda8..ee57b77c48926 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -941,7 +941,7 @@ def test_delete_dag_run_not_found(self, test_client): class TestGetDagRunAssetTriggerEvents: def test_should_respond_200(self, test_client, dag_maker, session): - asset1 = Asset(uri="ds1") + asset1 = Asset(name="ds1", uri="file:///da1") with dag_maker(dag_id="source_dag", start_date=START_DATE1, session=session): EmptyOperator(task_id="task", outlets=[asset1]) @@ -975,7 +975,6 @@ def test_should_respond_200(self, test_client, dag_maker, session): { "timestamp": from_datetime_to_zulu(event.timestamp), "asset_id": asset1_id, - "uri": asset1.uri, "extra": {}, "id": event.id, "source_dag_id": ti.dag_id,