Skip to content

Commit

Permalink
Merge pull request #7 from avkirilishin/33948_bug_eventstimetable_sch…
Browse files Browse the repository at this point in the history
…edules_past_events

EventsTimetable schedules past events even if catchup=False
  • Loading branch information
avkirilishin authored Dec 9, 2023
2 parents 845bcd1 + 29d02c5 commit 52a7e10
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 18 deletions.
45 changes: 30 additions & 15 deletions airflow/timetables/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ def __init__(
self.event_dates.sort()
self.restrict_to_events = restrict_to_events
if description is None:
self.description = (
f"{len(self.event_dates)} Events between {self.event_dates[0]} and {self.event_dates[-1]}"
)
self._summary = f"{len(self.event_dates)} Events"
if self.event_dates:
self.description = (
f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}"
)
else:
self.description = "No events"
self._summary = f"{len(self.event_dates)} events"
else:
self._summary = description
self.description = description
Expand All @@ -79,22 +82,34 @@ def next_dagrun_info(
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is None:
next_event = self.event_dates[0]
else:
future_dates = itertools.dropwhile(
lambda when: when <= last_automated_data_interval.end, # type: ignore
self.event_dates,
)
next_event = next(future_dates, None) # type: ignore
if next_event is None:
return None
earliest = restriction.earliest
if not restriction.catchup:
current_time = pendulum.DateTime.utcnow()
if earliest is None or current_time > earliest:
earliest = current_time

dates = iter(self.event_dates)
next_event = next(dates, None) # type: ignore
while next_event:
is_allowed = True
if earliest and next_event < earliest:
is_allowed = False
if last_automated_data_interval and next_event <= last_automated_data_interval.end:
is_allowed = False
if is_allowed:
break
next_event = next(dates, None) # type: ignore
if next_event is None:
return None

if restriction.latest is not None and next_event > restriction.latest:
return None

return DagRunInfo.exact(next_event)

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# If Timetable not restricted to events, run for the time specified
if not self.restrict_to_events:
if not self.restrict_to_events or not self.event_dates:
return DataInterval.exact(run_after)

# If restricted to events, run for the most recent past event
Expand Down
64 changes: 61 additions & 3 deletions tests/timetables/test_events_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import pendulum
import pytest
import time_machine

from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.events import EventsTimetable
from airflow.utils.timezone import utc

START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc)

EVENT_DATES = [
pendulum.DateTime(2021, 9, 6, tzinfo=utc),
Expand Down Expand Up @@ -93,16 +95,23 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict
Test that when using strict event dates, manual runs before the first event have the first event's date
as the start interval
"""
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE)
expected_data_interval = DataInterval.exact(EVENT_DATES[0])
assert expected_data_interval == manual_run_data_interval


@pytest.mark.parametrize(
"last_automated_data_interval, expected_next_info",
[
pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)),
pytest.param(
DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]),
DagRunInfo.interval(START_DATE, START_DATE),
),
]
+ [
pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2))
for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:])
for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:])
]
+ [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], EVENT_DATES_SORTED[-1]), None)],
)
Expand All @@ -118,3 +127,52 @@ def test_subsequent_weekday_schedule(
restriction=restriction,
)
assert next_info == expected_next_info


@pytest.mark.parametrize(
"current_date",
[
pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), id="when-current-date-is-before-first-event"),
pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), id="when-current-date-is-in-the-middle"),
pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), id="when-current-date-is-after-last-event"),
],
)
@pytest.mark.parametrize(
"last_automated_data_interval",
[
pytest.param(None, id="first-run"),
pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), id="subsequent-run"),
],
)
def test_no_catchup_first_starts(
last_automated_data_interval: DataInterval | None,
current_date,
unrestricted_timetable: Timetable,
) -> None:
# we don't use the last_automated_data_interval here because it's always less than the first event
expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0])
expected_info = None
if expected_date <= EVENT_DATES_SORTED[-1]:
expected_info = DagRunInfo.interval(start=expected_date, end=expected_date)

with time_machine.travel(current_date):
next_info = unrestricted_timetable.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False),
)
assert next_info == expected_info


def test_empty_timetable() -> None:
empty_timetable = EventsTimetable(event_dates=[])
next_info = empty_timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False),
)
assert next_info is None


def test_empty_timetable_manual_run() -> None:
empty_timetable = EventsTimetable(event_dates=[])
manual_run_data_interval = empty_timetable.infer_manual_data_interval(run_after=START_DATE)
assert manual_run_data_interval == DataInterval(start=START_DATE, end=START_DATE)

0 comments on commit 52a7e10

Please sign in to comment.