Skip to content

Commit

Permalink
Remove unused code from airflow/api/common/mark_tasks.py (apache#44296
Browse files Browse the repository at this point in the history
)

PR title is self explanatory :)
  • Loading branch information
kaxil authored Nov 23, 2024
1 parent 3c58e01 commit 2179b17
Showing 1 changed file with 2 additions and 74 deletions.
76 changes: 2 additions & 74 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Collection, Iterable, Iterator, NamedTuple
from typing import TYPE_CHECKING, Collection, Iterable

from sqlalchemy import and_, or_, select
from sqlalchemy.orm import lazyload
Expand All @@ -29,7 +29,6 @@
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

if TYPE_CHECKING:
from datetime import datetime
Expand All @@ -38,47 +37,6 @@

from airflow.models.dag import DAG
from airflow.models.operator import Operator
from airflow.utils.types import DagRunType


class _DagRunInfo(NamedTuple):
logical_date: datetime
data_interval: tuple[datetime, datetime]


def _create_dagruns(
dag: DAG,
infos: Iterable[_DagRunInfo],
state: DagRunState,
run_type: DagRunType,
) -> Iterable[DagRun]:
"""
Infers from data intervals which DAG runs need to be created and does so.
:param dag: The DAG to create runs for.
:param infos: List of logical dates and data intervals to evaluate.
:param state: The state to set the dag run to
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{logical_date}``.
:return: Newly created and existing dag runs for the logical dates supplied.
"""
# Find out existing DAG runs that we don't need to create.
dag_runs = {
run.logical_date: run
for run in DagRun.find(dag_id=dag.dag_id, logical_date=[info.logical_date for info in infos])
}

for info in infos:
if info.logical_date not in dag_runs:
dag_runs[info.logical_date] = dag.create_dagrun(
logical_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
state=state,
run_type=run_type,
triggered_by=DagRunTriggeredByType.TIMETABLE,
)
return dag_runs.values()


@provide_session
Expand Down Expand Up @@ -131,7 +89,7 @@ def set_state(
task_id_map_index_list = list(find_task_relatives(tasks, downstream, upstream))
# now look for the task instances that are affected

qry_dag = get_all_dag_task_query(dag, session, state, task_id_map_index_list, dag_run_ids)
qry_dag = get_all_dag_task_query(dag, state, task_id_map_index_list, dag_run_ids)

if commit:
tis_altered = session.scalars(qry_dag.with_for_update()).all()
Expand All @@ -145,7 +103,6 @@ def set_state(

def get_all_dag_task_query(
dag: DAG,
session: SASession,
state: TaskInstanceState,
task_ids: list[str | tuple[str, int]],
run_ids: Iterable[str],
Expand All @@ -163,13 +120,6 @@ def get_all_dag_task_query(
return qry_dag


def _iter_existing_dag_run_infos(dag: DAG, run_ids: list[str], session: SASession) -> Iterator[_DagRunInfo]:
for dag_run in DagRun.find(dag_id=dag.dag_id, run_id=run_ids, session=session):
dag_run.dag = dag
dag_run.verify_integrity(session=session)
yield _DagRunInfo(dag_run.logical_date, dag.get_run_data_interval(dag_run))


def find_task_relatives(tasks, downstream, upstream):
"""Yield task ids and optionally ancestor and descendant ids."""
for item in tasks:
Expand Down Expand Up @@ -417,28 +367,6 @@ def __set_dag_run_state_to_running_or_queued(
return res


@provide_session
def set_dag_run_state_to_running(
*,
dag: DAG,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to running.
Set for a specific logical date and its task instances to running.
"""
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.RUNNING,
dag=dag,
run_id=run_id,
commit=commit,
session=session,
)


@provide_session
def set_dag_run_state_to_queued(
*,
Expand Down

0 comments on commit 2179b17

Please sign in to comment.