diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 0648e38..1903046 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -33,11 +33,11 @@ with , batch_insert_ts from {{ ref('stg_history_ledgers') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + batch_run_date < date_add(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 60 minute) + and closed_at < date_add(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and batch_run_date >= TIMESTAMP '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch + and closed_at >= date_sub(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% endif %} ) @@ -70,11 +70,11 @@ with , batch_insert_ts from {{ ref('stg_history_transactions') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + batch_run_date < date_add(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 60 minute) + and closed_at < date_add(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and batch_run_date >= TIMESTAMP '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch + and closed_at >= date_sub(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% endif %} ) @@ -190,11 +190,11 @@ with , batch_insert_ts from {{ ref('stg_history_operations') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + batch_run_date < date_add(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 60 minute) + and closed_at < date_add(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and batch_run_date >= TIMESTAMP '{{ dbt_airflow_macros.ts() }}' -- batch run is the min bound of a batch + and closed_at >= date_sub(TIMESTAMP '{{ dbt_airflow_macros.ts() }}', interval 30 minute) {% endif %} )