Skip to content

Commit

Permalink
Move LatestOnlyOperator operator to standard provider. (apache#44309)
Browse files Browse the repository at this point in the history
* Moved latest_only operator to standard provider along with all references

* Add latest_only operator to standard provider.yaml file.

* Move latest_only operator test file to Standard provider

* Add skip condition to latest_only_operator tests that are not backward compatible

* Revert skipping test to using a conditional code depending on airflow version
  • Loading branch information
hardeybisey authored Nov 30, 2024
1 parent 14b32ea commit 57d109c
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 12 deletions.
2 changes: 1 addition & 1 deletion airflow/example_dags/example_latest_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator

with DAG(
dag_id="latest_only",
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_latest_only_with_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.operators.generic_transfer`
-

* - :mod:`airflow.operators.latest_only`
* - :mod:`airflow.providers.standard.operators.latest_only`
-

* - :mod:`airflow.providers.standard.operators.trigger_dagrun`
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions providers/src/airflow/providers/standard/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ operators:
- airflow.providers.standard.operators.python
- airflow.providers.standard.operators.generic_transfer
- airflow.providers.standard.operators.trigger_dagrun
- airflow.providers.standard.operators.latest_only

sensors:
- integration-name: Standard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow import settings
from airflow.models import DagRun, TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
Expand All @@ -48,11 +48,12 @@

def get_task_instances(task_id):
session = settings.Session()
logical_date = DagRun.logical_date if AIRFLOW_V_3_0_PLUS else DagRun.execution_date
return (
session.query(TaskInstance)
.join(TaskInstance.dag_run)
.filter(TaskInstance.task_id == task_id)
.order_by(DagRun.logical_date)
.order_by(logical_date)
.all()
)

Expand Down Expand Up @@ -130,31 +131,43 @@ def test_skipping_non_latest(self, dag_maker):
downstream_task3.run(start_date=DEFAULT_DATE, end_date=END_DATE)

latest_instances = get_task_instances("latest")
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
else:
exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances}
assert exec_date_to_latest_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "skipped",
timezone.datetime(2016, 1, 1, 12): "skipped",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream_2")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): None,
timezone.datetime(2016, 1, 1, 12): None,
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream_3")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
Expand Down Expand Up @@ -210,23 +223,32 @@ def test_not_skipping_external(self, dag_maker):
downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE)

latest_instances = get_task_instances("latest")
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
else:
exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances}
assert exec_date_to_latest_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream_2")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
Expand Down

0 comments on commit 57d109c

Please sign in to comment.