Skip to content

Commit

Permalink
Enhance docs for zombie tasks (apache#35825)
Browse files Browse the repository at this point in the history
closes: apache#35698
  • Loading branch information
pankajkoti authored Nov 25, 2023
1 parent 7c2885d commit 177da90
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
3 changes: 3 additions & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,7 @@ def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

# [START find_zombies]
def _find_zombies(self) -> None:
"""
Find zombie task instances and create a TaskCallbackRequest to be handled by the DAG processor.
Expand Down Expand Up @@ -1763,6 +1764,8 @@ def _find_zombies(self) -> None:
self.job.executor.send_callback(request)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})

# [END find_zombies]

@staticmethod
def _generate_zombie_message_details(ti: TI) -> dict[str, Any]:
zombie_message_details = {
Expand Down
77 changes: 75 additions & 2 deletions docs/apache-airflow/core-concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,82 @@ Zombie/Undead Tasks

No system runs perfectly, and task instances are expected to die once in a while. Airflow detects two kinds of task/process mismatch:

* *Zombie tasks* are tasks that are supposed to be running but suddenly died (e.g. their process was killed, or the machine died). Airflow will find these periodically, clean them up, and either fail or retry the task depending on its settings.
* *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive
(e.g. their process didn't send a recent heartbeat as it got killed, or the machine died). Airflow will find these
periodically, clean them up, and either fail or retry the task depending on its settings.

* *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task
Instances via the UI. Airflow will find them periodically and terminate them.


Below is the code snippet from the Airflow scheduler that runs periodically to detect zombie/undead tasks.

.. exampleinclude:: /../../airflow/jobs/scheduler_job_runner.py
:language: python
:start-after: [START find_zombies]
:end-before: [END find_zombies]


The explanation of the criteria used in the above snippet to detect zombie tasks is as below:

1. **Task Instance State**

Only task instances in the RUNNING state are considered potential zombies.

2. **Job State and Heartbeat Check**

Zombie tasks are identified if the associated job is not in the RUNNING state or if the latest heartbeat of the job is
earlier than the calculated time threshold (limit_dttm). The heartbeat is a mechanism to indicate that a task or job is
still alive and running.

3. **Job Type**

The job associated with the task must be of type "LocalTaskJob."

4. **Queued by Job ID**

Only tasks queued by the same job that is currently being processed are considered.

These conditions collectively help identify running tasks that may be zombies based on their state, associated job
state, heartbeat status, job type, and the specific job that queued them. If a task meets these criteria, it is
considered a potential zombie, and further actions, such as logging and sending a callback request, are taken.

Reproducing zombie tasks locally
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you'd like to reproduce zombie tasks for development/testing processes, follow the steps below:

1. Set the below environment variables for your local Airflow setup (alternatively you could tweak the corresponding config values in airflow.cfg)

.. code-block:: bash
export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
2. Have a DAG with a task that takes about 10 minutes to complete(i.e. a long-running task). For example, you could use the below DAG:

.. code-block:: python
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from datetime import datetime
@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
t1 = BashOperator(
task_id="sleep_10_minutes",
bash_command="sleep 600",
)
sleep_dag()
Run the above DAG and wait for a while. You should see the task instance becoming a zombie task and then being killed by the scheduler.

* *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them.


Executor Configuration
Expand Down

0 comments on commit 177da90

Please sign in to comment.