Skip to content

Commit

Permalink
Rename Fast API serializers/schemas to datamodels (apache#43823)
Browse files Browse the repository at this point in the history
Pierre, Ash & I discussed about making the naming consistent for Pydantic Models. We agreed on `datamodels` since it is what OpenAPI terms it too apart from schemas: https://swagger.io/docs/specification/v3_0/data-models/data-models/.

We didn't choose `models` because we have DB models named as such.
  • Loading branch information
kaxil authored Nov 8, 2024
1 parent f57db71 commit 2b79d18
Show file tree
Hide file tree
Showing 46 changed files with 56 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from pydantic import AliasPath, BaseModel, BeforeValidator, ConfigDict, Field

from airflow.api_fastapi.core_api.serializers.job import JobResponse
from airflow.api_fastapi.core_api.serializers.trigger import TriggerResponse
from airflow.api_fastapi.core_api.datamodels.job import JobResponse
from airflow.api_fastapi.core_api.datamodels.trigger import TriggerResponse
from airflow.utils.state import TaskInstanceState


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

from pydantic import BaseModel

from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.serializers.dags import DAGResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse


class DAGWithLatestDagRunsResponse(DAGResponse):
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4336,7 +4336,7 @@ components:
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
$ref: '#/components/schemas/airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState'
$ref: '#/components/schemas/airflow__api_fastapi__core_api__datamodels__ui__dashboard__TaskInstanceState'
type: object
required:
- dag_run_types
Expand Down Expand Up @@ -5056,7 +5056,7 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState:
airflow__api_fastapi__core_api__datamodels__ui__dashboard__TaskInstanceState:
properties:
no_status:
type: integer
Expand Down
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.serializers.backfills import (
from airflow.api_fastapi.core_api.datamodels.backfills import (
BackfillCollectionResponse,
BackfillPostBody,
BackfillResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.models import DagRun
from airflow.models.backfill import (
AlreadyRunningBackfill,
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.connections import (
from airflow.api_fastapi.core_api.datamodels.connections import (
ConnectionBody,
ConnectionCollectionResponse,
ConnectionResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import Connection
from airflow.utils import helpers

Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
)
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_run import (
from airflow.api_fastapi.core_api.datamodels.dag_run import (
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import DAG, DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dag_sources import DAGSourceResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_sources import DAGSourceResponse
from airflow.models.dagcode import DagCode

dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources")
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
from airflow.api_fastapi.common.db.dag_runs import dagruns_select_with_state_count
from airflow.api_fastapi.common.parameters import QueryDagIdsFilter
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_stats import (
from airflow.api_fastapi.core_api.datamodels.dag_stats import (
DagStatsCollectionResponse,
DagStatsResponse,
DagStatsStateResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.utils.state import DagRunState

dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats")
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dag_warning.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_warning import (
from airflow.api_fastapi.core_api.datamodels.dag_warning import (
DAGWarningCollectionResponse,
DAGWarningResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import DagWarning

dag_warning_router = AirflowRouter(tags=["DagWarning"])
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dags import (
from airflow.api_fastapi.core_api.datamodels.dags import (
DAGCollectionResponse,
DAGDetailsResponse,
DAGPatchBody,
DAGResponse,
DAGTagCollectionResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel, DagTag

Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.event_logs import (
from airflow.api_fastapi.core_api.datamodels.event_logs import (
EventLogCollectionResponse,
EventLogResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import Log

event_logs_router = AirflowRouter(tags=["Event Log"], prefix="/eventLogs")
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.import_error import (
from airflow.api_fastapi.core_api.datamodels.import_error import (
ImportErrorCollectionResponse,
ImportErrorResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.errors import ParseImportError

import_error_router = AirflowRouter(tags=["Import Error"], prefix="/importErrors")
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.monitor import HealthInfoSchema
from airflow.api_fastapi.core_api.datamodels.monitor import HealthInfoSchema

monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")

Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.plugins import PluginCollectionResponse, PluginResponse
from airflow.api_fastapi.core_api.datamodels.plugins import PluginCollectionResponse, PluginResponse
from airflow.plugins_manager import get_plugin_info

plugins_router = AirflowRouter(tags=["Plugin"], prefix="/plugins")
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.pools import (
from airflow.api_fastapi.core_api.datamodels.pools import (
BasePool,
PoolCollectionResponse,
PoolPatchBody,
PoolPostBody,
PoolResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.pool import Pool

pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.providers import ProviderCollectionResponse, ProviderResponse
from airflow.api_fastapi.core_api.datamodels.providers import ProviderCollectionResponse, ProviderResponse
from airflow.providers_manager import ProviderInfo, ProvidersManager

providers_router = AirflowRouter(tags=["Provider"], prefix="/providers")
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
float_range_filter_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.task_instances import (
from airflow.api_fastapi.core_api.datamodels.task_instances import (
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models.taskinstance import TaskInstance as TI
from airflow.ti_deps.dep_context import DepContext
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.variables import (
from airflow.api_fastapi.core_api.datamodels.variables import (
VariableBody,
VariableCollectionResponse,
VariableResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.variable import Variable

variables_router = AirflowRouter(tags=["Variable"], prefix="/variables")
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import airflow
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.version import VersionInfo
from airflow.api_fastapi.core_api.datamodels.version import VersionInfo
from airflow.utils.platform import get_airflow_git_version

version_router = AirflowRouter(tags=["Version"], prefix="/version")
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
QueryTagsFilter,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.serializers.dags import DAGResponse
from airflow.api_fastapi.core_api.serializers.ui.dags import (
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
from airflow.api_fastapi.core_api.datamodels.ui.dags import (
DAGWithLatestDagRunsCollectionResponse,
DAGWithLatestDagRunsResponse,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/ui/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from typing_extensions import Annotated

from airflow.api_fastapi.common.parameters import DateTimeQuery
from airflow.api_fastapi.core_api.datamodels.ui.dashboard import HistoricalMetricDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.ui.dashboard import HistoricalMetricDataResponse
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down
14 changes: 7 additions & 7 deletions airflow/api_fastapi/execution_api/routes/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing_extensions import Annotated

from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.execution_api import schemas
from airflow.api_fastapi.execution_api import datamodels
from airflow.exceptions import AirflowNotFoundException
from airflow.models.connection import Connection

Expand All @@ -37,9 +37,9 @@
log = logging.getLogger(__name__)


def get_task_token() -> schemas.TIToken:
def get_task_token() -> datamodels.TIToken:
"""TODO: Placeholder for task identity authentication. This should be replaced with actual JWT decoding and validation."""
return schemas.TIToken(ti_key="test_key")
return datamodels.TIToken(ti_key="test_key")


@connection_router.get(
Expand All @@ -51,8 +51,8 @@ def get_task_token() -> schemas.TIToken:
)
async def get_connection(
connection_id: str,
token: Annotated[schemas.TIToken, Depends(get_task_token)],
) -> schemas.ConnectionResponse:
token: Annotated[datamodels.TIToken, Depends(get_task_token)],
) -> datamodels.ConnectionResponse:
"""Get an Airflow connection."""
if not has_connection_access(connection_id, token):
raise HTTPException(
Expand All @@ -72,10 +72,10 @@ async def get_connection(
"message": f"Connection with ID {connection_id} not found",
},
)
return schemas.ConnectionResponse.model_validate(connection, from_attributes=True)
return datamodels.ConnectionResponse.model_validate(connection, from_attributes=True)


def has_connection_access(connection_id: str, token: schemas.TIToken) -> bool:
def has_connection_access(connection_id: str, token: datamodels.TIToken) -> bool:
"""Check if the task has access to the connection."""
# TODO: Placeholder for actual implementation

Expand Down
12 changes: 6 additions & 6 deletions airflow/api_fastapi/execution_api/routes/task_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.execution_api import schemas
from airflow.api_fastapi.execution_api import datamodels
from airflow.models.taskinstance import TaskInstance as TI
from airflow.utils import timezone
from airflow.utils.state import State
Expand Down Expand Up @@ -58,14 +58,14 @@
)
def ti_update_state(
task_instance_id: UUID,
ti_patch_payload: Annotated[schemas.TIStateUpdate, Body()],
ti_patch_payload: Annotated[datamodels.TIStateUpdate, Body()],
session: Annotated[Session, Depends(get_session)],
):
"""
Update the state of a TaskInstance.
Not all state transitions are valid, and transitioning to some states requires extra information to be
passed along. (Check out the schemas for details, the rendered docs might not reflect this accurately)
passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately)
"""
# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)
Expand All @@ -88,7 +88,7 @@ def ti_update_state(

query = update(TI).where(TI.id == ti_id_str).values(data)

if isinstance(ti_patch_payload, schemas.TIEnterRunningPayload):
if isinstance(ti_patch_payload, datamodels.TIEnterRunningPayload):
if previous_state != State.QUEUED:
log.warning(
"Can not start Task Instance ('%s') in invalid state: %s",
Expand Down Expand Up @@ -118,7 +118,7 @@ def ti_update_state(
pid=ti_patch_payload.pid,
state=State.RUNNING,
)
elif isinstance(ti_patch_payload, schemas.TITerminalStatePayload):
elif isinstance(ti_patch_payload, datamodels.TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)

# TODO: Replace this with FastAPI's Custom Exception handling:
Expand Down Expand Up @@ -146,7 +146,7 @@ def ti_update_state(
)
def ti_heartbeat(
task_instance_id: UUID,
ti_payload: schemas.TIHeartbeatInfo,
ti_payload: datamodels.TIHeartbeatInfo,
session: Annotated[Session, Depends(get_session)],
):
"""Update the heartbeat of a TaskInstance to mark it as alive & still running."""
Expand Down
4 changes: 2 additions & 2 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,7 @@ export const $HistoricalMetricDataResponse = {
$ref: "#/components/schemas/DAGRunStates",
},
task_instance_states: {
$ref: "#/components/schemas/airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState",
$ref: "#/components/schemas/airflow__api_fastapi__core_api__datamodels__ui__dashboard__TaskInstanceState",
},
},
type: "object",
Expand Down Expand Up @@ -3000,7 +3000,7 @@ export const $VersionInfo = {
description: "Version information serializer for responses.",
} as const;

export const $airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState =
export const $airflow__api_fastapi__core_api__datamodels__ui__dashboard__TaskInstanceState =
{
properties: {
no_status: {
Expand Down
Loading

0 comments on commit 2b79d18

Please sign in to comment.