Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip lightweight delete when there is no data to delete #370

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,23 +220,44 @@
{% endcall %}
{% endif %}

{% call statement('delete_existing_data') %}
{% if is_distributed %}
{% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %}
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% else %}
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% set delete_filter %}
select distinct {{ unique_key }} from {{ inserting_relation }}
{% endset %}
{% set data_to_delete_count_query %}
select count(*) from {{ existing_relation }} where ({{ unique_key }}) global in ({{ delete_filter }})
{% endset %}
{% set data_to_delete_count = run_query(data_to_delete_count_query).rows[0].values()[0] %}
{% if data_to_delete_count > 0 %}
{{ log(data_to_delete_count ~ " rows to be deleted.", info=True) }}
{% set unique_key_query %}
-- https://github.com/ClickHouse/ClickHouse/issues/69559
select count(distinct {{ unique_key }}) from {{ inserting_relation }}
{% endset %}
{% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %}
{% if unique_key_count == 1 %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure I fully understood, we fetch the actual keys here, because when you put it as a sup query, you get the issue mentioned in ClickHouse/ClickHouse#69559 ?

{% set query %}
select toString(any(tuple({{ unique_key }}))) from {{ inserting_relation }}
{% endset %}
{% set delete_filter = run_query(query).rows[0].values()[0] %}
{{ log('Delete filter: ' ~ delete_filter) }}
{% endif %}
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{% call statement('delete_existing_data') %}
{% if is_distributed %}
{% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %}
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in ({{ delete_filter }})
{% else %}
delete from {{ existing_relation }} where ({{ unique_key }}) in ({{ delete_filter }})
{% endif %}
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}
{% else %}
{{ log("No data to be deleted, skip lightweight delete.", info=True) }}
{% endif %}
{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_new_data') %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
seeds_base_csv,
)
from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange
from dbt.tests.util import run_dbt
from dbt.tests.util import run_dbt, run_dbt_and_capture

from tests.integration.adapter.incremental.test_base_incremental import uniq_schema

Expand Down Expand Up @@ -56,15 +56,6 @@ def test_simple_incremental(self, project):
run_dbt(["run", "--select", "unique_source_one"])
run_dbt(["run", "--select", "unique_incremental_one"])


lw_delete_schema = """
version: 2

models:
- name: "lw_delete_inc"
description: "Incremental table"
"""

lw_delete_inc = """
{{ config(
materialized='distributed_incremental',
Expand All @@ -81,23 +72,93 @@ def test_simple_incremental(self, project):
{% endif %}
"""

lw_delete_no_op = """
{{ config(
materialized='distributed_incremental',
order_by=['key'],
unique_key='key',
incremental_strategy='delete+insert'
)
}}
{% if is_incremental() %}
SELECT toUInt64(number) as key FROM numbers(50, 10)
{% else %}
SELECT toUInt64(number) as key FROM numbers(10)
{% endif %}
"""

LW_DELETE_UNIQUE_KEY_COMPILATION = """
{{ config(
materialized='distributed_incremental',
order_by=['key'],
unique_key='key',
incremental_strategy='delete+insert'
)
}}
SELECT 1 as key
UNION ALL
SELECT 1 as key
"""

LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """
{{ config(
materialized='distributed_incremental',
order_by=['key'],
unique_key=['key', 'date'],
incremental_strategy='delete+insert'
)
}}
SELECT 1 as key, toDate('2024-10-21') as date
UNION ALL
SELECT 1 as key, toDate('2024-10-21') as date
"""


@pytest.mark.skipif(os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster')
class TestLWDeleteDistributedIncremental:
@pytest.fixture(scope="class")
def models(self):
return {"lw_delete_inc.sql": lw_delete_inc}
return {
"lw_delete_inc.sql": lw_delete_inc,
'lw_delete_no_op.sql': lw_delete_no_op,
'lw_delete_unique_key_compilation.sql': LW_DELETE_UNIQUE_KEY_COMPILATION,
'lw_delete_composite_unique_key_compilation.sql': LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION,
}

@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_lw_delete(self, project):
run_dbt()
@pytest.mark.parametrize("model", ["lw_delete_inc"])
def test_lw_delete(self, project, model):
run_dbt(["run", "--select", model])
result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one")
assert result[0] == 100
run_dbt()
_, log = run_dbt_and_capture(["run", "--select", model])
result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one")
assert '20 rows to be deleted.' in log
assert result[0] == 180

@pytest.mark.parametrize("model", ["lw_delete_no_op"])
def test_lw_delete_no_op(self, project, model):
run_dbt(["run", "--select", model])
_, log = run_dbt_and_capture(["run", "--select", model])
# assert that no delete query is issued against table lw_delete_no_op
assert 'rows to be deleted.' not in log
assert 'No data to be deleted, skip lightweight delete.' in log

@pytest.mark.parametrize(
"model,delete_filter_log",
[
("lw_delete_unique_key_compilation", "Delete filter: (1)"),
("lw_delete_composite_unique_key_compilation", "Delete filter: (1,'2024-10-21')"),
],
)
def test_lw_delete_unique_key(self, project, model, delete_filter_log):
"""Assure that the delete_filter in `DELETE FROM <table> WHERE <unique_key> IN (<delete_filter>)` is templated
by a string of unique key value(s) when there is only one value (combination) for the unique key(s)."""
run_dbt(["run", "--select", model])
_, log = run_dbt_and_capture(["run", "--select", model, "--log-level", "debug"])
result = project.run_sql(f"select count(*) as num_rows from {model}", fetch="one")
assert delete_filter_log in log
assert result[0] == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed. lw_delete_unique_key_compilation and lw_delete_composite_unique_key_compilation have 2 rows each.



compound_key_schema = """
version: 2
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ def test_config(ch_test_users, ch_test_version):
client_port = client_port or 10723
test_port = 10900 if test_driver == 'native' else client_port
try:
run_cmd(['docker-compose', '-f', compose_file, 'down', '-v'])
run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v'])
sys.stderr.write('Starting docker compose')
os.environ['PROJECT_ROOT'] = '.'
up_result = run_cmd(['docker-compose', '-f', compose_file, 'up', '-d'])
up_result = run_cmd(['docker', 'compose', '-f', compose_file, 'up', '-d'])
if up_result[0]:
raise Exception(f'Failed to start docker: {up_result[2]}')
url = f"http://{test_host}:{client_port}"
wait_until_responsive(timeout=30.0, pause=0.5, check=lambda: is_responsive(url))
except Exception as e:
raise Exception('Failed to run docker-compose: {}', str(e))
raise Exception('Failed to run docker compose: {}', str(e))
elif not client_port:
if test_driver == 'native':
client_port = 8443 if test_port == 9440 else 8123
Expand Down Expand Up @@ -102,9 +102,9 @@ def test_config(ch_test_users, ch_test_version):

if docker:
try:
run_cmd(['docker-compose', '-f', compose_file, 'down', '-v'])
run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v'])
except Exception as e:
raise Exception('Failed to run docker-compose while cleaning up: {}', str(e))
raise Exception('Failed to run docker compose while cleaning up: {}', str(e))
else:
for test_user in ch_test_users:
test_client.command('DROP USER %s', (test_user,))
Expand Down