Skip to content

Commit

Permalink
feat: Use dbt state to avoid large table rebuilds
Browse files Browse the repository at this point in the history
  • Loading branch information
bmtcril committed Feb 28, 2024
1 parent f8afced commit a89c47b
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 14 deletions.
31 changes: 26 additions & 5 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ jobs:
run: |
tutor local do alembic -c "downgrade base"
tutor local do alembic -c "upgrade head"
- name: Test DBT
run: tutor local do dbt -c "test"
# This should:
# 1. Find no models on the first run, since state should be up to date
# 2. Force run all models
# 3. Successfully run tests
- name: Test dbt
run: |
tutor local do dbt -c "run"
tutor local do dbt --only_changed False -c "run"
tutor local do dbt --only_changed False -c "test"
- name: Load test
run: tutor local do load-xapi-test-data
- name: Import demo course
Expand Down Expand Up @@ -111,8 +118,15 @@ jobs:
run: |
tutor dev do alembic -c "downgrade base"
tutor dev do alembic -c "upgrade head"
- name: Test DBT
run: tutor dev do dbt -c "test"
# This should:
# 1. Find no models on the first run, since state should be up to date
# 2. Force run all models
# 3. Successfully run tests
- name: Test dbt
run: |
tutor local do dbt -c "run"
tutor local do dbt --only_changed False -c "run"
tutor local do dbt --only_changed False -c "test"
- name: Load test
run: tutor dev do load-xapi-test-data
- name: Import demo course
Expand Down Expand Up @@ -183,8 +197,15 @@ jobs:
run: |
tutor k8s do alembic -c "downgrade base"
tutor k8s do alembic -c "upgrade head"
# This should:
# 1. Find no models on the first run, since state should be up to date
# 2. Force run all models
# 3. Successfully run tests
- name: Test DBT
run: tutor k8s do dbt -c "test"
run: |
tutor local do dbt -c "run"
tutor local do dbt --only_changed False -c "run"
tutor k8s do dbt --only_changed False -c "test"
- name: Load test
run: tutor k8s do load-xapi-test-data
- name: Import demo course
Expand Down
18 changes: 16 additions & 2 deletions tutoraspects/commands_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,22 @@
tutor local do dbt -c "run -m enrollments_by_day --threads 4"
""",
)
@click.option(
"--only_changed",
default=True,
type=click.UNPROCESSED,
help="""Whether to only run models that have changed since the last dbt run. Since
re-running materialized views will recreate potentially huge datasets and
incur downtime, this defaults to true.
If no prior state is found, the command will run as if this was False.
If your command fails due to an issue with "state:modified", you may need to
set this to False.
""",
)
@click.pass_obj
def dbt(context, command) -> None:
def dbt(context, only_changed, command) -> None:
"""
Job that proxies dbt commands to a container which runs them against ClickHouse.
"""
Expand All @@ -33,7 +47,7 @@ def dbt(context, command) -> None:

command = f"""echo 'Making dbt script executable...'
echo 'Running dbt {command}'
bash /app/aspects/scripts/dbt.sh {command}
bash /app/aspects/scripts/dbt.sh {only_changed} {command}
echo 'Done!';
"""
runner.run_job("aspects", command)
Expand Down
20 changes: 17 additions & 3 deletions tutoraspects/commands_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,30 @@ def load_xapi_test_data(config_file) -> list[tuple[str, str]]:
tutor local do dbt -c "run -m enrollments_by_day --threads 4"
""",
)
def dbt(command: string) -> list[tuple[str, str]]:
@click.option(
"--only_changed",
default=True,
type=click.UNPROCESSED,
help="""Whether to only run models that have changed since the last dbt run. Since
re-running materialized views will recreate potentially huge datasets and
incur downtime, this defaults to true.
If no prior state is found, the command will run as if this was False.
If your command fails due to an issue with "state:modified", you may need to
set this to False.
""",
)
def dbt(only_changed: bool, command: string) -> list[tuple[str, str]]:
"""
Job that proxies dbt commands to a container which runs them against ClickHouse.
"""
return [
(
"aspects",
"echo 'Making dbt script executable...' && "
f"echo 'Running dbt {command}' && "
f"bash /app/aspects/scripts/dbt.sh {command} && "
f"echo 'Running dbt, only_changed: {only_changed} command: {command}' && "
f"bash /app/aspects/scripts/dbt.sh {only_changed} {command} && "
"echo 'Done!';",
),
]
Expand Down
7 changes: 7 additions & 0 deletions tutoraspects/patches/k8s-jobs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ spec:
value: {{ ASPECTS_XAPI_DATABASE }}
- name: ASPECTS_ENROLLMENT_EVENTS_TABLE
value: {{ ASPECTS_ENROLLMENT_EVENTS_TABLE }}
- name: DBT_STATE
value: {{ DBT_STATE_DIR }}
image: {{ DOCKER_IMAGE_ASPECTS }}
securityContext:
allowPrivilegeEscalation: false
Expand All @@ -35,7 +37,12 @@ spec:
name: alembic
- mountPath: /app/aspects/migrations/alembic/versions
name: versions
- mountPath: {{ DBT_STATE_DIR }}
name: dbt-state
volumes:
- name: dbt-state
persistentVolumeClaim:
claimName: aspects-dbt
- name: scripts
configMap:
name: aspects-scripts
Expand Down
15 changes: 15 additions & 0 deletions tutoraspects/patches/k8s-volumes
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: aspects-dbt
labels:
app.kubernetes.io/component: volume
app.kubernetes.io/name: aspects-dbt
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10M

{% if RUN_CLICKHOUSE %}
---
apiVersion: v1
Expand Down
1 change: 1 addition & 0 deletions tutoraspects/patches/local-docker-compose-jobs-services
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ aspects-job:
volumes:
- ../../env/plugins/aspects/apps/aspects:/app/aspects
- ../../env/plugins/aspects/apps/aspects/scripts/:/app/aspects/scripts:ro
- ../../data/aspects-dbt:{{ DBT_STATE_DIR }}
{% if RUN_SUPERSET or RUN_CLICKHOUSE or RUN_RALPH %}depends_on:{% if RUN_SUPERSET %}
- superset{% endif %}{% if RUN_CLICKHOUSE%}
- clickhouse{% endif %}{% if RUN_RALPH %}
Expand Down
1 change: 1 addition & 0 deletions tutoraspects/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@
("DBT_REPOSITORY", "https://github.com/openedx/aspects-dbt"),
("DBT_BRANCH", "v3.5.0"),
("DBT_SSH_KEY", ""),
("DBT_STATE_DIR", "/app/aspects/dbt_state/"),
# This is a pip compliant list of Python packages to install to run dbt
# make sure packages with versions are enclosed in double quotes
("EXTRA_DBT_PACKAGES", []),
Expand Down
23 changes: 20 additions & 3 deletions tutoraspects/templates/aspects/apps/aspects/scripts/dbt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,28 @@ cd aspects-dbt || exit

export ASPECTS_EVENT_SINK_DATABASE={{ASPECTS_EVENT_SINK_DATABASE}}
export ASPECTS_XAPI_DATABASE={{ASPECTS_XAPI_DATABASE}}
export DBT_STATE={{ DBT_STATE_DIR }}

echo "Installing dbt dependencies"
dbt deps --profiles-dir /app/aspects/dbt/

echo "Running dbt $*"
dbt "$@" --profiles-dir /app/aspects/dbt/

echo "Running dbt ${@:2}"

if [ "$1" == "True" ]
then
echo "Requested to only run modified state, checking for ${DBT_STATE}/manifest.json"
fi

# If state exists and we've asked to only run changed files, add the flag
if [ "$1" == "True" ] && [ -e "${DBT_STATE}/manifest.json" ]
then
echo "Found {{DBT_STATE_DIR}}/manifest.json so only running modified items and their downstreams"
dbt "${@:2}" --profiles-dir /app/aspects/dbt/ -s state:modified+
else
echo "Running command *without* state:modified+ this may take a long time."
dbt "${@:2}" --profiles-dir /app/aspects/dbt/
fi

rm -rf ${DBT_STATE}/*
cp -r ./target/manifest.json ${DBT_STATE}
rm -rf aspects-dbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

bash /app/aspects/scripts/alembic.sh upgrade head

bash /app/aspects/scripts/dbt.sh run
bash /app/aspects/scripts/dbt.sh True run

0 comments on commit a89c47b

Please sign in to comment.