Skip to content

Commit

Permalink
raise 404 when dag or dag_run_id is not found in List TI endpoint (ap…
Browse files Browse the repository at this point in the history
…ache#44156)

* dag and dag_run not found

* refactor
  • Loading branch information
rawwar authored Nov 19, 2024
1 parent 24811f7 commit e8fe1bd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
12 changes: 11 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models import Base
from airflow.models import Base, DagRun
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.ti_deps.dep_context import DepContext
Expand Down Expand Up @@ -303,8 +303,18 @@ def get_task_instances(
base_query = select(TI).join(TI.dag_run)

if dag_id != "~":
dag = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: `{dag_id}` was not found")
base_query = base_query.where(TI.dag_id == dag_id)

if dag_run_id != "~":
dag_run = session.scalar(select(DagRun).filter_by(run_id=dag_run_id))
if not dag_run:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"DagRun with run_id: `{dag_run_id}` was not found",
)
base_query = base_query.where(TI.run_id == dag_run_id)

task_instance_select, total_entries = paginated_select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,15 @@ def test_should_respond_200(
assert response.json()["total_entries"] == expected_ti
assert len(response.json()["task_instances"]) == expected_ti

def test_not_found(self, test_client):
response = test_client.get("/public/dags/invalid/dagRuns/~/taskInstances")
assert response.status_code == 404
assert response.json() == {"detail": "DAG with dag_id: `invalid` was not found"}

response = test_client.get("/public/dags/~/dagRuns/invalid/taskInstances")
assert response.status_code == 404
assert response.json() == {"detail": "DagRun with run_id: `invalid` was not found"}

@pytest.mark.xfail(reason="permissions not implemented yet.")
def test_return_TI_only_from_readable_dags(self, test_client, session):
task_instances = {
Expand Down

0 comments on commit e8fe1bd

Please sign in to comment.