From 99de194f4abfed67e5568879df91971bacc3fc17 Mon Sep 17 00:00:00 2001 From: Gopal Dirisala <39794726+dirrao@users.noreply.github.com> Date: Mon, 9 Sep 2024 19:42:30 +0530 Subject: [PATCH] Deprecated configuration stalled_task_timeout, task_adoption_timeout and worker_pods_pending_timeout removed (#42060) * Deprecated configuration stalled_task_timeout, task_adoption_timeout and worker_pods_pending_timeout removed * news fragment added --- airflow/configuration.py | 11 -------- airflow/jobs/scheduler_job_runner.py | 41 +--------------------------- docs/conf.py | 4 --- newsfragments/42060.significant.rst | 1 + 4 files changed, 2 insertions(+), 55 deletions(-) create mode 100644 newsfragments/42060.significant.rst diff --git a/airflow/configuration.py b/airflow/configuration.py index b014fe46e86de..f4bea46c5c0ba 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -382,17 +382,6 @@ def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006 ("fab", "auth_rate_limit"): ("webserver", "auth_rate_limit", "2.9.0"), } - # A mapping of new configurations to a list of old configurations for when one configuration - # deprecates more than one other deprecation. The deprecation logic for these configurations - # is defined in SchedulerJobRunner. - many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = { - ("scheduler", "task_queued_timeout"): [ - ("celery", "stalled_task_timeout", "2.6.0"), - ("celery", "task_adoption_timeout", "2.6.0"), - ("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"), - ] - } - # A mapping of new section -> (old section, since_version). deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")} diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 9d58ebdca0bd3..ca6baaf2c0ac2 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -23,7 +23,6 @@ import signal import sys import time -import warnings from collections import Counter, defaultdict, deque from dataclasses import dataclass from datetime import timedelta @@ -178,45 +177,7 @@ def __init__( self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold") self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration") - - # Since the functionality for stalled_task_timeout, task_adoption_timeout, and - # worker_pods_pending_timeout are now handled by a single config (task_queued_timeout), - # we can't deprecate them as we normally would. So, we'll read each config and take - # the max value in order to ensure we're not undercutting a legitimate - # use of any of these configs. - stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout", fallback=0) - if stalled_task_timeout: - # TODO: Remove in Airflow 3.0 - warnings.warn( - "The '[celery] stalled_task_timeout' config option is deprecated. " - "Please update your config to use '[scheduler] task_queued_timeout' instead.", - DeprecationWarning, - stacklevel=2, - ) - task_adoption_timeout = conf.getfloat("celery", "task_adoption_timeout", fallback=0) - if task_adoption_timeout: - # TODO: Remove in Airflow 3.0 - warnings.warn( - "The '[celery] task_adoption_timeout' config option is deprecated. " - "Please update your config to use '[scheduler] task_queued_timeout' instead.", - DeprecationWarning, - stacklevel=2, - ) - worker_pods_pending_timeout = conf.getfloat( - "kubernetes_executor", "worker_pods_pending_timeout", fallback=0 - ) - if worker_pods_pending_timeout: - # TODO: Remove in Airflow 3.0 - warnings.warn( - "The '[kubernetes_executor] worker_pods_pending_timeout' config option is deprecated. " - "Please update your config to use '[scheduler] task_queued_timeout' instead.", - DeprecationWarning, - stacklevel=2, - ) - task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout") - self._task_queued_timeout = max( - stalled_task_timeout, task_adoption_timeout, worker_pods_pending_timeout, task_queued_timeout - ) + self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout") self.do_pickle = do_pickle diff --git a/docs/conf.py b/docs/conf.py index cbcdaa6970150..c87871e7ede6d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -438,10 +438,6 @@ def get_configs_and_deprecations( ) in AirflowConfigParser.deprecated_options.items(): deprecated_options[deprecated_section][deprecated_key] = section, key, since_version - for (section, key), deprecated in AirflowConfigParser.many_to_one_deprecated_options.items(): - for deprecated_section, deprecated_key, since_version in deprecated: - deprecated_options[deprecated_section][deprecated_key] = section, key, since_version - if package_name == "apache-airflow": configs = retrieve_configuration_description(include_providers=False) else: diff --git a/newsfragments/42060.significant.rst b/newsfragments/42060.significant.rst new file mode 100644 index 0000000000000..d41937314644a --- /dev/null +++ b/newsfragments/42060.significant.rst @@ -0,0 +1 @@ +Removed deprecated configuration ``stalled_task_timeout`` from ``celery``, ``task_adoption_timeout`` from ``celery`` and ``worker_pods_pending_timeout`` from ``worker_pods_pending_timeout``. Please use ``task_queued_timeout`` from ``scheduler`` instead.