Skip to content

Commit

Permalink
Altinity / clustering fixes (#767)
Browse files Browse the repository at this point in the history
These changes should fix the remaining issues with running Aspects in a ClickHouse Cluster such as in Altinity or using their k8s operator. It assumes you can run DDL on one node in a cluster, and that the `user_defined_zookeeper_path` ClickHouse setting is set so that user defined functions don't specifically need to be created `ON CLUSTER`.
  • Loading branch information
bmtcril authored Aug 21, 2024
1 parent b76e3ea commit 726b821
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 31 deletions.
26 changes: 16 additions & 10 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ jobs:
- name: Init clickhouse
run: tutor local do init-clickhouse
# This should:
# 1. Find no models on the first run, since state should be up to date
# 2. Force run all models
# 3. Successfully run tests
# 1. Run all models, since alembic test removed our state
# 2. Find no models on the first run, state should now be up to date now
# 3. Force run all models
# 4. Successfully run tests
- name: Test dbt
run: |
tutor local do dbt -c "run"
tutor local do dbt -c "run"
tutor local do dbt --only_changed False -c "run"
tutor local do dbt --only_changed False -c "test"
Expand Down Expand Up @@ -126,11 +128,13 @@ jobs:
- name: Init clickhouse
run: tutor dev do init-clickhouse
# This should:
# 1. Find no models on the first run, since state should be up to date
# 2. Force run all models
# 3. Successfully run tests
# 1. Run all models, since alembic test removed our state
# 2. Find no models on the first run, state should now be up to date now
# 3. Force run all models
# 4. Successfully run tests
- name: Test dbt
run: |
tutor dev do dbt -c "run"
tutor dev do dbt -c "run"
tutor dev do dbt --only_changed False -c "run"
tutor dev do dbt --only_changed False -c "test"
Expand Down Expand Up @@ -212,11 +216,13 @@ jobs:
- name: Init clickhouse
run: tutor k8s do init-clickhouse
# This should:
# 1. Find no models on the first run, since state should be up to date
# 2. Force run all models
# 3. Successfully run tests
- name: Test DBT
# 1. Run all models, since alembic test removed our state
# 2. Find no models on the first run, state should now be up to date now
# 3. Force run all models
# 4. Successfully run tests
- name: Test dbt
run: |
tutor k8s do dbt -c "run"
tutor k8s do dbt -c "run"
tutor k8s do dbt --only_changed False -c "run"
tutor k8s do dbt --only_changed False -c "test"
Expand Down
2 changes: 1 addition & 1 deletion tutoraspects/commands_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def load_xapi_test_data(config_file) -> list[tuple[str, str]]:
return [
(
"aspects",
"echo 'Running script...' && "
f"echo 'Running script... {config_file}' && "
"cd /app/aspects/scripts/ && "
f"bash clickhouse-demo-xapi-data.sh {config_file} && "
"echo 'Done!';",
Expand Down
2 changes: 1 addition & 1 deletion tutoraspects/patches/k8s-jobs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ spec:
- name: ASPECTS_XAPI_DATABASE
value: {{ASPECTS_XAPI_DATABASE}}
- name: CLICKHOUSE_CLUSTER_NAME
value: {{CLICKHOUSE_CLUSTER_NAME}}
value: "{{CLICKHOUSE_CLUSTER_NAME}}"
- name: DBT_STATE
value: {{ DBT_STATE_DIR }}
- name: ASPECTS_DATA_TTL_EXPRESSION
Expand Down
2 changes: 1 addition & 1 deletion tutoraspects/patches/local-docker-compose-jobs-services
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ aspects-job:
VENV_DIR: /opt/venv
ASPECTS_EVENT_SINK_DATABASE: {{ASPECTS_EVENT_SINK_DATABASE}}
ASPECTS_XAPI_DATABASE: {{ASPECTS_XAPI_DATABASE}}
CLICKHOUSE_CLUSTER_NAME: {{CLICKHOUSE_CLUSTER_NAME}}
CLICKHOUSE_CLUSTER_NAME: "{{CLICKHOUSE_CLUSTER_NAME}}"
DBT_STATE: {{ DBT_STATE_DIR }}
ASPECTS_DATA_TTL_EXPRESSION: "{{ ASPECTS_DATA_TTL_EXPRESSION }}"
DBT_PROFILE_TARGET_DATABASE: "{{ DBT_PROFILE_TARGET_DATABASE }}"
Expand Down
14 changes: 12 additions & 2 deletions tutoraspects/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@
# the cluster here. All objects will be created "ON CLUSTER" with replicated
# table types, otherwise a single server deployment will be assumed.
("CLICKHOUSE_CLUSTER_NAME", ""),
# We need to connect to a single node for Alembic and dbt DDL queries,
# otherwise the fast pace of the changes will outpace the propagation of
# changes to the cluster. We assume connection details are all the same
# except for the host.
("CLICKHOUSE_CLUSTER_DDL_NODE_HOST", ""),
# Port for the native interface exposed on the host container in Docker Compose,
# this is changed from 9000 to prevent conflicts with MinIO, which also listens
# on 9000.
Expand Down Expand Up @@ -239,8 +244,13 @@
(
"CLICKHOUSE_ADMIN_SQLALCHEMY_URI",
"clickhouse+native://{{CLICKHOUSE_ADMIN_USER}}:{{CLICKHOUSE_ADMIN_PASSWORD}}"
"@{{CLICKHOUSE_HOST}}:{{CLICKHOUSE_INTERNAL_NATIVE_PORT}}/{{"
"ASPECTS_XAPI_DATABASE}}"
"@"
"{% if CLICKHOUSE_CLUSTER_DDL_NODE_HOST %}"
"{{CLICKHOUSE_CLUSTER_DDL_NODE_HOST}}"
"{% else %}"
"{{CLICKHOUSE_HOST}}"
"{% endif %}"
":{{CLICKHOUSE_INTERNAL_NATIVE_PORT}}/{{ASPECTS_XAPI_DATABASE}}"
"{% if CLICKHOUSE_SECURE_CONNECTION %}?secure=True{% endif %}",
),
######################
Expand Down
15 changes: 10 additions & 5 deletions tutoraspects/templates/aspects/apps/aspects/dbt/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ aspects: # this needs to match the profile in your dbt_project.yml file
prod:
type: clickhouse
schema: {{ DBT_PROFILE_TARGET_DATABASE }}
host: {{ CLICKHOUSE_HOST }}
host: {% if CLICKHOUSE_CLUSTER_DDL_NODE_HOST %}{{CLICKHOUSE_CLUSTER_DDL_NODE_HOST}}{% else %}{{ CLICKHOUSE_HOST }}{% endif %}
port: {{ CLICKHOUSE_INTERNAL_NATIVE_PORT }}
user: {{ CLICKHOUSE_ADMIN_USER }}
password: '{{ CLICKHOUSE_ADMIN_PASSWORD }}'
cluster: {{ CLICKHOUSE_CLUSTER_NAME }}
password: {{ CLICKHOUSE_ADMIN_PASSWORD }}
cluster: "{{ CLICKHOUSE_CLUSTER_NAME }}"

# These are ClickHouse provider values and map directly to ClickHouse connection settings.
verify: True
Expand All @@ -17,15 +17,20 @@ aspects: # this needs to match the profile in your dbt_project.yml file
compression: lz4
connect_timeout: 10
send_receive_timeout: 300
cluster_mode: false
cluster_mode: {% if CLICKHOUSE_CLUSTER_NAME %}true{% else %}false{% endif %}
use_lw_deletes: false
check_exchange: false
sync_request_timeout: 5
compress_block_size: 1048576
threads: 2

# Without this dbt queries populating tables can be killed for using too much memory
custom_settings:
# Without this dbt queries populating tables can be killed for using too much memory
memory_overcommit_ratio_denominator_for_user: 0
# Drop tables immediately so they can be recreated
database_atomic_delay_before_drop_table_sec: 0
# Make sure all drops and detaches complete before continuing
database_atomic_wait_for_drop_and_detach_synchronously: 1


{{ patch("dbt-profiles") | indent(6)}}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
from clickhouse_sqlalchemy.alembic.dialect import include_object, patch_alembic_version


def make_replicated_zk_path(cluster, table_name):
"""
Allows CH cluster functionality in Alembic
"""
database = "{{ ASPECTS_XAPI_DATABASE }}"
return f'/clickhouse/tables/{cluster}/{database}/{table_name}'


def process_revision_directives(context, revision, directives):
# extract Migration
migration_script = directives[0]
Expand Down Expand Up @@ -64,7 +72,23 @@ def run_migrations_offline():
)

with context.begin_transaction():
{% if CLICKHOUSE_CLUSTER_NAME %}
# Without this, the Alembic versions table will only be created on one
# shard and will cause errors as migrations are run.
migration_context = context._proxy._migration_context
kwargs = {
'cluster': "{{CLICKHOUSE_CLUSTER_NAME}}",
'table_path': make_replicated_zk_path(
"{{CLICKHOUSE_CLUSTER_NAME}}",
migration_context.version_table
),
'replica_name': '{replica}'
}

patch_alembic_version(context, **kwargs)
{% else %}
patch_alembic_version(context)
{% endif %}
context.run_migrations()


Expand All @@ -88,7 +112,23 @@ def run_migrations_online():
)

with context.begin_transaction():
{% if CLICKHOUSE_CLUSTER_NAME %}
# Without this, the Alembic versions table will only be created on one
# shard and will cause errors as migrations are run.
migration_context = context._proxy._migration_context
kwargs = {
'cluster': "{{CLICKHOUSE_CLUSTER_NAME}}",
'table_path': make_replicated_zk_path(
"{{CLICKHOUSE_CLUSTER_NAME}}",
migration_context.version_table
),
'replica_name': '{replica}'
}

patch_alembic_version(context, **kwargs)
{% else %}
patch_alembic_version(context)
{% endif %}
context.run_migrations()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
down_revision = None
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""


def upgrade():
Expand All @@ -19,7 +18,7 @@ def upgrade():
"""
op.execute(
f"""
CREATE OR REPLACE FUNCTION get_org_from_course_url {on_cluster}
CREATE OR REPLACE FUNCTION get_org_from_course_url
AS (
course_url) ->
nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9]*)'), '');
Expand All @@ -28,4 +27,4 @@ def upgrade():


def downgrade():
op.execute("DROP FUNCTION IF EXISTS get_org_from_course_url;")
op.execute(f"DROP FUNCTION IF EXISTS get_org_from_course_url;")
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,34 @@ def upgrade():
# We include these drop statements here because "CREATE OR REPLACE DICTIONARY"
# currently throws a file rename error and you can't drop a dictionary with a
# table referring to it.
op.execute(f"""
DETACH DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names
{on_cluster}
""")
op.execute(f"""
DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names
{on_cluster}
SYNC
""")
op.execute(f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names_dict
{on_cluster}
SYNC
""")
op.execute(f"""
DETACH DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names
{on_cluster}
""")
op.execute(f"""
DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names
{on_cluster}
SYNC
""")
op.execute(f"""
DROP DICTIONARY IF EXISTS
{{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names_dict
{on_cluster}
SYNC
""")
op.execute(
f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@
down_revision = "0017"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""


def upgrade():
op.execute(
f"""
CREATE OR REPLACE FUNCTION get_org_from_course_url {on_cluster}
CREATE OR REPLACE FUNCTION get_org_from_course_url
AS (
course_url) ->
nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9\w\-~.:%]*)'), '');
nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9\\w\\-~.:%]*)'), '');
"""
)


def downgrade():
op.execute(
f"""
CREATE OR REPLACE FUNCTION get_org_from_course_url {on_cluster}
CREATE OR REPLACE FUNCTION get_org_from_course_url
AS (
course_url) ->
nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9]*)'), '');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
down_revision = "0032"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""

def upgrade():
op.execute(
f"""
CREATE OR REPLACE FUNCTION get_org_from_ccx_course_url {on_cluster}
CREATE OR REPLACE FUNCTION get_org_from_ccx_course_url
AS (
course_url) ->
nullIf(EXTRACT(course_url, 'ccx-v1:([a-zA-Z0-9]*)'), '');
Expand All @@ -24,4 +23,4 @@ def upgrade():


def downgrade():
op.execute("DROP FUNCTION IF EXISTS get_org_from_ccx_course_url;")
op.execute(f"DROP FUNCTION IF EXISTS get_org_from_ccx_course_url;")
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env bash
echo "Loading demo xAPI data..."
cat $1
xapi-db-load load-db --config_file $1 || { ec=$?; printf '%s\n' "Error loading demo data!" >&2; exit $ec; }

echo "Done."
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
client = clickhouse_connect.get_client(
host="{{CLICKHOUSE_HOST}}",
username='{{CLICKHOUSE_ADMIN_USER}}',
password='{{CLICKHOUSE_ADMIN_PASSWORD}}'
password='{{CLICKHOUSE_ADMIN_PASSWORD}}',
port={{ CLICKHOUSE_INTERNAL_HTTP_PORT }},
secure={{ CLICKHOUSE_SECURE_CONNECTION }}
)

def sink_files():
Expand Down

0 comments on commit 726b821

Please sign in to comment.