Skip to content

Commit

Permalink
AIP-84 Get Mapped Task Instance Tries (apache#44303)
Browse files Browse the repository at this point in the history
* adding get_task_instance_tries code

* rebase

* rebase

* adding get_mapped_task_instance_tries code

* adding mark_fastapi_migration_done

* rebasing test cases

* rebasing task_instances.py

* updating map_index default value

---------

Co-authored-by: kandharvishnuu <[email protected]>
  • Loading branch information
kandharvishnu and kandharvishnuu authored Nov 26, 2024
1 parent 624a942 commit a238d06
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 5 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ def _query(orm_object):
)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_mapped_task_instance_tries(
Expand Down
69 changes: 69 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4146,6 +4146,75 @@ paths:
schema:
type: string
title: Task Id
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceHistoryCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries:
get:
tags:
- Task Instance
summary: Get Mapped Task Instance Tries
operationId: get_mapped_task_instance_tries
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: map_index
in: path
required: true
schema:
type: integer
title: Map Index
responses:
'200':
description: Successful Response
Expand Down
22 changes: 21 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ def get_task_instance_tries(
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
map_index: int = -1,
) -> TaskInstanceHistoryCollectionResponse:
"""Get list of task instances history."""
map_index = -1

def _query(orm_object: Base) -> Select:
query = select(orm_object).where(
Expand Down Expand Up @@ -278,6 +278,26 @@ def _query(orm_object: Base) -> Select:
)


@task_instances_router.get(
task_instances_prefix + "/{task_id}/{map_index}/tries",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def get_mapped_task_instance_tries(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
map_index: int,
) -> TaskInstanceHistoryCollectionResponse:
return get_task_instance_tries(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
map_index=map_index,
session=session,
)


@task_instances_router.get(
task_instances_prefix + "/{task_id}/{map_index}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
29 changes: 28 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -979,16 +979,43 @@ export const UseTaskInstanceServiceGetTaskInstanceTriesKeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceTriesKey,
...(queryKey ?? [{ dagId, dagRunId, taskId }]),
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetMappedTaskInstanceTriesDefaultResponse =
Awaited<ReturnType<typeof TaskInstanceService.getMappedTaskInstanceTries>>;
export type TaskInstanceServiceGetMappedTaskInstanceTriesQueryResult<
TData = TaskInstanceServiceGetMappedTaskInstanceTriesDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetMappedTaskInstanceTriesKey =
"TaskInstanceServiceGetMappedTaskInstanceTries";
export const UseTaskInstanceServiceGetMappedTaskInstanceTriesKeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetMappedTaskInstanceTriesKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetMappedTaskInstanceDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getMappedTaskInstance>
Expand Down
50 changes: 49 additions & 1 deletion airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceDependencies1 = (
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
Expand All @@ -1300,21 +1301,68 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTries = (
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn({
dagId,
dagRunId,
mapIndex,
taskId,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}),
});
/**
* Get Mapped Task Instance Tries
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTries = (
queryClient: QueryClient,
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTriesKeyFn({
dagId,
dagRunId,
mapIndex,
taskId,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({ dagId, dagRunId, taskId }),
TaskInstanceService.getMappedTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}),
});
/**
* Get Mapped Task Instance
Expand Down
49 changes: 48 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,7 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1 = <
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
Expand All @@ -1564,24 +1565,70 @@ export const useTaskInstanceServiceGetTaskInstanceTries = <
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, taskId },
{ dagId, dagRunId, mapIndex, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance Tries
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetMappedTaskInstanceTries = <
TData = Common.TaskInstanceServiceGetMappedTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTriesKeyFn(
{ dagId, dagRunId, mapIndex, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getMappedTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}) as TData,
...options,
Expand Down
Loading

0 comments on commit a238d06

Please sign in to comment.