Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-38 Invalidate DryRun query cache on submit #46238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5754,12 +5754,6 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -5846,12 +5840,6 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
Expand Down
33 changes: 16 additions & 17 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,13 @@ def _patch_ti_validate_request(
@task_instances_router.patch(
task_instances_prefix + "/{task_id}/dry_run",
responses=create_openapi_http_exception_doc(
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT],
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST],
),
)
@task_instances_router.patch(
task_instances_prefix + "/{task_id}/{map_index}/dry_run",
responses=create_openapi_http_exception_doc(
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT],
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST],
),
)
def patch_task_instance_dry_run(
Expand All @@ -744,23 +744,22 @@ def patch_task_instance_dry_run(
tis: list[TI] = []

if data.get("new_state"):
tis = dag.set_task_instance_state(
task_id=task_id,
run_id=dag_run_id,
map_indexes=[map_index],
state=data["new_state"],
upstream=body.include_upstream,
downstream=body.include_downstream,
future=body.include_future,
past=body.include_past,
commit=False,
session=session,
tis = (
dag.set_task_instance_state(
task_id=task_id,
run_id=dag_run_id,
map_indexes=[map_index],
state=data["new_state"],
upstream=body.include_upstream,
downstream=body.include_downstream,
future=body.include_future,
past=body.include_past,
commit=False,
session=session,
)
or []
)

if not tis:
raise HTTPException(
status.HTTP_409_CONFLICT, f"Task id {task_id} is already in {data['new_state']} state"
)
elif "note" in data:
tis = [ti]

Expand Down
2 changes: 0 additions & 2 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2525,7 +2525,6 @@ export class TaskInstanceService {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
},
});
Expand Down Expand Up @@ -2566,7 +2565,6 @@ export class TaskInstanceService {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
},
});
Expand Down
8 changes: 0 additions & 8 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4306,10 +4306,6 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Conflict
*/
409: HTTPExceptionResponse;
/**
* Validation Error
*/
Expand Down Expand Up @@ -4341,10 +4337,6 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Conflict
*/
409: HTTPExceptionResponse;
/**
* Validation Error
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ const MarkTaskInstanceAsButton = ({ taskInstance, withText = true }: Props) => {
}}
value={menuState}
>
<StateBadge state={menuState}>{menuState}</StateBadge>
<StateBadge my={1} state={menuState}>
{menuState}
</StateBadge>
</Menu.Item>
))}
</Menu.Content>
Expand Down
4 changes: 3 additions & 1 deletion airflow/ui/src/queries/useClearDagRunDryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Props<TData, TError> = {
requestBody: DAGRunClearBody;
};

export const useClearDagRunDryRunKey = "clearRunDryRun";

export const useClearDagRunDryRun = <TData = TaskInstanceCollectionResponse, TError = unknown>({
dagId,
dagRunId,
Expand All @@ -45,5 +47,5 @@ export const useClearDagRunDryRun = <TData = TaskInstanceCollectionResponse, TEr
...requestBody,
},
}) as TData,
queryKey: ["clearDagRun", dagId, requestBody.only_failed],
queryKey: [useClearDagRunDryRunKey, dagId, { only_failed: requestBody.only_failed }],
});
3 changes: 3 additions & 0 deletions airflow/ui/src/queries/useClearRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
} from "openapi/queries";
import { toaster } from "src/components/ui";

import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";

const onError = () => {
toaster.create({
description: "Clear Dag Run request failed",
Expand All @@ -52,6 +54,7 @@ export const useClearDagRun = ({
UseDagServiceGetDagDetailsKeyFn({ dagId }),
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useClearDagRunDryRunKey, dagId],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/src/queries/useClearTaskInstances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import {
import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";

import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";

const onError = () => {
toaster.create({
description: "Clear Task Instance request failed",
Expand Down Expand Up @@ -76,6 +79,8 @@ export const useClearTaskInstances = ({
...taskInstanceKeys,
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useClearTaskInstancesDryRunKey, dagId],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
Expand Down
14 changes: 3 additions & 11 deletions airflow/ui/src/queries/useClearTaskInstancesDryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Props<TData, TError> = {
requestBody: ClearTaskInstancesBody;
};

export const useClearTaskInstancesDryRunKey = "clearTaskInstanceDryRun";

export const useClearTaskInstancesDryRun = <TData = PostClearTaskInstancesResponse, TError = unknown>({
dagId,
options,
Expand All @@ -42,15 +44,5 @@ export const useClearTaskInstancesDryRun = <TData = PostClearTaskInstancesRespon
...requestBody,
},
}) as TData,
queryKey: [
"clearTaskInstance",
dagId,
requestBody.dag_run_id,
requestBody.only_failed,
requestBody.task_ids,
requestBody.include_downstream,
requestBody.include_future,
requestBody.include_past,
requestBody.include_upstream,
],
queryKey: [useClearTaskInstancesDryRunKey, dagId, requestBody],
});
5 changes: 5 additions & 0 deletions airflow/ui/src/queries/usePatchTaskInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import {
} from "openapi/queries";
import { toaster } from "src/components/ui";

import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";

const onError = () => {
toaster.create({
description: "Patch Task Instance request failed",
Expand Down Expand Up @@ -54,6 +57,8 @@ export const usePatchTaskInstance = ({
UseTaskInstanceServiceGetTaskInstanceKeyFn({ dagId, dagRunId, taskId }),
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn({ dagId, dagRunId, mapIndex, taskId }),
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
Expand Down
20 changes: 12 additions & 8 deletions airflow/ui/src/queries/usePatchTaskInstanceDryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Props<TData, TError> = {
taskId: string;
};

export const usePatchTaskInstanceDryRunKey = "patchTaskInstanceDryRun";

export const usePatchTaskInstanceDryRun = <TData = PatchTaskInstanceDryRunResponse, TError = unknown>({
dagId,
dagRunId,
Expand All @@ -49,15 +51,17 @@ export const usePatchTaskInstanceDryRun = <TData = PatchTaskInstanceDryRunRespon
taskId,
}) as TData,
queryKey: [
"patchTaskInstanceDryRun",
usePatchTaskInstanceDryRunKey,
dagId,
dagRunId,
taskId,
mapIndex,
requestBody.new_state,
requestBody.include_downstream,
requestBody.include_future,
requestBody.include_past,
requestBody.include_upstream,
{
include_downstream: requestBody.include_downstream,
include_future: requestBody.include_future,
include_past: requestBody.include_past,
include_upstream: requestBody.include_upstream,
mapIndex,
new_state: requestBody.new_state,
taskId,
},
],
});
Original file line number Diff line number Diff line change
Expand Up @@ -3492,7 +3492,7 @@ def test_update_mask_should_call_mocked_api(
assert mock_set_ti_state.call_count == set_ti_state_call_count

@mock.patch("airflow.models.dag.DAG.set_task_instance_state")
def test_should_raise_409_for_updating_same_task_instance_state(
def test_should_return_empty_list_for_updating_same_task_instance_state(
self, mock_set_ti_state, test_client, session
):
self.create_task_instances(session)
Expand All @@ -3505,5 +3505,5 @@ def test_should_raise_409_for_updating_same_task_instance_state(
"new_state": "success",
},
)
assert response.status_code == 409
assert "Task id print_the_context is already in success state" in response.text
assert response.status_code == 200
assert response.json() == {"task_instances": [], "total_entries": 0}
Loading