Skip to content

Commit

Permalink
Remove unused dag_ids argument to DagFile processor classes. (apach…
Browse files Browse the repository at this point in the history
…e#44845)

This hasn't been possible to set in a while, (like since sometime before 2.0,
possibly even before 1.8) and the doc string gives a clue to the behaviour:
only to schedule certain dags, but that is not the job of the dag processor
and hasn't been involved in that flow since 2.0.

Time to go.
  • Loading branch information
ashb authored Dec 11, 2024
1 parent cc41721 commit 9aa2cea
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
processor_timeout=processor_timeout,
dag_directory=args.subdir,
max_runs=args.num_runs,
dag_ids=[],
),
)

Expand Down
20 changes: 2 additions & 18 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,18 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param dag_ids: if specified, only schedule tasks with these DAG IDs
"""

def __init__(
self,
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
):
super().__init__()
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
# Map from file path to the processor
self._processors: dict[str, DagFileProcessorProcess] = {}
# Pipe for communicating signals
Expand All @@ -156,7 +153,6 @@ def start(self) -> None:
self._max_runs,
self._processor_timeout,
child_signal_conn,
self._dag_ids,
),
)
self._process = process
Expand All @@ -177,26 +173,19 @@ def _run_processor_manager(
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: list[str] | None,
) -> None:
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
set_new_process_group()
span = Trace.get_current_span()
span.set_attributes(
{
"dag_directory": str(dag_directory),
"dag_ids": str(dag_ids),
}
)
span.set_attribute("dag_directory", str(dag_directory))
setproctitle("airflow scheduler -- DagFileProcessorManager")
reload_configuration_for_dag_processing()
processor_manager = DagFileProcessorManager(
dag_directory=dag_directory,
max_runs=max_runs,
processor_timeout=processor_timeout,
dag_ids=dag_ids,
signal_conn=signal_conn,
)
processor_manager.start()
Expand Down Expand Up @@ -307,15 +296,13 @@ class DagFileProcessorManager(LoggingMixin):
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param signal_conn: connection to communicate signal with processor agent.
:param dag_ids: if specified, only schedule tasks with these DAG IDs
"""

def __init__(
self,
dag_directory: os.PathLike[str],
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
signal_conn: MultiprocessingConnection | None = None,
):
super().__init__()
Expand All @@ -325,7 +312,6 @@ def __init__(
self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
self._dag_ids = dag_ids
self._parsing_start_time: float | None = None
self._dag_directory = dag_directory
# Set the signal conn in to non-blocking mode, so that attempting to
Expand Down Expand Up @@ -1001,11 +987,10 @@ def collect_results(self) -> None:
self.log.debug("%s file paths queued for processing", len(self._file_path_queue))

@staticmethod
def _create_process(file_path, dag_ids, dag_directory, callback_requests):
def _create_process(file_path, dag_directory, callback_requests):
"""Create DagFileProcessorProcess instance."""
return DagFileProcessorProcess(
file_path=file_path,
dag_ids=dag_ids,
dag_directory=dag_directory,
callback_requests=callback_requests,
)
Expand All @@ -1026,7 +1011,6 @@ def start_new_processes(self):
callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._create_process(
file_path,
self._dag_ids,
self.get_dag_directory(),
callback_to_execute_for_file,
)
Expand Down
13 changes: 2 additions & 11 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
Runs DAG processing in a separate process using DagFileProcessor.
:param file_path: a Python file containing Airflow DAG definitions
:param dag_ids: If specified, only look at these DAG ID's
:param callback_requests: failure callback to execute
"""

Expand All @@ -102,13 +101,11 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
def __init__(
self,
file_path: str,
dag_ids: list[str] | None,
dag_directory: str,
callback_requests: list[CallbackRequest],
):
super().__init__()
self._file_path = file_path
self._dag_ids = dag_ids
self._dag_directory = dag_directory
self._callback_requests = callback_requests

Expand Down Expand Up @@ -136,7 +133,6 @@ def _run_file_processor(
result_channel: MultiprocessingConnection,
parent_channel: MultiprocessingConnection,
file_path: str,
dag_ids: list[str] | None,
thread_name: str,
dag_directory: str,
callback_requests: list[CallbackRequest],
Expand All @@ -147,8 +143,6 @@ def _run_file_processor(
:param result_channel: the connection to use for passing back the result
:param parent_channel: the parent end of the channel to close in the child
:param file_path: the file to process
:param dag_ids: if specified, only examine DAG ID's that are
in this list
:param thread_name: the name to use for the process that is launched
:param callback_requests: failure callback to execute
:return: the process that was launched
Expand All @@ -174,7 +168,7 @@ def _handle_dag_file_processing():
threading.current_thread().name = thread_name

log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log)
dag_file_processor = DagFileProcessor(dag_directory=dag_directory, log=log)
result: tuple[int, int, int] = dag_file_processor.process_file(
file_path=file_path,
callback_requests=callback_requests,
Expand Down Expand Up @@ -241,7 +235,6 @@ def start(self) -> None:
_child_channel,
_parent_channel,
self.file_path,
self._dag_ids,
f"DagFileProcessor{self._instance_id}",
self._dag_directory,
self._callback_requests,
Expand Down Expand Up @@ -415,15 +408,13 @@ class DagFileProcessor(LoggingMixin):
Returns a tuple of 'number of dags found' and 'the count of import errors'
:param dag_ids: If specified, only look at these DAG ID's
:param log: Logger to save the processing process
"""

UNIT_TEST_MODE: bool = conf.getboolean("core", "UNIT_TEST_MODE")

def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.Logger):
def __init__(self, dag_directory: str, log: logging.Logger):
super().__init__()
self.dag_ids = dag_ids
self._log = log
self._dag_directory = dag_directory
self.dag_warnings: set[tuple[str, str]] = set()
Expand Down
1 change: 0 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,6 @@ def _execute(self) -> int | None:
dag_directory=Path(self.subdir),
max_runs=self.num_times_parse_dags,
processor_timeout=processor_timeout,
dag_ids=[],
)

reset_signals = self.register_signals()
Expand Down
Loading

0 comments on commit 9aa2cea

Please sign in to comment.