Skip to content

Commit

Permalink
AIP-84: Migrating GET one queued asset events for DAG to fastAPI (apa…
Browse files Browse the repository at this point in the history
…che#44128)

* AIP-84: Migrating GET queued asset events for DAG to fastAPI

* fixing tests and server code

* fixing parameters

* fixing parameters

* AIP-84: Migrating GET one queued asset events for DAG to fastAPI

* adding annotation

* Update airflow/api_fastapi/core_api/routes/public/assets.py

Co-authored-by: Wei Lee <[email protected]>

* Update tests/api_fastapi/core_api/routes/public/test_assets.py

Co-authored-by: Wei Lee <[email protected]>

* Update airflow/api_fastapi/core_api/routes/public/assets.py

Co-authored-by: Pierre Jeambrun <[email protected]>

---------

Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Pierre Jeambrun <[email protected]>
  • Loading branch information
3 people authored Nov 19, 2024
1 parent 1bcd7ec commit af958bb
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 73 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def _generate_queued_event_where_clause(
return where_clause


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
Expand Down
59 changes: 59 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,65 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/assets/queuedEvent/{uri}:
get:
tags:
- Asset
summary: Get Dag Asset Queued Event
description: Get a queued asset event for a DAG.
operationId: get_dag_asset_queued_event
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: uri
in: path
required: true
schema:
type: string
title: Uri
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/QueuedEventResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/queuedEvent/{uri}:
delete:
tags:
Expand Down
31 changes: 31 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,37 @@ def get_dag_asset_queued_events(
)


@assets_router.get(
"/dags/{dag_id}/assets/queuedEvent/{uri:path}",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
]
),
)
def get_dag_asset_queued_event(
dag_id: str,
uri: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
) -> QueuedEventResponse:
"""Get a queued asset event for a DAG."""
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before)
query = (
select(AssetDagRunQueue)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)
adrq = session.scalar(query)
if not adrq:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found",
)

return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri)


@assets_router.delete(
"/assets/queuedEvent/{uri:path}",
status_code=status.HTTP_204_NO_CONTENT,
Expand Down
24 changes: 24 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@ export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = (
useAssetServiceGetDagAssetQueuedEventsKey,
...(queryKey ?? [{ before, dagId }]),
];
export type AssetServiceGetDagAssetQueuedEventDefaultResponse = Awaited<
ReturnType<typeof AssetService.getDagAssetQueuedEvent>
>;
export type AssetServiceGetDagAssetQueuedEventQueryResult<
TData = AssetServiceGetDagAssetQueuedEventDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useAssetServiceGetDagAssetQueuedEventKey =
"AssetServiceGetDagAssetQueuedEvent";
export const UseAssetServiceGetDagAssetQueuedEventKeyFn = (
{
before,
dagId,
uri,
}: {
before?: string;
dagId: string;
uri: string;
},
queryKey?: Array<unknown>,
) => [
useAssetServiceGetDagAssetQueuedEventKey,
...(queryKey ?? [{ before, dagId, uri }]),
];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
Expand Down
30 changes: 30 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,36 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = (
}),
queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
});
/**
* Get Dag Asset Queued Event
* Get a queued asset event for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.uri
* @param data.before
* @returns QueuedEventResponse Successful Response
* @throws ApiError
*/
export const prefetchUseAssetServiceGetDagAssetQueuedEvent = (
queryClient: QueryClient,
{
before,
dagId,
uri,
}: {
before?: string;
dagId: string;
uri: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({
before,
dagId,
uri,
}),
queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }),
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
36 changes: 36 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,42 @@ export const useAssetServiceGetDagAssetQueuedEvents = <
AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
...options,
});
/**
* Get Dag Asset Queued Event
* Get a queued asset event for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.uri
* @param data.before
* @returns QueuedEventResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceGetDagAssetQueuedEvent = <
TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
before,
dagId,
uri,
}: {
before?: string;
dagId: string;
uri: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
{ before, dagId, uri },
queryKey,
),
queryFn: () =>
AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
...options,
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
36 changes: 36 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,42 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = <
AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
...options,
});
/**
* Get Dag Asset Queued Event
* Get a queued asset event for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.uri
* @param data.before
* @returns QueuedEventResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceGetDagAssetQueuedEventSuspense = <
TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
before,
dagId,
uri,
}: {
before?: string;
dagId: string;
uri: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
{ before, dagId, uri },
queryKey,
),
queryFn: () =>
AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
...options,
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
34 changes: 34 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import type {
GetDagAssetQueuedEventsResponse,
DeleteDagAssetQueuedEventsData,
DeleteDagAssetQueuedEventsResponse,
GetDagAssetQueuedEventData,
GetDagAssetQueuedEventResponse,
DeleteAssetQueuedEventsData,
DeleteAssetQueuedEventsResponse,
HistoricalMetricsData,
Expand Down Expand Up @@ -341,6 +343,38 @@ export class AssetService {
});
}

/**
* Get Dag Asset Queued Event
* Get a queued asset event for a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.uri
* @param data.before
* @returns QueuedEventResponse Successful Response
* @throws ApiError
*/
public static getDagAssetQueuedEvent(
data: GetDagAssetQueuedEventData,
): CancelablePromise<GetDagAssetQueuedEventResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}",
path: {
dag_id: data.dagId,
uri: data.uri,
},
query: {
before: data.before,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}

/**
* Delete Asset Queued Events
* Delete queued asset events for an asset.
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,14 @@ export type DeleteDagAssetQueuedEventsData = {

export type DeleteDagAssetQueuedEventsResponse = void;

export type GetDagAssetQueuedEventData = {
before?: string | null;
dagId: string;
uri: string;
};

export type GetDagAssetQueuedEventResponse = QueuedEventResponse;

export type DeleteAssetQueuedEventsData = {
before?: string | null;
uri: string;
Expand Down Expand Up @@ -1735,6 +1743,33 @@ export type $OpenApiTs = {
};
};
};
"/public/dags/{dag_id}/assets/queuedEvent/{uri}": {
get: {
req: GetDagAssetQueuedEventData;
res: {
/**
* Successful Response
*/
200: QueuedEventResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/assets/queuedEvent/{uri}": {
delete: {
req: DeleteAssetQueuedEventsData;
Expand Down
Loading

0 comments on commit af958bb

Please sign in to comment.