Skip to content

Commit

Permalink
perf: slightly incresed Airflow concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Stablum committed Nov 11, 2021
1 parent 97b6e4c commit 7747d65
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 14 deletions.
16 changes: 8 additions & 8 deletions airflow/airflow.cfg.m4
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 16
sql_alchemy_pool_size = 24

# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,
Expand All @@ -54,7 +54,7 @@ sql_alchemy_pool_size = 16
# and the total number of "sleeping" connections the pool will allow is pool_size.
# max_overflow can be set to ``-1`` to indicate no overflow limit;
# no limit will be placed on the total number of concurrent connections. Defaults to ``10``.
sql_alchemy_max_overflow = 20
sql_alchemy_max_overflow = 40

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
Expand All @@ -81,23 +81,23 @@ sql_alchemy_schema =
# This defines the maximum number of task instances that can run concurrently in Airflow
# regardless of scheduler count and worker count. Generally, this value is reflective of
# the number of task instances with the running state in the metadata database.
parallelism = 32
parallelism = 24

# The maximum number of task instances allowed to run concurrently in each DAG. To calculate
# the number of tasks that is running concurrently for a DAG, add up the number of running
# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``concurrency``,
# which is defaulted as ``dag_concurrency``.
dag_concurrency = 16
dag_concurrency = 24

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# The maximum number of active DAG runs per DAG. The scheduler will not create more DAG runs
# if it reaches the limit. This is configurable at the DAG level with ``max_active_runs``,
# which is defaulted as ``max_active_runs_per_dag``.
max_active_runs_per_dag = 16
max_active_runs = 16
max_threads = 16
max_active_runs_per_dag = 24
max_active_runs = 24
max_threads = 24

# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
Expand Down Expand Up @@ -685,7 +685,7 @@ celery_app_name = airflow.executors.celery_executor
# ``airflow celery worker`` command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
worker_concurrency = 24

# The maximum and minimum concurrency that will be used when starting workers with the
# ``airflow celery worker`` command (always keep minimum processes, but grow
Expand Down
9 changes: 9 additions & 0 deletions airflow/last_run_id.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

RUN_ID=$(airflow dags list-runs --state running -d download_and_preprocess_sets -o plain |
grep download_and_preprocess_sets |
head -n 1 |
awk '{print $2}')

echo $RUN_ID

8 changes: 8 additions & 0 deletions airflow/retry_tasks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

# some tasks may fail, but that doesn't mean that the entire dataset download and creation
# process also has to fail. This script marks some failed tasks as successful so dependent
# tasks can continue being run even with slightly less data

RUN_ID=$(bash last_run_id.sh)
echo RUN_ID:$RUN_ID
6 changes: 1 addition & 5 deletions airflow/unstuck_tasks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,5 @@
# process also has to fail. This script marks some failed tasks as successful so dependent
# tasks can continue being run even with slightly less data

RUN_ID=$(airflow dags list-runs --state running -d download_and_preprocess_sets -o plain |
grep download_and_preprocess_sets |
head -n 1 |
awk '{print $2}')

RUN_ID=$(bash last_run_id.sh)
echo RUN_ID:$RUN_ID
2 changes: 1 addition & 1 deletion preprocess/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def parse(page, ti):
+ f"(rel:{rel}, lens:{lens} activity_id:{activity_id}, fields:{fields}"
)
# remove the invalid activity-set
rels_vals.pop(activity_id)
rels_vals.pop(activity_id, None)
large_mp.send(ti, rels_vals)
large_mp.clear_recv(ti, f"download_{page}")

Expand Down

0 comments on commit 7747d65

Please sign in to comment.