diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 83ab15a..0648e38 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -33,11 +33,11 @@ with , batch_insert_ts from {{ ref('stg_history_ledgers') }} where - batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) - and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) + and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% if is_incremental() %} - and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch - and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch + and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% endif %} ) @@ -70,11 +70,11 @@ with , batch_insert_ts from {{ ref('stg_history_transactions') }} where - batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) - and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) + and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% if is_incremental() %} - and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch - and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch + and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% endif %} ) @@ -190,11 +190,11 @@ with , batch_insert_ts from {{ ref('stg_history_operations') }} where - batch_run_date < date_add('{{ dbt_airflow_macros.ts() }}', interval 60 minute) - and closed_at < date_add('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) + and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% if is_incremental() %} - and batch_run_date >= '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch - and closed_at >= date_sub('{{ dbt_airflow_macros.ts() }}', interval 30 minute) + and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch + and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) {% endif %} )