diff --git a/macros/upload_individual_datasets/upload_exposures.sql b/macros/upload_individual_datasets/upload_exposures.sql index e097be7e..ce17b5cb 100644 --- a/macros/upload_individual_datasets/upload_exposures.sql +++ b/macros/upload_individual_datasets/upload_exposures.sql @@ -51,6 +51,39 @@ {% endif %} {% endmacro -%} +{% macro athena__get_exposures_dml_sql(exposures) -%} + {% if exposures != [] %} + {% set exposure_values %} + values + {% for exposure in exposures -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ exposure.unique_id | replace("'","\\'") }}', {# node_id #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + '{{ exposure.name | replace("'","'''") }}', {# name #} + '{{ exposure.type }}', {# type #} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(exposure.owner) | replace("'","\\'")) }}', {# owner #} + '{{ exposure.maturity }}', {# maturity #} + '{{ exposure.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ exposure.description | replace("'","''") | replace("\n", "\\n") }}', {# description #} + '{{ exposure.url }}', {# url #} + '{{ exposure.package_name }}', {# package_name #} + '{{ tojson(exposure.depends_on.nodes) }}', {# depends_on_nodes #} + '{{ tojson(exposure.tags) }}', {# tags #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(exposure) | replace("\\", "\\\\") | replace("'", "''")) }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ exposure_values }} + {% else %} {{ return("") }} + {% endif %} +{%- endmacro %} + {% macro bigquery__get_exposures_dml_sql(exposures) -%} {% if exposures != [] %} {% set exposure_values %} diff --git a/macros/upload_individual_datasets/upload_invocations.sql b/macros/upload_individual_datasets/upload_invocations.sql index 775861ac..aa2496db 100644 --- a/macros/upload_individual_datasets/upload_invocations.sql +++ b/macros/upload_individual_datasets/upload_invocations.sql @@ -95,6 +95,69 @@ {% endmacro -%} +{% macro athena__get_invocations_dml_sql() -%} + {% set invocation_values %} + values + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ dbt_version }}', {# dbt_version #} + '{{ project_name }}', {# project_name #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + '{{ flags.WHICH }}', {# dbt_command #} + {{ flags.FULL_REFRESH }}, {# full_refresh_flag #} + '{{ target.profile_name }}', {# target_profile_name #} + '{{ target.name }}', {# target_name #} + '{{ target.schema }}', {# target_schema #} + {{ target.threads }}, {# target_threads #} + + '{{ env_var('DBT_CLOUD_PROJECT_ID', '') }}', {# dbt_cloud_project_id #} + '{{ env_var('DBT_CLOUD_JOB_ID', '') }}', {# dbt_cloud_job_id #} + '{{ env_var('DBT_CLOUD_RUN_ID', '') }}', {# dbt_cloud_run_id #} + '{{ env_var('DBT_CLOUD_RUN_REASON_CATEGORY', '') }}', {# dbt_cloud_run_reason_category #} + '{{ env_var('DBT_CLOUD_RUN_REASON', '') | replace("'","\\'") }}', {# dbt_cloud_run_reason #} + + {% if var('env_vars', none) %} + {% set env_vars_dict = {} %} + {% for env_variable in var('env_vars') %} + {% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %} + {% endfor %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(env_vars_dict)) }}, {# env_vars #} + {% else %} + null, {# env_vars #} + {% endif %} + + {% if var('dbt_vars', none) %} + {% set dbt_vars_dict = {} %} + {% for dbt_var in var('dbt_vars') %} + {% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %} + {% endfor %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(dbt_vars_dict)) }}, {# dbt_vars #} + {% else %} + null, {# dbt_vars #} + {% endif %} + + {% if invocation_args_dict.vars %} + {# vars - different format for pre v1.5 (yaml vs list) #} + {% if invocation_args_dict.vars is string %} + {% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %} + {% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %} + {% endif %} + {% endif %} + + '{{ tojson(invocation_args_dict) | replace('\\', '\\\\') | replace("'", "''") }}', {# invocation_args #} + + {% set metadata_env = {} %} + {% for key, value in dbt_metadata_envs.items() %} + {% do metadata_env.update({key: value}) %} + {% endfor %} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(metadata_env) | replace('\\', '\\\\')) }}' {# dbt_custom_envs #} + + ) + {% endset %} + {{ invocation_values }} + +{% endmacro -%} + {% macro bigquery__get_invocations_dml_sql() -%} {% set invocation_values %} ( diff --git a/macros/upload_individual_datasets/upload_model_executions.sql b/macros/upload_individual_datasets/upload_model_executions.sql index 1d5fa83b..d76bda3d 100644 --- a/macros/upload_individual_datasets/upload_model_executions.sql +++ b/macros/upload_individual_datasets/upload_model_executions.sql @@ -61,6 +61,47 @@ {% endif %} {% endmacro -%} +{% macro athena__get_model_executions_dml_sql(models) -%} + {% if models != [] %} + {% set model_execution_values %} + values + {% for model in models -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}CAST('{{ compile_started_at }}' AS timestamp){% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}CAST('{{ query_completed_at }}' AS timestamp){% else %}null{% endif %}, {# query_completed_at #} + + {{ model.execution_time }}, {# total_node_runtime #} + cast('{{ model.adapter_response.rows_affected }}' as int), + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + '{{ model.message | replace("\\", "\\\\") | replace("'", "''") | replace("\n", "\\n") }}', {# message #} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "''")) }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_execution_values }} + {% else %} {{ return("") }} + {% endif %} +{%- endmacro %} + {% macro bigquery__get_model_executions_dml_sql(models) -%} {% if models != [] %} {% set model_execution_values %} diff --git a/macros/upload_individual_datasets/upload_models.sql b/macros/upload_individual_datasets/upload_models.sql index 2c27daec..b73e96f1 100644 --- a/macros/upload_individual_datasets/upload_models.sql +++ b/macros/upload_individual_datasets/upload_models.sql @@ -55,6 +55,42 @@ {% endif %} {% endmacro -%} +{% macro athena__get_models_dml_sql(models) -%} + {% if models != [] %} + {% set model_values %} + values + {% for model in models -%} + {% set model_copy = model.copy() -%} + {% do model_copy.pop('raw_code', None) %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model_copy.unique_id }}', {# node_id #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + '{{ model_copy.database }}', {# database #} + '{{ model_copy.schema }}', {# schema #} + '{{ model_copy.name }}', {# name #} + '{{ tojson(model_copy.depends_on.nodes) }}', {# depends_on_nodes #} + '{{ model_copy.package_name }}', {# package_name #} + '{{ model_copy.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ model_copy.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #} + '{{ model_copy.config.materialized }}', {# materialization #} + '{{ tojson(model_copy.tags) }}', {# tags #} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model_copy.config.meta)) }}', {# meta #} + '{{ model_copy.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model_copy) | replace("\\", "\\\\") | replace("'","''")) }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_values }} + {% else %} {{ return("") }} + {% endif %} +{%- endmacro %} + {% macro bigquery__get_models_dml_sql(models) -%} {% if models != [] %} {% set model_values %} diff --git a/macros/upload_individual_datasets/upload_sources.sql b/macros/upload_individual_datasets/upload_sources.sql index 3d899755..36a9ce63 100644 --- a/macros/upload_individual_datasets/upload_sources.sql +++ b/macros/upload_individual_datasets/upload_sources.sql @@ -47,6 +47,37 @@ {% endif %} {% endmacro -%} +{% macro athena__get_sources_dml_sql(sources) -%} + {% if sources != [] %} + {% set source_values %} + values + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.unique_id }}', {# node_id #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + '{{ source.database }}', {# database #} + '{{ source.schema }}', {# schema #} + '{{ source.source_name }}', {# source_name #} + '{{ source.loader }}', {# loader #} + '{{ source.name }}', {# name #} + '{{ source.identifier }}', {# identifier #} + '{{ source.loaded_at_field | replace("'","\\'") }}', {# loaded_at_field #} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(source.freshness) | replace("'","''")) }}', {# freshness #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(source) | replace("\\", "\\\\") | replace("'", "''")) }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ source_values }} + {% else %} {{ return("") }} + {% endif %} +{%- endmacro %} + {% macro bigquery__get_sources_dml_sql(sources) -%} {% if sources != [] %} {% set source_values %} diff --git a/macros/upload_individual_datasets/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql index 3af31d73..12afde92 100644 --- a/macros/upload_individual_datasets/upload_test_executions.sql +++ b/macros/upload_individual_datasets/upload_test_executions.sql @@ -54,6 +54,44 @@ {% endif %} {% endmacro -%} +{% macro athena__get_test_executions_dml_sql(tests) -%} + {% if tests != [] %} + {% set test_execution_values %} + values + {% for test in tests -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ test.node.unique_id }}', {# node_id #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + + {% set config_full_refresh = test.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ test.thread_id }}', {# thread_id #} + '{{ test.status }}', {# status #} + + {% set compile_started_at = (test.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}CAST('{{ compile_started_at }}' AS timestamp){% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (test.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}CAST('{{ query_completed_at }}' AS timestamp){% else %}null{% endif %}, {# query_completed_at #} + + {{ test.execution_time }}, {# total_node_runtime #} + null, {# rows_affected not available in Databricks #} + {{ 'null' if test.failures is none else test.failures }}, {# failures #} + '{{ test.message | replace("\\", "\\\\") | replace("'", "'''") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ test_execution_values }} + {% else %} {{ return("") }} + {% endif %} +{% endmacro -%} + {% macro bigquery__get_test_executions_dml_sql(tests) -%} {% if tests != [] %} {% set test_execution_values %} diff --git a/macros/upload_individual_datasets/upload_tests.sql b/macros/upload_individual_datasets/upload_tests.sql index 18d7171c..bc529312 100644 --- a/macros/upload_individual_datasets/upload_tests.sql +++ b/macros/upload_individual_datasets/upload_tests.sql @@ -41,6 +41,34 @@ {% endif %} {% endmacro -%} +{% macro athena__get_tests_dml_sql(tests) -%} + {% if tests != [] %} + {% set test_values %} + values + {% for test in tests -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ test.unique_id }}', {# node_id #} + CAST('{{ run_started_at }}' AS timestamp), {# run_started_at #} + '{{ test.name }}', {# name #} + '{{ tojson(test.depends_on.nodes) }}', {# depends_on_nodes #} + '{{ test.package_name }}', {# package_name #} + '{{ test.original_file_path | replace('\\', '\\\\') }}', {# test_path #} + '{{ tojson(test.tags) }}', {# tags #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test) | replace("\\", "\\\\") | replace("'","''")) }}' {# all_fields #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ test_values }} + {% else %} {{ return("") }} + {% endif %} +{%- endmacro %} + {% macro bigquery__get_tests_dml_sql(tests) -%} {% if tests != [] %} {% set test_values %} diff --git a/macros/upload_results/insert_into_metadata_table.sql b/macros/upload_results/insert_into_metadata_table.sql index 8d4afa40..937c7e94 100644 --- a/macros/upload_results/insert_into_metadata_table.sql +++ b/macros/upload_results/insert_into_metadata_table.sql @@ -11,6 +11,17 @@ {%- endmacro %} +{% macro athena__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + {% macro spark__insert_into_metadata_table(relation, fields, content) -%} {% set insert_into_table_query %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 114a667d..5dfe475d 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -19,10 +19,10 @@ {% set objects = dbt_artifacts.get_dataset_content(dataset) %} {# Upload in chunks to reduce the query size #} - {% if dataset == 'models' %} - {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} + {% if dataset in ('models', 'tests') %} + {% set upload_limit = 50 if target.type in ('athena', 'bigquery') else 100 %} {% else %} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} + {% set upload_limit = 300 if target.type in ('athena', 'bigquery') else 5000 %} {% endif %} {# Loop through each chunk in turn #}