diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index dda49e4f84386..96a09eb99bb28 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1800,6 +1800,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ti=ti, session=session, ) + session.commit() except NotImplementedError: # this block only gets entered if the executor has not implemented `revoke_task`. # in which case, we try the fallback logic @@ -1838,7 +1839,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session): ), ) ) - self._reschedule_stuck_task(ti) + self._reschedule_stuck_task(ti, session=session) else: self.log.info( "Task requeue attempts exceeded max; marking failed. task_instance=%s", @@ -1875,8 +1876,7 @@ def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis): ti_repr, ) - @provide_session - def _reschedule_stuck_task(self, ti, session=NEW_SESSION): + def _reschedule_stuck_task(self, ti: TaskInstance, session: Session): session.execute( update(TI) .where(TI.filter_for_tis([ti])) @@ -1890,7 +1890,7 @@ def _reschedule_stuck_task(self, ti, session=NEW_SESSION): @provide_session def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: """ - Check the Log table to see how many times a taskinstance has been stuck in queued. + Check the Log table to see how many times a task instance has been stuck in queued. We can then use this information to determine whether to reschedule a task or fail it. """ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 12423157ccb55..fbf1d4228b56f 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2276,8 +2276,7 @@ def _queue_tasks(tis): scheduler._task_queued_timeout = -300 # always in violation of timeout with _loader_mock(mock_executors): - scheduler._handle_tasks_stuck_in_queued(session=session) - + scheduler._handle_tasks_stuck_in_queued() # If the task gets stuck in queued once, we reset it to scheduled tis = dr.get_task_instances(session=session) assert [x.state for x in tis] == ["scheduled", "scheduled"] @@ -2291,8 +2290,7 @@ def _queue_tasks(tis): ] with _loader_mock(mock_executors): - scheduler._handle_tasks_stuck_in_queued(session=session) - session.commit() + scheduler._handle_tasks_stuck_in_queued() log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] assert log_events == [ @@ -2307,8 +2305,7 @@ def _queue_tasks(tis): _queue_tasks(tis=tis) with _loader_mock(mock_executors): - scheduler._handle_tasks_stuck_in_queued(session=session) - session.commit() + scheduler._handle_tasks_stuck_in_queued() log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()] assert log_events == [ "stuck in queued reschedule",