Skip to content

Commit

Permalink
Remove fallback for old, pre 2.7, providers (apache#44755)
Browse files Browse the repository at this point in the history
In ~2.7, we moved provider config from core into the providers themselves.
However, if providers that were released before that change was used on 2.7+,
there could be failures because the config wouldn't be in those providers!
So, we added a fallback for all of the configs that were moved.

As we move toward Airflow 3, we don't need to carry this baggage
forever. This does mean provider released in earlier than mid 2023 won't
work with Airflow 3 (and I'd imagine there will be other reasons they
wont work as well).
  • Loading branch information
jedcunningham authored Dec 12, 2024
1 parent 67cff15 commit 02821cc
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 203 deletions.
1 change: 1 addition & 0 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ class TaskCommandMarker:


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
"""
Run a single task instance.
Expand Down
105 changes: 0 additions & 105 deletions airflow/config_templates/pre_2_7_defaults.cfg

This file was deleted.

26 changes: 2 additions & 24 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ def __init__(
# interpolation placeholders. The _default_values config parser will interpolate them
# properly when we call get() on it.
self._default_values = create_default_config_parser(self.configuration_description)
self._pre_2_7_default_values = create_pre_2_7_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
Expand Down Expand Up @@ -292,10 +291,6 @@ def get_default_value(self, section: str, key: str, fallback: Any = None, raw=Fa
return value.replace("%", "%%")
return value

def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
"""Get pre 2.7 default config values."""
return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs)

# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
Expand Down Expand Up @@ -454,6 +449,8 @@ def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None
elif not with_providers and self._providers_configuration_loaded:
reload_providers_when_leaving = True
self.restore_core_default_configuration()
elif self._providers_configuration_loaded:
reload_providers_when_leaving = True
yield
if reload_providers_when_leaving:
self.load_providers_configuration()
Expand Down Expand Up @@ -989,10 +986,6 @@ def get( # type: ignore[override,misc]
if self.get_default_value(section, key) is not None or "fallback" in kwargs:
return expand_env_var(self.get_default_value(section, key, **kwargs))

if self.get_default_pre_2_7_value(section, key) is not None:
# no expansion needed
return self.get_default_pre_2_7_value(section, key, **kwargs)

if not suppress_warnings:
log.warning("section/key [%s/%s] not found in config", section, key)

Expand Down Expand Up @@ -1382,7 +1375,6 @@ def as_dict(

# We check sequentially all those sources and the last one we saw it in will "win"
configs: Iterable[tuple[str, ConfigParser]] = [
("default-pre-2-7", self._pre_2_7_default_values),
("default", self._default_values),
("airflow.cfg", self),
]
Expand Down Expand Up @@ -1901,20 +1893,6 @@ def create_default_config_parser(configuration_description: dict[str, dict[str,
return parser


def create_pre_2_7_defaults() -> ConfigParser:
"""
Create parser using the old defaults from Airflow < 2.7.0.
This is used in order to be able to fall-back to those defaults when old version of provider,
not supporting "config contribution" is installed with Airflow 2.7.0+. This "default"
configuration does not support variable expansion, those are pretty much hard-coded defaults '
we want to fall-back to in such case.
"""
config_parser = ConfigParser()
config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
return config_parser


def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
airflow_config = pathlib.Path(AIRFLOW_CONFIG)
if airflow_config.is_dir():
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def validate_session():
"""Validate ORM Session."""
global engine

worker_precheck = conf.getboolean("celery", "worker_precheck")
worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False)
if not worker_precheck:
return True
else:
Expand Down
10 changes: 8 additions & 2 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,12 @@ def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
log.info("Log template table does not exist (added in 2.3.0); skipping log template sync.")
return

es_log_it_template_fallback = "{dag_id}-{task_id}-{execution_date}-{try_number}"

filename = conf.get("logging", "log_filename_template")
elasticsearch_id = conf.get("elasticsearch", "log_id_template")
elasticsearch_id = conf.get("elasticsearch", "log_id_template", fallback=es_log_it_template_fallback)

# TODO: The elasticsearch specific stuff here is probably inappropriate - provider is bleeding into core

stored = session.execute(
select(
Expand All @@ -965,7 +969,9 @@ def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
# If we have an empty table, and the default values exist, we will seed the
# table with values from pre 2.3.0, so old logs will still be retrievable.
if not stored:
is_default_log_id = elasticsearch_id == conf.get_default_value("elasticsearch", "log_id_template")
is_default_log_id = elasticsearch_id == conf.get_default_value(
"elasticsearch", "log_id_template", fallback=es_log_it_template_fallback
)
is_default_filename = filename == conf.get_default_value("logging", "log_filename_template")
if is_default_log_id and is_default_filename:
session.add(
Expand Down
8 changes: 0 additions & 8 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,6 @@ def _read(
out_message = logs if "log_pos" in (metadata or {}) else messages + logs
return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}

@staticmethod
def _get_pod_namespace(ti: TaskInstance):
pod_override = ti.executor_config.get("pod_override")
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace")

def _get_log_retrieval_url(
self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None
) -> tuple[str, str]:
Expand Down
26 changes: 26 additions & 0 deletions newsfragments/44755.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Provider configuration fallbacks removed from core

Around ~2.7, we moved provider config from core into the providers themselves.
However, if providers that were released before that change was used on 2.7+,
there could be failures because the config wouldn't be in those providers!
So, we added a fallback for all of the configs that were moved. These fallbacks
have now been removed. You must use providers that contain their own config.

TODO: It's unlikely this will be the change that sets the "lowest" min provider version
for compatibility with Airflow 3. But, it is possible, and if it is we will need to
then determine what those versions are.

Note: This changes doesn't really fix in any of the following "types", but I've marked
dependency change so there is something checked.

.. Check the type of change that applies to this change
* Types of change

* [ ] DAG changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [x] Dependency change
44 changes: 24 additions & 20 deletions tests/cli/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,33 @@

from airflow.executors import local_executor
from airflow.models.dagbag import DagBag
from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor

from tests_common.test_utils.config import conf_vars

# Create custom executors here because conftest is imported first
custom_executor_module = type(sys)("custom_executor")
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
"CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
)
custom_executor_module.CustomLocalExecutor = type( # type: ignore
"CustomLocalExecutor", (local_executor.LocalExecutor,), {}
)
custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
"CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {}
)
custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
"CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
)
sys.modules["custom_executor"] = custom_executor_module

@pytest.fixture
def custom_executors():
from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor

# Create custom executors here because conftest is imported first
custom_executor_module = type(sys)("custom_executor")
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
"CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
)
custom_executor_module.CustomLocalExecutor = type( # type: ignore
"CustomLocalExecutor", (local_executor.LocalExecutor,), {}
)
custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
"CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {}
)
custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
"CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
)
sys.modules["custom_executor"] = custom_executor_module


@pytest.fixture(autouse=True)
Expand Down
7 changes: 3 additions & 4 deletions tests/cli/test_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from airflow.executors.executor_utils import ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import AwsEcsExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor

from tests_common.test_utils.config import conf_vars

Expand Down Expand Up @@ -167,7 +166,7 @@ def test_dynamic_conflict_detection(self, cli_commands_mock: MagicMock):
# force re-evaluation of cli commands (done in top level code)
reload(cli_parser)

@patch.object(CeleryExecutor, "get_cli_commands")
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
@patch.object(AwsEcsExecutor, "get_cli_commands")
def test_hybrid_executor_get_cli_commands(
self, ecs_executor_cli_commands_mock, celery_executor_cli_commands_mock
Expand Down Expand Up @@ -201,7 +200,7 @@ def test_hybrid_executor_get_cli_commands(
assert celery_executor_command.name in commands
assert ecs_executor_command.name in commands

@patch.object(CeleryExecutor, "get_cli_commands")
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
@patch.object(AwsEcsExecutor, "get_cli_commands")
def test_hybrid_executor_get_cli_commands_with_error(
self, ecs_executor_cli_commands_mock, celery_executor_cli_commands_mock, caplog
Expand Down Expand Up @@ -378,7 +377,7 @@ def test_executor_specific_commands_not_accessible(self, command):
("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
],
)
def test_cli_parser_executors(self, executor, expected_args):
def test_cli_parser_executors(self, custom_executors, executor, expected_args):
"""Test that CLI commands for the configured executor are present"""
for expected_arg in expected_args:
with (
Expand Down
4 changes: 2 additions & 2 deletions tests/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1594,8 +1594,8 @@ def test_restore_and_reload_provider_configuration():
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
conf.restore_core_default_configuration()
assert conf.providers_configuration_loaded is False
# built-in pre-2-7 celery executor
assert conf.get("celery", "celery_app_name") == "airflow.executors.celery_executor"
with pytest.raises(AirflowConfigException, match="not found in config"):
conf.get("celery", "celery_app_name")
conf.load_providers_configuration()
assert conf.providers_configuration_loaded is True
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
Expand Down
7 changes: 6 additions & 1 deletion tests/executors/test_executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import AwsEcsExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers_manager import ProvidersManager

from tests_common.test_utils.config import conf_vars

Expand Down Expand Up @@ -141,6 +141,11 @@ def test_get_hybrid_executors_from_config(self, executor_config, expected_execut
assert executors == expected_executors_list

def test_init_executors(self):
# We need to init provider config in order to import CeleryExecutor
ProvidersManager().initialize_providers_configuration()

from airflow.providers.celery.executors.celery_executor import CeleryExecutor

with conf_vars({("core", "executor"): "CeleryExecutor"}):
executors = ExecutorLoader.init_executors()
executor_name = ExecutorLoader.get_default_executor_name()
Expand Down
Loading

0 comments on commit 02821cc

Please sign in to comment.