Skip to content

Commit

Permalink
AIP-38 Invalidate DryRun query cache on submit (apache#46238)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun authored and got686-yandex committed Jan 30, 2025
1 parent 62b0be7 commit d3d1dde
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 63 deletions.
12 changes: 0 additions & 12 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5754,12 +5754,6 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -5846,12 +5840,6 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
Expand Down
33 changes: 16 additions & 17 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,13 @@ def _patch_ti_validate_request(
@task_instances_router.patch(
task_instances_prefix + "/{task_id}/dry_run",
responses=create_openapi_http_exception_doc(
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT],
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST],
),
)
@task_instances_router.patch(
task_instances_prefix + "/{task_id}/{map_index}/dry_run",
responses=create_openapi_http_exception_doc(
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT],
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST],
),
)
def patch_task_instance_dry_run(
Expand All @@ -744,23 +744,22 @@ def patch_task_instance_dry_run(
tis: list[TI] = []

if data.get("new_state"):
tis = dag.set_task_instance_state(
task_id=task_id,
run_id=dag_run_id,
map_indexes=[map_index],
state=data["new_state"],
upstream=body.include_upstream,
downstream=body.include_downstream,
future=body.include_future,
past=body.include_past,
commit=False,
session=session,
tis = (
dag.set_task_instance_state(
task_id=task_id,
run_id=dag_run_id,
map_indexes=[map_index],
state=data["new_state"],
upstream=body.include_upstream,
downstream=body.include_downstream,
future=body.include_future,
past=body.include_past,
commit=False,
session=session,
)
or []
)

if not tis:
raise HTTPException(
status.HTTP_409_CONFLICT, f"Task id {task_id} is already in {data['new_state']} state"
)
elif "note" in data:
tis = [ti]

Expand Down
2 changes: 0 additions & 2 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2525,7 +2525,6 @@ export class TaskInstanceService {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
},
});
Expand Down Expand Up @@ -2566,7 +2565,6 @@ export class TaskInstanceService {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
},
});
Expand Down
8 changes: 0 additions & 8 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4306,10 +4306,6 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Conflict
*/
409: HTTPExceptionResponse;
/**
* Validation Error
*/
Expand Down Expand Up @@ -4341,10 +4337,6 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Conflict
*/
409: HTTPExceptionResponse;
/**
* Validation Error
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ const MarkTaskInstanceAsButton = ({ taskInstance, withText = true }: Props) => {
}}
value={menuState}
>
<StateBadge state={menuState}>{menuState}</StateBadge>
<StateBadge my={1} state={menuState}>
{menuState}
</StateBadge>
</Menu.Item>
))}
</Menu.Content>
Expand Down
4 changes: 3 additions & 1 deletion airflow/ui/src/queries/useClearDagRunDryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Props<TData, TError> = {
requestBody: DAGRunClearBody;
};

export const useClearDagRunDryRunKey = "clearRunDryRun";

export const useClearDagRunDryRun = <TData = TaskInstanceCollectionResponse, TError = unknown>({
dagId,
dagRunId,
Expand All @@ -45,5 +47,5 @@ export const useClearDagRunDryRun = <TData = TaskInstanceCollectionResponse, TEr
...requestBody,
},
}) as TData,
queryKey: ["clearDagRun", dagId, requestBody.only_failed],
queryKey: [useClearDagRunDryRunKey, dagId, { only_failed: requestBody.only_failed }],
});
3 changes: 3 additions & 0 deletions airflow/ui/src/queries/useClearRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
} from "openapi/queries";
import { toaster } from "src/components/ui";

import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";

const onError = () => {
toaster.create({
description: "Clear Dag Run request failed",
Expand All @@ -52,6 +54,7 @@ export const useClearDagRun = ({
UseDagServiceGetDagDetailsKeyFn({ dagId }),
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useClearDagRunDryRunKey, dagId],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/src/queries/useClearTaskInstances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import {
import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";

import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";

const onError = () => {
toaster.create({
description: "Clear Task Instance request failed",
Expand Down Expand Up @@ -76,6 +79,8 @@ export const useClearTaskInstances = ({
...taskInstanceKeys,
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useClearTaskInstancesDryRunKey, dagId],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
Expand Down
14 changes: 3 additions & 11 deletions airflow/ui/src/queries/useClearTaskInstancesDryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Props<TData, TError> = {
requestBody: ClearTaskInstancesBody;
};

export const useClearTaskInstancesDryRunKey = "clearTaskInstanceDryRun";

export const useClearTaskInstancesDryRun = <TData = PostClearTaskInstancesResponse, TError = unknown>({
dagId,
options,
Expand All @@ -42,15 +44,5 @@ export const useClearTaskInstancesDryRun = <TData = PostClearTaskInstancesRespon
...requestBody,
},
}) as TData,
queryKey: [
"clearTaskInstance",
dagId,
requestBody.dag_run_id,
requestBody.only_failed,
requestBody.task_ids,
requestBody.include_downstream,
requestBody.include_future,
requestBody.include_past,
requestBody.include_upstream,
],
queryKey: [useClearTaskInstancesDryRunKey, dagId, requestBody],
});
5 changes: 5 additions & 0 deletions airflow/ui/src/queries/usePatchTaskInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import {
} from "openapi/queries";
import { toaster } from "src/components/ui";

import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";

const onError = () => {
toaster.create({
description: "Patch Task Instance request failed",
Expand Down Expand Up @@ -54,6 +57,8 @@ export const usePatchTaskInstance = ({
UseTaskInstanceServiceGetTaskInstanceKeyFn({ dagId, dagRunId, taskId }),
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn({ dagId, dagRunId, mapIndex, taskId }),
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
Expand Down
20 changes: 12 additions & 8 deletions airflow/ui/src/queries/usePatchTaskInstanceDryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Props<TData, TError> = {
taskId: string;
};

export const usePatchTaskInstanceDryRunKey = "patchTaskInstanceDryRun";

export const usePatchTaskInstanceDryRun = <TData = PatchTaskInstanceDryRunResponse, TError = unknown>({
dagId,
dagRunId,
Expand All @@ -49,15 +51,17 @@ export const usePatchTaskInstanceDryRun = <TData = PatchTaskInstanceDryRunRespon
taskId,
}) as TData,
queryKey: [
"patchTaskInstanceDryRun",
usePatchTaskInstanceDryRunKey,
dagId,
dagRunId,
taskId,
mapIndex,
requestBody.new_state,
requestBody.include_downstream,
requestBody.include_future,
requestBody.include_past,
requestBody.include_upstream,
{
include_downstream: requestBody.include_downstream,
include_future: requestBody.include_future,
include_past: requestBody.include_past,
include_upstream: requestBody.include_upstream,
mapIndex,
new_state: requestBody.new_state,
taskId,
},
],
});
Original file line number Diff line number Diff line change
Expand Up @@ -3492,7 +3492,7 @@ def test_update_mask_should_call_mocked_api(
assert mock_set_ti_state.call_count == set_ti_state_call_count

@mock.patch("airflow.models.dag.DAG.set_task_instance_state")
def test_should_raise_409_for_updating_same_task_instance_state(
def test_should_return_empty_list_for_updating_same_task_instance_state(
self, mock_set_ti_state, test_client, session
):
self.create_task_instances(session)
Expand All @@ -3505,5 +3505,5 @@ def test_should_raise_409_for_updating_same_task_instance_state(
"new_state": "success",
},
)
assert response.status_code == 409
assert "Task id print_the_context is already in success state" in response.text
assert response.status_code == 200
assert response.json() == {"task_instances": [], "total_entries": 0}

0 comments on commit d3d1dde

Please sign in to comment.