diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1264af56a534c..b9d18098f6fec 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1711,6 +1711,7 @@ def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None: if num_timed_out_tasks: self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks) + # [START find_zombies] def _find_zombies(self) -> None: """ Find zombie task instances and create a TaskCallbackRequest to be handled by the DAG processor. @@ -1763,6 +1764,8 @@ def _find_zombies(self) -> None: self.job.executor.send_callback(request) Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id}) + # [END find_zombies] + @staticmethod def _generate_zombie_message_details(ti: TI) -> dict[str, Any]: zombie_message_details = { diff --git a/docs/apache-airflow/core-concepts/tasks.rst b/docs/apache-airflow/core-concepts/tasks.rst index 5e305e2fe83e5..d72c14a27eb09 100644 --- a/docs/apache-airflow/core-concepts/tasks.rst +++ b/docs/apache-airflow/core-concepts/tasks.rst @@ -243,9 +243,82 @@ Zombie/Undead Tasks No system runs perfectly, and task instances are expected to die once in a while. Airflow detects two kinds of task/process mismatch: -* *Zombie tasks* are tasks that are supposed to be running but suddenly died (e.g. their process was killed, or the machine died). Airflow will find these periodically, clean them up, and either fail or retry the task depending on its settings. +* *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive + (e.g. their process didn't send a recent heartbeat as it got killed, or the machine died). Airflow will find these + periodically, clean them up, and either fail or retry the task depending on its settings. + +* *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task + Instances via the UI. Airflow will find them periodically and terminate them. + + +Below is the code snippet from the Airflow scheduler that runs periodically to detect zombie/undead tasks. + +.. exampleinclude:: /../../airflow/jobs/scheduler_job_runner.py + :language: python + :start-after: [START find_zombies] + :end-before: [END find_zombies] + + +The explanation of the criteria used in the above snippet to detect zombie tasks is as below: + +1. **Task Instance State** + + Only task instances in the RUNNING state are considered potential zombies. + +2. **Job State and Heartbeat Check** + + Zombie tasks are identified if the associated job is not in the RUNNING state or if the latest heartbeat of the job is + earlier than the calculated time threshold (limit_dttm). The heartbeat is a mechanism to indicate that a task or job is + still alive and running. + +3. **Job Type** + + The job associated with the task must be of type "LocalTaskJob." + +4. **Queued by Job ID** + + Only tasks queued by the same job that is currently being processed are considered. + +These conditions collectively help identify running tasks that may be zombies based on their state, associated job +state, heartbeat status, job type, and the specific job that queued them. If a task meets these criteria, it is +considered a potential zombie, and further actions, such as logging and sending a callback request, are taken. + +Reproducing zombie tasks locally +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you'd like to reproduce zombie tasks for development/testing processes, follow the steps below: + +1. Set the below environment variables for your local Airflow setup (alternatively you could tweak the corresponding config values in airflow.cfg) + +.. code-block:: bash + + export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600 + export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2 + export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5 + + +2. Have a DAG with a task that takes about 10 minutes to complete(i.e. a long-running task). For example, you could use the below DAG: + +.. code-block:: python + + from airflow.decorators import dag + from airflow.operators.bash import BashOperator + from datetime import datetime + + + @dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False) + def sleep_dag(): + t1 = BashOperator( + task_id="sleep_10_minutes", + bash_command="sleep 600", + ) + + + sleep_dag() + + +Run the above DAG and wait for a while. You should see the task instance becoming a zombie task and then being killed by the scheduler. -* *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them. Executor Configuration