diff --git a/airflow/cli/commands/local_commands/dag_processor_command.py b/airflow/cli/commands/local_commands/dag_processor_command.py index 03513df17a093..f0c3bc5060ca1 100644 --- a/airflow/cli/commands/local_commands/dag_processor_command.py +++ b/airflow/cli/commands/local_commands/dag_processor_command.py @@ -43,7 +43,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner: processor_timeout=processor_timeout, dag_directory=args.subdir, max_runs=args.num_runs, - dag_ids=[], ), ) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 7d9c9298a996e..f60377d496625 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -116,7 +116,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): :param max_runs: The number of times to parse and schedule each file. -1 for unlimited. :param processor_timeout: How long to wait before timing out a DAG file processor - :param dag_ids: if specified, only schedule tasks with these DAG IDs """ def __init__( @@ -124,13 +123,11 @@ def __init__( dag_directory: os.PathLike, max_runs: int, processor_timeout: timedelta, - dag_ids: list[str] | None, ): super().__init__() self._dag_directory: os.PathLike = dag_directory self._max_runs = max_runs self._processor_timeout = processor_timeout - self._dag_ids = dag_ids # Map from file path to the processor self._processors: dict[str, DagFileProcessorProcess] = {} # Pipe for communicating signals @@ -156,7 +153,6 @@ def start(self) -> None: self._max_runs, self._processor_timeout, child_signal_conn, - self._dag_ids, ), ) self._process = process @@ -177,26 +173,19 @@ def _run_processor_manager( max_runs: int, processor_timeout: timedelta, signal_conn: MultiprocessingConnection, - dag_ids: list[str] | None, ) -> None: # Make this process start as a new process group - that makes it easy # to kill all sub-process of this at the OS-level, rather than having # to iterate the child processes set_new_process_group() span = Trace.get_current_span() - span.set_attributes( - { - "dag_directory": str(dag_directory), - "dag_ids": str(dag_ids), - } - ) + span.set_attribute("dag_directory", str(dag_directory)) setproctitle("airflow scheduler -- DagFileProcessorManager") reload_configuration_for_dag_processing() processor_manager = DagFileProcessorManager( dag_directory=dag_directory, max_runs=max_runs, processor_timeout=processor_timeout, - dag_ids=dag_ids, signal_conn=signal_conn, ) processor_manager.start() @@ -307,7 +296,6 @@ class DagFileProcessorManager(LoggingMixin): for unlimited. :param processor_timeout: How long to wait before timing out a DAG file processor :param signal_conn: connection to communicate signal with processor agent. - :param dag_ids: if specified, only schedule tasks with these DAG IDs """ def __init__( @@ -315,7 +303,6 @@ def __init__( dag_directory: os.PathLike[str], max_runs: int, processor_timeout: timedelta, - dag_ids: list[str] | None, signal_conn: MultiprocessingConnection | None = None, ): super().__init__() @@ -325,7 +312,6 @@ def __init__( self._max_runs = max_runs # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn - self._dag_ids = dag_ids self._parsing_start_time: float | None = None self._dag_directory = dag_directory # Set the signal conn in to non-blocking mode, so that attempting to @@ -1001,11 +987,10 @@ def collect_results(self) -> None: self.log.debug("%s file paths queued for processing", len(self._file_path_queue)) @staticmethod - def _create_process(file_path, dag_ids, dag_directory, callback_requests): + def _create_process(file_path, dag_directory, callback_requests): """Create DagFileProcessorProcess instance.""" return DagFileProcessorProcess( file_path=file_path, - dag_ids=dag_ids, dag_directory=dag_directory, callback_requests=callback_requests, ) @@ -1026,7 +1011,6 @@ def start_new_processes(self): callback_to_execute_for_file = self._callback_to_execute[file_path] processor = self._create_process( file_path, - self._dag_ids, self.get_dag_directory(), callback_to_execute_for_file, ) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 840c17300f565..b3e6ff770b8c1 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -92,7 +92,6 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): Runs DAG processing in a separate process using DagFileProcessor. :param file_path: a Python file containing Airflow DAG definitions - :param dag_ids: If specified, only look at these DAG ID's :param callback_requests: failure callback to execute """ @@ -102,13 +101,11 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): def __init__( self, file_path: str, - dag_ids: list[str] | None, dag_directory: str, callback_requests: list[CallbackRequest], ): super().__init__() self._file_path = file_path - self._dag_ids = dag_ids self._dag_directory = dag_directory self._callback_requests = callback_requests @@ -136,7 +133,6 @@ def _run_file_processor( result_channel: MultiprocessingConnection, parent_channel: MultiprocessingConnection, file_path: str, - dag_ids: list[str] | None, thread_name: str, dag_directory: str, callback_requests: list[CallbackRequest], @@ -147,8 +143,6 @@ def _run_file_processor( :param result_channel: the connection to use for passing back the result :param parent_channel: the parent end of the channel to close in the child :param file_path: the file to process - :param dag_ids: if specified, only examine DAG ID's that are - in this list :param thread_name: the name to use for the process that is launched :param callback_requests: failure callback to execute :return: the process that was launched @@ -174,7 +168,7 @@ def _handle_dag_file_processing(): threading.current_thread().name = thread_name log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path) - dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log) + dag_file_processor = DagFileProcessor(dag_directory=dag_directory, log=log) result: tuple[int, int, int] = dag_file_processor.process_file( file_path=file_path, callback_requests=callback_requests, @@ -241,7 +235,6 @@ def start(self) -> None: _child_channel, _parent_channel, self.file_path, - self._dag_ids, f"DagFileProcessor{self._instance_id}", self._dag_directory, self._callback_requests, @@ -415,15 +408,13 @@ class DagFileProcessor(LoggingMixin): Returns a tuple of 'number of dags found' and 'the count of import errors' - :param dag_ids: If specified, only look at these DAG ID's :param log: Logger to save the processing process """ UNIT_TEST_MODE: bool = conf.getboolean("core", "UNIT_TEST_MODE") - def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.Logger): + def __init__(self, dag_directory: str, log: logging.Logger): super().__init__() - self.dag_ids = dag_ids self._log = log self._dag_directory = dag_directory self.dag_warnings: set[tuple[str, str]] = set() diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 21fa41aa2c592..0254b417a71ad 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -934,7 +934,6 @@ def _execute(self) -> int | None: dag_directory=Path(self.subdir), max_runs=self.num_times_parse_dags, processor_timeout=processor_timeout, - dag_ids=[], ) reset_signals = self.register_signals() diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 4a338e164d64b..fedc15a7437fb 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -80,8 +80,8 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess): # This fake processor will return the zombies it received in constructor # as its processing result w/o actually parsing anything. - def __init__(self, file_path, dag_ids, dag_directory, callbacks): - super().__init__(file_path, dag_ids, dag_directory, callbacks) + def __init__(self, file_path, dag_directory, callbacks): + super().__init__(file_path, dag_directory, callbacks) # We need a "real" selectable handle for waitable_handle to work readable, writable = multiprocessing.Pipe(duplex=False) writable.send("abc") @@ -109,10 +109,9 @@ def result(self): return self._result @staticmethod - def _create_process(file_path, callback_requests, dag_ids, dag_directory): + def _create_process(file_path, callback_requests, dag_directory): return FakeDagFileProcessorRunner( file_path, - dag_ids, dag_directory, callback_requests, ) @@ -169,7 +168,6 @@ def test_remove_file_clears_import_error(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) with create_session() as session: @@ -199,7 +197,6 @@ def test_max_runs_when_no_files(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -217,7 +214,6 @@ def test_start_new_processes_with_same_filepath(self, _): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) file_1 = "file_1.py" @@ -246,7 +242,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) mock_processor = MagicMock() @@ -266,7 +261,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) mock_processor = MagicMock() @@ -295,7 +289,6 @@ def test_file_paths_in_queue_sorted_alphabetically( max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.set_file_paths(dag_files) @@ -320,7 +313,6 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.set_file_paths(dag_files) @@ -380,7 +372,6 @@ def test_file_paths_in_queue_sorted_by_modified_time( max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.set_file_paths(dag_files) @@ -413,7 +404,6 @@ def test_file_paths_in_queue_excludes_missing_file( max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.set_file_paths(dag_files) @@ -445,7 +435,6 @@ def test_add_new_file_to_parsing_queue( max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.set_file_paths(dag_files) @@ -486,7 +475,6 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( max_runs=3, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) # let's say the DAG was just parsed 10 seconds before the Freezed time @@ -542,7 +530,6 @@ def test_file_paths_in_queue_sorted_by_priority( max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.set_file_paths(dag_files) @@ -563,7 +550,6 @@ def test_scan_stale_dags(self): max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), - dag_ids=[], ) test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py") @@ -630,7 +616,6 @@ def test_scan_stale_dags_standalone_mode(self): max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), - dag_ids=[], ) test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py") @@ -682,12 +667,10 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), - dag_ids=[], ) processor = DagFileProcessorProcess( file_path="abc.txt", - dag_ids=[], dag_directory=TEST_DAG_FOLDER, callback_requests=[], ) @@ -709,12 +692,10 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), - dag_ids=[], ) processor = DagFileProcessorProcess( file_path="abc.txt", - dag_ids=[], dag_directory=str(TEST_DAG_FOLDER), callback_requests=[], ) @@ -741,7 +722,6 @@ def test_dag_with_system_exit(self): manager = DagFileProcessorManager( dag_directory=dag_directory, - dag_ids=[], max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, @@ -782,7 +762,6 @@ def test_import_error_with_dag_directory(self, tmp_path): manager = DagFileProcessorManager( dag_directory=processor_dir_1, - dag_ids=[], max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), @@ -798,7 +777,6 @@ def test_import_error_with_dag_directory(self, tmp_path): manager = DagFileProcessorManager( dag_directory=processor_dir_2, - dag_ids=[], max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), @@ -863,7 +841,6 @@ def fake_processor_(*args, **kwargs): manager = DagFileProcessorManager( dag_directory=dag_filepath, - dag_ids=[], # A reasonable large number to ensure that we trigger the deadlock max_runs=100, processor_timeout=timedelta(seconds=5), @@ -904,7 +881,6 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -933,7 +909,6 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -957,7 +932,6 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -999,7 +973,6 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) manager.last_dag_dir_refresh_time = timezone.utcnow() - timedelta(minutes=10) @@ -1044,7 +1017,6 @@ def test_fetch_callbacks_from_database(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) with create_session() as session: @@ -1086,7 +1058,6 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) with create_session() as session: @@ -1121,7 +1092,6 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) with create_session() as session: @@ -1157,7 +1127,6 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, - dag_ids=[], ) with create_session() as session: @@ -1175,7 +1144,6 @@ def test_callback_queue(self, tmp_path): max_runs=1, processor_timeout=timedelta(days=365), signal_conn=MagicMock(), - dag_ids=[], ) dag1_req1 = DagCallbackRequest( @@ -1273,7 +1241,7 @@ class path, thus when reloading logging module the airflow.processor_manager os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365)) processor_agent.start() processor_agent._process.join() @@ -1288,7 +1256,7 @@ def test_parse_once(self): clear_db_dags() test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365)) processor_agent.start() while not processor_agent.done: processor_agent.heartbeat() @@ -1311,7 +1279,7 @@ def test_launch_process(self): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365)) processor_agent.start() processor_agent._process.join() @@ -1319,25 +1287,25 @@ def test_launch_process(self): assert os.path.isfile(log_file_loc) def test_get_callbacks_pipe(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() retval = processor_agent.get_callbacks_pipe() assert retval == processor_agent._parent_signal_conn def test_get_callbacks_pipe_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.get_callbacks_pipe() def test_heartbeat_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.heartbeat() def test_heartbeat_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1346,7 +1314,7 @@ def test_heartbeat_poll_eof_error(self): assert ret_val is None def test_heartbeat_poll_connection_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1355,7 +1323,7 @@ def test_heartbeat_poll_connection_error(self): assert ret_val is None def test_heartbeat_poll_process_message(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.side_effect = [True, False] processor_agent._parent_signal_conn.recv = Mock() @@ -1366,19 +1334,19 @@ def test_heartbeat_poll_process_message(self): def test_process_message_invalid_type(self): message = "xyz" - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) with pytest.raises(RuntimeError, match="Unexpected message received of type str"): processor_agent._process_message(message) def test_heartbeat_manager(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent._heartbeat_manager() @mock.patch("airflow.utils.process_utils.reap_process_group") def test_heartbeat_manager_process_restart(self, mock_pg): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() processor_agent._process = MagicMock() processor_agent.start = Mock() @@ -1392,7 +1360,7 @@ def test_heartbeat_manager_process_restart(self, mock_pg): @mock.patch("time.monotonic") @mock.patch("airflow.dag_processing.manager.reap_process_group") def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock_stats): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.pid = 12345 @@ -1413,7 +1381,7 @@ def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock processor_agent.start.assert_called() def test_heartbeat_manager_terminate(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True @@ -1423,7 +1391,7 @@ def test_heartbeat_manager_terminate(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_terminate_conn_err(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True processor_agent._parent_signal_conn = Mock() @@ -1434,7 +1402,7 @@ def test_heartbeat_manager_terminate_conn_err(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_end_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365)) processor_agent._process = Mock() processor_agent._process.__bool__ = Mock(return_value=False) processor_agent._process.side_effect = [None] @@ -1449,7 +1417,7 @@ def test_log_to_stdout(self, capfd): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365)) processor_agent.start() processor_agent._process.join() @@ -1464,7 +1432,7 @@ def test_not_log_to_stdout(self, capfd): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365)) processor_agent.start() processor_agent._process.join() diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index c2962ea04115c..b23cd44f959ae 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -107,18 +107,14 @@ def teardown_method(self) -> None: self.clean_db() def _process_file(self, file_path, dag_directory, session): - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock() - ) + dag_file_processor = DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock()) dag_file_processor.process_file(file_path, []) @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) + dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()) with create_session() as session: session.query(TaskInstance).delete() dag = dagbag.get_dag("example_branch_operator") @@ -152,9 +148,7 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag): dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) + dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()) with create_session() as session: session.query(TaskInstance).delete() dag = dagbag.get_dag("example_branch_operator") @@ -188,9 +182,7 @@ def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, def test_failure_callbacks_should_not_drop_hostname(self): dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) + dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()) dag_file_processor.UNIT_TEST_MODE = False with create_session() as session: @@ -224,9 +216,7 @@ def test_process_file_should_failure_callback(self, monkeypatch, tmp_path, get_t callback_file = tmp_path.joinpath("callback.txt") callback_file.touch() monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) + dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()) dag = get_test_dag("test_on_failure_callback") task = dag.get_task(task_id="test_on_failure_callback_task") @@ -576,7 +566,6 @@ def test_import_error_tracebacks_zip_depth(self, tmp_path): def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for_file): processor = DagFileProcessorProcess( file_path="abc.txt", - dag_ids=[], dag_directory=[], callback_requests=[], ) @@ -584,7 +573,6 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for result_channel=MagicMock(), parent_channel=MagicMock(), file_path="fake_file_path", - dag_ids=[], thread_name="fake_thread_name", callback_requests=[], dag_directory=[], @@ -597,7 +585,6 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_file): processor = DagFileProcessorProcess( file_path="abc.txt", - dag_ids=[], dag_directory=[], callback_requests=[], ) @@ -605,7 +592,6 @@ def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_f result_channel=MagicMock(), parent_channel=MagicMock(), file_path="fake_file_path", - dag_ids=[], thread_name="fake_thread_name", callback_requests=[], dag_directory=[], @@ -622,7 +608,6 @@ def test_no_valueerror_with_parseable_dag_in_zip(self, mock_context, tmp_path): processor = DagFileProcessorProcess( file_path=zip_filename, - dag_ids=[], dag_directory=[], callback_requests=[], ) @@ -638,7 +623,6 @@ def test_nullbyte_exception_handling_when_preimporting_airflow(self, mock_contex processor = DagFileProcessorProcess( file_path=dag_filename, - dag_ids=[], dag_directory=[], callback_requests=[], ) diff --git a/tests/listeners/test_dag_import_error_listener.py b/tests/listeners/test_dag_import_error_listener.py index ff63d141c7852..aa085d3cfd7f3 100644 --- a/tests/listeners/test_dag_import_error_listener.py +++ b/tests/listeners/test_dag_import_error_listener.py @@ -95,9 +95,7 @@ def teardown_method(self) -> None: self.clean_db() def _process_file(self, file_path, dag_directory, session): - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock() - ) + dag_file_processor = DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock()) dag_file_processor.process_file(file_path, [])