Skip to content

Commit

Permalink
Fix triggerer race condition in HA setting (apache#38666)
Browse files Browse the repository at this point in the history
Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
Lioscro and uranusjr authored Jun 3, 2024
1 parent 1bf8479 commit da3a77a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
14 changes: 14 additions & 0 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,20 @@ def update_triggers(self, requested_trigger_ids: set[int]):
self.failed_triggers.append((new_id, e))
continue

# If new_trigger_orm.task_instance is None, this means the TaskInstance
# row was updated by either Trigger.submit_event or Trigger.submit_failure
# and can happen when a single trigger Job is being run on multiple TriggerRunners
# in a High-Availability setup.
if new_trigger_orm.task_instance is None:
self.log.info(
(
"TaskInstance for Trigger ID %s is None. It was likely updated by another trigger job. "
"Skipping trigger instantiation."
),
new_id,
)
continue

try:
new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
except TypeError as err:
Expand Down
80 changes: 80 additions & 0 deletions tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,86 @@ def test_update_trigger_with_triggerer_argument_change(
assert "got an unexpected keyword argument 'not_exists_arg'" in caplog.text


@pytest.mark.asyncio
async def test_trigger_create_race_condition_38599(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue #38599.
More details in the issue description.
The race condition may occur in the following scenario:
1. TaskInstance TI1 defers itself, which creates Trigger T1, which holds a
reference to TI1.
2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1.
3. TJR1 misses a heartbeat, most likely due to high host load causing delays in
each TriggererJobRunner._run_trigger_loop loop.
4. A second TriggererJobRunner TJR2 notices that T1 has missed its heartbeat,
so it starts the process of picking up any Triggers that TJR1 may have had,
including T1.
5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and cleans it
up by clearing the trigger_id of TI1.
6. TJR2 tries to execute T1, but it crashes (with the above error) while trying to
look up TI1 (because T1 no longer has a TaskInstance linked to it).
"""
path = tmp_path / "test_trigger_create_after_completion.txt"
trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), filename=path.as_posix())
trigger_orm = Trigger.from_object(trigger)
trigger_orm.id = 1
session.add(trigger_orm)

dag = DagModel(dag_id="test-dag")
dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none")
ti = TaskInstance(
PythonOperator(task_id="dummy-task", python_callable=print),
run_id=dag_run.run_id,
state=TaskInstanceState.DEFERRED,
)
ti.dag_id = dag.dag_id
ti.trigger_id = 1
session.add(dag)
session.add(dag_run)
session.add(ti)

job1 = Job()
job2 = Job()
session.add(job1)
session.add(job2)

session.commit()

job_runner1 = TriggererJobRunner(job1)
job_runner2 = TriggererJobRunner(job2)

# Assign and run the trigger on the first TriggererJobRunner
# Instead of running job_runner1._execute, we will run the individual methods
# to control the timing of the execution.
job_runner1.load_triggers()
assert len(job_runner1.trigger_runner.to_create) == 1
# Before calling job_runner1.handle_events, run the trigger synchronously
await job_runner1.trigger_runner.create_triggers()
assert len(job_runner1.trigger_runner.triggers) == 1
_, trigger_task_info = next(iter(job_runner1.trigger_runner.triggers.items()))
await trigger_task_info["task"]
assert trigger_task_info["task"].done()

# In a real execution environment, a missed heartbeat would cause the trigger to be picked up
# by another TriggererJobRunner.
# In this test, however, this is not necessary because we are controlling the execution
# of the TriggererJobRunner.
# job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta(hours=1)
# session.commit()

# This calls Trigger.submit_event, which will unlink the trigger from the task instance
job_runner1.handle_events()

# Simulate the second TriggererJobRunner picking up the trigger
job_runner2.trigger_runner.update_triggers({trigger_orm.id})
# The race condition happens here.
# AttributeError: 'NoneType' object has no attribute 'dag_id'
await job_runner2.trigger_runner.create_triggers()

assert path.read_text() == "hi\n"


def test_trigger_create_race_condition_18392(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue #18392.
Expand Down

0 comments on commit da3a77a

Please sign in to comment.