Skip to content

Commit

Permalink
Fix the EventsTimetable schedules past events bug
Browse files Browse the repository at this point in the history
  • Loading branch information
avkirilishin committed Dec 8, 2023
1 parent 845bcd1 commit 474e485
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 13 deletions.
32 changes: 22 additions & 10 deletions airflow/timetables/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,28 @@ def next_dagrun_info(
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is None:
next_event = self.event_dates[0]
else:
future_dates = itertools.dropwhile(
lambda when: when <= last_automated_data_interval.end, # type: ignore
self.event_dates,
)
next_event = next(future_dates, None) # type: ignore
if next_event is None:
return None
earliest = restriction.earliest
if not restriction.catchup:
current_time = pendulum.DateTime.utcnow()
if earliest is None or current_time > earliest:
earliest = current_time

dates = iter(self.event_dates)
next_event = next(dates, None) # type: ignore
while next_event:
is_allowed = True
if earliest and next_event < earliest:
is_allowed = False
if last_automated_data_interval and next_event <= last_automated_data_interval.end:
is_allowed = False
if is_allowed:
break
next_event = next(dates, None) # type: ignore
if next_event is None:
return None

if restriction.latest is not None and next_event > restriction.latest:
return None

return DagRunInfo.exact(next_event)

Expand Down
49 changes: 46 additions & 3 deletions tests/timetables/test_events_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import pendulum
import pytest
import time_machine

from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.events import EventsTimetable
from airflow.utils.timezone import utc

START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc)

EVENT_DATES = [
pendulum.DateTime(2021, 9, 6, tzinfo=utc),
Expand Down Expand Up @@ -93,16 +95,23 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict
Test that when using strict event dates, manual runs before the first event have the first event's date
as the start interval
"""
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE)
expected_data_interval = DataInterval.exact(EVENT_DATES[0])
assert expected_data_interval == manual_run_data_interval


@pytest.mark.parametrize(
"last_automated_data_interval, expected_next_info",
[
pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)),
pytest.param(
DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]),
DagRunInfo.interval(START_DATE, START_DATE),
),
]
+ [
pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2))
for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:])
for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:])
]
+ [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], EVENT_DATES_SORTED[-1]), None)],
)
Expand All @@ -118,3 +127,37 @@ def test_subsequent_weekday_schedule(
restriction=restriction,
)
assert next_info == expected_next_info


@pytest.mark.parametrize(
"current_date",
[
pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), id="when-current-date-is-before-first-event"),
pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), id="when-current-date-is-in-the-middle"),
pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), id="when-current-date-is-after-last-event"),
],
)
@pytest.mark.parametrize(
"last_automated_data_interval",
[
pytest.param(None, id="first-run"),
pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), id="subsequent-run"),
],
)
def test_no_catchup_first_starts(
last_automated_data_interval: DataInterval | None,
current_date,
unrestricted_timetable: Timetable,
) -> None:
# we don't use the last_automated_data_interval here because it's always less than the first event
expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0])
expected_info = None
if expected_date <= EVENT_DATES_SORTED[-1]:
expected_info = DagRunInfo.interval(start=expected_date, end=expected_date)

with time_machine.travel(current_date):
next_info = unrestricted_timetable.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False),
)
assert next_info == expected_info

0 comments on commit 474e485

Please sign in to comment.