Skip to content

Commit

Permalink
Remove "single process" restrictions on SQLite in favour of using WAL…
Browse files Browse the repository at this point in the history
… mode (apache#44839)

Since 2010(!) sqlite has had a WAL, or Write-Ahead Log mode of journalling
which allos multiple concurrent readers and one writer. More than good enough
for us for "local" use.

The primary driver for this change was a realisation that it is possible and
to reduce the amount of code in complexity in DagProcessorManager before
reworking it for AIP-72 support :- we have a lot of code in the
DagProcessorManager to support `if async_mode` that makes understanding the
flow complex.

Some useful docs and articles about this mode:

- [The offical docs](https://sqlite.org/wal.html)
- [Simon Willison's TIL](https://til.simonwillison.net/sqlite/enabling-wal-mode)
- [fly.io article about scaling read concurrency](https://fly.io/blog/sqlite-internals-wal/)

This still keeps the warning against using SQLite in production, but it
greatly reduces the restrictions what combos and settings can use this. In
short, when using an SQLite db it is now possible to:

- use LocalExecutor, including with more than 1 concurrent worker slot
- have multiple DAG parsing processes (even before AIP-72/TaskSDK changes to
  that)

We execute the `PRAGMA journal_mode` every time we connect, which is more
often that is strictly needed as this is one of the few modes thatis
persistent and a property of the DB file just for ease and to ensure that it
it is in the mode we want.

I have tested this with `breeze -b sqlite start_airflow` and a kicking off a
lot of tasks concurrently.

Will this be without problems? No, not entirely, but due to the
scheduler+webserver+api server process we've _already_ got the case where
multiple processes are operating on the DB file. This change just makes the
best use of that following the guidance of the SQLite project: Ensuring that
only a single process accesses the DB concurrently is not a requirement
anymore!
  • Loading branch information
ashb authored Dec 11, 2024
1 parent 7f1d54a commit cb74a41
Show file tree
Hide file tree
Showing 20 changed files with 51 additions and 478 deletions.
1 change: 0 additions & 1 deletion airflow/cli/commands/local_commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
Expand Down
119 changes: 11 additions & 108 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class DagFileStat:
class DagParsingSignal(enum.Enum):
"""All signals sent to parser."""

AGENT_RUN_ONCE = "agent_run_once"
TERMINATE_MANAGER = "terminate_manager"
END_MANAGER = "end_manager"

Expand All @@ -118,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param async_mode: Whether to start agent in async mode
"""

def __init__(
Expand All @@ -127,14 +125,12 @@ def __init__(
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
async_mode: bool,
):
super().__init__()
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
self._async_mode = async_mode
# Map from file path to the processor
self._processors: dict[str, DagFileProcessorProcess] = {}
# Pipe for communicating signals
Expand All @@ -161,7 +157,6 @@ def start(self) -> None:
self._processor_timeout,
child_signal_conn,
self._dag_ids,
self._async_mode,
),
)
self._process = process
Expand All @@ -170,57 +165,19 @@ def start(self) -> None:

self.log.info("Launched DagFileProcessorManager with pid: %s", process.pid)

def run_single_parsing_loop(self) -> None:
"""
Send agent heartbeat signal to the manager, requesting that it runs one processing "loop".
Should only be used when launched DAG file processor manager in sync mode.
Call wait_until_finished to ensure that any launched processors have finished before continuing.
"""
if not self._parent_signal_conn or not self._process:
raise ValueError("Process not started.")
if not self._process.is_alive():
return

try:
self._parent_signal_conn.send(DagParsingSignal.AGENT_RUN_ONCE)
except ConnectionError:
# If this died cos of an error then we will noticed and restarted
# when harvest_serialized_dags calls _heartbeat_manager.
pass

def get_callbacks_pipe(self) -> MultiprocessingConnection:
"""Return the pipe for sending Callbacks to DagProcessorManager."""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
return self._parent_signal_conn

def wait_until_finished(self) -> None:
"""Wait until DAG parsing is finished."""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
if self._async_mode:
raise RuntimeError("wait_until_finished should only be called in sync_mode")
while self._parent_signal_conn.poll(timeout=None):
try:
result = self._parent_signal_conn.recv()
except EOFError:
return
self._process_message(result)
if isinstance(result, DagParsingStat):
# In sync mode (which is the only time we call this function) we don't send this message from
# the Manager until all the running processors have finished
return

@staticmethod
def _run_processor_manager(
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: list[str] | None,
async_mode: bool,
) -> None:
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
Expand All @@ -241,7 +198,6 @@ def _run_processor_manager(
processor_timeout=processor_timeout,
dag_ids=dag_ids,
signal_conn=signal_conn,
async_mode=async_mode,
)
processor_manager.start()

Expand Down Expand Up @@ -352,7 +308,6 @@ class DagFileProcessorManager(LoggingMixin):
:param processor_timeout: How long to wait before timing out a DAG file processor
:param signal_conn: connection to communicate signal with processor agent.
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param async_mode: whether to start the manager in async mode
"""

def __init__(
Expand All @@ -362,7 +317,6 @@ def __init__(
processor_timeout: timedelta,
dag_ids: list[str] | None,
signal_conn: MultiprocessingConnection | None = None,
async_mode: bool = True,
):
super().__init__()
# known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly
Expand All @@ -372,30 +326,16 @@ def __init__(
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: float | None = None
self._dag_directory = dag_directory
# Set the signal conn in to non-blocking mode, so that attempting to
# send when the buffer is full errors, rather than hangs for-ever
# attempting to send (this is to avoid deadlocks!)
#
# Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
# continue the scheduler
if self._async_mode and self._direct_scheduler_conn is not None:
if self._direct_scheduler_conn:
os.set_blocking(self._direct_scheduler_conn.fileno(), False)

self.standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._parallelism = conf.getint("scheduler", "parsing_processes")
if (
conf.get_mandatory_value("database", "sql_alchemy_conn").startswith("sqlite")
and self._parallelism > 1
):
self.log.warning(
"Because we cannot use more than 1 thread (parsing_processes = "
"%d) when using sqlite. So we set parallelism to 1.",
self._parallelism,
)
self._parallelism = 1

# Parse and schedule each file no faster than this interval.
self._file_process_interval = conf.getint("scheduler", "min_file_process_interval")
Expand Down Expand Up @@ -531,20 +471,13 @@ def deactivate_stale_dags(
cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated)

def _run_parsing_loop(self):
# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
poll_time = 0.0
else:
poll_time = None
poll_time = 0.0

self._refresh_dag_dir()
self.prepare_file_path_queue()
max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop")

if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
self.start_new_processes()
while True:
with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span:
loop_start_time = time.monotonic()
Expand All @@ -557,36 +490,16 @@ def _run_parsing_loop(self):

self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
if span.is_recording():
span.add_event(name="terminate")
self.terminate()
break
elif agent_signal == DagParsingSignal.END_MANAGER:
if span.is_recording():
span.add_event(name="end")
self.end()
sys.exit(os.EX_OK)
elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
# continue the loop to parse dags
pass
elif isinstance(agent_signal, CallbackRequest):
self._add_callback_to_queue(agent_signal)
else:
raise ValueError(f"Invalid message {type(agent_signal)}")

if not ready and not self._async_mode:
# In "sync" mode we don't want to parse the DAGs until we
# are told to (as that would open another connection to the
# SQLite DB which isn't a good practice

# This shouldn't happen, as in sync mode poll should block for
# ever. Lets be defensive about that.
self.log.warning(
"wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time
)

continue

for sentinel in ready:
if sentinel is not self._direct_scheduler_conn:
processor = self.waitables.get(sentinel)
Expand Down Expand Up @@ -631,14 +544,6 @@ def _run_parsing_loop(self):
# Update number of loop iteration.
self._num_run += 1

if not self._async_mode:
self.log.debug("Waiting for processors to finish since we're using sqlite")
# Wait until the running DAG processors are finished before
# sending a DagParsingStat message back. This means the Agent
# can tell we've got to the end of this iteration when it sees
# this type of message
self.wait_until_finished()

# Collect anything else that has finished, but don't kick off any more processors
if span.is_recording():
span.add_event(name="collect_results")
Expand All @@ -664,10 +569,9 @@ def _run_parsing_loop(self):
except BlockingIOError:
# Try again next time around the loop!

# It is better to fail, than it is deadlock. This should
# "almost never happen" since the DagParsingStat object is
# small, and in async mode this stat is not actually _required_
# for normal operation (It only drives "max runs")
# It is better to fail, than it is deadlock. This should "almost never happen" since the
# DagParsingStat object is small, and is not actually _required_ for normal operation (It
# only drives "max runs")
self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring")

if max_runs_reached:
Expand All @@ -683,12 +587,11 @@ def _run_parsing_loop(self):
)
break

if self._async_mode:
loop_duration = time.monotonic() - loop_start_time
if loop_duration < 1:
poll_time = 1 - loop_duration
else:
poll_time = 0.0
loop_duration = time.monotonic() - loop_start_time
if loop_duration < 1:
poll_time = 1 - loop_duration
else:
poll_time = 0.0

@classmethod
@provide_session
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class BaseExecutor(LoggingMixin):
supports_sentry: bool = False

is_local: bool = False
is_single_threaded: bool = False
is_production: bool = True

change_sensor_mode_to_reschedule: bool = False
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class DebugExecutor(BaseExecutor):

_terminated = threading.Event()

is_single_threaded: bool = True
is_production: bool = False

change_sensor_mode_to_reschedule: bool = True
Expand Down
49 changes: 4 additions & 45 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

from __future__ import annotations

import functools
import logging
import os
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowConfigException, UnknownExecutorException
Expand Down Expand Up @@ -236,68 +234,29 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
return executor

@classmethod
def import_executor_cls(
cls, executor_name: ExecutorName, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Import the executor class.
Supports the same formats as ExecutorLoader.load_executor.
:param executor_name: Name of core executor or module path to executor.
:param validate: Whether or not to validate the executor before returning
:return: executor class via executor_name and executor import source
"""

def _import_and_validate(path: str) -> type[BaseExecutor]:
executor = import_string(path)
if validate:
cls.validate_database_executor_compatibility(executor)
return executor

return _import_and_validate(executor_name.module_path), executor_name.connector_source
return import_string(executor_name.module_path), executor_name.connector_source

@classmethod
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Import the default executor class.
:param validate: Whether or not to validate the executor before returning
:return: executor class and executor import source
"""
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name, validate=validate)
executor, source = cls.import_executor_cls(executor_name)
return executor, source

@classmethod
@functools.cache
def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None:
"""
Validate database and executor compatibility.
Most of the databases work universally, but SQLite can only work with
single-threaded executors (e.g. Sequential).
This is NOT done in ``airflow.configuration`` (when configuration is
initialized) because loading the executor class is heavy work we want to
avoid unless needed.
"""
# Single threaded executors can run with any backend.
if executor.is_single_threaded:
return

# This is set in tests when we want to be able to use SQLite.
if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1":
return

from airflow.settings import engine

# SQLite only works with single threaded executors
if engine.dialect.name == "sqlite":
raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}")

@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class SequentialExecutor(BaseExecutor):
"""

is_local: bool = True
is_single_threaded: bool = True
is_production: bool = False

serve_logs: bool = True
Expand Down
Loading

0 comments on commit cb74a41

Please sign in to comment.