diff --git a/dbt_project.yml b/dbt_project.yml index 60bd2117..fe0e848a 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -12,7 +12,9 @@ clean-targets: # folders to be removed by `dbt clean` models: dbt_artifacts: +materialized: view - +file_format: delta + +file_format: "{% if target.type == 'athena' %}parquet[% else %}delta{% endif %}" + +table_type: "{% if target.type == 'athena' %}iceberg{% endif %}" + +incremental_strategy: "{% if target.type == 'athena' %}append{% endif %}" sources: +materialized: incremental +on_schema_change: append_new_columns @@ -20,3 +22,4 @@ models: +persist_docs: # Databricks doesn't offer column-level support for persisting docs columns: '{{ target.name != "databricks" }}' + diff --git a/integration_test_project/example-env.sh b/integration_test_project/example-env.sh index 47cb0d67..c556daea 100755 --- a/integration_test_project/example-env.sh +++ b/integration_test_project/example-env.sh @@ -16,7 +16,8 @@ export DBT_ENV_SECRET_DATABRICKS_TOKEN= export DBT_ENV_SECRET_GCP_PROJECT= export DBT_ENV_SPARK_DRIVER_PATH= # /Library/simba/spark/lib/libsparkodbc_sbu.dylib on a Mac export DBT_ENV_SPARK_ENDPOINT= # The endpoint ID from the Databricks HTTP path - +export DBT_ENV_ATHENA_S3_STAGING= +export DBT_ENV_ATHENA_S3_DATA= # dbt environment variables, change these export DBT_VERSION="1_5_0" export DBT_CLOUD_PROJECT_ID= diff --git a/integration_test_project/models/incremental.sql b/integration_test_project/models/incremental.sql index 7c756a9d..b4cfac0e 100644 --- a/integration_test_project/models/incremental.sql +++ b/integration_test_project/models/incremental.sql @@ -1,11 +1,23 @@ +{% if target.name == 'athena' %} {{ config( materialized='incremental', unique_key='id', meta={"meta_field": "description with an ' apostrophe"}, + table_type='iceberg', + file_format='parquet', + incremental_strategy='merge' ) }} - +{% else %} +{{ + config( + materialized='incremental', + unique_key='id', + meta={"meta_field": "description with an ' apostrophe"}, + ) +}} +{% endif %} -- {{ source('dummy_source', '"GROUP"') }} select diff --git a/integration_test_project/package-lock.yml b/integration_test_project/package-lock.yml new file mode 100644 index 00000000..55b46caf --- /dev/null +++ b/integration_test_project/package-lock.yml @@ -0,0 +1,3 @@ +packages: +- local: ../ +sha1_hash: 570155077da10b186bf3640e96c2b6746d0fc6dc diff --git a/integration_test_project/profiles.yml b/integration_test_project/profiles.yml index b24ad80d..3c34b074 100644 --- a/integration_test_project/profiles.yml +++ b/integration_test_project/profiles.yml @@ -52,3 +52,14 @@ dbt_artifacts: dbname: postgres schema: public threads: 8 + athena: + type: athena + s3_staging_dir: "{{ env_var('DBT_ENV_ATHENA_S3_STAGING') }}" + s3_data_dir: "{{ env_var('DBT_ENV_ATHENA_S3_DATA') }}" + s3_data_naming: schema_table_unique + region_name: "{{ env_var('AWS_DEFAULT_REGION') }}" + schema: public + database: awsdatacatalog + threads: 8 + seed_s3_upload_args: + ACL: bucket-owner-full-control diff --git a/macros/database_specific_helpers/type_helpers.sql b/macros/database_specific_helpers/type_helpers.sql index 4064ad46..c73c9a95 100644 --- a/macros/database_specific_helpers/type_helpers.sql +++ b/macros/database_specific_helpers/type_helpers.sql @@ -8,6 +8,18 @@ {{ return(api.Column.translate_type("boolean")) }} {% endmacro %} +{#- TIMESTAMP -#} +{% macro type_timestamp() %} + {{ return(adapter.dispatch('type_timestamp', 'dbt_artifacts')()) }} +{% endmacro %} + +{% macro default__type_timestamp() %} + {{ return(api.Column.translate_type("timestamp")) }} +{% endmacro %} + +{% macro athena__type_timestamp() %} + timestamp(6) +{% endmacro %} {#- JSON -#} {% macro type_json() %} diff --git a/macros/upload_individual_datasets/upload_exposures.sql b/macros/upload_individual_datasets/upload_exposures.sql index 9f0ec5d3..2f1d42b5 100644 --- a/macros/upload_individual_datasets/upload_exposures.sql +++ b/macros/upload_individual_datasets/upload_exposures.sql @@ -118,3 +118,37 @@ {{ return("") }} {% endif %} {%- endmacro %} + +{% macro athena__get_exposures_dml_sql(exposures) -%} + {% if exposures != [] %} + + {% set exposure_values %} + {% for exposure in exposures -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ exposure.unique_id | replace("\'", "\'\'") }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ exposure.name | replace("\'", "\'\'") }}', {# name #} + '{{ exposure.type }}', {# type #} + '{{ tojson(exposure.owner) | replace("\'", "\'\'") }}', {# owner #} + '{{ exposure.maturity }}', {# maturity #} + '{{ exposure.original_file_path | replace("\'", "\'\'") }}', {# path #} + '{{ exposure.description | replace("\'", "\'\'") }}', {# description #} + '{{ exposure.url }}', {# url #} + '{{ exposure.package_name }}', {# package_name #} + '{{ tojson(exposure.depends_on.nodes) | replace("\'", "\'\'") }}', {# depends_on_nodes #} + '{{ tojson(exposure.tags) | replace("\'", "\'\'") }}', {# tags #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + cast(null as varchar) + {% else %} + '{{ tojson(exposure).replace("\'", "\'\'") }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ exposure_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_invocations.sql b/macros/upload_individual_datasets/upload_invocations.sql index 21c5574c..b5d8e3d1 100644 --- a/macros/upload_individual_datasets/upload_invocations.sql +++ b/macros/upload_individual_datasets/upload_invocations.sql @@ -221,3 +221,66 @@ {{ invocation_values }} {% endmacro -%} + +{% macro athena__get_invocations_dml_sql() -%} + {% set invocation_values %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ dbt_version }}', {# dbt_version #} + '{{ project_name }}', {# project_name #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ flags.WHICH }}', {# dbt_command #} + {{ flags.FULL_REFRESH }}, {# full_refresh_flag #} + '{{ target.profile_name }}', {# target_profile_name #} + '{{ target.name }}', {# target_name #} + '{{ target.schema }}', {# target_schema #} + {{ target.threads }}, {# target_threads #} + + '{{ env_var("DBT_CLOUD_PROJECT_ID", "") }}', {# dbt_cloud_project_id #} + '{{ env_var("DBT_CLOUD_JOB_ID", "") }}', {# dbt_cloud_job_id #} + '{{ env_var("DBT_CLOUD_RUN_ID", "") }}', {# dbt_cloud_run_id #} + '{{ env_var("DBT_CLOUD_RUN_REASON_CATEGORY", "") }}', {# dbt_cloud_run_reason_category #} + '{{ env_var("DBT_CLOUD_RUN_REASON", '') | replace("\'", "\'\'") }}', {# dbt_cloud_run_reason #} + + {% if var('env_vars', none) %} + {% set env_vars_dict = {} %} + {% for env_variable in var('env_vars') %} + {% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %} + {% endfor %} + '{{ tojson(env_vars_dict) | replace("\'", "\'\'") }}', {# env_vars #} + {% else %} + cast(null as varchar), {# env_vars #} + {% endif %} + + {% if var('dbt_vars', none) %} + {% set dbt_vars_dict = {} %} + {% for dbt_var in var('dbt_vars') %} + {% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %} + {% endfor %} + '{{ tojson(dbt_vars_dict) | replace("\'", "\'\'") }}', {# dbt_vars #} + {% else %} + cast(null as varchar), {# dbt_vars #} + {% endif %} + + {% if invocation_args_dict.vars %} + {# vars - different format for pre v1.5 (yaml vs list) #} + {% if invocation_args_dict.vars is string %} + {# BigQuery does not handle the yaml-string from "--vars" well, when passed to "parse_json". Workaround is to parse the string, and then "tojson" will properly format the dict as a json-object. #} + {% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %} + {% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %} + {% endif %} + {% endif %} + + '{{ tojson(invocation_args_dict) | replace("\'", "\'\'") }}', {# invocation_args #} + + {% set metadata_env = {} %} + {% for key, value in dbt_metadata_envs.items() %} + {% do metadata_env.update({key: value}) %} + {% endfor %} + '{{ tojson(metadata_env) | replace("\'", "\'\'") }}' {# dbt_custom_envs #} + ) + {% endset %} + {{ invocation_values }} + +{% endmacro -%} + diff --git a/macros/upload_individual_datasets/upload_model_executions.sql b/macros/upload_individual_datasets/upload_model_executions.sql index bca26fea..d89b9898 100644 --- a/macros/upload_individual_datasets/upload_model_executions.sql +++ b/macros/upload_individual_datasets/upload_model_executions.sql @@ -203,3 +203,45 @@ {{ return("") }} {% endif %} {%- endmacro %} + + +{% macro athena__get_model_executions_dml_sql(models) -%} + {% if models != [] %} + {% set model_execution_values %} + {% for model in models -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}cast('{{ compile_started_at }}' as timestamp(6)){% else %}cast(null as timestamp(6)){% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}cast('{{ query_completed_at }}' as timestamp(6)){% else %}cast(null as timestamp(6)){% endif %}, {# query_completed_at #} + + {{ model.execution_time }}, {# total_node_runtime #} + cast({{ model.adapter_response.rows_affected }} as integer), {# rows_affected #} + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + '{{ model.message | replace("\'", "\'\'") }}', {# message #} + '{{ tojson(model.adapter_response) | replace("\'", "\'\'") }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_models.sql b/macros/upload_individual_datasets/upload_models.sql index d8dc49fd..b6cad2da 100644 --- a/macros/upload_individual_datasets/upload_models.sql +++ b/macros/upload_individual_datasets/upload_models.sql @@ -124,3 +124,38 @@ {{ return("") }} {% endif %} {%- endmacro %} + +{% macro athena__get_models_dml_sql(models) -%} + {% if models != [] %} + {% set model_values %} + {% for model in models -%} + {% do model.pop('raw_code', None) %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ model.database }}', {# database #} + '{{ model.schema }}', {# schema #} + '{{ model.name }}', {# name #} + '{{ tojson(model.depends_on.nodes) }}', {# depends_on_nodes #} + '{{ model.package_name }}', {# package_name #} + '{{ model.original_file_path | replace("\\", "\\\\") }}', {# path #} + '{{ model.checksum.checksum }}', {# checksum #} + '{{ model.config.materialized }}', {# materialization #} + '{{ tojson(model.tags) }}', {# tags #} + '{{ model.config.meta | replace("\'", "\'\'") }}', {# meta #} + '{{ model.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + cast(null as varchar) + {% else %} + '{{ tojson(model) | replace("\'", "\'\'") }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_seed_executions.sql b/macros/upload_individual_datasets/upload_seed_executions.sql index 1ccbfe2a..d8b52b1c 100644 --- a/macros/upload_individual_datasets/upload_seed_executions.sql +++ b/macros/upload_individual_datasets/upload_seed_executions.sql @@ -217,3 +217,60 @@ {{ return("") }} {% endif %} {% endmacro -%} + +{% macro athena__get_seed_executions_dml_sql(seeds) -%} + {% if seeds != [] %} + {% set seed_execution_values %} + {% for model in seeds -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + + {% if model.timing != [] %} + {% for stage in model.timing if stage.name == "compile" %} + {% if loop.length == 0 %} + cast(null as timestamp(6)), {# compile_started_at #} + {% else %} + cast('{{ stage.started_at }}' as timestamp(6)), {# compile_started_at #} + {% endif %} + {% endfor %} + + {% for stage in model.timing if stage.name == "execute" %} + {% if loop.length == 0 %} + cast(null as timestamp(6)), {# query_completed_at #} + {% else %} + cast('{{ stage.completed_at }}' as timestamp(6)), {# query_completed_at #} + {% endif %} + {% endfor %} + {% else %} + cast(null as timestamp(6)), {# compile_started_at #} + cast(null as timestamp(6)), {# query_completed_at #} + {% endif %} + + {{ model.execution_time }}, {# total_node_runtime #} + cast({{ model.adapter_response.rows_affected }} as integer), {# rows_affected #} + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + '{{ model.message | replace("\'", "\'\'") }}', {# message #} + '{{ tojson(model.adapter_response) | replace("\'", "\'\'") }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ seed_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} diff --git a/macros/upload_individual_datasets/upload_seeds.sql b/macros/upload_individual_datasets/upload_seeds.sql index 32e67383..f8a3c543 100644 --- a/macros/upload_individual_datasets/upload_seeds.sql +++ b/macros/upload_individual_datasets/upload_seeds.sql @@ -109,3 +109,34 @@ {{ return("") }} {% endif %} {%- endmacro %} + +{% macro athena__get_seeds_dml_sql(seeds) -%} + {% if seeds != [] %} + {% set seed_values %} + {% for seed in seeds -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ seed.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ seed.database }}', {# database #} + '{{ seed.schema }}', {# schema #} + '{{ seed.name }}', {# name #} + '{{ seed.package_name }}', {# package_name #} + '{{ seed.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ seed.checksum.checksum }}', {# checksum #} + '{{ tojson(seed.config.meta) | replace("\'", "\'\'") }}', {# meta #} + '{{ seed.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + cast(null as varchar) + {% else %} + '{{ tojson(seed) | replace("\'", "\'\'") }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ seed_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_snapshot_executions.sql b/macros/upload_individual_datasets/upload_snapshot_executions.sql index 2006b168..97273d58 100644 --- a/macros/upload_individual_datasets/upload_snapshot_executions.sql +++ b/macros/upload_individual_datasets/upload_snapshot_executions.sql @@ -217,3 +217,60 @@ {{ return("") }} {% endif %} {% endmacro -%} + +{% macro athena__get_snapshot_executions_dml_sql(snapshots) -%} + {% if snapshots != [] %} + {% set snapshot_execution_values %} + {% for model in snapshots -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + + {% if model.timing != [] %} + {% for stage in model.timing if stage.name == "compile" %} + {% if loop.length == 0 %} + cast(null as timestamp(6)), {# compile_started_at #} + {% else %} + cast('{{ stage.started_at }}' as timestamp(6)), {# compile_started_at #} + {% endif %} + {% endfor %} + + {% for stage in model.timing if stage.name == "execute" %} + {% if loop.length == 0 %} + cast(null as timestamp(6)), {# query_completed_at #} + {% else %} + cast('{{ stage.completed_at }}' as timestamp(6)), {# query_completed_at #} + {% endif %} + {% endfor %} + {% else %} + cast(null as timestamp(6)), {# compile_started_at #} + cast(null as timestamp(6)), {# query_completed_at #} + {% endif %} + + {{ model.execution_time }}, {# total_node_runtime #} + cast({{ model.adapter_response.rows_affected }} as integer), {# rows_affected #} + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + '{{ model.message | replace("\'", "\'\'") }}', {# message #} + '{{ tojson(model.adapter_response) | replace("\'", "\'\'") }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ snapshot_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} diff --git a/macros/upload_individual_datasets/upload_snapshots.sql b/macros/upload_individual_datasets/upload_snapshots.sql index 0be6759b..846c63e0 100644 --- a/macros/upload_individual_datasets/upload_snapshots.sql +++ b/macros/upload_individual_datasets/upload_snapshots.sql @@ -119,3 +119,36 @@ {{ return("") }} {% endif %} {%- endmacro %} + +{% macro athena__get_snapshots_dml_sql(snapshots) -%} + {% if snapshots != [] %} + {% set snapshot_values %} + {% for snapshot in snapshots -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ snapshot.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ snapshot.database }}', {# database #} + '{{ snapshot.schema }}', {# schema #} + '{{ snapshot.name }}', {# name #} + '{{ tojson(snapshot.depends_on.nodes) | replace("\'", "\'\'") }}', {# depends_on_nodes #} + '{{ snapshot.package_name }}', {# package_name #} + '{{ snapshot.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ snapshot.checksum.checksum }}', {# checksum #} + '{{ snapshot.config.strategy }}', {# strategy #} + '{{ tojson(snapshot.config.meta) | replace("\'", "\'\'") }}', {# meta #} + '{{ snapshot.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + cast(null as varchar) + {% else %} + '{{ tojson(snapshot) | replace("\'", "\'\'") }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ snapshot_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_sources.sql b/macros/upload_individual_datasets/upload_sources.sql index dbcb49e4..e2f6cb67 100644 --- a/macros/upload_individual_datasets/upload_sources.sql +++ b/macros/upload_individual_datasets/upload_sources.sql @@ -109,3 +109,34 @@ {{ return("") }} {% endif %} {%- endmacro %} + +{% macro athena__get_sources_dml_sql(sources) -%} + {% if sources != [] %} + {% set source_values %} + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ source.database }}', {# database #} + '{{ source.schema }}', {# schema #} + '{{ source.source_name }}', {# source_name #} + '{{ source.loader }}', {# loader #} + '{{ source.name }}', {# name #} + '{{ source.identifier }}', {# identifier #} + '{{ source.loaded_at_field | replace("\'", "\'\'") }}', {# loaded_at_field #} + '{{ tojson(source.freshness) | replace("\'", "\'\'") }}', {# freshness #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + cast(null as varchar) + {% else %} + '{{ tojson(source) | replace("\'", "\'\'") }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ source_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql index ea3553ae..9d4108f7 100644 --- a/macros/upload_individual_datasets/upload_test_executions.sql +++ b/macros/upload_individual_datasets/upload_test_executions.sql @@ -148,3 +148,58 @@ {{ return("") }} {% endif %} {% endmacro -%} + +{% macro athena__get_test_executions_dml_sql(tests) -%} + {% if tests != [] %} + {% set test_execution_values %} + {% for test in tests -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ test.node.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + + {% set config_full_refresh = test.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ test.thread_id }}', {# thread_id #} + '{{ test.status }}', {# status #} + + {% if test.timing != [] %} + {% for stage in test.timing if stage.name == "compile" %} + {% if loop.length == 0 %} + cast(null as timestamp(6)), {# compile_started_at #} + {% else %} + cast('{{ stage.started_at }}' as timestamp(6)), {# compile_started_at #} + {% endif %} + {% endfor %} + + {% for stage in test.timing if stage.name == "execute" %} + {% if loop.length == 0 %} + cast(null as timestamp(6)), {# query_completed_at #} + {% else %} + cast('{{ stage.completed_at }}' as timestamp(6)), {# query_completed_at #} + {% endif %} + {% endfor %} + {% else %} + cast(null as timestamp(6)), {# compile_started_at #} + cast(null as timestamp(6)), {# query_completed_at #} + {% endif %} + + {{ test.execution_time }}, {# total_node_runtime #} + cast(null as integer), {# rows_affected #} + {{ 'cast(null as integer)' if test.failures is none else test.failures }}, {# failures #} + '{{ test.message | replace("\'", "\'\'") }}', {# message #} + '{{ tojson(test.adapter_response) | replace("\'", "\'\'") }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + + {%- endfor %} + {% endset %} + {{ test_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} diff --git a/macros/upload_individual_datasets/upload_tests.sql b/macros/upload_individual_datasets/upload_tests.sql index cf75e9ba..a24b713f 100644 --- a/macros/upload_individual_datasets/upload_tests.sql +++ b/macros/upload_individual_datasets/upload_tests.sql @@ -97,3 +97,31 @@ {{ return("") }} {% endif %} {%- endmacro %} + +{% macro athena__get_tests_dml_sql(tests) -%} + {% if tests != [] %} + {% set test_values %} + {% for test in tests -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ test.unique_id }}', {# node_id #} + cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #} + '{{ test.name }}', {# name #} + '{{ tojson(test.depends_on.nodes) | replace("\'", "\'\'") }}', {# depends_on_nodes #} + '{{ test.package_name }}', {# package_name #} + '{{ test.original_file_path | replace('\\', '\\\\') }}', {# test_path #} + '{{ tojson(test.tags) | replace("\'", "\'\'") }}', {# tags #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + cast(null as varchar) + {% else %} + '{{ tojson(test) | replace("\'", "\'\'") }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ test_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_results/insert_into_metadata_table.sql b/macros/upload_results/insert_into_metadata_table.sql index 24f1eb77..259862ac 100644 --- a/macros/upload_results/insert_into_metadata_table.sql +++ b/macros/upload_results/insert_into_metadata_table.sql @@ -59,5 +59,17 @@ {%- endmacro %} +{% macro athena__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + values + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + {% macro default__insert_into_metadata_table(relation, fields, content) -%} {%- endmacro %} diff --git a/tox.ini b/tox.ini index 542d6e21..bd2e18b9 100644 --- a/tox.ini +++ b/tox.ini @@ -69,6 +69,9 @@ profiles_dir = integration_test_project [testenv] passenv = + AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY + AWS_DEFAULT_REGION DBT_PROFILES_DIR GITHUB_SHA_OVERRIDE GITHUB_SHA @@ -85,6 +88,8 @@ passenv = DBT_ENV_SECRET_GCP_PROJECT DBT_ENV_SPARK_DRIVER_PATH DBT_ENV_SPARK_ENDPOINT + DBT_ENV_ATHENA_S3_STAGING + DBT_ENV_ATHENA_S3_DATA GOOGLE_APPLICATION_CREDENTIALS DBT_CLOUD_PROJECT_ID DBT_CLOUD_JOB_ID @@ -265,6 +270,14 @@ commands = dbt deps dbt build --target bigquery --vars '"my_var": "my value"' +[testenv:integration_test_athena] +changedir = integration_test_project +deps = dbt-athena-community~=1.7.0 +commands = + dbt clean + dbt deps + dbt build --target athena --vars '"my_var": "my value"' + # Spark integration test (disabled) [testenv:integration_spark] changedir = integration_test_project