Skip to content

Commit

Permalink
AIP-84 Invalid states raises 422 status on get dag_runs and task_inst…
Browse files Browse the repository at this point in the history
…ances endpoints (apache#44237)

* invalid state raises 422 for list ti's

* invalid state raises 422 for list dag_runs

* fix tests
  • Loading branch information
rawwar authored Nov 21, 2024
1 parent 48eb430 commit 440c224
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
22 changes: 17 additions & 5 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
overload,
)

from fastapi import Depends, HTTPException, Query
from fastapi import Depends, HTTPException, Query, status
from pendulum.parsing.exceptions import ParserError
from pydantic import AfterValidator, BaseModel, NonNegativeInt
from sqlalchemy import Column, case, or_
Expand Down Expand Up @@ -337,9 +337,15 @@ def to_orm(self, select: Select) -> Select:

@staticmethod
def _convert_dag_run_states(states: Iterable[str] | None) -> list[DagRunState | None] | None:
if not states:
return None
return [None if s in ("none", None) else DagRunState(s) for s in states]
try:
if not states:
return None
return [None if s in ("none", None) else DagRunState(s) for s in states]
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid value for state. Valid values are {', '.join(DagRunState)}",
)

def depends(self, state: list[str] = Query(default_factory=list)) -> DagRunStateFilter:
states = self._convert_dag_run_states(state)
Expand All @@ -360,7 +366,13 @@ def to_orm(self, select: Select) -> Select:
return select.where(or_(*conditions))

def depends(self, state: list[str] = Query(default_factory=list)) -> TIStateFilter:
states = _convert_ti_states(state)
try:
states = _convert_ti_states(state)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid value for state. Valid values are {', '.join(TaskInstanceState)}",
)
return self.set_value(states)


Expand Down
7 changes: 5 additions & 2 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,11 @@ def test_bad_filters(self, test_client):
assert body["detail"] == expected_detail

def test_invalid_state(self, test_client):
with pytest.raises(ValueError, match="'invalid' is not a valid DagRunState"):
test_client.get(f"/public/dags/{DAG1_ID}/dagRuns", params={"state": "invalid"})
response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns", params={"state": ["invalid"]})
assert response.status_code == 422
assert (
response.json()["detail"] == f"Invalid value for state. Valid values are {', '.join(DagRunState)}"
)


class TestPatchDagRun:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,14 @@ def test_not_found(self, test_client):
assert response.status_code == 404
assert response.json() == {"detail": "DagRun with run_id: `invalid` was not found"}

def test_bad_state(self, test_client):
response = test_client.get("/public/dags/~/dagRuns/~/taskInstances", params={"state": "invalid"})
assert response.status_code == 422
assert (
response.json()["detail"]
== f"Invalid value for state. Valid values are {', '.join(TaskInstanceState)}"
)

@pytest.mark.xfail(reason="permissions not implemented yet.")
def test_return_TI_only_from_readable_dags(self, test_client, session):
task_instances = {
Expand Down

0 comments on commit 440c224

Please sign in to comment.