Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventsTimetable schedules past events even if catchup=False #7

Merged
merged 2 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions airflow/timetables/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ def __init__(
self.event_dates.sort()
self.restrict_to_events = restrict_to_events
if description is None:
self.description = (
f"{len(self.event_dates)} Events between {self.event_dates[0]} and {self.event_dates[-1]}"
)
self._summary = f"{len(self.event_dates)} Events"
if self.event_dates:
self.description = (
f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}"
)
else:
self.description = "No events"
self._summary = f"{len(self.event_dates)} events"
else:
self._summary = description
self.description = description
Expand All @@ -79,22 +82,34 @@ def next_dagrun_info(
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is None:
next_event = self.event_dates[0]
else:
future_dates = itertools.dropwhile(
lambda when: when <= last_automated_data_interval.end, # type: ignore
self.event_dates,
)
next_event = next(future_dates, None) # type: ignore
if next_event is None:
return None
earliest = restriction.earliest
if not restriction.catchup:
current_time = pendulum.DateTime.utcnow()
if earliest is None or current_time > earliest:
earliest = current_time

dates = iter(self.event_dates)
next_event = next(dates, None) # type: ignore
while next_event:
is_allowed = True
if earliest and next_event < earliest:
is_allowed = False
if last_automated_data_interval and next_event <= last_automated_data_interval.end:
is_allowed = False
if is_allowed:
break
next_event = next(dates, None) # type: ignore
if next_event is None:
return None

if restriction.latest is not None and next_event > restriction.latest:
return None

return DagRunInfo.exact(next_event)

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# If Timetable not restricted to events, run for the time specified
if not self.restrict_to_events:
if not self.restrict_to_events or not self.event_dates:
return DataInterval.exact(run_after)

# If restricted to events, run for the most recent past event
Expand Down
64 changes: 61 additions & 3 deletions tests/timetables/test_events_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import pendulum
import pytest
import time_machine

from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.events import EventsTimetable
from airflow.utils.timezone import utc

START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc)

EVENT_DATES = [
pendulum.DateTime(2021, 9, 6, tzinfo=utc),
Expand Down Expand Up @@ -93,16 +95,23 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict
Test that when using strict event dates, manual runs before the first event have the first event's date
as the start interval
"""
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE)
expected_data_interval = DataInterval.exact(EVENT_DATES[0])
assert expected_data_interval == manual_run_data_interval


@pytest.mark.parametrize(
"last_automated_data_interval, expected_next_info",
[
pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)),
pytest.param(
DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]),
DagRunInfo.interval(START_DATE, START_DATE),
),
]
+ [
pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2))
for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:])
for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:])
]
+ [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], EVENT_DATES_SORTED[-1]), None)],
)
Expand All @@ -118,3 +127,52 @@ def test_subsequent_weekday_schedule(
restriction=restriction,
)
assert next_info == expected_next_info


@pytest.mark.parametrize(
"current_date",
[
pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), id="when-current-date-is-before-first-event"),
pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), id="when-current-date-is-in-the-middle"),
pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), id="when-current-date-is-after-last-event"),
],
)
@pytest.mark.parametrize(
"last_automated_data_interval",
[
pytest.param(None, id="first-run"),
pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), id="subsequent-run"),
],
)
def test_no_catchup_first_starts(
last_automated_data_interval: DataInterval | None,
current_date,
unrestricted_timetable: Timetable,
) -> None:
# we don't use the last_automated_data_interval here because it's always less than the first event
expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0])
expected_info = None
if expected_date <= EVENT_DATES_SORTED[-1]:
expected_info = DagRunInfo.interval(start=expected_date, end=expected_date)

with time_machine.travel(current_date):
next_info = unrestricted_timetable.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False),
)
assert next_info == expected_info


def test_empty_timetable() -> None:
empty_timetable = EventsTimetable(event_dates=[])
next_info = empty_timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False),
)
assert next_info is None


def test_empty_timetable_manual_run() -> None:
empty_timetable = EventsTimetable(event_dates=[])
manual_run_data_interval = empty_timetable.infer_manual_data_interval(run_after=START_DATE)
assert manual_run_data_interval == DataInterval(start=START_DATE, end=START_DATE)
Loading