Skip to content

Commit

Permalink
Add CLI support for bulk pause and resume of DAGs (apache#38265)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahar1 authored Mar 20, 2024
1 parent c84a1ec commit fecc1ed
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 15 deletions.
27 changes: 23 additions & 4 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,19 @@ def string_lower_type(val):
),
action="store_true",
)

ARG_TREAT_DAG_AS_REGEX = Arg(
("--treat-dag-as-regex",),
help=("Deprecated -- use `--treat-dag-id-as-regex` instead"),
action="store_true",
)

ARG_TREAT_DAG_ID_AS_REGEX = Arg(
("--treat-dag-id-as-regex",),
help=("if set, dag_id will be treated as regex instead of an exact string"),
action="store_true",
)

# test_dag
ARG_SHOW_DAGRUN = Arg(
("--show-dagrun",),
Expand Down Expand Up @@ -1098,15 +1106,25 @@ class GroupCommand(NamedTuple):
),
ActionCommand(
name="pause",
help="Pause a DAG",
help="Pause DAG(s)",
description=(
"Pause one or more DAGs. This command allows to halt the execution of specified DAGs, "
"disabling further task scheduling. Use `--treat-dag-id-as-regex` to target multiple DAGs by "
"treating the `--dag-id` as a regex pattern."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_ID_AS_REGEX, ARG_YES, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="unpause",
help="Resume a paused DAG",
help="Resume paused DAG(s)",
description=(
"Resume one or more DAGs. This command allows to restore the execution of specified "
"DAGs, enabling further task scheduling. Use `--treat-dag-id-as-regex` to target multiple DAGs "
"treating the `--dag-id` as a regex pattern."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_ID_AS_REGEX, ARG_YES, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="trigger",
Expand Down Expand Up @@ -1222,6 +1240,7 @@ class GroupCommand(NamedTuple):
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
ARG_TREAT_DAG_AS_REGEX,
ARG_TREAT_DAG_ID_AS_REGEX,
),
),
ActionCommand(
Expand Down
49 changes: 42 additions & 7 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.helpers import ask_yesno
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -137,14 +138,21 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
category=RemovedInAirflow3Warning,
)

if not args.treat_dag_id_as_regex and args.treat_dag_as_regex:
warnings.warn(
"--treat-dag-as-regex is deprecated, use --treat-dag-id-as-regex instead",
category=RemovedInAirflow3Warning,
)
args.treat_dag_id_as_regex = args.treat_dag_as_regex

if args.ignore_first_depends_on_past is False:
args.ignore_first_depends_on_past = True

if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")

if not dag:
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex)
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex)
elif isinstance(dag, list):
dags = dag
else:
Expand Down Expand Up @@ -214,14 +222,41 @@ def dag_unpause(args) -> None:
@providers_configuration_loaded
def set_is_paused(is_paused: bool, args) -> None:
"""Set is_paused for DAG by a given dag_id."""
dag = DagModel.get_dagmodel(args.dag_id)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
should_apply = True
dags = [
dag
for dag in get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex)
if is_paused != dag.get_is_paused()
]

if not dags:
raise AirflowException(f"No {'un' if is_paused else ''}paused DAGs were found")

if not args.yes and args.treat_dag_id_as_regex:
dags_ids = [dag.dag_id for dag in dags]
question = (
f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} DAGs:\n"
f"{','.join(dags_ids)}"
f"\n\nAre you sure? [y/n]"
)
should_apply = ask_yesno(question)

dag.set_is_paused(is_paused=is_paused)
if should_apply:
dags_models = [DagModel.get_dagmodel(dag.dag_id) for dag in dags]
for dag_model in dags_models:
if dag_model is not None:
dag_model.set_is_paused(is_paused=is_paused)

print(f"Dag: {args.dag_id}, paused: {is_paused}")
AirflowConsole().print_as(
data=[
{"dag_id": dag.dag_id, "is_paused": dag.get_is_paused()}
for dag in dags_models
if dag is not None
],
output=args.output,
)
else:
print("Operation cancelled by user")


@providers_configuration_loaded
Expand Down
64 changes: 60 additions & 4 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def test_backfill(self, mock_run):
"--task-regex",
"run_this_first",
"--dry-run",
"--treat-dag-as-regex",
"--treat-dag-id-as-regex",
"--start-date",
DEFAULT_DATE.isoformat(),
]
Expand All @@ -244,6 +244,24 @@ def test_backfill(self, mock_run):
assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE_REPR}\n" in output
assert "Task run_this_first located in DAG example_branch_operator\n" in output

@mock.patch("airflow.cli.commands.dag_command._run_dag_backfill")
def test_backfill_treat_dag_as_regex_deprecation(self, _):
run_date = DEFAULT_DATE + timedelta(days=1)
cli_args = self.parser.parse_args(
[
"dags",
"backfill",
"example_bash_operator",
"--treat-dag-as-regex",
"--start-date",
run_date.isoformat(),
]
)

with pytest.warns(DeprecationWarning, match="--treat-dag-as-regex is deprecated"):
dag_command.dag_backfill(cli_args)
assert cli_args.treat_dag_id_as_regex == cli_args.treat_dag_as_regex

@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_backfill_fails_without_loading_dags(self, mock_get_dag):
cli_args = self.parser.parse_args(["dags", "backfill", "example_bash_operator"])
Expand Down Expand Up @@ -650,11 +668,49 @@ def test_cli_list_jobs_with_args(self):
def test_pause(self):
args = self.parser.parse_args(["dags", "pause", "example_bash_operator"])
dag_command.dag_pause(args)
assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [True, 1]
assert self.dagbag.dags["example_bash_operator"].get_is_paused()
dag_command.dag_unpause(args)
assert not self.dagbag.dags["example_bash_operator"].get_is_paused()

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", "^example_.*$", "--treat-dag-id-as-regex"])
dag_command.dag_pause(args)
mock_yesno.assert_called_once()
assert self.dagbag.dags["example_bash_decorator"].get_is_paused()
assert self.dagbag.dags["example_kubernetes_executor"].get_is_paused()
assert self.dagbag.dags["example_xcom_args"].get_is_paused()

args = self.parser.parse_args(["dags", "unpause", "example_bash_operator"])
args = self.parser.parse_args(["dags", "unpause", "^example_.*$", "--treat-dag-id-as-regex"])
dag_command.dag_unpause(args)
assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [False, 0]
assert not self.dagbag.dags["example_bash_decorator"].get_is_paused()
assert not self.dagbag.dags["example_kubernetes_executor"].get_is_paused()
assert not self.dagbag.dags["example_xcom_args"].get_is_paused()

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_operation_cancelled(self, ask_yesno, capsys):
args = self.parser.parse_args(["dags", "pause", "example_bash_operator", "--treat-dag-id-as-regex"])
ask_yesno.return_value = False
dag_command.dag_pause(args)
stdout = capsys.readouterr().out
assert "Operation cancelled by user" in stdout

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_yes(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", ".*", "--treat-dag-id-as-regex", "--yes"])
dag_command.dag_pause(args)
mock_yesno.assert_not_called()
dag_command.dag_unpause(args)

def test_pause_non_existing_dag_error(self):
args = self.parser.parse_args(["dags", "pause", "non_existing_dag"])
with pytest.raises(AirflowException):
dag_command.dag_pause(args)

def test_unpause_already_unpaused_dag_error(self):
args = self.parser.parse_args(["dags", "unpause", "example_bash_operator", "--yes"])
with pytest.raises(AirflowException, match="No paused DAGs were found"):
dag_command.dag_unpause(args)

def test_trigger_dag(self):
dag_command.dag_trigger(
Expand Down

0 comments on commit fecc1ed

Please sign in to comment.