Skip to content

Commit

Permalink
Merge pull request #1076 from elementary-data/ele-1527-ttv-configurin…
Browse files Browse the repository at this point in the history
…g-tests-from-a-list-of-critical-tables

Added models and macros for tests recommendation.
  • Loading branch information
elongl authored Aug 21, 2023
2 parents a62fb61 + 00189cc commit 1614c21
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{% macro get_recommended_tests(
depends_on_count=none,
dependant_on_count=none,
exposure_count=none,
critical_name_like_patterns=none,
table_types=none,
tags=none,
owners=none
) %}
{% set query %}
select resource_name, source_name, test_namespace, test_name, timestamp_column
from {{ ref("pending_test_recommendations") }}
where 1=0

{% if depends_on_count %}
or depends_on_count >= {{ depends_on_count }}
{% endif %}

{% if dependant_on_count %}
or dependant_on_count >= {{ dependant_on_count }}
{% endif %}

{% if exposure_count %}
or exposure_count >= {{ exposure_count }}
{% endif %}

{% if critical_name_like_patterns %}
{% for name_like_pattern in critical_name_like_patterns %}
or resource_name like '{{ name_like_pattern }}'
{% endfor %}
{% endif %}

{% if table_types %}
or table_type in ('{{ table_types | join("','") }}')
{% endif %}

{% if tags %}
or tags ?| array['{{ tags | join("','") }}']
{% endif %}

{% if owners %}
or owner ?| array['{{ owners | join("','") }}']
{% endif %}
{% endset %}

{% set result = elementary.run_query(query) %}
{% do return(elementary.agate_to_dicts(result)) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
{# Object structure is [test_namespace, test_name, requires_timestamp_column] #}
{% set recommended_tests = [
("elementary", "volume_anomalies", false),
("elementary", "freshness_anomalies", true),
] %}

with
tables_criticality as (
select
id,
lower(database_name) as database_name,
lower(schema_name) as schema_name,
lower(table_name) as table_name,
resource_name,
source_name,
tags,
owner,
depends_on_count,
dependant_on_count,
exposure_count,
table_type
from {{ ref("tables_criticality") }}
),

potential_recommended_tests as (
select id, test_namespace, short_name, requires_timestamp_column
from tables_criticality
cross join
(
{% for recommended_test in recommended_tests %}
select
'{{ recommended_test[0] }}' as test_namespace,
'{{ recommended_test[1] }}' as short_name,
{{ recommended_test[2] }} as requires_timestamp_column
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
) rt
),

existing_recommended_tests as (
select parent_model_unique_id, test_namespace, short_name
from {{ ref("elementary", "dbt_tests") }}
),

pending_recommended_tests as (
select id, test_namespace, short_name, requires_timestamp_column
from potential_recommended_tests
where
(id, test_namespace, short_name) not in (
select parent_model_unique_id, test_namespace, short_name
from existing_recommended_tests
)
),

timestamp_columns as (
select database_name, schema_name, table_name, timestamp_column
from {{ ref("table_timestamp_columns") }}
),

pending_tests_with_table_info as (
select
resource_name,
source_name,
test_namespace,
short_name as test_name,
timestamp_column,
tags,
owner,
depends_on_count,
dependant_on_count,
exposure_count,
table_type
from pending_recommended_tests
join tables_criticality using (id)
left join timestamp_columns using (database_name, schema_name, table_name)
where requires_timestamp_column = false or timestamp_column is not null
)

select *
from pending_tests_with_table_info
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{% set timestamp_column_names = [
"updated_at",
"created_at",
"_fivetran_synced",
"_airbyte_emitted_at",
"create_date",
"created",
"db_insert_time",
"create_ts",
"created_ts",
"update_ts",
"updated_ts",
"load_ts",
"loaded_at",
"date_created",
"dbt_updated_at",
"update_datetime",
"event_time",
"event_date",
"event_created_at",
"event_updated_at",
"event_event_time",
"_etl_loaded_at",
"__etl_loaded_at",
"_etl_inserted_at",
"_ingestion_time",
"last_modified_datetime",
] %}
{% set joined_timestamp_column_names = "'{}'".format(
"', '".join(timestamp_column_names)
) %}


with
columns as (
select distinct
lower(database_name) as database_name,
lower(schema_name) as schema_name,
lower(table_name) as table_name,
lower(column_name) as column_name
from {{ ref("elementary", "dbt_columns") }}
),

-- Inferring the timestamp column based on their names and assigning a confidence score.
inferred_timestamp_columns as (
select
database_name,
schema_name,
table_name,
column_name,
timestamp_column_names.confidence
from columns
join
(
values
{% for timestamp_column_name in timestamp_column_names %}
('{{ timestamp_column_name }}', {{ loop.index }})
{% if not loop.last %},{% endif %}
{% endfor %}
) as timestamp_column_names(column_name, confidence) using (column_name)
),

-- Users can provide the timestamp columns for their sources,
-- if provided, we assign a confidence score of 0 (certain).
source_provided_timestamp_columns as (
select
lower(database_name) as database_name,
lower(schema_name) as schema_name,
lower(name) as table_name,
lower(loaded_at_field) as column_name
from {{ ref("elementary", "dbt_sources") }}
where loaded_at_field is not null
),

-- Combining the inferred and source provided timestamp columns.
absolute_rated_timestamp_columns as (
select
database_name,
schema_name,
table_name,
column_name,
inferred.confidence as absolute_confidence
from inferred_timestamp_columns inferred
union all
select
database_name,
schema_name,
table_name,
column_name,
0 as absolute_confidence
from source_provided_timestamp_columns
),

-- Sort the timestamp columns by confidence and assign a rank.
relative_rated_timestamp_columns as (
select
database_name,
schema_name,
table_name,
column_name,
row_number() over (
partition by database_name, schema_name, table_name
order by absolute_confidence
) as relative_confidence
from absolute_rated_timestamp_columns
),

-- Select the timestamp columns with the highest confidence.
best_rated_timestamp_columns as (
select database_name, schema_name, table_name, column_name
from relative_rated_timestamp_columns
where relative_confidence = 1
)

select database_name, schema_name, table_name, column_name as timestamp_column
from best_rated_timestamp_columns
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
with
dbt_models_data as (
select
unique_id as id,
database_name,
schema_name,
alias as table_name,
name as resource_name,
null as source_name,
'model' as table_type,
cast(tags as jsonb) as tags,
cast(owner as jsonb) as owner,
cast(depends_on_nodes as jsonb) as depends_on
from {{ ref("elementary", "dbt_models") }}
),

dbt_sources_data as (
select
unique_id as id,
database_name,
schema_name,
name as table_name,
name as resource_name,
source_name,
'source' as table_type,
cast(tags as jsonb) as tags,
cast(owner as jsonb) as owner,
cast('[]' as jsonb) as depends_on
from {{ ref("elementary", "dbt_sources") }}
),

tables_information as (
select *
from dbt_models_data
union all
select *
from dbt_sources_data
),

dependant_on_counts as (
select t1.id, count(*) as dependant_on_count
from tables_information t1
join tables_information t2 on t2.depends_on ? t1.id
group by t1.id
),

exposure_counts as (
select t.id, count(*) as exposure_count
from tables_information t
join
{{ ref("elementary", "dbt_exposures") }} e
on e.depends_on_nodes::jsonb ? t.id
group by t.id
),

tables as (
select
tables_information.*,
jsonb_array_length(tables_information.depends_on) as depends_on_count,
coalesce(dependant_on_counts.dependant_on_count, 0) as dependant_on_count,
coalesce(exposure_counts.exposure_count, 0) as exposure_count
from tables_information
left join dependant_on_counts on tables_information.id = dependant_on_counts.id
left join exposure_counts on tables_information.id = exposure_counts.id
)

select *
from tables

0 comments on commit 1614c21

Please sign in to comment.