Skip to content

Commit

Permalink
Move more DAG parsing related config to dag_processor section (apache…
Browse files Browse the repository at this point in the history
…#46034)

This moves most of the DAG paring related config into the new
``dag_processor`` section.

The remaining options need more attention than just a
straightforward move, so I'll tackle those separately for
ease of review.
  • Loading branch information
jedcunningham authored and got686-yandex committed Jan 30, 2025
1 parent e39f157 commit afc1ece
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout_seconds: int = conf.getint("dag_processor", "dag_file_processor_timeout")
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
Expand Down
30 changes: 29 additions & 1 deletion airflow/cli/commands/remote_commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ def message(self) -> str:
),
ConfigChange(config=ConfigParameter("core", "task_runner")),
ConfigChange(config=ConfigParameter("core", "enable_xcom_pickling")),
ConfigChange(
config=ConfigParameter("core", "dag_file_processor_timeout"),
renamed_to=ConfigParameter("dag_processor", "dag_file_processor_timeout"),
),
# api
ConfigChange(
config=ConfigParameter("api", "access_control_allow_origin"),
Expand Down Expand Up @@ -289,7 +293,7 @@ def message(self) -> str:
),
ConfigChange(
config=ConfigParameter("scheduler", "max_threads"),
renamed_to=ConfigParameter("scheduler", "parsing_processes"),
renamed_to=ConfigParameter("dag_processor", "parsing_processes"),
),
ConfigChange(
config=ConfigParameter("scheduler", "statsd_host"),
Expand Down Expand Up @@ -327,6 +331,30 @@ def message(self) -> str:
config=ConfigParameter("scheduler", "statsd_custom_client_path"),
renamed_to=ConfigParameter("metrics", "statsd_custom_client_path"),
),
ConfigChange(
config=ConfigParameter("scheduler", "parsing_processes"),
renamed_to=ConfigParameter("dag_processor", "parsing_processes"),
),
ConfigChange(
config=ConfigParameter("scheduler", "file_parsing_sort_mode"),
renamed_to=ConfigParameter("dag_processor", "file_parsing_sort_mode"),
),
ConfigChange(
config=ConfigParameter("scheduler", "max_callbacks_per_loop"),
renamed_to=ConfigParameter("dag_processor", "max_callbacks_per_loop"),
),
ConfigChange(
config=ConfigParameter("scheduler", "min_file_process_interval"),
renamed_to=ConfigParameter("dag_processor", "min_file_process_interval"),
),
ConfigChange(
config=ConfigParameter("scheduler", "stale_dag_threshold"),
renamed_to=ConfigParameter("dag_processor", "stale_dag_threshold"),
),
ConfigChange(
config=ConfigParameter("scheduler", "print_stats_interval"),
renamed_to=ConfigParameter("dag_processor", "print_stats_interval"),
),
ConfigChange(
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
Expand Down
129 changes: 64 additions & 65 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,6 @@ core:
type: integer
example: ~
default: "2"
dag_file_processor_timeout:
description: |
How long before timing out a DagFileProcessor, which processes a dag file
version_added: 1.10.6
type: string
example: ~
default: "50"
default_impersonation:
description: |
If set, tasks without a ``run_as_user`` argument will be run with this user
Expand Down Expand Up @@ -2264,15 +2257,6 @@ scheduler:
type: float
example: ~
default: "1"
min_file_process_interval:
description: |
Number of seconds after which a DAG file is parsed. The DAG file is parsed every
``[scheduler] min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
this interval. Keeping this number low will increase CPU usage.
version_added: ~
type: integer
example: ~
default: "30"
parsing_cleanup_interval:
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
Expand All @@ -2282,25 +2266,6 @@ scheduler:
type: integer
example: ~
default: "60"
stale_dag_threshold:
description: |
How long (in seconds) to wait after we have re-parsed a DAG file before deactivating stale
DAGs (DAGs which are no longer present in the expected files). The reason why we need
this threshold is to account for the time between when the file is parsed and when the
DAG is loaded. The absolute maximum that this could take is ``[core] dag_file_processor_timeout``,
but when you have a long timeout configured, it results in a significant delay in the
deactivation of stale dags.
version_added: 2.6.0
type: integer
example: ~
default: "50"
print_stats_interval:
description: |
How often should stats be printed to the logs. Setting to 0 will disable printing stats
version_added: ~
type: integer
example: ~
default: "30"
pool_metrics_interval:
description: |
How often (in seconds) should pool usage stats be sent to StatsD (if statsd_on is enabled)
Expand Down Expand Up @@ -2440,36 +2405,6 @@ scheduler:
type: boolean
example: ~
default: "True"
parsing_processes:
description: |
The scheduler can run multiple processes in parallel to parse dags.
This defines how many processes will run.
version_added: 1.10.14
type: integer
example: ~
default: "2"
file_parsing_sort_mode:
description: |
One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
The scheduler will list and sort the dag files to decide the parsing order.
* ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
recently modified DAGs first.
* ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
same host. This is useful when running with Scheduler in HA mode where each scheduler can
parse different DAG files.
* ``alphabetical``: Sort by filename
version_added: 2.1.0
type: string
example: ~
default: "modified_time"
max_callbacks_per_loop:
description: |
The maximum number of callbacks that are fetched during a single loop.
version_added: 2.3.0
type: integer
example: ~
default: "20"
dag_stale_not_seen_duration:
description: |
Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
Expand Down Expand Up @@ -2700,3 +2635,67 @@ dag_processor:
type: integer
example: ~
default: "300"
parsing_processes:
description: |
The DAG processor can run multiple processes in parallel to parse dags.
This defines how many processes will run.
version_added: ~
type: integer
example: ~
default: "2"
file_parsing_sort_mode:
description: |
One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
The DAG processor will list and sort the dag files to decide the parsing order.
* ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
recently modified DAGs first.
* ``random_seeded_by_host``: Sort randomly across multiple DAG processors but with same order on the
same host, allowing each processor to parse the files in a different order.
* ``alphabetical``: Sort by filename
version_added: ~
type: string
example: ~
default: "modified_time"
max_callbacks_per_loop:
description: |
The maximum number of callbacks that are fetched during a single loop.
version_added: ~
type: integer
example: ~
default: "20"
min_file_process_interval:
description: |
Number of seconds after which a DAG file is parsed. The DAG file is parsed every
``[dag_processor] min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
this interval. Keeping this number low will increase CPU usage.
version_added: ~
type: integer
example: ~
default: "30"
stale_dag_threshold:
description: |
How long (in seconds) to wait after we have re-parsed a DAG file before deactivating stale
DAGs (DAGs which are no longer present in the expected files). The reason why we need
this threshold is to account for the time between when the file is parsed and when the
DAG is loaded. The absolute maximum that this could take is
``[dag_processor] dag_file_processor_timeout``, but when you have a long timeout configured,
it results in a significant delay in the deactivation of stale dags.
version_added: ~
type: integer
example: ~
default: "50"
dag_file_processor_timeout:
description: |
How long before timing out a DagFileProcessor, which processes a dag file
version_added: ~
type: string
example: ~
default: "50"
print_stats_interval:
description: |
How often should DAG processor stats be printed to the logs. Setting to 0 will disable printing stats
version_added: ~
type: integer
example: ~
default: "30"
2 changes: 2 additions & 0 deletions airflow/config_templates/unit_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
# Those values are set so that during unit tests things run faster than usual
job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5

[dag_processor]
parsing_processes = 2

[triggerer]
Expand Down
6 changes: 5 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,11 @@ def inversed_deprecated_sections(self):
("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
("dag_processor", "file_parsing_sort_mode"): [
"modified_time",
"random_seeded_by_host",
"alphabetical",
],
("logging", "logging_level"): _available_logging_levels,
("logging", "fab_logging_level"): _available_logging_levels,
# celery_logging_level can be empty, which uses logging_level as fallback
Expand Down
26 changes: 14 additions & 12 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,31 +133,33 @@ class DagFileProcessorManager:
processors finish, more are launched. The files are processed over and
over again, but no more often than the specified interval.
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:param max_runs: The number of times to parse each file. -1 for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param signal_conn: connection to communicate signal with processor agent.
"""

max_runs: int
processor_timeout: float = attrs.field(factory=_config_int_factory("core", "dag_file_processor_timeout"))
processor_timeout: float = attrs.field(
factory=_config_int_factory("dag_processor", "dag_file_processor_timeout")
)
selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector)

_parallelism: int = attrs.field(factory=_config_int_factory("scheduler", "parsing_processes"))
_parallelism: int = attrs.field(factory=_config_int_factory("dag_processor", "parsing_processes"))

parsing_cleanup_interval: float = attrs.field(
factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
)
_file_process_interval: float = attrs.field(
factory=_config_int_factory("scheduler", "min_file_process_interval")
factory=_config_int_factory("dag_processor", "min_file_process_interval")
)
stale_dag_threshold: float = attrs.field(
factory=_config_int_factory("dag_processor", "stale_dag_threshold")
)
stale_dag_threshold: float = attrs.field(factory=_config_int_factory("scheduler", "stale_dag_threshold"))

log: logging.Logger = attrs.field(default=log, init=False)

_last_deactivate_stale_dags_time: float = attrs.field(default=0, init=False)
print_stats_interval: float = attrs.field(
factory=_config_int_factory("scheduler", "print_stats_interval")
factory=_config_int_factory("dag_processor", "print_stats_interval")
)
last_stat_print_time: float = attrs.field(default=0, init=False)

Expand All @@ -183,7 +185,7 @@ class DagFileProcessorManager:
)

max_callbacks_per_loop: int = attrs.field(
factory=_config_int_factory("scheduler", "max_callbacks_per_loop")
factory=_config_int_factory("dag_processor", "max_callbacks_per_loop")
)

base_log_dir: str = attrs.field(factory=_config_get_factory("scheduler", "CHILD_PROCESS_LOG_DIRECTORY"))
Expand Down Expand Up @@ -802,7 +804,7 @@ def prepare_file_path_queue(self):
now = timezone.utcnow()

# Sort the file paths by the parsing order mode
list_mode = conf.get("scheduler", "file_parsing_sort_mode")
list_mode = conf.get("dag_processor", "file_parsing_sort_mode")

files_with_mtime: dict[str, datetime] = {}
file_paths = []
Expand Down Expand Up @@ -841,7 +843,7 @@ def prepare_file_path_queue(self):
elif list_mode == "alphabetical":
file_paths.sort()
elif list_mode == "random_seeded_by_host":
# Shuffle the list seeded by hostname so multiple schedulers can work on different
# Shuffle the list seeded by hostname so multiple DAG processors can work on different
# set of files. Since we set the seed, the sort order will remain same per host
random.Random(get_hostname()).shuffle(file_paths)

Expand All @@ -860,7 +862,7 @@ def prepare_file_path_queue(self):
)

# Do not convert the following list to set as set does not preserve the order
# and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
# and we need to maintain the order of file_paths for `[dag_processor] file_parsing_sort_mode`
files_paths_to_queue = [
file_path for file_path in file_paths if file_path not in file_paths_to_exclude
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ The ``DagFileProcessorManager`` runs user codes. As a result, it runs as a stand
``DagFileProcessorManager`` has the following steps:

1. Check for new files: If the elapsed time since the DAG was last refreshed is > :ref:`config:scheduler__dag_dir_list_interval` then update the file paths list
2. Exclude recently processed files: Exclude files that have been processed more recently than :ref:`min_file_process_interval<config:scheduler__min_file_process_interval>` and have not been modified
2. Exclude recently processed files: Exclude files that have been processed more recently than :ref:`min_file_process_interval<config:dag_processor__min_file_process_interval>` and have not been modified
3. Queue file paths: Add files discovered to the file path queue
4. Process files: Start a new ``DagFileProcessorProcess`` for each file, up to a maximum of :ref:`config:scheduler__parsing_processes`
4. Process files: Start a new ``DagFileProcessorProcess`` for each file, up to a maximum of :ref:`config:dag_processor__parsing_processes`
5. Collect results: Collect the result from any finished DAG processors
6. Log statistics: Print statistics and emit ``dag_processing.total_parse_time``

``DagFileProcessorProcess`` has the following steps:

1. Process file: The entire process must complete within :ref:`dag_file_processor_timeout<config:core__dag_file_processor_timeout>`
1. Process file: The entire process must complete within :ref:`dag_file_processor_timeout<config:dag_processor__dag_file_processor_timeout>`
2. The DAG files are loaded as Python module: Must complete within :ref:`dagbag_import_timeout<config:core__dagbag_import_timeout>`
3. Process modules: Find DAG objects within Python module
4. Return DagBag: Provide the ``DagFileProcessorManager`` a list of the discovered DAG objects
Expand Down Expand Up @@ -136,12 +136,12 @@ There are several areas of resource usage that you should pay attention to:
* CPU usage is most important for FileProcessors - those are the processes that parse and execute
Python DAG files. Since DAG processors typically triggers such parsing continuously, when you have a lot of DAGs,
the processing might take a lot of CPU. You can mitigate it by increasing the
:ref:`config:scheduler__min_file_process_interval`, but this is one of the mentioned trade-offs,
:ref:`config:dag_processor__min_file_process_interval`, but this is one of the mentioned trade-offs,
result of this is that changes to such files will be picked up slower and you will see delays between
submitting the files and getting them available in Airflow UI and executed by Scheduler. Optimizing
the way how your DAGs are built, avoiding external data sources is your best approach to improve CPU
usage. If you have more CPUs available, you can increase number of processing threads
:ref:`config:scheduler__parsing_processes`.
:ref:`config:dag_processor__parsing_processes`.
* Airflow might use quite a significant amount of memory when you try to get more performance out of it.
Often more performance is achieved in Airflow by increasing the number of processes handling the load,
and each process requires whole interpreter of Python loaded, a lot of classes imported, temporary
Expand Down Expand Up @@ -185,14 +185,14 @@ The following config settings can be used to control aspects of the Scheduler.
However, you can also look at other non-performance-related scheduler configuration parameters available at
:doc:`../configurations-ref` in the ``[scheduler]`` section.

- :ref:`config:scheduler__file_parsing_sort_mode`
- :ref:`config:dag_processor__file_parsing_sort_mode`
The scheduler will list and sort the DAG files to decide the parsing order.

- :ref:`config:scheduler__min_file_process_interval`
- :ref:`config:dag_processor__min_file_process_interval`
Number of seconds after which a DAG file is re-parsed. The DAG file is parsed every
min_file_process_interval number of seconds. Updates to DAGs are reflected after
this interval. Keeping this number low will increase CPU usage.

- :ref:`config:scheduler__parsing_processes`
- :ref:`config:dag_processor__parsing_processes`
The scheduler can run multiple processes in parallel to parse DAG files. This defines
how many processes will run.
8 changes: 4 additions & 4 deletions docs/apache-airflow/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ and build DAG relations between them. This is because of the design decision for
and the impact the top-level code parsing speed on both performance and scalability of Airflow.

Airflow scheduler executes the code outside the Operator's ``execute`` methods with the minimum interval of
:ref:`min_file_process_interval<config:scheduler__min_file_process_interval>` seconds. This is done in order
:ref:`min_file_process_interval<config:dag_processor__min_file_process_interval>` seconds. This is done in order
to allow dynamic scheduling of the DAGs - where scheduling and dependencies might change over time and
impact the next schedule of the DAG. Airflow scheduler tries to continuously make sure that what you have
in DAGs is correctly reflected in scheduled tasks.
Expand Down Expand Up @@ -442,10 +442,10 @@ at the following configuration parameters and fine tune them according your need
each parameter by following the links):

* :ref:`config:scheduler__scheduler_idle_sleep_time`
* :ref:`config:scheduler__min_file_process_interval`
* :ref:`config:dag_processor__min_file_process_interval`
* :ref:`config:dag_processor__refresh_interval`
* :ref:`config:scheduler__parsing_processes`
* :ref:`config:scheduler__file_parsing_sort_mode`
* :ref:`config:dag_processor__parsing_processes`
* :ref:`config:dag_processor__file_parsing_sort_mode`

Example of watcher pattern with trigger rules
---------------------------------------------
Expand Down
Loading

0 comments on commit afc1ece

Please sign in to comment.