Skip to content

Commit

Permalink
AIP-84 Convert async route to sync routes (apache#43797)
Browse files Browse the repository at this point in the history
* AIP-84 convert async route to sync routes

* Update following code review

* Fix CI
  • Loading branch information
pierrejeambrun authored Nov 8, 2024
1 parent 6613320 commit 36e716a
Show file tree
Hide file tree
Showing 24 changed files with 60 additions and 55 deletions.
12 changes: 6 additions & 6 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
path="/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def list_backfills(
def list_backfills(
dag_id: str,
limit: QueryLimit,
offset: QueryOffset,
Expand Down Expand Up @@ -81,7 +81,7 @@ async def list_backfills(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_backfill(
def get_backfill(
backfill_id: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -102,7 +102,7 @@ async def get_backfill(
]
),
)
async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand All @@ -125,7 +125,7 @@ async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get
]
),
)
async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand All @@ -147,7 +147,7 @@ async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(g
]
),
)
async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b: Backfill = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand Down Expand Up @@ -194,7 +194,7 @@ async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(ge
]
),
)
async def create_backfill(
def create_backfill(
backfill_request: BackfillPostBody,
):
from_date = timezone.coerce_datetime(backfill_request.from_date)
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def delete_connection(
def delete_connection(
connection_id: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -64,7 +64,7 @@ async def delete_connection(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_connection(
def get_connection(
connection_id: str,
session: Annotated[Session, Depends(get_session)],
) -> ConnectionResponse:
Expand All @@ -85,7 +85,7 @@ async def get_connection(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_connections(
def get_connections(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -125,7 +125,7 @@ async def get_connections(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_409_CONFLICT]
),
)
async def post_connection(
def post_connection(
post_body: ConnectionBody,
session: Annotated[Session, Depends(get_session)],
) -> ConnectionResponse:
Expand Down Expand Up @@ -156,7 +156,7 @@ async def post_connection(
]
),
)
async def patch_connection(
def patch_connection(
connection_id: str,
patch_body: ConnectionBody,
session: Annotated[Session, Depends(get_session)],
Expand Down
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]
),
)
async def get_dag_run(
def get_dag_run(
dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]
) -> DAGRunResponse:
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
Expand All @@ -75,7 +75,7 @@ async def get_dag_run(
]
),
)
async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]):
def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]):
"""Delete a DAG Run entry."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))

Expand All @@ -99,7 +99,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio
]
),
)
async def patch_dag_run_state(
def patch_dag_run_state(
dag_id: str,
dag_run_id: str,
patch_body: DAGRunPatchBody,
Expand Down Expand Up @@ -138,6 +138,6 @@ async def patch_dag_run_state(
else:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True)

dag_run = session.get(DagRun, dag_run.id)
session.refresh(dag_run)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
},
response_model=DAGSourceResponse,
)
async def get_dag_source(
def get_dag_source(
file_token: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/dag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]
),
)
async def get_dag_stats(
def get_dag_stats(
session: Annotated[Session, Depends(get_session)],
dag_ids: QueryDagIdsFilter,
) -> DagStatsCollectionResponse:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/dag_warning.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"/dagWarnings",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def list_dag_warnings(
def list_dag_warnings(
dag_id: QueryDagIdInDagWarningFilter,
warning_type: QueryWarningTypeFilter,
limit: QueryLimit,
Expand Down
16 changes: 7 additions & 9 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@


@dags_router.get("/")
async def get_dags(
def get_dags(
limit: QueryLimit,
offset: QueryOffset,
tags: QueryTagsFilter,
Expand Down Expand Up @@ -101,7 +101,7 @@ async def get_dags(
"/tags",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_dag_tags(
def get_dag_tags(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -142,9 +142,7 @@ async def get_dag_tags(
]
),
)
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
) -> DAGResponse:
def get_dag(dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request) -> DAGResponse:
"""Get basic information about a DAG."""
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
Expand Down Expand Up @@ -172,7 +170,7 @@ async def get_dag(
]
),
)
async def get_dag_details(
def get_dag_details(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
) -> DAGDetailsResponse:
"""Get details of DAG."""
Expand Down Expand Up @@ -202,7 +200,7 @@ async def get_dag_details(
]
),
)
async def patch_dag(
def patch_dag(
dag_id: str,
patch_body: DAGPatchBody,
session: Annotated[Session, Depends(get_session)],
Expand Down Expand Up @@ -241,7 +239,7 @@ async def patch_dag(
]
),
)
async def patch_dags(
def patch_dags(
patch_body: DAGPatchBody,
limit: QueryLimit,
offset: QueryOffset,
Expand Down Expand Up @@ -301,7 +299,7 @@ async def patch_dags(
]
),
)
async def delete_dag(
def delete_dag(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
) -> Response:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_event_log(
def get_event_log(
event_log_id: int,
session: Annotated[Session, Depends(get_session)],
) -> EventLogResponse:
Expand All @@ -66,7 +66,7 @@ async def get_event_log(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_event_logs(
def get_event_logs(
limit: QueryLimit,
offset: QueryOffset,
session: Annotated[Session, Depends(get_session)],
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_import_error(
def get_import_error(
import_error_id: int,
session: Annotated[Session, Depends(get_session)],
) -> ImportErrorResponse:
Expand All @@ -66,7 +66,7 @@ async def get_import_error(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_import_errors(
def get_import_errors(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@


@monitor_router.get("/health")
async def get_health() -> HealthInfoSchema:
def get_health() -> HealthInfoSchema:
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@plugins_router.get("/")
async def get_plugins(
def get_plugins(
limit: QueryLimit,
offset: QueryOffset,
) -> PluginCollectionResponse:
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
]
),
)
async def delete_pool(
def delete_pool(
pool_name: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -71,7 +71,7 @@ async def delete_pool(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_pool(
def get_pool(
pool_name: str,
session: Annotated[Session, Depends(get_session)],
) -> PoolResponse:
Expand All @@ -89,7 +89,7 @@ async def get_pool(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_pools(
def get_pools(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -127,7 +127,7 @@ async def get_pools(
]
),
)
async def patch_pool(
def patch_pool(
pool_name: str,
patch_body: PoolPatchBody,
session: Annotated[Session, Depends(get_session)],
Expand Down Expand Up @@ -170,7 +170,7 @@ async def patch_pool(
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def post_pool(
def post_pool(
post_body: PoolPostBody,
session: Annotated[Session, Depends(get_session)],
) -> PoolResponse:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _provider_mapper(provider: ProviderInfo) -> ProviderResponse:


@providers_router.get("/")
async def get_providers(
def get_providers(
limit: QueryLimit,
offset: QueryOffset,
) -> ProviderCollectionResponse:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_task_instance(
def get_task_instance(
dag_id: str, dag_run_id: str, task_id: str, session: Annotated[Session, Depends(get_session)]
) -> TaskInstanceResponse:
"""Get task instance."""
Expand Down Expand Up @@ -236,7 +236,7 @@ async def get_task_instance_dependencies(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_mapped_task_instance(
def get_mapped_task_instance(
dag_id: str,
dag_run_id: str,
task_id: str,
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def delete_variable(
def delete_variable(
variable_key: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -59,7 +59,7 @@ async def delete_variable(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_variable(
def get_variable(
variable_key: str,
session: Annotated[Session, Depends(get_session)],
) -> VariableResponse:
Expand All @@ -78,7 +78,7 @@ async def get_variable(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_variables(
def get_variables(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -121,7 +121,7 @@ async def get_variables(
]
),
)
async def patch_variable(
def patch_variable(
variable_key: str,
patch_body: VariableBody,
session: Annotated[Session, Depends(get_session)],
Expand Down Expand Up @@ -154,7 +154,7 @@ async def patch_variable(
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def post_variable(
def post_variable(
post_body: VariableBody,
session: Annotated[Session, Depends(get_session)],
) -> VariableResponse:
Expand Down
Loading

0 comments on commit 36e716a

Please sign in to comment.