Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Athena support to dbt_artifacts #447

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions macros/upload_individual_datasets/upload_exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,39 @@
{% endif %}
{% endmacro -%}

{% macro athena__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}
{% set exposure_values %}
values
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ exposure.unique_id | replace("'","\\'") }}', {# node_id #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}
'{{ exposure.name | replace("'","'''") }}', {# name #}
'{{ exposure.type }}', {# type #}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(exposure.owner) | replace("'","\\'")) }}', {# owner #}
'{{ exposure.maturity }}', {# maturity #}
'{{ exposure.original_file_path | replace('\\', '\\\\') }}', {# path #}
'{{ exposure.description | replace("'","''") | replace("\n", "\\n") }}', {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
'{{ tojson(exposure.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ tojson(exposure.tags) }}', {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(exposure) | replace("\\", "\\\\") | replace("'", "''")) }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %} {{ return("") }}
{% endif %}
{%- endmacro %}

{% macro bigquery__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}
{% set exposure_values %}
Expand Down
63 changes: 63 additions & 0 deletions macros/upload_individual_datasets/upload_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,69 @@

{% endmacro -%}

{% macro athena__get_invocations_dml_sql() -%}
{% set invocation_values %}
values
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_version }}', {# dbt_version #}
'{{ project_name }}', {# project_name #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}
'{{ flags.WHICH }}', {# dbt_command #}
{{ flags.FULL_REFRESH }}, {# full_refresh_flag #}
'{{ target.profile_name }}', {# target_profile_name #}
'{{ target.name }}', {# target_name #}
'{{ target.schema }}', {# target_schema #}
{{ target.threads }}, {# target_threads #}

'{{ env_var('DBT_CLOUD_PROJECT_ID', '') }}', {# dbt_cloud_project_id #}
'{{ env_var('DBT_CLOUD_JOB_ID', '') }}', {# dbt_cloud_job_id #}
'{{ env_var('DBT_CLOUD_RUN_ID', '') }}', {# dbt_cloud_run_id #}
'{{ env_var('DBT_CLOUD_RUN_REASON_CATEGORY', '') }}', {# dbt_cloud_run_reason_category #}
'{{ env_var('DBT_CLOUD_RUN_REASON', '') | replace("'","\\'") }}', {# dbt_cloud_run_reason #}

{% if var('env_vars', none) %}
{% set env_vars_dict = {} %}
{% for env_variable in var('env_vars') %}
{% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %}
{% endfor %}
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(env_vars_dict)) }}, {# env_vars #}
{% else %}
null, {# env_vars #}
{% endif %}

{% if var('dbt_vars', none) %}
{% set dbt_vars_dict = {} %}
{% for dbt_var in var('dbt_vars') %}
{% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %}
{% endfor %}
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(dbt_vars_dict)) }}, {# dbt_vars #}
{% else %}
null, {# dbt_vars #}
{% endif %}

{% if invocation_args_dict.vars %}
{# vars - different format for pre v1.5 (yaml vs list) #}
{% if invocation_args_dict.vars is string %}
{% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %}
{% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %}
{% endif %}
{% endif %}

'{{ tojson(invocation_args_dict) | replace('\\', '\\\\') | replace("'", "''") }}', {# invocation_args #}

{% set metadata_env = {} %}
{% for key, value in dbt_metadata_envs.items() %}
{% do metadata_env.update({key: value}) %}
{% endfor %}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(metadata_env) | replace('\\', '\\\\')) }}' {# dbt_custom_envs #}

)
{% endset %}
{{ invocation_values }}

{% endmacro -%}

{% macro bigquery__get_invocations_dml_sql() -%}
{% set invocation_values %}
(
Expand Down
41 changes: 41 additions & 0 deletions macros/upload_individual_datasets/upload_model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,47 @@
{% endif %}
{% endmacro -%}

{% macro athena__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
values
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}CAST('{{ compile_started_at }}' AS timestamp){% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}CAST('{{ query_completed_at }}' AS timestamp){% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}
cast('{{ model.adapter_response.rows_affected }}' as int),
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("\\", "\\\\") | replace("'", "''") | replace("\n", "\\n") }}', {# message #}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "''")) }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_execution_values }}
{% else %} {{ return("") }}
{% endif %}
{%- endmacro %}

{% macro bigquery__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
Expand Down
36 changes: 36 additions & 0 deletions macros/upload_individual_datasets/upload_models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,42 @@
{% endif %}
{% endmacro -%}

{% macro athena__get_models_dml_sql(models) -%}
{% if models != [] %}
{% set model_values %}
values
{% for model in models -%}
{% set model_copy = model.copy() -%}
{% do model_copy.pop('raw_code', None) %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model_copy.unique_id }}', {# node_id #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}
'{{ model_copy.database }}', {# database #}
'{{ model_copy.schema }}', {# schema #}
'{{ model_copy.name }}', {# name #}
'{{ tojson(model_copy.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ model_copy.package_name }}', {# package_name #}
'{{ model_copy.original_file_path | replace('\\', '\\\\') }}', {# path #}
'{{ model_copy.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #}
'{{ model_copy.config.materialized }}', {# materialization #}
'{{ tojson(model_copy.tags) }}', {# tags #}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model_copy.config.meta)) }}', {# meta #}
'{{ model_copy.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model_copy) | replace("\\", "\\\\") | replace("'","''")) }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_values }}
{% else %} {{ return("") }}
{% endif %}
{%- endmacro %}

{% macro bigquery__get_models_dml_sql(models) -%}
{% if models != [] %}
{% set model_values %}
Expand Down
31 changes: 31 additions & 0 deletions macros/upload_individual_datasets/upload_sources.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@
{% endif %}
{% endmacro -%}

{% macro athena__get_sources_dml_sql(sources) -%}
{% if sources != [] %}
{% set source_values %}
values
{% for source in sources -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ source.unique_id }}', {# node_id #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}
'{{ source.database }}', {# database #}
'{{ source.schema }}', {# schema #}
'{{ source.source_name }}', {# source_name #}
'{{ source.loader }}', {# loader #}
'{{ source.name }}', {# name #}
'{{ source.identifier }}', {# identifier #}
'{{ source.loaded_at_field | replace("'","\\'") }}', {# loaded_at_field #}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(source.freshness) | replace("'","''")) }}', {# freshness #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(source) | replace("\\", "\\\\") | replace("'", "''")) }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ source_values }}
{% else %} {{ return("") }}
{% endif %}
{%- endmacro %}

{% macro bigquery__get_sources_dml_sql(sources) -%}
{% if sources != [] %}
{% set source_values %}
Expand Down
38 changes: 38 additions & 0 deletions macros/upload_individual_datasets/upload_test_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,44 @@
{% endif %}
{% endmacro -%}

{% macro athena__get_test_executions_dml_sql(tests) -%}
{% if tests != [] %}
{% set test_execution_values %}
values
{% for test in tests -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ test.node.unique_id }}', {# node_id #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}

{% set config_full_refresh = test.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ test.thread_id }}', {# thread_id #}
'{{ test.status }}', {# status #}

{% set compile_started_at = (test.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}CAST('{{ compile_started_at }}' AS timestamp){% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (test.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}CAST('{{ query_completed_at }}' AS timestamp){% else %}null{% endif %}, {# query_completed_at #}

{{ test.execution_time }}, {# total_node_runtime #}
null, {# rows_affected not available in Databricks #}
{{ 'null' if test.failures is none else test.failures }}, {# failures #}
'{{ test.message | replace("\\", "\\\\") | replace("'", "'''") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ test_execution_values }}
{% else %} {{ return("") }}
{% endif %}
{% endmacro -%}

{% macro bigquery__get_test_executions_dml_sql(tests) -%}
{% if tests != [] %}
{% set test_execution_values %}
Expand Down
28 changes: 28 additions & 0 deletions macros/upload_individual_datasets/upload_tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@
{% endif %}
{% endmacro -%}

{% macro athena__get_tests_dml_sql(tests) -%}
{% if tests != [] %}
{% set test_values %}
values
{% for test in tests -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ test.unique_id }}', {# node_id #}
CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #}
'{{ test.name }}', {# name #}
'{{ tojson(test.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ test.package_name }}', {# package_name #}
'{{ test.original_file_path | replace('\\', '\\\\') }}', {# test_path #}
'{{ tojson(test.tags) }}', {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test) | replace("\\", "\\\\") | replace("'","''")) }}' {# all_fields #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ test_values }}
{% else %} {{ return("") }}
{% endif %}
{%- endmacro %}

{% macro bigquery__get_tests_dml_sql(tests) -%}
{% if tests != [] %}
{% set test_values %}
Expand Down
11 changes: 11 additions & 0 deletions macros/upload_results/insert_into_metadata_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@

{%- endmacro %}

{% macro athena__insert_into_metadata_table(relation, fields, content) -%}

{% set insert_into_table_query %}
insert into {{ relation }} {{ fields }}
{{ content }}
{% endset %}

{% do run_query(insert_into_table_query) %}

{%- endmacro %}

{% macro spark__insert_into_metadata_table(relation, fields, content) -%}

{% set insert_into_table_query %}
Expand Down
6 changes: 3 additions & 3 deletions macros/upload_results/upload_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
{% set objects = dbt_artifacts.get_dataset_content(dataset) %}

{# Upload in chunks to reduce the query size #}
{% if dataset == 'models' %}
{% set upload_limit = 50 if target.type == 'bigquery' else 100 %}
{% if dataset in ('models', 'tests') %}
{% set upload_limit = 50 if target.type in ('athena', 'bigquery') else 100 %}
{% else %}
{% set upload_limit = 300 if target.type == 'bigquery' else 5000 %}
{% set upload_limit = 300 if target.type in ('athena', 'bigquery') else 5000 %}
{% endif %}

{# Loop through each chunk in turn #}
Expand Down
Loading