diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 008d9daecbecb..cd2a25b074c80 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -382,11 +382,19 @@ def string_lower_type(val): ), action="store_true", ) + ARG_TREAT_DAG_AS_REGEX = Arg( ("--treat-dag-as-regex",), + help=("Deprecated -- use `--treat-dag-id-as-regex` instead"), + action="store_true", +) + +ARG_TREAT_DAG_ID_AS_REGEX = Arg( + ("--treat-dag-id-as-regex",), help=("if set, dag_id will be treated as regex instead of an exact string"), action="store_true", ) + # test_dag ARG_SHOW_DAGRUN = Arg( ("--show-dagrun",), @@ -1098,15 +1106,25 @@ class GroupCommand(NamedTuple): ), ActionCommand( name="pause", - help="Pause a DAG", + help="Pause DAG(s)", + description=( + "Pause one or more DAGs. This command allows to halt the execution of specified DAGs, " + "disabling further task scheduling. Use `--treat-dag-id-as-regex` to target multiple DAGs by " + "treating the `--dag-id` as a regex pattern." + ), func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"), - args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE), + args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_ID_AS_REGEX, ARG_YES, ARG_OUTPUT, ARG_VERBOSE), ), ActionCommand( name="unpause", - help="Resume a paused DAG", + help="Resume paused DAG(s)", + description=( + "Resume one or more DAGs. This command allows to restore the execution of specified " + "DAGs, enabling further task scheduling. Use `--treat-dag-id-as-regex` to target multiple DAGs " + "treating the `--dag-id` as a regex pattern." + ), func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"), - args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE), + args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_ID_AS_REGEX, ARG_YES, ARG_OUTPUT, ARG_VERBOSE), ), ActionCommand( name="trigger", @@ -1222,6 +1240,7 @@ class GroupCommand(NamedTuple): ARG_RERUN_FAILED_TASKS, ARG_RUN_BACKWARDS, ARG_TREAT_DAG_AS_REGEX, + ARG_TREAT_DAG_ID_AS_REGEX, ), ), ActionCommand( diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index c36b30bd70886..137cd3827e392 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -45,6 +45,7 @@ from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils.dot_renderer import render_dag, render_dag_dependencies +from airflow.utils.helpers import ask_yesno from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import DagRunState @@ -137,6 +138,13 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None: category=RemovedInAirflow3Warning, ) + if not args.treat_dag_id_as_regex and args.treat_dag_as_regex: + warnings.warn( + "--treat-dag-as-regex is deprecated, use --treat-dag-id-as-regex instead", + category=RemovedInAirflow3Warning, + ) + args.treat_dag_id_as_regex = args.treat_dag_as_regex + if args.ignore_first_depends_on_past is False: args.ignore_first_depends_on_past = True @@ -144,7 +152,7 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None: raise AirflowException("Provide a start_date and/or end_date") if not dag: - dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex) + dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex) elif isinstance(dag, list): dags = dag else: @@ -214,14 +222,41 @@ def dag_unpause(args) -> None: @providers_configuration_loaded def set_is_paused(is_paused: bool, args) -> None: """Set is_paused for DAG by a given dag_id.""" - dag = DagModel.get_dagmodel(args.dag_id) - - if not dag: - raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + should_apply = True + dags = [ + dag + for dag in get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex) + if is_paused != dag.get_is_paused() + ] + + if not dags: + raise AirflowException(f"No {'un' if is_paused else ''}paused DAGs were found") + + if not args.yes and args.treat_dag_id_as_regex: + dags_ids = [dag.dag_id for dag in dags] + question = ( + f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} DAGs:\n" + f"{','.join(dags_ids)}" + f"\n\nAre you sure? [y/n]" + ) + should_apply = ask_yesno(question) - dag.set_is_paused(is_paused=is_paused) + if should_apply: + dags_models = [DagModel.get_dagmodel(dag.dag_id) for dag in dags] + for dag_model in dags_models: + if dag_model is not None: + dag_model.set_is_paused(is_paused=is_paused) - print(f"Dag: {args.dag_id}, paused: {is_paused}") + AirflowConsole().print_as( + data=[ + {"dag_id": dag.dag_id, "is_paused": dag.get_is_paused()} + for dag in dags_models + if dag is not None + ], + output=args.output, + ) + else: + print("Operation cancelled by user") @providers_configuration_loaded diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 77ae9dea6c191..cc458b7cf7525 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -230,7 +230,7 @@ def test_backfill(self, mock_run): "--task-regex", "run_this_first", "--dry-run", - "--treat-dag-as-regex", + "--treat-dag-id-as-regex", "--start-date", DEFAULT_DATE.isoformat(), ] @@ -244,6 +244,24 @@ def test_backfill(self, mock_run): assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE_REPR}\n" in output assert "Task run_this_first located in DAG example_branch_operator\n" in output + @mock.patch("airflow.cli.commands.dag_command._run_dag_backfill") + def test_backfill_treat_dag_as_regex_deprecation(self, _): + run_date = DEFAULT_DATE + timedelta(days=1) + cli_args = self.parser.parse_args( + [ + "dags", + "backfill", + "example_bash_operator", + "--treat-dag-as-regex", + "--start-date", + run_date.isoformat(), + ] + ) + + with pytest.warns(DeprecationWarning, match="--treat-dag-as-regex is deprecated"): + dag_command.dag_backfill(cli_args) + assert cli_args.treat_dag_id_as_regex == cli_args.treat_dag_as_regex + @mock.patch("airflow.cli.commands.dag_command.get_dag") def test_backfill_fails_without_loading_dags(self, mock_get_dag): cli_args = self.parser.parse_args(["dags", "backfill", "example_bash_operator"]) @@ -650,11 +668,49 @@ def test_cli_list_jobs_with_args(self): def test_pause(self): args = self.parser.parse_args(["dags", "pause", "example_bash_operator"]) dag_command.dag_pause(args) - assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [True, 1] + assert self.dagbag.dags["example_bash_operator"].get_is_paused() + dag_command.dag_unpause(args) + assert not self.dagbag.dags["example_bash_operator"].get_is_paused() + + @mock.patch("airflow.cli.commands.dag_command.ask_yesno") + def test_pause_regex(self, mock_yesno): + args = self.parser.parse_args(["dags", "pause", "^example_.*$", "--treat-dag-id-as-regex"]) + dag_command.dag_pause(args) + mock_yesno.assert_called_once() + assert self.dagbag.dags["example_bash_decorator"].get_is_paused() + assert self.dagbag.dags["example_kubernetes_executor"].get_is_paused() + assert self.dagbag.dags["example_xcom_args"].get_is_paused() - args = self.parser.parse_args(["dags", "unpause", "example_bash_operator"]) + args = self.parser.parse_args(["dags", "unpause", "^example_.*$", "--treat-dag-id-as-regex"]) dag_command.dag_unpause(args) - assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [False, 0] + assert not self.dagbag.dags["example_bash_decorator"].get_is_paused() + assert not self.dagbag.dags["example_kubernetes_executor"].get_is_paused() + assert not self.dagbag.dags["example_xcom_args"].get_is_paused() + + @mock.patch("airflow.cli.commands.dag_command.ask_yesno") + def test_pause_regex_operation_cancelled(self, ask_yesno, capsys): + args = self.parser.parse_args(["dags", "pause", "example_bash_operator", "--treat-dag-id-as-regex"]) + ask_yesno.return_value = False + dag_command.dag_pause(args) + stdout = capsys.readouterr().out + assert "Operation cancelled by user" in stdout + + @mock.patch("airflow.cli.commands.dag_command.ask_yesno") + def test_pause_regex_yes(self, mock_yesno): + args = self.parser.parse_args(["dags", "pause", ".*", "--treat-dag-id-as-regex", "--yes"]) + dag_command.dag_pause(args) + mock_yesno.assert_not_called() + dag_command.dag_unpause(args) + + def test_pause_non_existing_dag_error(self): + args = self.parser.parse_args(["dags", "pause", "non_existing_dag"]) + with pytest.raises(AirflowException): + dag_command.dag_pause(args) + + def test_unpause_already_unpaused_dag_error(self): + args = self.parser.parse_args(["dags", "unpause", "example_bash_operator", "--yes"]) + with pytest.raises(AirflowException, match="No paused DAGs were found"): + dag_command.dag_unpause(args) def test_trigger_dag(self): dag_command.dag_trigger(