diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 6ff78617c5f1e..2c3ccdad00b4a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -26,6 +26,7 @@ import operator import os import signal +import traceback from collections import defaultdict from collections.abc import Collection, Generator, Iterable, Mapping from datetime import timedelta @@ -2810,6 +2811,7 @@ def signal_handler(signum, frame): os._exit(1) return self.log.error("Received SIGTERM. Terminating subprocesses.") + self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack())) self.task.on_kill() raise AirflowTaskTerminated("Task received SIGTERM signal") diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index b1b8370f1881f..244fa2ed5bf3a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -516,6 +516,27 @@ def task_function(ti): ti.run() assert "on_failure_callback called" in caplog.text + def test_task_sigterm_calls_with_traceback_in_logs(self, dag_maker, caplog): + """ + Test that ensures that tasks print traceback to the logs when they receive sigterm + """ + + def task_function(ti): + os.kill(ti.pid, signal.SIGTERM) + + with dag_maker(): + task_ = PythonOperator( + task_id="test_on_failure", + python_callable=task_function, + ) + + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + ti.task = task_ + with pytest.raises(AirflowTaskTerminated): + ti.run() + assert "Stacktrace: " in caplog.text + def test_task_sigterm_works_with_retries(self, dag_maker): """ Test that ensures that tasks are retried when they receive sigterm