Skip to content

Commit

Permalink
Updates to use closed_at instead of batch_run_date
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Aug 5, 2024
1 parent 8fe5708 commit 4e02dd5
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 234 deletions.
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/account_signers_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ with
, s.weight
, s.sponsor
, s.last_modified_ledger
, l.closed_at
, s.closed_at
, s.ledger_entry_change
, s.deleted
-- table only has natural keys, creating a primary key
Expand All @@ -33,16 +33,10 @@ with
order by s.last_modified_ledger desc, s.ledger_entry_change desc
) as row_nr
from {{ ref('stg_account_signers') }} as s
join {{ ref('stg_history_ledgers') }} as l
on s.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
s.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(s.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
s.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)
select
Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/accounts_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ with
, a.threshold_high
, a.last_modified_ledger
, a.ledger_entry_change
, l.closed_at
, a.closed_at
, a.deleted
, a.sponsor
, a.sequence_ledger
Expand All @@ -47,16 +47,10 @@ with
)
as row_nr
from {{ ref('stg_accounts') }} as a
join {{ ref('stg_history_ledgers') }} as l
on a.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
a.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(a.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
a.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
config(
tags = ["current_state"],
materialized = "incremental",
unique_key = "balance_id"
unique_key = "balance_id",
cluster_by = "balance_id"
)
}}
/* Returns the latest state of each claimable balance in the `claimable_balances` table.
Expand Down Expand Up @@ -37,7 +38,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cb.batch_run_date >= date_sub(current_date(), interval 2 day)
cb.closed_at >= timestamp_sub(current_timestamp(), interval 2 day)
{% endif %}
)

Expand Down
32 changes: 16 additions & 16 deletions models/marts/ledger_current_state/claimable_balances_current.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ models:
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: claimants
description: '{{ doc("claimants") }}'
Expand All @@ -28,58 +28,58 @@ models:
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: asset_code
description: '{{ doc("asset_code") }}'
tests:
- not_null:
config:
where: asset_type != 'native'
and batch_run_date > current_datetime - interval 2 day
and closed_at > current_timestamp - interval 2 day

- name: asset_issuer
description: '{{ doc("asset_issuer") }}'
tests:
- not_null:
config:
where: asset_type != 'native'
and batch_run_date > current_datetime - interval 2 day
and closed_at > current_timestamp - interval 2 day

- name: asset_amount
description: '{{ doc("asset_amount") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: sponsor
description: '{{ doc("sponsor") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: flags
description: '{{ doc("flags_accounts_balances") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: last_modified_ledger
description: '{{ doc("last_modified_ledger") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: ledger_entry_change
description: '{{ doc("ledger_entry_change") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day
- accepted_values:
values: [0, 1, 2]
quote: false
Expand All @@ -89,46 +89,46 @@ models:
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: batch_id
description: '{{ doc("batch_id") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: batch_run_date
description: '{{ doc("batch_run_date") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: closed_at
description: '{{ doc("closed_at") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: ledger_sequence
description: '{{ doc("ledger_sequence") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: upstream_insert_ts
description: '{{ doc("upstream_insert_ts") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: batch_insert_ts
description: '{{ doc("batch_insert_ts") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cfg.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cfg.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
cfg.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
5 changes: 1 addition & 4 deletions models/marts/ledger_current_state/contract_code_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cc.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cc.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
cc.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
5 changes: 1 addition & 4 deletions models/marts/ledger_current_state/contract_data_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cd.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cd.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
cd.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/liquidity_pools_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ with
, lp.asset_b_amount
, lp.last_modified_ledger
, lp.ledger_entry_change
, l.closed_at
, lp.closed_at
, lp.deleted
, lp.batch_run_date
, lp.batch_insert_ts
Expand All @@ -43,16 +43,10 @@ with
order by lp.last_modified_ledger desc, lp.ledger_entry_change desc
) as row_nr
from {{ ref('stg_liquidity_pools') }} as lp
join {{ ref('stg_history_ledgers') }} as l
on lp.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
lp.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(lp.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
lp.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}

)
Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/offers_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ with
, o.price
, o.flags
, o.last_modified_ledger
, l.closed_at
, o.closed_at
, o.ledger_entry_change
, o.deleted
, o.sponsor
Expand All @@ -37,16 +37,10 @@ with
order by o.last_modified_ledger desc, o.ledger_entry_change desc
) as row_nr
from {{ ref('stg_offers') }} as o
join {{ ref('stg_history_ledgers') }} as l
on o.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
o.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(o.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
o.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}

)
Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/trust_lines_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ with
, tl.trust_line_limit
, tl.last_modified_ledger
, tl.ledger_entry_change
, l.closed_at
, tl.closed_at
, tl.deleted
-- table only has natural keys, creating a primary key
, concat(tl.account_id, '-', tl.asset_code, '-', tl.asset_issuer, '-', tl.liquidity_pool_id
Expand All @@ -40,16 +40,10 @@ with
order by tl.last_modified_ledger desc, tl.ledger_entry_change desc
) as row_nr
from {{ ref('stg_trust_lines') }} as tl
join {{ ref('stg_history_ledgers') }} as l
on tl.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
tl.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(tl.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
tl.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}

)
Expand Down
5 changes: 1 addition & 4 deletions models/marts/ledger_current_state/ttl_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
ttl.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(ttl.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
ttl.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
Loading

0 comments on commit 4e02dd5

Please sign in to comment.