From afb3a0fbf763f7e54322ac4427fccee3503805dc Mon Sep 17 00:00:00 2001 From: Xiatong Zheng Date: Mon, 21 Oct 2024 16:34:41 +0200 Subject: [PATCH 1/2] feat: skip lightweight delete when there is no data to delete --- .../incremental/incremental.sql | 53 +++++++---- .../test_distributed_incremental.py | 91 +++++++++++++++---- tests/integration/conftest.py | 10 +- 3 files changed, 116 insertions(+), 38 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index c4dcb1b8..6e7e3af4 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -220,23 +220,44 @@ {% endcall %} {% endif %} - {% call statement('delete_existing_data') %} - {% if is_distributed %} - {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} - delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) - {% else %} - delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) + {% set delete_filter %} + select distinct {{ unique_key }} from {{ inserting_relation }} + {% endset %} + {% set data_to_delete_count_query %} + select count(*) from {{ existing_relation }} where ({{ unique_key }}) global in ({{ delete_filter }}) + {% endset %} + {% set data_to_delete_count = run_query(data_to_delete_count_query).rows[0].values()[0] %} + {% if data_to_delete_count > 0 %} + {{ log(data_to_delete_count ~ " rows to be deleted.", info=True) }} + {% set unique_key_query %} + -- https://github.com/ClickHouse/ClickHouse/issues/69559 + select count(distinct {{ unique_key }}) from {{ inserting_relation }} + {% endset %} + {% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %} + {% if unique_key_count == 1 %} + {% set query %} + select toString(tuple({{ unique_key }})) from {{ inserting_relation }} + {% endset %} + {% set delete_filter = run_query(query).rows[0].values()[0] %} + {{ log('Delete filter: ' ~ delete_filter) }} {% endif %} - {%- if incremental_predicates %} - {% for predicate in incremental_predicates %} - and {{ predicate }} - {% endfor %} - {%- endif -%} - {{ adapter.get_model_query_settings(model) }} - {% endcall %} - + {% call statement('delete_existing_data') %} + {% if is_distributed %} + {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in ({{ delete_filter }}) + {% else %} + delete from {{ existing_relation }} where ({{ unique_key }}) in ({{ delete_filter }}) + {% endif %} + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%} + {{ adapter.get_model_query_settings(model) }} + {% endcall %} + {% else %} + {{ log("No data to be deleted, skip lightweight delete.", info=True) }} + {% endif %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_new_data') %} diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index f132933d..588a1023 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -8,7 +8,7 @@ seeds_base_csv, ) from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange -from dbt.tests.util import run_dbt +from dbt.tests.util import run_dbt, run_dbt_and_capture from tests.integration.adapter.incremental.test_base_incremental import uniq_schema @@ -56,15 +56,6 @@ def test_simple_incremental(self, project): run_dbt(["run", "--select", "unique_source_one"]) run_dbt(["run", "--select", "unique_incremental_one"]) - -lw_delete_schema = """ -version: 2 - -models: - - name: "lw_delete_inc" - description: "Incremental table" -""" - lw_delete_inc = """ {{ config( materialized='distributed_incremental', @@ -81,23 +72,89 @@ def test_simple_incremental(self, project): {% endif %} """ +lw_delete_no_op = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key='key', + incremental_strategy='delete+insert' + ) +}} +{% if is_incremental() %} + SELECT toUInt64(number) as key FROM numbers(50, 10) +{% else %} + SELECT toUInt64(number) as key FROM numbers(10) +{% endif %} +""" + +LW_DELETE_UNIQUE_KEY_COMPILATION = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key='key', + incremental_strategy='delete+insert' + ) +}} +SELECT 1 as key +""" + +LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key=['key', 'date'], + incremental_strategy='delete+insert' + ) +}} +SELECT 1 as key, toDate('2024-10-21') as date +""" + +@pytest.mark.skipif(os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster') class TestLWDeleteDistributedIncremental: @pytest.fixture(scope="class") def models(self): - return {"lw_delete_inc.sql": lw_delete_inc} + return { + "lw_delete_inc.sql": lw_delete_inc, + 'lw_delete_no_op.sql': lw_delete_no_op, + 'lw_delete_unique_key_compilation.sql': LW_DELETE_UNIQUE_KEY_COMPILATION, + 'lw_delete_composite_unique_key_compilation.sql': LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION, + } - @pytest.mark.skipif( - os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' - ) - def test_lw_delete(self, project): - run_dbt() + @pytest.mark.parametrize("model", ["lw_delete_inc"]) + def test_lw_delete(self, project, model): + run_dbt(["run", "--select", model]) result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") assert result[0] == 100 - run_dbt() + _, log = run_dbt_and_capture(["run", "--select", model]) result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") + assert '20 rows to be deleted.' in log assert result[0] == 180 + @pytest.mark.parametrize("model", ["lw_delete_no_op"]) + def test_lw_delete_no_op(self, project, model): + run_dbt(["run", "--select", model]) + _, log = run_dbt_and_capture(["run", "--select", model]) + # assert that no delete query is issued against table lw_delete_no_op + assert 'rows to be deleted.' not in log + assert 'No data to be deleted, skip lightweight delete.' in log + + @pytest.mark.parametrize( + "model,delete_filter_log", + [ + ("lw_delete_unique_key_compilation", "Delete filter: (1)"), + ("lw_delete_composite_unique_key_compilation", "Delete filter: (1,'2024-10-21')"), + ], + ) + def test_lw_delete_unique_key(self, project, model, delete_filter_log): + """Assure that the delete_filter in `DELETE FROM WHERE IN ()` is templated + by a string of unique key value(s) when there is only one value (combination) for the unique key(s).""" + run_dbt(["run", "--select", model]) + _, log = run_dbt_and_capture(["run", "--select", model, "--log-level", "debug"]) + result = project.run_sql(f"select count(*) as num_rows from {model}", fetch="one") + assert delete_filter_log in log + assert result[0] == 1 + compound_key_schema = """ version: 2 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 50b1af6a..5fdfc914 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,16 +55,16 @@ def test_config(ch_test_users, ch_test_version): client_port = client_port or 10723 test_port = 10900 if test_driver == 'native' else client_port try: - run_cmd(['docker-compose', '-f', compose_file, 'down', '-v']) + run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v']) sys.stderr.write('Starting docker compose') os.environ['PROJECT_ROOT'] = '.' - up_result = run_cmd(['docker-compose', '-f', compose_file, 'up', '-d']) + up_result = run_cmd(['docker', 'compose', '-f', compose_file, 'up', '-d']) if up_result[0]: raise Exception(f'Failed to start docker: {up_result[2]}') url = f"http://{test_host}:{client_port}" wait_until_responsive(timeout=30.0, pause=0.5, check=lambda: is_responsive(url)) except Exception as e: - raise Exception('Failed to run docker-compose: {}', str(e)) + raise Exception('Failed to run docker compose: {}', str(e)) elif not client_port: if test_driver == 'native': client_port = 8443 if test_port == 9440 else 8123 @@ -102,9 +102,9 @@ def test_config(ch_test_users, ch_test_version): if docker: try: - run_cmd(['docker-compose', '-f', compose_file, 'down', '-v']) + run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v']) except Exception as e: - raise Exception('Failed to run docker-compose while cleaning up: {}', str(e)) + raise Exception('Failed to run docker compose while cleaning up: {}', str(e)) else: for test_user in ch_test_users: test_client.command('DROP USER %s', (test_user,)) From 86f15c82be7bdd86354544d7f2676f88fa1386b5 Mon Sep 17 00:00:00 2001 From: Xiatong Zheng Date: Tue, 22 Oct 2024 16:13:36 +0200 Subject: [PATCH 2/2] fix: fetch only one unique key --- .../macros/materializations/incremental/incremental.sql | 2 +- .../adapter/incremental/test_distributed_incremental.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 6e7e3af4..1dc1566f 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -236,7 +236,7 @@ {% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %} {% if unique_key_count == 1 %} {% set query %} - select toString(tuple({{ unique_key }})) from {{ inserting_relation }} + select toString(any(tuple({{ unique_key }}))) from {{ inserting_relation }} {% endset %} {% set delete_filter = run_query(query).rows[0].values()[0] %} {{ log('Delete filter: ' ~ delete_filter) }} diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index 588a1023..32349b7c 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -96,6 +96,8 @@ def test_simple_incremental(self, project): ) }} SELECT 1 as key +UNION ALL +SELECT 1 as key """ LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """ @@ -107,6 +109,8 @@ def test_simple_incremental(self, project): ) }} SELECT 1 as key, toDate('2024-10-21') as date +UNION ALL +SELECT 1 as key, toDate('2024-10-21') as date """