From f36e2bf67f778b86c97579d10b6e3b179853ceb3 Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Sat, 30 Nov 2024 21:28:18 +0530 Subject: [PATCH] Remove AIP-44 from models/dagrun (#44496) --- airflow/models/dagrun.py | 22 +++++-------------- .../serialization/pydantic/taskinstance.py | 2 +- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e7fa37c82f93a..b28d2fbf02fc1 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -52,7 +52,6 @@ from sqlalchemy_utils import UUIDType from airflow import settings -from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import DagCallbackRequest from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound @@ -86,9 +85,6 @@ from airflow.models.dag import DAG from airflow.models.operator import Operator - from airflow.serialization.pydantic.dag_run import DagRunPydantic - from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic - from airflow.serialization.pydantic.tasklog import LogTemplatePydantic from airflow.typing_compat import Literal from airflow.utils.types import ArgNotSet @@ -611,7 +607,6 @@ def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str: return DagRunType(run_type).generate_run_id(logical_date) @staticmethod - @internal_api_call @provide_session def fetch_task_instances( dag_id: str | None = None, @@ -648,7 +643,6 @@ def fetch_task_instances( tis = tis.where(TI.task_id.in_(task_ids)) return session.scalars(tis).all() - @internal_api_call def _check_last_n_dagruns_failed(self, dag_id, max_consecutive_failed_dag_runs, session): """Check if last N dags failed.""" dag_runs = ( @@ -719,7 +713,7 @@ def get_task_instance( session: Session = NEW_SESSION, *, map_index: int = -1, - ) -> TI | TaskInstancePydantic | None: + ) -> TI | None: """ Return the task instance specified by task_id for this dag run. @@ -735,7 +729,6 @@ def get_task_instance( ) @staticmethod - @internal_api_call @provide_session def fetch_task_instance( dag_id: str, @@ -743,7 +736,7 @@ def fetch_task_instance( task_id: str, session: Session = NEW_SESSION, map_index: int = -1, - ) -> TI | TaskInstancePydantic | None: + ) -> TI | None: """ Return the task instance specified by task_id for this dag run. @@ -768,10 +761,9 @@ def get_dag(self) -> DAG: return self.dag @staticmethod - @internal_api_call @provide_session def get_previous_dagrun( - dag_run: DagRun | DagRunPydantic, state: DagRunState | None = None, session: Session = NEW_SESSION + dag_run: DagRun, state: DagRunState | None = None, session: Session = NEW_SESSION ) -> DagRun | None: """ Return the previous DagRun, if there is one. @@ -789,7 +781,6 @@ def get_previous_dagrun( return session.scalar(select(DagRun).where(*filters).order_by(DagRun.logical_date.desc()).limit(1)) @staticmethod - @internal_api_call @provide_session def get_previous_scheduled_dagrun( dag_run_id: int, @@ -1706,15 +1697,12 @@ def schedule_tis( return count @provide_session - def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic: + def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate: return DagRun._get_log_template(log_template_id=self.log_template_id, session=session) @staticmethod - @internal_api_call @provide_session - def _get_log_template( - log_template_id: int | None, session: Session = NEW_SESSION - ) -> LogTemplate | LogTemplatePydantic: + def _get_log_template(log_template_id: int | None, session: Session = NEW_SESSION) -> LogTemplate: template: LogTemplate | None if log_template_id is None: # DagRun created before LogTemplate introduction. template = session.scalar(select(LogTemplate).order_by(LogTemplate.id).limit(1)) diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index 87d6f48f111b8..431903a8b9fce 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -220,7 +220,7 @@ def xcom_push( session=session, ) - def get_dagrun(self, session: Session | None = None) -> DagRunPydantic: + def get_dagrun(self, session: Session | None = None) -> DagRun: """ Return the DagRun for this TaskInstance.