diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 503397064cbf4..66c5770db5188 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -857,6 +857,27 @@ def string_lower_type(val): ("option",), help="The option name", ) + +ARG_LINT_CONFIG_SECTION = Arg( + ("--section",), + help="The section name(s) to lint in the airflow config.", + type=string_list_type, +) +ARG_LINT_CONFIG_OPTION = Arg( + ("--option",), + help="The option name(s) to lint in the airflow config.", + type=string_list_type, +) +ARG_LINT_CONFIG_IGNORE_SECTION = Arg( + ("--ignore-section",), + help="The section name(s) to ignore to lint in the airflow config.", + type=string_list_type, +) +ARG_LINT_CONFIG_IGNORE_OPTION = Arg( + ("--ignore-option",), + help="The option name(s) to ignore to lint in the airflow config.", + type=string_list_type, +) ARG_OPTIONAL_SECTION = Arg( ("--section",), help="The section name", @@ -1733,6 +1754,18 @@ class GroupCommand(NamedTuple): ARG_VERBOSE, ), ), + ActionCommand( + name="lint", + help="lint options for the configuration changes while migrating from Airflow 2.x to Airflow 3.0", + func=lazy_load_command("airflow.cli.commands.remote_commands.config_command.lint_config"), + args=( + ARG_LINT_CONFIG_SECTION, + ARG_LINT_CONFIG_OPTION, + ARG_LINT_CONFIG_IGNORE_SECTION, + ARG_LINT_CONFIG_IGNORE_OPTION, + ARG_VERBOSE, + ), + ), ) JOBS_COMMANDS = ( diff --git a/airflow/cli/commands/remote_commands/config_command.py b/airflow/cli/commands/remote_commands/config_command.py index 82f1943d4cf84..17e7b62321761 100644 --- a/airflow/cli/commands/remote_commands/config_command.py +++ b/airflow/cli/commands/remote_commands/config_command.py @@ -18,11 +18,14 @@ from __future__ import annotations +from dataclasses import dataclass from io import StringIO +from typing import NamedTuple import pygments from pygments.lexers.configs import IniLexer +from airflow.cli.simple_table import AirflowConsole from airflow.configuration import conf from airflow.exceptions import AirflowConfigException from airflow.utils.cli import should_use_colors @@ -64,3 +67,354 @@ def get_value(args): print(value) except AirflowConfigException: pass + + +class ConfigParameter(NamedTuple): + """Represents a configuration parameter.""" + + section: str + option: str + + +@dataclass +class ConfigChange: + """ + Class representing the configuration changes in Airflow 3.0. + + :param config: The configuration parameter being changed. + :param suggestion: A suggestion for replacing or handling the removed configuration. + :param renamed_to: The new section and option if the configuration is renamed. + """ + + config: ConfigParameter + suggestion: str = "" + renamed_to: ConfigParameter | None = None + + @property + def message(self) -> str: + """Generate a message for this configuration change.""" + if self.renamed_to: + if self.config.section != self.renamed_to.section: + return ( + f"`{self.config.option}` configuration parameter moved from `{self.config.section}` section to `" + f"{self.renamed_to.section}` section as `{self.renamed_to.option}`." + ) + return ( + f"`{self.config.option}` configuration parameter renamed to `{self.renamed_to.option}` " + f"in the `{self.config.section}` section." + ) + return ( + f"Removed deprecated `{self.config.option}` configuration parameter from `{self.config.section}` section. " + f"{self.suggestion}" + ) + + +CONFIGS_CHANGES = [ + ConfigChange( + config=ConfigParameter("admin", "hide_sensitive_variable_fields"), + renamed_to=ConfigParameter("core", "hide_sensitive_var_conn_fields"), + ), + ConfigChange( + config=ConfigParameter("admin", "sensitive_variable_fields"), + renamed_to=ConfigParameter("core", "sensitive_var_conn_names"), + ), + ConfigChange( + config=ConfigParameter("core", "check_slas"), + suggestion="The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in " + "future", + ), + ConfigChange( + config=ConfigParameter("core", "strict_asset_uri_validation"), + suggestion="Asset URI with a defined scheme will now always be validated strictly, " + "raising a hard error on validation failure.", + ), + ConfigChange( + config=ConfigParameter("core", "worker_precheck"), + renamed_to=ConfigParameter("celery", "worker_precheck"), + ), + ConfigChange( + config=ConfigParameter("core", "non_pooled_task_slot_count"), + renamed_to=ConfigParameter("core", "default_pool_task_slot_count"), + ), + ConfigChange( + config=ConfigParameter("core", "dag_concurrency"), + renamed_to=ConfigParameter("core", "max_active_tasks_per_dag"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_conn"), + renamed_to=ConfigParameter("database", "sql_alchemy_conn"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_engine_encoding"), + renamed_to=ConfigParameter("database", "sql_engine_encoding"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_engine_collation_for_ids"), + renamed_to=ConfigParameter("database", "sql_engine_collation_for_ids"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_pool_enabled"), + renamed_to=ConfigParameter("database", "sql_alchemy_pool_enabled"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_pool_size"), + renamed_to=ConfigParameter("database", "sql_alchemy_pool_size"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_max_overflow"), + renamed_to=ConfigParameter("database", "sql_alchemy_max_overflow"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_pool_recycle"), + renamed_to=ConfigParameter("database", "sql_alchemy_pool_recycle"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_pool_pre_ping"), + renamed_to=ConfigParameter("database", "sql_alchemy_pool_pre_ping"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_schema"), + renamed_to=ConfigParameter("database", "sql_alchemy_schema"), + ), + ConfigChange( + config=ConfigParameter("core", "sql_alchemy_connect_args"), + renamed_to=ConfigParameter("database", "sql_alchemy_connect_args"), + ), + ConfigChange( + config=ConfigParameter("core", "load_default_connections"), + renamed_to=ConfigParameter("database", "load_default_connections"), + ), + ConfigChange( + config=ConfigParameter("core", "max_db_retries"), + renamed_to=ConfigParameter("database", "max_db_retries"), + ), + ConfigChange( + config=ConfigParameter("api", "access_control_allow_origin"), + renamed_to=ConfigParameter("api", "access_control_allow_origins"), + ), + ConfigChange( + config=ConfigParameter("api", "auth_backend"), + renamed_to=ConfigParameter("api", "auth_backends"), + ), + ConfigChange( + config=ConfigParameter("logging", "enable_task_context_logger"), + suggestion="Remove TaskContextLogger: Replaced by the Log table for better handling of task log " + "messages outside the execution context.", + ), + ConfigChange( + config=ConfigParameter("metrics", "metrics_use_pattern_match"), + ), + ConfigChange( + config=ConfigParameter("metrics", "timer_unit_consistency"), + suggestion="In Airflow 3.0, the `timer_unit_consistency` setting in the `metrics` section is " + "removed as it is now the default behaviour. This is done to standardize all timer and " + "timing metrics to milliseconds across all metric loggers", + ), + ConfigChange( + config=ConfigParameter("metrics", "statsd_allow_list"), + renamed_to=ConfigParameter("metrics", "metrics_allow_list"), + ), + ConfigChange( + config=ConfigParameter("metrics", "statsd_block_list"), + renamed_to=ConfigParameter("metrics", "metrics_block_list"), + ), + ConfigChange( + config=ConfigParameter("traces", "otel_task_log_event"), + ), + ConfigChange( + config=ConfigParameter("operators", "allow_illegal_arguments"), + ), + ConfigChange( + config=ConfigParameter("webserver", "allow_raw_html_descriptions"), + ), + ConfigChange( + config=ConfigParameter("webserver", "update_fab_perms"), + renamed_to=ConfigParameter("fab", "update_fab_perms"), + ), + ConfigChange( + config=ConfigParameter("webserver", "auth_rate_limited"), + renamed_to=ConfigParameter("fab", "auth_rate_limited"), + ), + ConfigChange( + config=ConfigParameter("webserver", option="auth_rate_limit"), + renamed_to=ConfigParameter("fab", "auth_rate_limit"), + ), + ConfigChange( + config=ConfigParameter("webserver", "session_lifetime_days"), + renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"), + ), + ConfigChange( + config=ConfigParameter("webserver", "force_log_out_after"), + renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"), + ), + ConfigChange( + config=ConfigParameter("policy", "airflow_local_settings"), + renamed_to=ConfigParameter("policy", "task_policy"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "dependency_detector"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "processor_poll_interval"), + renamed_to=ConfigParameter("scheduler", "scheduler_idle_sleep_time"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "deactivate_stale_dags_interval"), + renamed_to=ConfigParameter("scheduler", "parsing_cleanup_interval"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_on"), renamed_to=ConfigParameter("metrics", "statsd_on") + ), + ConfigChange( + config=ConfigParameter("scheduler", "max_threads"), + renamed_to=ConfigParameter("scheduler", "parsing_processes"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_host"), + renamed_to=ConfigParameter("metrics", "statsd_host"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_port"), + renamed_to=ConfigParameter("metrics", "statsd_port"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_prefix"), + renamed_to=ConfigParameter("metrics", "statsd_prefix"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_allow_list"), + renamed_to=ConfigParameter("metrics", "statsd_allow_list"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "stat_name_handler"), + renamed_to=ConfigParameter("metrics", "stat_name_handler"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_datadog_enabled"), + renamed_to=ConfigParameter("metrics", "statsd_datadog_enabled"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_datadog_tags"), + renamed_to=ConfigParameter("metrics", "statsd_datadog_tags"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_datadog_metrics_tags"), + renamed_to=ConfigParameter("metrics", "statsd_datadog_metrics_tags"), + ), + ConfigChange( + config=ConfigParameter("scheduler", "statsd_custom_client_path"), + renamed_to=ConfigParameter("metrics", "statsd_custom_client_path"), + ), + ConfigChange( + config=ConfigParameter("celery", "stalled_task_timeout"), + renamed_to=ConfigParameter("scheduler", "task_queued_timeout"), + ), + ConfigChange( + config=ConfigParameter("celery", "default_queue"), + renamed_to=ConfigParameter("operators", "default_queue"), + ), + ConfigChange( + config=ConfigParameter("celery", "task_adoption_timeout"), + renamed_to=ConfigParameter("scheduler", "task_queued_timeout"), + ), + ConfigChange( + config=ConfigParameter("kubernetes_executor", "worker_pods_pending_timeout"), + renamed_to=ConfigParameter("scheduler", "task_queued_timeout"), + ), + ConfigChange( + config=ConfigParameter("kubernetes_executor", "worker_pods_pending_timeout_check_interval"), + renamed_to=ConfigParameter("scheduler", "task_queued_timeout_check_interval"), + ), + ConfigChange( + config=ConfigParameter("smtp", "smtp_user"), + suggestion="Please use the SMTP connection (`smtp_default`).", + ), + ConfigChange( + config=ConfigParameter("smtp", "smtp_password"), + suggestion="Please use the SMTP connection (`smtp_default`).", + ), +] + + +@providers_configuration_loaded +def lint_config(args) -> None: + """ + Lint the airflow.cfg file for removed, or renamed configurations. + + This function scans the Airflow configuration file for parameters that are removed or renamed in + Airflow 3.0. It provides suggestions for alternative parameters or settings where applicable. + CLI Arguments: + --section: str (optional) + The specific section of the configuration to lint. + Example: --section core + + --option: str (optional) + The specific option within a section to lint. + Example: --option check_slas + + --ignore-section: str (optional) + A section to ignore during linting. + Example: --ignore-section webserver + + --ignore-option: str (optional) + An option to ignore during linting. + Example: --ignore-option smtp_user + + --verbose: flag (optional) + Enables detailed output, including the list of ignored sections and options. + Example: --verbose + + Examples: + 1. Lint all sections and options: + airflow config lint + + 2. Lint a specific sections: + airflow config lint --section core,webserver + + 3. Lint a specific sections and options: + airflow config lint --section smtp --option smtp_user + + 4. Ignore a sections: + irflow config lint --ignore-section webserver,api + + 5. Ignore an options: + airflow config lint --ignore-option smtp_user,session_lifetime_days + + 6. Enable verbose output: + airflow config lint --verbose + + :param args: The CLI arguments for linting configurations. + """ + console = AirflowConsole() + lint_issues = [] + + section_to_check_if_provided = args.section or [] + option_to_check_if_provided = args.option or [] + + ignore_sections = args.ignore_section or [] + ignore_options = args.ignore_option or [] + + for configuration in CONFIGS_CHANGES: + if section_to_check_if_provided and configuration.config.section not in section_to_check_if_provided: + continue + + if option_to_check_if_provided and configuration.config.option not in option_to_check_if_provided: + continue + + if configuration.config.section in ignore_sections or configuration.config.option in ignore_options: + continue + + if conf.has_option(configuration.config.section, configuration.config.option): + lint_issues.append(configuration.message) + + if lint_issues: + console.print("[red]Found issues in your airflow.cfg:[/red]") + for issue in lint_issues: + console.print(f" - [yellow]{issue}[/yellow]") + if args.verbose: + console.print("\n[blue]Detailed Information:[/blue]") + console.print(f"Ignored sections: [green]{', '.join(ignore_sections)}[/green]") + console.print(f"Ignored options: [green]{', '.join(ignore_options)}[/green]") + console.print("\n[red]Please update your configuration file accordingly.[/red]") + else: + console.print("[green]No issues found in your airflow.cfg. It is ready for Airflow 3![/green]") diff --git a/newsfragments/44908.feature.rst b/newsfragments/44908.feature.rst new file mode 100644 index 0000000000000..2a711a0149322 --- /dev/null +++ b/newsfragments/44908.feature.rst @@ -0,0 +1 @@ +The ``airflow config lint`` command has been introduced to help users migrate from Airflow 2.x to 3.0 by identifying removed or renamed configuration parameters in airflow.cfg. diff --git a/tests/cli/commands/remote_commands/test_config_command.py b/tests/cli/commands/remote_commands/test_config_command.py index 57354fda7694a..f932b1851d227 100644 --- a/tests/cli/commands/remote_commands/test_config_command.py +++ b/tests/cli/commands/remote_commands/test_config_command.py @@ -17,11 +17,16 @@ from __future__ import annotations import contextlib +import os +import re from io import StringIO from unittest import mock +import pytest + from airflow.cli import cli_parser from airflow.cli.commands.remote_commands import config_command +from airflow.cli.commands.remote_commands.config_command import ConfigChange, ConfigParameter from tests_common.test_utils.config import conf_vars @@ -232,3 +237,211 @@ def test_should_raise_exception_when_option_is_missing(self, caplog): self.parser.parse_args(["config", "get-value", "missing-section", "dags_folder"]) ) assert "section/key [missing-section/dags_folder] not found in config" in caplog.text + + +class TestConfigLint: + @pytest.mark.parametrize("removed_config", config_command.CONFIGS_CHANGES) + def test_lint_detects_removed_configs(self, removed_config): + with mock.patch("airflow.configuration.conf.has_option", return_value=True): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config(cli_parser.get_parser().parse_args(["config", "lint"])) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + normalized_message = re.sub(r"\s+", " ", removed_config.message.strip()) + + assert normalized_message in normalized_output + + @pytest.mark.parametrize( + "section, option, suggestion", + [ + ( + "core", + "check_slas", + "The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in future", + ), + ( + "core", + "strict_asset_uri_validation", + "Asset URI with a defined scheme will now always be validated strictly, raising a hard error on validation failure.", + ), + ( + "logging", + "enable_task_context_logger", + "Remove TaskContextLogger: Replaced by the Log table for better handling of task log messages outside the execution context.", + ), + ], + ) + def test_lint_with_specific_removed_configs(self, section, option, suggestion): + with mock.patch("airflow.configuration.conf.has_option", return_value=True): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config(cli_parser.get_parser().parse_args(["config", "lint"])) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + expected_message = f"Removed deprecated `{option}` configuration parameter from `{section}` section." + assert expected_message in normalized_output + + assert suggestion in normalized_output + + def test_lint_specific_section_option(self): + with mock.patch("airflow.configuration.conf.has_option", return_value=True): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config( + cli_parser.get_parser().parse_args( + ["config", "lint", "--section", "core", "--option", "check_slas"] + ) + ) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + assert ( + "Removed deprecated `check_slas` configuration parameter from `core` section." + in normalized_output + ) + + def test_lint_with_invalid_section_option(self): + with mock.patch("airflow.configuration.conf.has_option", return_value=False): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config( + cli_parser.get_parser().parse_args( + ["config", "lint", "--section", "invalid_section", "--option", "invalid_option"] + ) + ) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + assert "No issues found in your airflow.cfg." in normalized_output + + def test_lint_detects_multiple_issues(self): + with mock.patch( + "airflow.configuration.conf.has_option", + side_effect=lambda s, o: o in ["check_slas", "strict_asset_uri_validation"], + ): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config(cli_parser.get_parser().parse_args(["config", "lint"])) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + assert ( + "Removed deprecated `check_slas` configuration parameter from `core` section." + in normalized_output + ) + assert ( + "Removed deprecated `strict_asset_uri_validation` configuration parameter from `core` section." + in normalized_output + ) + + @pytest.mark.parametrize( + "removed_configs", + [ + [ + ( + "core", + "check_slas", + "The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in future", + ), + ( + "core", + "strict_asset_uri_validation", + "Asset URI with a defined scheme will now always be validated strictly, raising a hard error on validation failure.", + ), + ( + "logging", + "enable_task_context_logger", + "Remove TaskContextLogger: Replaced by the Log table for better handling of task log messages outside the execution context.", + ), + ], + ], + ) + def test_lint_detects_multiple_removed_configs(self, removed_configs): + with mock.patch("airflow.configuration.conf.has_option", return_value=True): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config(cli_parser.get_parser().parse_args(["config", "lint"])) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + for section, option, suggestion in removed_configs: + expected_message = ( + f"Removed deprecated `{option}` configuration parameter from `{section}` section." + ) + assert expected_message in normalized_output + + if suggestion: + assert suggestion in normalized_output + + @pytest.mark.parametrize( + "renamed_configs", + [ + # Case 1: Renamed configurations within the same section + [ + ("core", "non_pooled_task_slot_count", "core", "default_pool_task_slot_count"), + ("scheduler", "processor_poll_interval", "scheduler", "scheduler_idle_sleep_time"), + ], + # Case 2: Renamed configurations across sections + [ + ("admin", "hide_sensitive_variable_fields", "core", "hide_sensitive_var_conn_fields"), + ("core", "worker_precheck", "celery", "worker_precheck"), + ], + ], + ) + def test_lint_detects_renamed_configs(self, renamed_configs): + with mock.patch("airflow.configuration.conf.has_option", return_value=True): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config(cli_parser.get_parser().parse_args(["config", "lint"])) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + for old_section, old_option, new_section, new_option in renamed_configs: + if old_section == new_section: + expected_message = f"`{old_option}` configuration parameter renamed to `{new_option}` in the `{old_section}` section." + else: + expected_message = f"`{old_option}` configuration parameter moved from `{old_section}` section to `{new_section}` section as `{new_option}`." + assert expected_message in normalized_output + + @pytest.mark.parametrize( + "env_var, config_change, expected_message", + [ + ( + "AIRFLOW__CORE__CHECK_SLAS", + ConfigChange( + config=ConfigParameter("core", "check_slas"), + suggestion="The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in future", + ), + "Removed deprecated `check_slas` configuration parameter from `core` section.", + ), + ( + "AIRFLOW__CORE__STRICT_ASSET_URI_VALIDATION", + ConfigChange( + config=ConfigParameter("core", "strict_asset_uri_validation"), + suggestion="Asset URI with a defined scheme will now always be validated strictly, raising a hard error on validation failure.", + ), + "Removed deprecated `strict_asset_uri_validation` configuration parameter from `core` section.", + ), + ], + ) + def test_lint_detects_configs_with_env_vars(self, env_var, config_change, expected_message): + with mock.patch.dict(os.environ, {env_var: "some_value"}): + with mock.patch("airflow.configuration.conf.has_option", return_value=True): + with contextlib.redirect_stdout(StringIO()) as temp_stdout: + config_command.lint_config(cli_parser.get_parser().parse_args(["config", "lint"])) + + output = temp_stdout.getvalue() + + normalized_output = re.sub(r"\s+", " ", output.strip()) + + assert expected_message in normalized_output + assert config_change.suggestion in normalized_output