From 474e4851633ad697876fc2cfeb71b16fe054f5ed Mon Sep 17 00:00:00 2001 From: Kirilishin Aleksei Date: Sat, 9 Dec 2023 03:21:15 +0400 Subject: [PATCH 1/2] Fix the EventsTimetable schedules past events bug --- airflow/timetables/events.py | 32 ++++++++++----- tests/timetables/test_events_timetable.py | 49 +++++++++++++++++++++-- 2 files changed, 68 insertions(+), 13 deletions(-) diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py index 1998b12d46172..53d5019da4226 100644 --- a/airflow/timetables/events.py +++ b/airflow/timetables/events.py @@ -79,16 +79,28 @@ def next_dagrun_info( last_automated_data_interval: DataInterval | None, restriction: TimeRestriction, ) -> DagRunInfo | None: - if last_automated_data_interval is None: - next_event = self.event_dates[0] - else: - future_dates = itertools.dropwhile( - lambda when: when <= last_automated_data_interval.end, # type: ignore - self.event_dates, - ) - next_event = next(future_dates, None) # type: ignore - if next_event is None: - return None + earliest = restriction.earliest + if not restriction.catchup: + current_time = pendulum.DateTime.utcnow() + if earliest is None or current_time > earliest: + earliest = current_time + + dates = iter(self.event_dates) + next_event = next(dates, None) # type: ignore + while next_event: + is_allowed = True + if earliest and next_event < earliest: + is_allowed = False + if last_automated_data_interval and next_event <= last_automated_data_interval.end: + is_allowed = False + if is_allowed: + break + next_event = next(dates, None) # type: ignore + if next_event is None: + return None + + if restriction.latest is not None and next_event > restriction.latest: + return None return DagRunInfo.exact(next_event) diff --git a/tests/timetables/test_events_timetable.py b/tests/timetables/test_events_timetable.py index e743000f07360..131a618de6b8d 100644 --- a/tests/timetables/test_events_timetable.py +++ b/tests/timetables/test_events_timetable.py @@ -19,12 +19,14 @@ import pendulum import pytest +import time_machine from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.events import EventsTimetable from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events +BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events +START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc) EVENT_DATES = [ pendulum.DateTime(2021, 9, 6, tzinfo=utc), @@ -93,7 +95,7 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict Test that when using strict event dates, manual runs before the first event have the first event's date as the start interval """ - manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=START_DATE) + manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE) expected_data_interval = DataInterval.exact(EVENT_DATES[0]) assert expected_data_interval == manual_run_data_interval @@ -101,8 +103,15 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict @pytest.mark.parametrize( "last_automated_data_interval, expected_next_info", [ + pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)), + pytest.param( + DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]), + DagRunInfo.interval(START_DATE, START_DATE), + ), + ] + + [ pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2)) - for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:]) + for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:]) ] + [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], EVENT_DATES_SORTED[-1]), None)], ) @@ -118,3 +127,37 @@ def test_subsequent_weekday_schedule( restriction=restriction, ) assert next_info == expected_next_info + + +@pytest.mark.parametrize( + "current_date", + [ + pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), id="when-current-date-is-before-first-event"), + pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), id="when-current-date-is-in-the-middle"), + pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), id="when-current-date-is-after-last-event"), + ], +) +@pytest.mark.parametrize( + "last_automated_data_interval", + [ + pytest.param(None, id="first-run"), + pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), id="subsequent-run"), + ], +) +def test_no_catchup_first_starts( + last_automated_data_interval: DataInterval | None, + current_date, + unrestricted_timetable: Timetable, +) -> None: + # we don't use the last_automated_data_interval here because it's always less than the first event + expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0]) + expected_info = None + if expected_date <= EVENT_DATES_SORTED[-1]: + expected_info = DagRunInfo.interval(start=expected_date, end=expected_date) + + with time_machine.travel(current_date): + next_info = unrestricted_timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False), + ) + assert next_info == expected_info From 29d02c56f5cada4896ae00613ab15574b5d29746 Mon Sep 17 00:00:00 2001 From: Kirilishin Aleksei Date: Sat, 9 Dec 2023 03:24:19 +0400 Subject: [PATCH 2/2] Add the empty timetable tests --- airflow/timetables/events.py | 13 ++++++++----- tests/timetables/test_events_timetable.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py index 53d5019da4226..0edf1b101299c 100644 --- a/airflow/timetables/events.py +++ b/airflow/timetables/events.py @@ -58,10 +58,13 @@ def __init__( self.event_dates.sort() self.restrict_to_events = restrict_to_events if description is None: - self.description = ( - f"{len(self.event_dates)} Events between {self.event_dates[0]} and {self.event_dates[-1]}" - ) - self._summary = f"{len(self.event_dates)} Events" + if self.event_dates: + self.description = ( + f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}" + ) + else: + self.description = "No events" + self._summary = f"{len(self.event_dates)} events" else: self._summary = description self.description = description @@ -106,7 +109,7 @@ def next_dagrun_info( def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # If Timetable not restricted to events, run for the time specified - if not self.restrict_to_events: + if not self.restrict_to_events or not self.event_dates: return DataInterval.exact(run_after) # If restricted to events, run for the most recent past event diff --git a/tests/timetables/test_events_timetable.py b/tests/timetables/test_events_timetable.py index 131a618de6b8d..39d1fd3431e34 100644 --- a/tests/timetables/test_events_timetable.py +++ b/tests/timetables/test_events_timetable.py @@ -161,3 +161,18 @@ def test_no_catchup_first_starts( restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False), ) assert next_info == expected_info + + +def test_empty_timetable() -> None: + empty_timetable = EventsTimetable(event_dates=[]) + next_info = empty_timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False), + ) + assert next_info is None + + +def test_empty_timetable_manual_run() -> None: + empty_timetable = EventsTimetable(event_dates=[]) + manual_run_data_interval = empty_timetable.infer_manual_data_interval(run_after=START_DATE) + assert manual_run_data_interval == DataInterval(start=START_DATE, end=START_DATE)