Skip to content

Commit

Permalink
Deprecated configuration stalled_task_timeout, task_adoption_timeout …
Browse files Browse the repository at this point in the history
…and worker_pods_pending_timeout removed (apache#42060)

* Deprecated configuration stalled_task_timeout, task_adoption_timeout and worker_pods_pending_timeout removed

* news fragment added
  • Loading branch information
dirrao authored Sep 9, 2024
1 parent 22bb433 commit 99de194
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 55 deletions.
11 changes: 0 additions & 11 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,17 +382,6 @@ def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
("fab", "auth_rate_limit"): ("webserver", "auth_rate_limit", "2.9.0"),
}

# A mapping of new configurations to a list of old configurations for when one configuration
# deprecates more than one other deprecation. The deprecation logic for these configurations
# is defined in SchedulerJobRunner.
many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
("scheduler", "task_queued_timeout"): [
("celery", "stalled_task_timeout", "2.6.0"),
("celery", "task_adoption_timeout", "2.6.0"),
("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
]
}

# A mapping of new section -> (old section, since_version).
deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}

Expand Down
41 changes: 1 addition & 40 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import signal
import sys
import time
import warnings
from collections import Counter, defaultdict, deque
from dataclasses import dataclass
from datetime import timedelta
Expand Down Expand Up @@ -178,45 +177,7 @@ def __init__(
self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold")
self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")

# Since the functionality for stalled_task_timeout, task_adoption_timeout, and
# worker_pods_pending_timeout are now handled by a single config (task_queued_timeout),
# we can't deprecate them as we normally would. So, we'll read each config and take
# the max value in order to ensure we're not undercutting a legitimate
# use of any of these configs.
stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout", fallback=0)
if stalled_task_timeout:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The '[celery] stalled_task_timeout' config option is deprecated. "
"Please update your config to use '[scheduler] task_queued_timeout' instead.",
DeprecationWarning,
stacklevel=2,
)
task_adoption_timeout = conf.getfloat("celery", "task_adoption_timeout", fallback=0)
if task_adoption_timeout:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The '[celery] task_adoption_timeout' config option is deprecated. "
"Please update your config to use '[scheduler] task_queued_timeout' instead.",
DeprecationWarning,
stacklevel=2,
)
worker_pods_pending_timeout = conf.getfloat(
"kubernetes_executor", "worker_pods_pending_timeout", fallback=0
)
if worker_pods_pending_timeout:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The '[kubernetes_executor] worker_pods_pending_timeout' config option is deprecated. "
"Please update your config to use '[scheduler] task_queued_timeout' instead.",
DeprecationWarning,
stacklevel=2,
)
task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
self._task_queued_timeout = max(
stalled_task_timeout, task_adoption_timeout, worker_pods_pending_timeout, task_queued_timeout
)
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")

self.do_pickle = do_pickle

Expand Down
4 changes: 0 additions & 4 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,6 @@ def get_configs_and_deprecations(
) in AirflowConfigParser.deprecated_options.items():
deprecated_options[deprecated_section][deprecated_key] = section, key, since_version

for (section, key), deprecated in AirflowConfigParser.many_to_one_deprecated_options.items():
for deprecated_section, deprecated_key, since_version in deprecated:
deprecated_options[deprecated_section][deprecated_key] = section, key, since_version

if package_name == "apache-airflow":
configs = retrieve_configuration_description(include_providers=False)
else:
Expand Down
1 change: 1 addition & 0 deletions newsfragments/42060.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed deprecated configuration ``stalled_task_timeout`` from ``celery``, ``task_adoption_timeout`` from ``celery`` and ``worker_pods_pending_timeout`` from ``worker_pods_pending_timeout``. Please use ``task_queued_timeout`` from ``scheduler`` instead.

0 comments on commit 99de194

Please sign in to comment.