diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 0648e38..dd63f48 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -33,11 +33,11 @@ with , batch_insert_ts from {{ ref('stg_history_ledgers') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + cast(batch_run_date as datetime) < date_add(datetime('{{ dbt_airflow_macros.ts() }}'), interval 60 minute) + and datetime(closed_at) < date_add(datetime('{{ dbt_airflow_macros.ts() }}'), interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and cast(batch_run_date as datetime) >= datetime('{{ dbt_airflow_macros.ts() }}') -- batch run is the min bound of a batch + and datetime(closed_at) >= date_sub(datetime('{{ dbt_airflow_macros.ts() }}'), interval 30 minute) {% endif %} ) @@ -70,11 +70,11 @@ with , batch_insert_ts from {{ ref('stg_history_transactions') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + cast(batch_run_date as datetime) < date_add(datetime('{{ dbt_airflow_macros.ts() }}'), interval 60 minute) + and datetime(closed_at) < date_add(datetime('{{ dbt_airflow_macros.ts() }}'), interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and cast(batch_run_date as datetime) >= datetime('{{ dbt_airflow_macros.ts() }}') -- batch run is the min bound of a batch + and datetime(closed_at) >= date_sub(datetime('{{ dbt_airflow_macros.ts() }}'), interval 30 minute) {% endif %} ) @@ -190,11 +190,11 @@ with , batch_insert_ts from {{ ref('stg_history_operations') }} where - cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day) - and date(closed_at) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + cast(batch_run_date as datetime) < date_add(datetime('{{ dbt_airflow_macros.ts() }}'), interval 60 minute) + and datetime(closed_at) < date_add(datetime('{{ dbt_airflow_macros.ts() }}'), interval 30 minute) {% if is_incremental() %} - and cast(batch_run_date as date) >= date('{{ dbt_airflow_macros.ds() }}') -- batch run is the min bound of a batch - and date(closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 1 day) + and cast(batch_run_date as datetime) >= datetime('{{ dbt_airflow_macros.ts() }}') -- batch run is the min bound of a batch + and datetime(closed_at) >= date_sub(datetime('{{ dbt_airflow_macros.ts() }}'), interval 30 minute) {% endif %} )