diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 7cf97ec5f..b561d3913 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -58,11 +58,13 @@ jobs: - name: Init clickhouse run: tutor local do init-clickhouse # This should: - # 1. Find no models on the first run, since state should be up to date - # 2. Force run all models - # 3. Successfully run tests + # 1. Run all models, since alembic test removed our state + # 2. Find no models on the first run, state should now be up to date now + # 3. Force run all models + # 4. Successfully run tests - name: Test dbt run: | + tutor local do dbt -c "run" tutor local do dbt -c "run" tutor local do dbt --only_changed False -c "run" tutor local do dbt --only_changed False -c "test" @@ -126,11 +128,13 @@ jobs: - name: Init clickhouse run: tutor dev do init-clickhouse # This should: - # 1. Find no models on the first run, since state should be up to date - # 2. Force run all models - # 3. Successfully run tests + # 1. Run all models, since alembic test removed our state + # 2. Find no models on the first run, state should now be up to date now + # 3. Force run all models + # 4. Successfully run tests - name: Test dbt run: | + tutor dev do dbt -c "run" tutor dev do dbt -c "run" tutor dev do dbt --only_changed False -c "run" tutor dev do dbt --only_changed False -c "test" @@ -212,11 +216,13 @@ jobs: - name: Init clickhouse run: tutor k8s do init-clickhouse # This should: - # 1. Find no models on the first run, since state should be up to date - # 2. Force run all models - # 3. Successfully run tests - - name: Test DBT + # 1. Run all models, since alembic test removed our state + # 2. Find no models on the first run, state should now be up to date now + # 3. Force run all models + # 4. Successfully run tests + - name: Test dbt run: | + tutor k8s do dbt -c "run" tutor k8s do dbt -c "run" tutor k8s do dbt --only_changed False -c "run" tutor k8s do dbt --only_changed False -c "test" diff --git a/tutoraspects/commands_v1.py b/tutoraspects/commands_v1.py index 7a7f77363..dffdbb23e 100644 --- a/tutoraspects/commands_v1.py +++ b/tutoraspects/commands_v1.py @@ -27,7 +27,7 @@ def load_xapi_test_data(config_file) -> list[tuple[str, str]]: return [ ( "aspects", - "echo 'Running script...' && " + f"echo 'Running script... {config_file}' && " "cd /app/aspects/scripts/ && " f"bash clickhouse-demo-xapi-data.sh {config_file} && " "echo 'Done!';", diff --git a/tutoraspects/patches/k8s-jobs b/tutoraspects/patches/k8s-jobs index 55380aace..4d8591e0a 100644 --- a/tutoraspects/patches/k8s-jobs +++ b/tutoraspects/patches/k8s-jobs @@ -19,7 +19,7 @@ spec: - name: ASPECTS_XAPI_DATABASE value: {{ASPECTS_XAPI_DATABASE}} - name: CLICKHOUSE_CLUSTER_NAME - value: {{CLICKHOUSE_CLUSTER_NAME}} + value: "{{CLICKHOUSE_CLUSTER_NAME}}" - name: DBT_STATE value: {{ DBT_STATE_DIR }} - name: ASPECTS_DATA_TTL_EXPRESSION diff --git a/tutoraspects/patches/local-docker-compose-jobs-services b/tutoraspects/patches/local-docker-compose-jobs-services index 5944a021a..71a186691 100644 --- a/tutoraspects/patches/local-docker-compose-jobs-services +++ b/tutoraspects/patches/local-docker-compose-jobs-services @@ -5,7 +5,7 @@ aspects-job: VENV_DIR: /opt/venv ASPECTS_EVENT_SINK_DATABASE: {{ASPECTS_EVENT_SINK_DATABASE}} ASPECTS_XAPI_DATABASE: {{ASPECTS_XAPI_DATABASE}} - CLICKHOUSE_CLUSTER_NAME: {{CLICKHOUSE_CLUSTER_NAME}} + CLICKHOUSE_CLUSTER_NAME: "{{CLICKHOUSE_CLUSTER_NAME}}" DBT_STATE: {{ DBT_STATE_DIR }} ASPECTS_DATA_TTL_EXPRESSION: "{{ ASPECTS_DATA_TTL_EXPRESSION }}" DBT_PROFILE_TARGET_DATABASE: "{{ DBT_PROFILE_TARGET_DATABASE }}" diff --git a/tutoraspects/plugin.py b/tutoraspects/plugin.py index 8a2194856..b8733da60 100644 --- a/tutoraspects/plugin.py +++ b/tutoraspects/plugin.py @@ -166,6 +166,11 @@ # the cluster here. All objects will be created "ON CLUSTER" with replicated # table types, otherwise a single server deployment will be assumed. ("CLICKHOUSE_CLUSTER_NAME", ""), + # We need to connect to a single node for Alembic and dbt DDL queries, + # otherwise the fast pace of the changes will outpace the propagation of + # changes to the cluster. We assume connection details are all the same + # except for the host. + ("CLICKHOUSE_CLUSTER_DDL_NODE_HOST", ""), # Port for the native interface exposed on the host container in Docker Compose, # this is changed from 9000 to prevent conflicts with MinIO, which also listens # on 9000. @@ -239,8 +244,13 @@ ( "CLICKHOUSE_ADMIN_SQLALCHEMY_URI", "clickhouse+native://{{CLICKHOUSE_ADMIN_USER}}:{{CLICKHOUSE_ADMIN_PASSWORD}}" - "@{{CLICKHOUSE_HOST}}:{{CLICKHOUSE_INTERNAL_NATIVE_PORT}}/{{" - "ASPECTS_XAPI_DATABASE}}" + "@" + "{% if CLICKHOUSE_CLUSTER_DDL_NODE_HOST %}" + "{{CLICKHOUSE_CLUSTER_DDL_NODE_HOST}}" + "{% else %}" + "{{CLICKHOUSE_HOST}}" + "{% endif %}" + ":{{CLICKHOUSE_INTERNAL_NATIVE_PORT}}/{{ASPECTS_XAPI_DATABASE}}" "{% if CLICKHOUSE_SECURE_CONNECTION %}?secure=True{% endif %}", ), ###################### diff --git a/tutoraspects/templates/aspects/apps/aspects/dbt/profiles.yml b/tutoraspects/templates/aspects/apps/aspects/dbt/profiles.yml index f8688ad21..074b260f2 100644 --- a/tutoraspects/templates/aspects/apps/aspects/dbt/profiles.yml +++ b/tutoraspects/templates/aspects/apps/aspects/dbt/profiles.yml @@ -4,11 +4,11 @@ aspects: # this needs to match the profile in your dbt_project.yml file prod: type: clickhouse schema: {{ DBT_PROFILE_TARGET_DATABASE }} - host: {{ CLICKHOUSE_HOST }} + host: {% if CLICKHOUSE_CLUSTER_DDL_NODE_HOST %}{{CLICKHOUSE_CLUSTER_DDL_NODE_HOST}}{% else %}{{ CLICKHOUSE_HOST }}{% endif %} port: {{ CLICKHOUSE_INTERNAL_NATIVE_PORT }} user: {{ CLICKHOUSE_ADMIN_USER }} - password: '{{ CLICKHOUSE_ADMIN_PASSWORD }}' - cluster: {{ CLICKHOUSE_CLUSTER_NAME }} + password: {{ CLICKHOUSE_ADMIN_PASSWORD }} + cluster: "{{ CLICKHOUSE_CLUSTER_NAME }}" # These are ClickHouse provider values and map directly to ClickHouse connection settings. verify: True @@ -17,15 +17,20 @@ aspects: # this needs to match the profile in your dbt_project.yml file compression: lz4 connect_timeout: 10 send_receive_timeout: 300 - cluster_mode: false + cluster_mode: {% if CLICKHOUSE_CLUSTER_NAME %}true{% else %}false{% endif %} use_lw_deletes: false check_exchange: false sync_request_timeout: 5 compress_block_size: 1048576 threads: 2 - # Without this dbt queries populating tables can be killed for using too much memory custom_settings: + # Without this dbt queries populating tables can be killed for using too much memory memory_overcommit_ratio_denominator_for_user: 0 + # Drop tables immediately so they can be recreated + database_atomic_delay_before_drop_table_sec: 0 + # Make sure all drops and detaches complete before continuing + database_atomic_wait_for_drop_and_detach_synchronously: 1 + {{ patch("dbt-profiles") | indent(6)}} diff --git a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/env.py b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/env.py index 452d13681..94f8a9418 100644 --- a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/env.py +++ b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/env.py @@ -28,6 +28,14 @@ from clickhouse_sqlalchemy.alembic.dialect import include_object, patch_alembic_version +def make_replicated_zk_path(cluster, table_name): + """ + Allows CH cluster functionality in Alembic + """ + database = "{{ ASPECTS_XAPI_DATABASE }}" + return f'/clickhouse/tables/{cluster}/{database}/{table_name}' + + def process_revision_directives(context, revision, directives): # extract Migration migration_script = directives[0] @@ -64,7 +72,23 @@ def run_migrations_offline(): ) with context.begin_transaction(): + {% if CLICKHOUSE_CLUSTER_NAME %} + # Without this, the Alembic versions table will only be created on one + # shard and will cause errors as migrations are run. + migration_context = context._proxy._migration_context + kwargs = { + 'cluster': "{{CLICKHOUSE_CLUSTER_NAME}}", + 'table_path': make_replicated_zk_path( + "{{CLICKHOUSE_CLUSTER_NAME}}", + migration_context.version_table + ), + 'replica_name': '{replica}' + } + + patch_alembic_version(context, **kwargs) + {% else %} patch_alembic_version(context) + {% endif %} context.run_migrations() @@ -88,7 +112,23 @@ def run_migrations_online(): ) with context.begin_transaction(): + {% if CLICKHOUSE_CLUSTER_NAME %} + # Without this, the Alembic versions table will only be created on one + # shard and will cause errors as migrations are run. + migration_context = context._proxy._migration_context + kwargs = { + 'cluster': "{{CLICKHOUSE_CLUSTER_NAME}}", + 'table_path': make_replicated_zk_path( + "{{CLICKHOUSE_CLUSTER_NAME}}", + migration_context.version_table + ), + 'replica_name': '{replica}' + } + + patch_alembic_version(context, **kwargs) + {% else %} patch_alembic_version(context) + {% endif %} context.run_migrations() diff --git a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0001_get_org_from_course_url.py b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0001_get_org_from_course_url.py index 7dc577812..6bc3b671b 100644 --- a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0001_get_org_from_course_url.py +++ b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0001_get_org_from_course_url.py @@ -5,7 +5,6 @@ down_revision = None branch_labels = None depends_on = None -on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else "" def upgrade(): @@ -19,7 +18,7 @@ def upgrade(): """ op.execute( f""" - CREATE OR REPLACE FUNCTION get_org_from_course_url {on_cluster} + CREATE OR REPLACE FUNCTION get_org_from_course_url AS ( course_url) -> nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9]*)'), ''); @@ -28,4 +27,4 @@ def upgrade(): def downgrade(): - op.execute("DROP FUNCTION IF EXISTS get_org_from_course_url;") + op.execute(f"DROP FUNCTION IF EXISTS get_org_from_course_url;") diff --git a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0010_course_dictionaries.py b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0010_course_dictionaries.py index 35cf13bee..654b33db0 100644 --- a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0010_course_dictionaries.py +++ b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0010_course_dictionaries.py @@ -13,22 +13,34 @@ def upgrade(): # We include these drop statements here because "CREATE OR REPLACE DICTIONARY" # currently throws a file rename error and you can't drop a dictionary with a # table referring to it. + op.execute(f""" + DETACH DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names + {on_cluster} + """) op.execute(f""" DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names {on_cluster} + SYNC """) op.execute(f""" DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names_dict {on_cluster} + SYNC + """) + op.execute(f""" + DETACH DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names + {on_cluster} """) op.execute(f""" DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names {on_cluster} + SYNC """) op.execute(f""" DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names_dict {on_cluster} + SYNC """) op.execute( f""" diff --git a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0020_get_org_from_course_url.py b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0020_get_org_from_course_url.py index 8036b1722..f075c017f 100644 --- a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0020_get_org_from_course_url.py +++ b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0020_get_org_from_course_url.py @@ -6,16 +6,15 @@ down_revision = "0017" branch_labels = None depends_on = None -on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else "" def upgrade(): op.execute( f""" - CREATE OR REPLACE FUNCTION get_org_from_course_url {on_cluster} + CREATE OR REPLACE FUNCTION get_org_from_course_url AS ( course_url) -> - nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9\w\-~.:%]*)'), ''); + nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9\\w\\-~.:%]*)'), ''); """ ) @@ -23,7 +22,7 @@ def upgrade(): def downgrade(): op.execute( f""" - CREATE OR REPLACE FUNCTION get_org_from_course_url {on_cluster} + CREATE OR REPLACE FUNCTION get_org_from_course_url AS ( course_url) -> nullIf(EXTRACT(course_url, 'course-v1:([a-zA-Z0-9]*)'), ''); diff --git a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0033_ccx_function.py b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0033_ccx_function.py index 1e6682a2a..426d66e62 100644 --- a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0033_ccx_function.py +++ b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0033_ccx_function.py @@ -8,12 +8,11 @@ down_revision = "0032" branch_labels = None depends_on = None -on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else "" def upgrade(): op.execute( f""" - CREATE OR REPLACE FUNCTION get_org_from_ccx_course_url {on_cluster} + CREATE OR REPLACE FUNCTION get_org_from_ccx_course_url AS ( course_url) -> nullIf(EXTRACT(course_url, 'ccx-v1:([a-zA-Z0-9]*)'), ''); @@ -24,4 +23,4 @@ def upgrade(): def downgrade(): - op.execute("DROP FUNCTION IF EXISTS get_org_from_ccx_course_url;") + op.execute(f"DROP FUNCTION IF EXISTS get_org_from_ccx_course_url;") diff --git a/tutoraspects/templates/aspects/apps/aspects/scripts/clickhouse-demo-xapi-data.sh b/tutoraspects/templates/aspects/apps/aspects/scripts/clickhouse-demo-xapi-data.sh index 2e572c309..219ac932d 100755 --- a/tutoraspects/templates/aspects/apps/aspects/scripts/clickhouse-demo-xapi-data.sh +++ b/tutoraspects/templates/aspects/apps/aspects/scripts/clickhouse-demo-xapi-data.sh @@ -1,5 +1,6 @@ #!/usr/bin/env bash echo "Loading demo xAPI data..." +cat $1 xapi-db-load load-db --config_file $1 || { ec=$?; printf '%s\n' "Error loading demo data!" >&2; exit $ec; } echo "Done." diff --git a/tutoraspects/templates/aspects/apps/aspects/scripts/insert_data.py b/tutoraspects/templates/aspects/apps/aspects/scripts/insert_data.py index 9ce591066..db7d83d67 100644 --- a/tutoraspects/templates/aspects/apps/aspects/scripts/insert_data.py +++ b/tutoraspects/templates/aspects/apps/aspects/scripts/insert_data.py @@ -11,7 +11,9 @@ client = clickhouse_connect.get_client( host="{{CLICKHOUSE_HOST}}", username='{{CLICKHOUSE_ADMIN_USER}}', - password='{{CLICKHOUSE_ADMIN_PASSWORD}}' + password='{{CLICKHOUSE_ADMIN_PASSWORD}}', + port={{ CLICKHOUSE_INTERNAL_HTTP_PORT }}, + secure={{ CLICKHOUSE_SECURE_CONNECTION }} ) def sink_files():