From ef0ee705febd6bec1c5ca013bef46339db6e6e7f Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Mon, 9 Dec 2024 16:21:46 -0700 Subject: [PATCH] Fix standalone dag processor startup --- .../local_commands/dag_processor_command.py | 22 ------------------- airflow/dag_processing/manager.py | 4 +++- 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/airflow/cli/commands/local_commands/dag_processor_command.py b/airflow/cli/commands/local_commands/dag_processor_command.py index fb2b601e48947..07d845150c099 100644 --- a/airflow/cli/commands/local_commands/dag_processor_command.py +++ b/airflow/cli/commands/local_commands/dag_processor_command.py @@ -19,39 +19,19 @@ from __future__ import annotations import logging -from datetime import timedelta -from typing import Any from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.dag_processing.manager import ( - DagFileProcessorManager, TaskSDKBasedDagCollector, reload_configuration_for_dag_processing, ) -from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner -from airflow.jobs.job import Job from airflow.utils import cli as cli_utils from airflow.utils.providers_configuration_loader import providers_configuration_loaded log = logging.getLogger(__name__) -def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner: - """Create DagFileProcessorProcess instance.""" - processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout") - processor_timeout = timedelta(seconds=processor_timeout_seconds) - return DagProcessorJobRunner( - job=Job(), - processor=DagFileProcessorManager( - processor_timeout=processor_timeout, - dag_directory=args.subdir, - max_runs=args.num_runs, - dag_ids=[], - ), - ) - - @cli_utils.action_cli @providers_configuration_loaded def dag_processor(args): @@ -63,9 +43,7 @@ def dag_processor(args): # if sql_conn.startswith("sqlite"): # raise SystemExit("Standalone DagProcessor is not supported when using sqlite.") - # job_runner = _create_dag_processor_job_runner(args) collector = TaskSDKBasedDagCollector(dag_directory=args.subdir, max_runs=args.num_runs, parallelism=1) - collector.run() reload_configuration_for_dag_processing() run_command_with_daemon_option( diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 146df97b4e641..11836657e18b2 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -396,7 +396,9 @@ def _scan_stale_dags(self): now = time.monotonic() elapsed_time_since_refresh = now - self._last_deactivate_stale_dags_time if elapsed_time_since_refresh > self.parsing_cleanup_interval: - last_parsed = {fp: stat.last_finish_time for fp, stat in self._file_stats.items()} + last_parsed = { + fp: stat.last_finish_time for fp, stat in self._file_stats.items() if stat.last_finish_time + } self.deactivate_stale_dags( last_parsed=last_parsed, dag_directory=self.get_dag_directory(),