Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to use closed_at instead of batch_run_date #73

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/account_signers_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ with
, s.weight
, s.sponsor
, s.last_modified_ledger
, l.closed_at
, s.closed_at
, s.ledger_entry_change
, s.deleted
-- table only has natural keys, creating a primary key
Expand All @@ -33,16 +33,10 @@ with
order by s.last_modified_ledger desc, s.ledger_entry_change desc
) as row_nr
from {{ ref('stg_account_signers') }} as s
join {{ ref('stg_history_ledgers') }} as l
on s.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
s.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(s.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
s.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)
select
Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/accounts_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ with
, a.threshold_high
, a.last_modified_ledger
, a.ledger_entry_change
, l.closed_at
, a.closed_at
, a.deleted
, a.sponsor
, a.sequence_ledger
Expand All @@ -47,16 +47,10 @@ with
)
as row_nr
from {{ ref('stg_accounts') }} as a
join {{ ref('stg_history_ledgers') }} as l
on a.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
a.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(a.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
a.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
config(
tags = ["current_state"],
materialized = "incremental",
unique_key = "balance_id"
unique_key = "balance_id",
cluster_by = "balance_id"
)
}}
/* Returns the latest state of each claimable balance in the `claimable_balances` table.
Expand Down Expand Up @@ -37,7 +38,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cb.batch_run_date >= date_sub(current_date(), interval 2 day)
cb.closed_at >= timestamp_sub(current_timestamp(), interval 2 day)
{% endif %}
)

Expand Down
32 changes: 16 additions & 16 deletions models/marts/ledger_current_state/claimable_balances_current.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ models:
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: claimants
description: '{{ doc("claimants") }}'
Expand All @@ -28,58 +28,58 @@ models:
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: asset_code
description: '{{ doc("asset_code") }}'
tests:
- not_null:
config:
where: asset_type != 'native'
and batch_run_date > current_datetime - interval 2 day
and closed_at > current_timestamp - interval 2 day

- name: asset_issuer
description: '{{ doc("asset_issuer") }}'
tests:
- not_null:
config:
where: asset_type != 'native'
and batch_run_date > current_datetime - interval 2 day
and closed_at > current_timestamp - interval 2 day

- name: asset_amount
description: '{{ doc("asset_amount") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: sponsor
description: '{{ doc("sponsor") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: flags
description: '{{ doc("flags_accounts_balances") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: last_modified_ledger
description: '{{ doc("last_modified_ledger") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: ledger_entry_change
description: '{{ doc("ledger_entry_change") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day
- accepted_values:
values: [0, 1, 2]
quote: false
Expand All @@ -89,46 +89,46 @@ models:
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: batch_id
description: '{{ doc("batch_id") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: batch_run_date
description: '{{ doc("batch_run_date") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: closed_at
description: '{{ doc("closed_at") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: ledger_sequence
description: '{{ doc("ledger_sequence") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: upstream_insert_ts
description: '{{ doc("upstream_insert_ts") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day

- name: batch_insert_ts
description: '{{ doc("batch_insert_ts") }}'
tests:
- not_null:
config:
where: batch_run_date > current_datetime - interval 2 day
where: closed_at > current_timestamp - interval 2 day
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cfg.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cfg.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
cfg.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cc.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cc.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
cc.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cd.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cd.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
cd.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/liquidity_pools_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ with
, lp.asset_b_amount
, lp.last_modified_ledger
, lp.ledger_entry_change
, l.closed_at
, lp.closed_at
, lp.deleted
, lp.batch_run_date
, lp.batch_insert_ts
Expand All @@ -43,16 +43,10 @@ with
order by lp.last_modified_ledger desc, lp.ledger_entry_change desc
) as row_nr
from {{ ref('stg_liquidity_pools') }} as lp
join {{ ref('stg_history_ledgers') }} as l
on lp.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
lp.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(lp.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
lp.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}

)
Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/offers_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ with
, o.price
, o.flags
, o.last_modified_ledger
, l.closed_at
, o.closed_at
, o.ledger_entry_change
, o.deleted
, o.sponsor
Expand All @@ -37,16 +37,10 @@ with
order by o.last_modified_ledger desc, o.ledger_entry_change desc
) as row_nr
from {{ ref('stg_offers') }} as o
join {{ ref('stg_history_ledgers') }} as l
on o.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched
where
o.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(o.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
o.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}

)
Expand Down
10 changes: 2 additions & 8 deletions models/marts/ledger_current_state/trust_lines_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ with
, tl.trust_line_limit
, tl.last_modified_ledger
, tl.ledger_entry_change
, l.closed_at
, tl.closed_at
, tl.deleted
-- table only has natural keys, creating a primary key
, concat(tl.account_id, '-', tl.asset_code, '-', tl.asset_issuer, '-', tl.liquidity_pool_id
Expand All @@ -40,16 +40,10 @@ with
order by tl.last_modified_ledger desc, tl.ledger_entry_change desc
) as row_nr
from {{ ref('stg_trust_lines') }} as tl
join {{ ref('stg_history_ledgers') }} as l
on tl.last_modified_ledger = l.sequence

{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
tl.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(tl.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
tl.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}

)
Expand Down
5 changes: 1 addition & 4 deletions models/marts/ledger_current_state/ttl_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
ttl.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(ttl.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
ttl.closed_at >= timestamp_sub(current_timestamp(), interval 7 day)
{% endif %}
)

Expand Down
Loading
Loading