Skip to content

Commit

Permalink
Remove AIP-44 from models/dagrun (apache#44496)
Browse files Browse the repository at this point in the history
  • Loading branch information
rawwar authored Nov 30, 2024
1 parent 57d109c commit f36e2bf
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 18 deletions.
22 changes: 5 additions & 17 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from sqlalchemy_utils import UUIDType

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, TaskNotFound
Expand Down Expand Up @@ -86,9 +85,6 @@

from airflow.models.dag import DAG
from airflow.models.operator import Operator
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.pydantic.tasklog import LogTemplatePydantic
from airflow.typing_compat import Literal
from airflow.utils.types import ArgNotSet

Expand Down Expand Up @@ -611,7 +607,6 @@ def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str:
return DagRunType(run_type).generate_run_id(logical_date)

@staticmethod
@internal_api_call
@provide_session
def fetch_task_instances(
dag_id: str | None = None,
Expand Down Expand Up @@ -648,7 +643,6 @@ def fetch_task_instances(
tis = tis.where(TI.task_id.in_(task_ids))
return session.scalars(tis).all()

@internal_api_call
def _check_last_n_dagruns_failed(self, dag_id, max_consecutive_failed_dag_runs, session):
"""Check if last N dags failed."""
dag_runs = (
Expand Down Expand Up @@ -719,7 +713,7 @@ def get_task_instance(
session: Session = NEW_SESSION,
*,
map_index: int = -1,
) -> TI | TaskInstancePydantic | None:
) -> TI | None:
"""
Return the task instance specified by task_id for this dag run.
Expand All @@ -735,15 +729,14 @@ def get_task_instance(
)

@staticmethod
@internal_api_call
@provide_session
def fetch_task_instance(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Session = NEW_SESSION,
map_index: int = -1,
) -> TI | TaskInstancePydantic | None:
) -> TI | None:
"""
Return the task instance specified by task_id for this dag run.
Expand All @@ -768,10 +761,9 @@ def get_dag(self) -> DAG:
return self.dag

@staticmethod
@internal_api_call
@provide_session
def get_previous_dagrun(
dag_run: DagRun | DagRunPydantic, state: DagRunState | None = None, session: Session = NEW_SESSION
dag_run: DagRun, state: DagRunState | None = None, session: Session = NEW_SESSION
) -> DagRun | None:
"""
Return the previous DagRun, if there is one.
Expand All @@ -789,7 +781,6 @@ def get_previous_dagrun(
return session.scalar(select(DagRun).where(*filters).order_by(DagRun.logical_date.desc()).limit(1))

@staticmethod
@internal_api_call
@provide_session
def get_previous_scheduled_dagrun(
dag_run_id: int,
Expand Down Expand Up @@ -1706,15 +1697,12 @@ def schedule_tis(
return count

@provide_session
def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic:
def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate:
return DagRun._get_log_template(log_template_id=self.log_template_id, session=session)

@staticmethod
@internal_api_call
@provide_session
def _get_log_template(
log_template_id: int | None, session: Session = NEW_SESSION
) -> LogTemplate | LogTemplatePydantic:
def _get_log_template(log_template_id: int | None, session: Session = NEW_SESSION) -> LogTemplate:
template: LogTemplate | None
if log_template_id is None: # DagRun created before LogTemplate introduction.
template = session.scalar(select(LogTemplate).order_by(LogTemplate.id).limit(1))
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def xcom_push(
session=session,
)

def get_dagrun(self, session: Session | None = None) -> DagRunPydantic:
def get_dagrun(self, session: Session | None = None) -> DagRun:
"""
Return the DagRun for this TaskInstance.
Expand Down

0 comments on commit f36e2bf

Please sign in to comment.