Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Amazon Athena #408

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ clean-targets: # folders to be removed by `dbt clean`
models:
dbt_artifacts:
+materialized: view
+file_format: delta
+file_format: "{% if target.type == 'athena' %}parquet[% else %}delta{% endif %}"
+table_type: "{% if target.type == 'athena' %}iceberg{% endif %}"
+incremental_strategy: "{% if target.type == 'athena' %}append{% endif %}"
sources:
+materialized: incremental
+on_schema_change: append_new_columns
+full_refresh: false
+persist_docs:
# Databricks doesn't offer column-level support for persisting docs
columns: '{{ target.name != "databricks" }}'

3 changes: 2 additions & 1 deletion integration_test_project/example-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ export DBT_ENV_SECRET_DATABRICKS_TOKEN=
export DBT_ENV_SECRET_GCP_PROJECT=
export DBT_ENV_SPARK_DRIVER_PATH= # /Library/simba/spark/lib/libsparkodbc_sbu.dylib on a Mac
export DBT_ENV_SPARK_ENDPOINT= # The endpoint ID from the Databricks HTTP path

export DBT_ENV_ATHENA_S3_STAGING=
export DBT_ENV_ATHENA_S3_DATA=
# dbt environment variables, change these
export DBT_VERSION="1_5_0"
export DBT_CLOUD_PROJECT_ID=
Expand Down
14 changes: 13 additions & 1 deletion integration_test_project/models/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
{% if target.name == 'athena' %}
{{
config(
materialized='incremental',
unique_key='id',
meta={"meta_field": "description with an ' apostrophe"},
table_type='iceberg',
file_format='parquet',
incremental_strategy='merge'
)
}}

{% else %}
{{
config(
materialized='incremental',
unique_key='id',
meta={"meta_field": "description with an ' apostrophe"},
)
}}
{% endif %}
-- {{ source('dummy_source', '"GROUP"') }}

select
Expand Down
3 changes: 3 additions & 0 deletions integration_test_project/package-lock.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
packages:
- local: ../
sha1_hash: 570155077da10b186bf3640e96c2b6746d0fc6dc
11 changes: 11 additions & 0 deletions integration_test_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,14 @@ dbt_artifacts:
dbname: postgres
schema: public
threads: 8
athena:
type: athena
s3_staging_dir: "{{ env_var('DBT_ENV_ATHENA_S3_STAGING') }}"
s3_data_dir: "{{ env_var('DBT_ENV_ATHENA_S3_DATA') }}"
s3_data_naming: schema_table_unique
region_name: "{{ env_var('AWS_DEFAULT_REGION') }}"
schema: public
database: awsdatacatalog
threads: 8
seed_s3_upload_args:
ACL: bucket-owner-full-control
12 changes: 12 additions & 0 deletions macros/database_specific_helpers/type_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@
{{ return(api.Column.translate_type("boolean")) }}
{% endmacro %}

{#- TIMESTAMP -#}
{% macro type_timestamp() %}
{{ return(adapter.dispatch('type_timestamp', 'dbt_artifacts')()) }}
{% endmacro %}

{% macro default__type_timestamp() %}
{{ return(api.Column.translate_type("timestamp")) }}
{% endmacro %}

{% macro athena__type_timestamp() %}
timestamp(6)
{% endmacro %}
{#- JSON -#}

{% macro type_json() %}
Expand Down
34 changes: 34 additions & 0 deletions macros/upload_individual_datasets/upload_exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,37 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro athena__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}

{% set exposure_values %}
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ exposure.unique_id | replace("\'", "\'\'") }}', {# node_id #}
cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #}
'{{ exposure.name | replace("\'", "\'\'") }}', {# name #}
'{{ exposure.type }}', {# type #}
'{{ tojson(exposure.owner) | replace("\'", "\'\'") }}', {# owner #}
'{{ exposure.maturity }}', {# maturity #}
'{{ exposure.original_file_path | replace("\'", "\'\'") }}', {# path #}
'{{ exposure.description | replace("\'", "\'\'") }}', {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
'{{ tojson(exposure.depends_on.nodes) | replace("\'", "\'\'") }}', {# depends_on_nodes #}
'{{ tojson(exposure.tags) | replace("\'", "\'\'") }}', {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
cast(null as varchar)
{% else %}
'{{ tojson(exposure).replace("\'", "\'\'") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
63 changes: 63 additions & 0 deletions macros/upload_individual_datasets/upload_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,66 @@
{{ invocation_values }}

{% endmacro -%}

{% macro athena__get_invocations_dml_sql() -%}
{% set invocation_values %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_version }}', {# dbt_version #}
'{{ project_name }}', {# project_name #}
cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #}
'{{ flags.WHICH }}', {# dbt_command #}
{{ flags.FULL_REFRESH }}, {# full_refresh_flag #}
'{{ target.profile_name }}', {# target_profile_name #}
'{{ target.name }}', {# target_name #}
'{{ target.schema }}', {# target_schema #}
{{ target.threads }}, {# target_threads #}

'{{ env_var("DBT_CLOUD_PROJECT_ID", "") }}', {# dbt_cloud_project_id #}
'{{ env_var("DBT_CLOUD_JOB_ID", "") }}', {# dbt_cloud_job_id #}
'{{ env_var("DBT_CLOUD_RUN_ID", "") }}', {# dbt_cloud_run_id #}
'{{ env_var("DBT_CLOUD_RUN_REASON_CATEGORY", "") }}', {# dbt_cloud_run_reason_category #}
'{{ env_var("DBT_CLOUD_RUN_REASON", '') | replace("\'", "\'\'") }}', {# dbt_cloud_run_reason #}

{% if var('env_vars', none) %}
{% set env_vars_dict = {} %}
{% for env_variable in var('env_vars') %}
{% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %}
{% endfor %}
'{{ tojson(env_vars_dict) | replace("\'", "\'\'") }}', {# env_vars #}
{% else %}
cast(null as varchar), {# env_vars #}
{% endif %}

{% if var('dbt_vars', none) %}
{% set dbt_vars_dict = {} %}
{% for dbt_var in var('dbt_vars') %}
{% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %}
{% endfor %}
'{{ tojson(dbt_vars_dict) | replace("\'", "\'\'") }}', {# dbt_vars #}
{% else %}
cast(null as varchar), {# dbt_vars #}
{% endif %}

{% if invocation_args_dict.vars %}
{# vars - different format for pre v1.5 (yaml vs list) #}
{% if invocation_args_dict.vars is string %}
{# BigQuery does not handle the yaml-string from "--vars" well, when passed to "parse_json". Workaround is to parse the string, and then "tojson" will properly format the dict as a json-object. #}
{% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %}
{% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %}
{% endif %}
{% endif %}

'{{ tojson(invocation_args_dict) | replace("\'", "\'\'") }}', {# invocation_args #}

{% set metadata_env = {} %}
{% for key, value in dbt_metadata_envs.items() %}
{% do metadata_env.update({key: value}) %}
{% endfor %}
'{{ tojson(metadata_env) | replace("\'", "\'\'") }}' {# dbt_custom_envs #}
)
{% endset %}
{{ invocation_values }}

{% endmacro -%}

42 changes: 42 additions & 0 deletions macros/upload_individual_datasets/upload_model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,45 @@
{{ return("") }}
{% endif %}
{%- endmacro %}


{% macro athena__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}cast('{{ compile_started_at }}' as timestamp(6)){% else %}cast(null as timestamp(6)){% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}cast('{{ query_completed_at }}' as timestamp(6)){% else %}cast(null as timestamp(6)){% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}
cast({{ model.adapter_response.rows_affected }} as integer), {# rows_affected #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("\'", "\'\'") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("\'", "\'\'") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
35 changes: 35 additions & 0 deletions macros/upload_individual_datasets/upload_models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,38 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro athena__get_models_dml_sql(models) -%}
{% if models != [] %}
{% set model_values %}
{% for model in models -%}
{% do model.pop('raw_code', None) %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.unique_id }}', {# node_id #}
cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #}
'{{ model.database }}', {# database #}
'{{ model.schema }}', {# schema #}
'{{ model.name }}', {# name #}
'{{ tojson(model.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ model.package_name }}', {# package_name #}
'{{ model.original_file_path | replace("\\", "\\\\") }}', {# path #}
'{{ model.checksum.checksum }}', {# checksum #}
'{{ model.config.materialized }}', {# materialization #}
'{{ tojson(model.tags) }}', {# tags #}
'{{ model.config.meta | replace("\'", "\'\'") }}', {# meta #}
'{{ model.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
cast(null as varchar)
{% else %}
'{{ tojson(model) | replace("\'", "\'\'") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
57 changes: 57 additions & 0 deletions macros/upload_individual_datasets/upload_seed_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,60 @@
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro athena__get_seed_executions_dml_sql(seeds) -%}
{% if seeds != [] %}
{% set seed_execution_values %}
{% for model in seeds -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% if model.timing != [] %}
{% for stage in model.timing if stage.name == "compile" %}
{% if loop.length == 0 %}
cast(null as timestamp(6)), {# compile_started_at #}
{% else %}
cast('{{ stage.started_at }}' as timestamp(6)), {# compile_started_at #}
{% endif %}
{% endfor %}

{% for stage in model.timing if stage.name == "execute" %}
{% if loop.length == 0 %}
cast(null as timestamp(6)), {# query_completed_at #}
{% else %}
cast('{{ stage.completed_at }}' as timestamp(6)), {# query_completed_at #}
{% endif %}
{% endfor %}
{% else %}
cast(null as timestamp(6)), {# compile_started_at #}
cast(null as timestamp(6)), {# query_completed_at #}
{% endif %}

{{ model.execution_time }}, {# total_node_runtime #}
cast({{ model.adapter_response.rows_affected }} as integer), {# rows_affected #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("\'", "\'\'") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("\'", "\'\'") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ seed_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}
31 changes: 31 additions & 0 deletions macros/upload_individual_datasets/upload_seeds.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,34 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro athena__get_seeds_dml_sql(seeds) -%}
{% if seeds != [] %}
{% set seed_values %}
{% for seed in seeds -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ seed.unique_id }}', {# node_id #}
cast('{{ run_started_at }}' as timestamp(6)), {# run_started_at #}
'{{ seed.database }}', {# database #}
'{{ seed.schema }}', {# schema #}
'{{ seed.name }}', {# name #}
'{{ seed.package_name }}', {# package_name #}
'{{ seed.original_file_path | replace('\\', '\\\\') }}', {# path #}
'{{ seed.checksum.checksum }}', {# checksum #}
'{{ tojson(seed.config.meta) | replace("\'", "\'\'") }}', {# meta #}
'{{ seed.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
cast(null as varchar)
{% else %}
'{{ tojson(seed) | replace("\'", "\'\'") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ seed_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
Loading
Loading