From a825c95afab95b23f20aee2306eca81942a8a405 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 19 Nov 2024 13:49:51 -0800 Subject: [PATCH] Don't create new session in stuck queue reschedule handler (#44192) This is a fix up / followup to #43520 It does not really make a material difference, just, I'm avoiding use of the session decorator, and the create / dispose session logic, when it is not needed. i also commit as i go along since there's no reason to handle multiple distinct tis in the same transaction. --- airflow/jobs/scheduler_job_runner.py | 8 ++++---- tests/jobs/test_scheduler_job.py | 9 +++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index dda49e4f84386..96a09eb99bb28 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1800,6 +1800,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ti=ti, session=session, ) + session.commit() except NotImplementedError: # this block only gets entered if the executor has not implemented `revoke_task`. # in which case, we try the fallback logic @@ -1838,7 +1839,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session): ), ) ) - self._reschedule_stuck_task(ti) + self._reschedule_stuck_task(ti, session=session) else: self.log.info( "Task requeue attempts exceeded max; marking failed. task_instance=%s", @@ -1875,8 +1876,7 @@ def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis): ti_repr, ) - @provide_session - def _reschedule_stuck_task(self, ti, session=NEW_SESSION): + def _reschedule_stuck_task(self, ti: TaskInstance, session: Session): session.execute( update(TI) .where(TI.filter_for_tis([ti])) @@ -1890,7 +1890,7 @@ def _reschedule_stuck_task(self, ti, session=NEW_SESSION): @provide_session def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: """ - Check the Log table to see how many times a taskinstance has been stuck in queued. + Check the Log table to see how many times a task instance has been stuck in queued. We can then use this information to determine whether to reschedule a task or fail it. """ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 12423157ccb55..fbf1d4228b56f 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2276,8 +2276,7 @@ def _queue_tasks(tis): scheduler._task_queued_timeout = -300 # always in violation of timeout with _loader_mock(mock_executors): - scheduler._handle_tasks_stuck_in_queued(session=session) - + scheduler._handle_tasks_stuck_in_queued() # If the task gets stuck in queued once, we reset it to scheduled tis = dr.get_task_instances(session=session) assert [x.state for x in tis] == ["scheduled", "scheduled"] @@ -2291,8 +2290,7 @@ def _queue_tasks(tis): ] with _loader_mock(mock_executors): - scheduler._handle_tasks_stuck_in_queued(session=session) - session.commit() + scheduler._handle_tasks_stuck_in_queued() log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] assert log_events == [ @@ -2307,8 +2305,7 @@ def _queue_tasks(tis): _queue_tasks(tis=tis) with _loader_mock(mock_executors): - scheduler._handle_tasks_stuck_in_queued(session=session) - session.commit() + scheduler._handle_tasks_stuck_in_queued() log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] assert log_events == [ "stuck in queued reschedule",