Skip to content

Commit

Permalink
adding get_task_instance_tries code
Browse files Browse the repository at this point in the history
  • Loading branch information
kandharvishnuu committed Nov 23, 2024
1 parent 3c58e01 commit d9fe941
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ def get_mapped_task_instance_try_details(
)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance_tries(
Expand Down
80 changes: 80 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3787,6 +3787,70 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries:
get:
tags:
- Task Instance
summary: Get Task Instance Tries
description: Get list of task instances history.
operationId: get_task_instance_tries
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceHistoryCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}:
get:
tags:
Expand Down Expand Up @@ -7147,6 +7211,22 @@ components:
- total_entries
title: TaskInstanceCollectionResponse
description: Task Instance Collection serializer for responses.
TaskInstanceHistoryCollectionResponse:
properties:
task_instances:
items:
$ref: '#/components/schemas/TaskInstanceHistoryResponse'
type: array
title: Task Instances
total_entries:
type: integer
title: Total Entries
type: object
required:
- task_instances
- total_entries
title: TaskInstanceHistoryCollectionResponse
description: TaskInstanceHistory Collection serializer for responses.
TaskInstanceHistoryResponse:
properties:
task_id:
Expand Down
48 changes: 47 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

from fastapi import Depends, HTTPException, Request, status
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select
from sqlalchemy.sql import or_, select
from sqlalchemy.sql.selectable import Select

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
Expand Down Expand Up @@ -51,6 +52,7 @@
ClearTaskInstancesBody,
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceHistoryCollectionResponse,
TaskInstanceHistoryResponse,
TaskInstanceReferenceCollectionResponse,
TaskInstanceReferenceResponse,
Expand Down Expand Up @@ -234,6 +236,50 @@ def get_task_instance_dependencies(
return TaskDependencyCollectionResponse.model_validate({"dependencies": deps})


@task_instances_router.get(
task_instances_prefix + "/{task_id}/tries",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def get_task_instance_tries(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
map_index: int = -1,
) -> TaskInstanceHistoryCollectionResponse:
"""Get list of task instances history."""

def _query(orm_object: Base) -> Select:
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.map_index == map_index,
)
print(type(query))
return query

# Exclude TaskInstance with state UP_FOR_RETRY since they have been recorded in TaskInstanceHistory
tis = session.scalars(
_query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, TI.state.is_(None)))
).all()
task_instances = session.scalars(_query(TIH)).all() + tis

if not task_instances:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found",
)
task_instances_data = [
TaskInstanceHistoryResponse.model_validate(instance, from_attributes=True)
for instance in task_instances
]
return TaskInstanceHistoryCollectionResponse(
task_instances=task_instances_data,
total_entries=len(task_instances_data),
)


@task_instances_router.get(
task_instances_prefix + "/{task_id}/{map_index}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,32 @@ export const UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn = (
useTaskInstanceServiceGetTaskInstanceDependencies1Key,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetTaskInstanceTriesDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getTaskInstanceTries>
>;
export type TaskInstanceServiceGetTaskInstanceTriesQueryResult<
TData = TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceTriesKey =
"TaskInstanceServiceGetTaskInstanceTries";
export const UseTaskInstanceServiceGetTaskInstanceTriesKeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceTriesKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetMappedTaskInstanceDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getMappedTaskInstance>
>;
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,46 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceDependencies1 = (
taskId,
}),
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetTaskInstanceTries = (
queryClient: QueryClient,
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn({
dagId,
dagRunId,
mapIndex,
taskId,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}),
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,50 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1 = <
}) as TData,
...options,
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetTaskInstanceTries = <
TData = Common.TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, mapIndex, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,50 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1Suspense = <
}) as TData,
...options,
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetTaskInstanceTriesSuspense = <
TData = Common.TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
mapIndex,
taskId,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, mapIndex, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
mapIndex,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3365,6 +3365,26 @@ export const $TaskInstanceCollectionResponse = {
description: "Task Instance Collection serializer for responses.",
} as const;

export const $TaskInstanceHistoryCollectionResponse = {
properties: {
task_instances: {
items: {
$ref: "#/components/schemas/TaskInstanceHistoryResponse",
},
type: "array",
title: "Task Instances",
},
total_entries: {
type: "integer",
title: "Total Entries",
},
},
type: "object",
required: ["task_instances", "total_entries"],
title: "TaskInstanceHistoryCollectionResponse",
description: "TaskInstanceHistory Collection serializer for responses.",
} as const;

export const $TaskInstanceHistoryResponse = {
properties: {
task_id: {
Expand Down
Loading

0 comments on commit d9fe941

Please sign in to comment.