From f49cc187f8c2108a7a024c7bf25e69e3aa0ef509 Mon Sep 17 00:00:00 2001 From: PerriLucas <90099396+PerriLucas@users.noreply.github.com> Date: Mon, 11 Dec 2023 11:01:27 -0300 Subject: [PATCH 1/4] Update enriched_history_operations.sql added missing endif to incremental logic --- models/marts/enriched_history_operations.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 35d4e32..0648e38 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -38,6 +38,7 @@ with {% if is_incremental() %} and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + {% endif %} ) , history_transactions as ( @@ -194,6 +195,7 @@ with {% if is_incremental() %} and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + {% endif %} ) From c05da4123afd5c3040adec6370b0b42d0d2a6898 Mon Sep 17 00:00:00 2001 From: lucas zanotelli Date: Mon, 11 Dec 2023 16:27:46 -0300 Subject: [PATCH 2/4] testing datetime interval --- models/marts/enriched_history_operations.sql | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 0648e38..83ab15a 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -33,11 +33,11 @@ with , batch_insert_ts from {{ ref('stg_history_ledgers') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) + and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch + and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% endif %} ) @@ -70,11 +70,11 @@ with , batch_insert_ts from {{ ref('stg_history_transactions') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) + and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch + and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% endif %} ) @@ -190,11 +190,11 @@ with , batch_insert_ts from {{ ref('stg_history_operations') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) + and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch + and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% endif %} ) From 7580f1ea53fe661541bf2ab574399d1219e105f1 Mon Sep 17 00:00:00 2001 From: Lucas Perri Date: Tue, 12 Dec 2023 08:41:37 -0300 Subject: [PATCH 3/4] revert --- models/marts/enriched_history_operations.sql | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 83ab15a..0648e38 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -33,11 +33,11 @@ with , batch_insert_ts from {{ ref('stg_history_ledgers') }} where - batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) - and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) + and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% if is_incremental() %} - and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch - and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch + and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% endif %} ) @@ -70,11 +70,11 @@ with , batch_insert_ts from {{ ref('stg_history_transactions') }} where - batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) - and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) + and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% if is_incremental() %} - and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch - and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch + and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% endif %} ) @@ -190,11 +190,11 @@ with , batch_insert_ts from {{ ref('stg_history_operations') }} where - batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) - and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) + and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% if is_incremental() %} - and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch - and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch + and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% endif %} ) From fa94bcdcfbebdea86d29fff23b7d6a62bbf9dea3 Mon Sep 17 00:00:00 2001 From: Lucas Perri Date: Wed, 13 Dec 2023 12:53:10 -0300 Subject: [PATCH 4/4] partition fee_stats --- models/marts/fee_stats_agg.sql | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/models/marts/fee_stats_agg.sql b/models/marts/fee_stats_agg.sql index fe9d99c..dabbd65 100644 --- a/models/marts/fee_stats_agg.sql +++ b/models/marts/fee_stats_agg.sql @@ -1,6 +1,10 @@ {{ config( materialized='incremental', - unique_key=["day_agg"] + unique_key=["day_agg"], + partition_by={ + "field": "day_agg" + , "data_type": "date" + , "granularity": "month"} ) }}