diff --git a/models/docs/marts/ledger_current_state/account_signers_current.md b/models/docs/marts/ledger_current_state/account_signers_current.md new file mode 100644 index 0000000..d5e6b99 --- /dev/null +++ b/models/docs/marts/ledger_current_state/account_signers_current.md @@ -0,0 +1,11 @@ +[comment]: < Account Signers Current - + +{% docs account_signers_current %} + +The `account_signers_current` table is a nightly snapshotted table that represents the current status of all account signers associated with an account. The table returns the latest account entry, which is defined as the highest `last_modified_ledger` per account signer on the Stellar Network. Deleted signers are included in the table. For full state history, please use the `account_signers` table. + +**The `account_signers_current` table is only updated nightly. Intraday ledger state changes are not captured in the table.** + +The `account_signers_current` table may be joined to the `accounts` table in order to find out more information about the originating account. The signers table has a many-to-one relationship with the accounts table, so you should expect multiple records to be returned for each `account_id`. + +{% enddocs %} \ No newline at end of file diff --git a/models/docs/marts/ledger_current_state/accounts_current.md b/models/docs/marts/ledger_current_state/accounts_current.md new file mode 100644 index 0000000..c497c81 --- /dev/null +++ b/models/docs/marts/ledger_current_state/accounts_current.md @@ -0,0 +1,11 @@ +[comment]: < Accounts Current - + +{% docs accounts_current %} + +The `accounts_current` table is a nightly snapshotted table that represents the current state of account ledger entries. The table returns the latest account entry, which is defined as the highest `last_modified_ledger` per account on the Stellar Network. Deleted accounts are included in the table. For full state history, please use the `accounts` table. + +**The `accounts_current` table is only updated nightly. Intraday ledger state changes are not captured in the table.** + +As a reminder, account ledger entries store detailed information for a given account, including current account status, preconditions for transaction authorization, security settings and account balance. The balance reported in the accounts table reflects the account’s XLM balance only. All other asset balances are reported in the `trust_lines` table. + +{% enddocs %} \ No newline at end of file diff --git a/models/docs/marts/ledger_current_state/liquidity_pools_current.md b/models/docs/marts/ledger_current_state/liquidity_pools_current.md new file mode 100644 index 0000000..93c7b54 --- /dev/null +++ b/models/docs/marts/ledger_current_state/liquidity_pools_current.md @@ -0,0 +1,15 @@ +[comment]: < Liquidity Pools Current - + +{% docs liquidity_pools_current %} + +The `liquidity_pools_current` table is a nightly snapshotted table that represents the current status of all liquidity pools. The table returns the latest pool entry, which is defined as the highest `last_modified_ledger` per liquidity pool on the Stellar Network. Deleted pools are included in the table. For full state history, please use the `liquidity_pools` table. + +**The `liquidity_pools_current` table is only updated nightly. Intraday ledger state changes are not captured in the table.** + +{% enddocs %} + +{% docs asset_pair %} + +A concatenated representation of the pair of assets in the liquidity pool, in the form `asset_a_code:asset_b_code`. In the case when the pool contains `XLM`, `XLM` is written as an asset code even though the native asset normally has a null asset code. Asset pair intends to make a human readable name for a pool, but does **not** represent a unique pool id. Different asset issuers can mint asset codes of the same name. In these cases, there will be duplicate `asset_pair` names. The pool ids are unique. + +{% enddocs %} \ No newline at end of file diff --git a/models/docs/marts/ledger_current_state/offers_current.md b/models/docs/marts/ledger_current_state/offers_current.md new file mode 100644 index 0000000..3660f2f --- /dev/null +++ b/models/docs/marts/ledger_current_state/offers_current.md @@ -0,0 +1,11 @@ +[comment]: < Offers Current - + +{% docs offers_current %} + +The `offers_current` table is a nightly snapshotted table that represents the current orderbook of the Stellar Decentralized Exchange. The table returns the latest offer, which is defined as the highest `last_modified_ledger` per offer on the Stellar Network. Deleted offers are included in the table. An offer is deleted when the seller either updates their offer price to zero or an order is completely filled. For full orderbook history, please use the `offers` table. + +**The `offers_current` table is only updated nightly. Intraday ledger state changes are not captured in the table.** + +For more information on how the Stellar Decentralized Exchange works, please read [these docs](https://developers.stellar.org/docs/encyclopedia/liquidity-on-stellar-sdex-liquidity-pools#order-books). + +{% enddocs %} \ No newline at end of file diff --git a/models/docs/marts/ledger_current_state/trust_lines_current.md b/models/docs/marts/ledger_current_state/trust_lines_current.md new file mode 100644 index 0000000..3456530 --- /dev/null +++ b/models/docs/marts/ledger_current_state/trust_lines_current.md @@ -0,0 +1,11 @@ +[comment]: < Trust Lines Current - + +{% docs trust_lines_current %} + +The `trust_lines_current` table is a nightly snapshotted table that represents the current state of account trustlines. The table returns the latest trustline entry, per account, which is defined as the highest `last_modified_ledger` per trustline/account on the Stellar Network. Deleted trust lines are included in the table. For full state history, please use the `trust_lines` table. + +**The `trust_lines_current` table is only updated nightly. Intraday ledger state changes are not captured in the table.** + +As a reminder, trustline ledger entries store detailed information for assets trusted by a given account. The balance reported in the trustlines table reflects the account’s trusted asset balances, `XLM` balance is reported in the `accounts` table. + +{% enddocs %} \ No newline at end of file diff --git a/models/intermediate/trades/int_trade_agg_day.sql b/models/intermediate/trades/int_trade_agg_day.sql index f2535ef..ba352c1 100644 --- a/models/intermediate/trades/int_trade_agg_day.sql +++ b/models/intermediate/trades/int_trade_agg_day.sql @@ -1,5 +1,6 @@ {{ config( - cluster_by =["asset_a", "asset_b"] + tags = ["trade_agg"] + , cluster_by =["asset_a", "asset_b"] ) }} diff --git a/models/intermediate/trades/int_trade_agg_month.sql b/models/intermediate/trades/int_trade_agg_month.sql index 05a7e82..3d844ce 100644 --- a/models/intermediate/trades/int_trade_agg_month.sql +++ b/models/intermediate/trades/int_trade_agg_month.sql @@ -1,5 +1,6 @@ {{ config( - cluster_by =["asset_a", "asset_b"] + tags = ["trade_agg"] + , cluster_by =["asset_a", "asset_b"] ) }} diff --git a/models/intermediate/trades/int_trade_agg_week.sql b/models/intermediate/trades/int_trade_agg_week.sql index cfa7d79..577879f 100644 --- a/models/intermediate/trades/int_trade_agg_week.sql +++ b/models/intermediate/trades/int_trade_agg_week.sql @@ -1,5 +1,6 @@ {{ config( - cluster_by =["asset_a", "asset_b"] + tags = ["trade_agg"] + , cluster_by =["asset_a", "asset_b"] ) }} diff --git a/models/intermediate/trades/int_trade_agg_year.sql b/models/intermediate/trades/int_trade_agg_year.sql index b5db74c..0d18732 100644 --- a/models/intermediate/trades/int_trade_agg_year.sql +++ b/models/intermediate/trades/int_trade_agg_year.sql @@ -1,5 +1,6 @@ {{ config( - cluster_by =["asset_a", "asset_b"] + tags = ["trade_agg"] + , cluster_by =["asset_a", "asset_b"] ) }} diff --git a/models/marts/enriched_history_operations.sql b/models/marts/enriched_history_operations.sql index 0648e38..621c5c0 100644 --- a/models/marts/enriched_history_operations.sql +++ b/models/marts/enriched_history_operations.sql @@ -1,5 +1,5 @@ {{ config( - tags = ["partnership_assets", "asset_stats"], + tags = ["enriched_history_operations"], materialized='incremental', unique_key=["op_id"], partition_by={ diff --git a/models/marts/enriched_history_operations.yml b/models/marts/enriched_history_operations.yml index 5470eb4..43e4e15 100644 --- a/models/marts/enriched_history_operations.yml +++ b/models/marts/enriched_history_operations.yml @@ -13,17 +13,6 @@ models: meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - - elementary.dimension_anomalies: - timestamp_column: closed_at - backfill_days: 90 - dimensions: - - "type" - time_bucket: - period: day - count: 1 - meta: - description: - "Monitors the frequency of values in the type column over time." columns: - name: op_id description: '{{ doc("operation_id") }}' diff --git a/models/marts/fee_stats_agg.sql b/models/marts/fee_stats_agg.sql index dabbd65..1bd2a74 100644 --- a/models/marts/fee_stats_agg.sql +++ b/models/marts/fee_stats_agg.sql @@ -1,4 +1,5 @@ {{ config( + tags=["fee_stats"], materialized='incremental', unique_key=["day_agg"], partition_by={ @@ -164,4 +165,4 @@ with ) select distinct * -from renaming \ No newline at end of file +from renaming diff --git a/models/marts/history_assets.sql b/models/marts/history_assets.sql index 9bb11ce..849f43a 100644 --- a/models/marts/history_assets.sql +++ b/models/marts/history_assets.sql @@ -1,5 +1,6 @@ {{ config( - materialized='incremental' + tags = ["history_assets"] + , materialized='incremental' , unique_key=["asset_id"] , cluster_by= ["asset_id"] ) @@ -81,4 +82,4 @@ {% endif %} select * -from add_assets \ No newline at end of file +from add_assets diff --git a/models/marts/ledger_current_state/account_signers_current.sql b/models/marts/ledger_current_state/account_signers_current.sql new file mode 100644 index 0000000..c82852a --- /dev/null +++ b/models/marts/ledger_current_state/account_signers_current.sql @@ -0,0 +1,62 @@ +{{ + config( + tags = ["current_state"], + materialized = "incremental", + unique_key = "unique_id", + cluster_by = "account_id" + ) +}} + +/* Finds the latest state of each account signer in the `account_signers` table. + Ranks each record (grain: one row per account) using + last modified ledger sequence number. View includes all account signers. + (Deleted and Existing). View matches the Horizon snapshotted state tables. */ +with + current_signers as ( + select + s.account_id + , s.signer + , s.weight + , s.sponsor + , s.last_modified_ledger + , l.closed_at + , s.ledger_entry_change + , s.deleted + -- table only has natural keys, creating a primary key + , concat(s.account_id, '-', s.signer + ) as unique_id + , s.batch_run_date + , s.batch_insert_ts + , row_number() + over ( + partition by s.account_id, s.signer + order by s.last_modified_ledger desc, s.ledger_entry_change desc + ) as row_nr + from {{ ref('stg_account_signers') }} as s + join {{ ref('stg_history_ledgers') }} as l + on s.last_modified_ledger = l.sequence + + {% if is_incremental() %} + -- limit the number of partitions fetched + where + s.batch_run_date >= date_sub(current_date(), interval 30 day) + -- fetch the last week of records loaded + and timestamp_add(s.batch_insert_ts, interval 7 day) + > (select max(t.upstream_insert_ts) from {{ this }} as t) + {% endif %} + ) +select + account_id + , signer + , weight + , sponsor + , last_modified_ledger + , ledger_entry_change + , closed_at + , deleted + , unique_id + , batch_run_date + , batch_insert_ts as upstream_insert_ts + , current_timestamp() as batch_insert_ts +from current_signers +where row_nr = 1 diff --git a/models/marts/ledger_current_state/account_signers_current.yml b/models/marts/ledger_current_state/account_signers_current.yml new file mode 100644 index 0000000..6f5ad94 --- /dev/null +++ b/models/marts/ledger_current_state/account_signers_current.yml @@ -0,0 +1,121 @@ +version: 2 + +models: + - name: account_signers_current + description: '{{ doc("account_signers_current") }}' + tests: + - dbt_utils.recency: + datepart: hour + field: cast(closed_at as datetime) + interval: 12 + config: + severity: warn + meta: + description: + "Monitors the freshness of your table over time, as the expected time between data updates." + - elementary.volume_anomalies: + timestamp_column: closed_at + backfill_days: 90 + time_bucket: + period: day + count: 1 + meta: + description: + "Monitors the row count of your table over time." + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - account_id + - signer + - ledger_entry_change + - last_modified_ledger + config: + where: closed_at >= timestamp_trunc(timestamp_sub(current_timestamp(), interval 2 day), day) + and closed_at < timestamp_trunc(current_timestamp(), day) + meta: + description: + "Tests the uniqueness combination of: account_id, signer, ledger_entry_change and last_modified_ledger." + columns: + - name: account_id + description: '{{ doc("account_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: signer + description: '{{ doc("signer") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: weight + description: '{{ doc("weight") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: sponsor + description: '{{ doc("sponsor") }}' + + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: [0, 1, 2] + quote: false + + - name: closed_at + description: '{{ doc("closed_at") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: deleted + description: '{{ doc("deleted") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["true", "false"] + quote: false + + - name: batch_run_date + description: '{{ doc("batch_run_date") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: batch_insert_ts + description: '{{ doc("batch_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: upstream_insert_ts + description: '{{ doc("upstream_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: unique_id + description: '{{ doc("unique_id") }}' + tests: + - not_null + - unique \ No newline at end of file diff --git a/models/marts/ledger_current_state/accounts_current.sql b/models/marts/ledger_current_state/accounts_current.sql new file mode 100644 index 0000000..cd8409a --- /dev/null +++ b/models/marts/ledger_current_state/accounts_current.sql @@ -0,0 +1,91 @@ +{{ + config( + tags = ["current_state"], + materialized = "incremental", + unique_key = "account_id" + ) +}} +/* Returns the latest state of each account in the `accounts` table. + Table includes all accounts. (Deleted and Existing). + + Rank all rows for an account by last modified ledger and ledger entry type. + Deleted entry types reuse the last modified ledger sequence */ + +with + current_accts as ( + select + a.account_id + , a.balance + , a.buying_liabilities + , a.selling_liabilities + , a.sequence_number + , a.num_subentries + , a.num_sponsoring + , a.num_sponsored + , a.inflation_destination + , a.flags + , a.home_domain + , a.master_weight + , a.threshold_low + , a.threshold_medium + , a.threshold_high + , a.last_modified_ledger + , a.ledger_entry_change + , l.closed_at + , a.deleted + , a.sponsor + , a.sequence_ledger + , a.sequence_time + , a.batch_run_date + , a.batch_insert_ts + , row_number() + over ( + partition by a.account_id + order by + a.last_modified_ledger desc + , a.ledger_entry_change desc + ) + as row_nr + from {{ ref('stg_accounts') }} as a + join {{ ref('stg_history_ledgers') }} as l + on a.last_modified_ledger = l.sequence + + {% if is_incremental() %} + -- limit the number of partitions fetched + where + a.batch_run_date >= date_sub(current_date(), interval 30 day) + -- fetch the last week of records loaded + and timestamp_add(a.batch_insert_ts, interval 7 day) + > (select max(t.upstream_insert_ts) from {{ this }} as t) + {% endif %} + ) + +/* Return the same fields as the `accounts` table */ +select + account_id + , balance + , buying_liabilities + , selling_liabilities + , sequence_number + , num_subentries + , num_sponsoring + , num_sponsored + , inflation_destination + , flags + , home_domain + , master_weight + , threshold_low + , threshold_medium + , threshold_high + , last_modified_ledger + , ledger_entry_change + , closed_at + , deleted + , sponsor + , sequence_ledger + , sequence_time + , batch_run_date + , batch_insert_ts as upstream_insert_ts + , current_timestamp() as batch_insert_ts +from current_accts +where row_nr = 1 diff --git a/models/marts/ledger_current_state/accounts_current.yml b/models/marts/ledger_current_state/accounts_current.yml new file mode 100644 index 0000000..577c945 --- /dev/null +++ b/models/marts/ledger_current_state/accounts_current.yml @@ -0,0 +1,212 @@ +version: 2 + +models: + - name: accounts_current + description: '{{ doc("accounts_current") }}' + tests: + - dbt_utils.recency: + datepart: hour + field: cast(closed_at as datetime) + interval: 12 + config: + severity: warn + meta: + description: + "Monitors the freshness of your table over time, as the expected time between data updates." + - elementary.volume_anomalies: + timestamp_column: closed_at + backfill_days: 90 + time_bucket: + period: day + count: 1 + meta: + description: + "Monitors the row count of your table over time." + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - account_id + - ledger_entry_change + - last_modified_ledger + config: + where: closed_at >= timestamp_trunc(timestamp_sub(current_timestamp(), interval 2 day), day) + and closed_at < timestamp_trunc(current_timestamp(), day) + meta: + description: + "Tests the uniqueness combination of: account_id, ledger_entry_change and last_modified_ledger." + columns: + - name: account_id + description: '{{ doc("account_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: balance + description: '{{ doc("balance") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: buying_liabilities + description: '{{ doc("buying_liabilities") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: selling_liabilities + description: '{{ doc("selling_liabilities") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: sequence_number + description: '{{ doc("sequence_number") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: num_subentries + description: '{{ doc("num_subentries") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: num_sponsoring + description: '{{ doc("num_sponsoring") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: num_sponsored + description: '{{ doc("num_sponsored") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: inflation_destination + description: '{{ doc("inflation_destination") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: flags + description: '{{ doc("flags") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: home_domain + description: '{{ doc("home_domain") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: master_weight + description: '{{ doc("master_weight") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: threshold_low + description: '{{ doc("threshold_low") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: threshold_medium + description: '{{ doc("threshold_medium") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: threshold_high + description: '{{ doc("threshold_high") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: [0, 1, 2] + quote: false + + - name: closed_at + description: '{{ doc("closed_at") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: deleted + description: '{{ doc("deleted") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["true", "false"] + quote: false + + - name: sponsor + description: '{{ doc("sponsor") }}' + + - name: sequence_ledger + description: '{{ doc("sequence_ledger") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: sequence_time + description: '{{ doc("sequence_time") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: batch_run_date + description: '{{ doc("batch_run_date") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: batch_insert_ts + description: '{{ doc("batch_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: upstream_insert_ts + description: '{{ doc("upstream_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day diff --git a/models/marts/ledger_current_state/liquidity_pools_current.sql b/models/marts/ledger_current_state/liquidity_pools_current.sql new file mode 100644 index 0000000..7d8526b --- /dev/null +++ b/models/marts/ledger_current_state/liquidity_pools_current.sql @@ -0,0 +1,81 @@ +{{ + config( + tags = ["current_state"], + materialized = "incremental", + unique_key = "liquidity_pool_id", + cluster_by = ["asset_a_code", "asset_a_issuer", "asset_b_code", "asset_b_issuer"] + ) +}} + +/* Finds the latest state of each liquidity pool in the `liquidity_pools` table. + Ranks each record (grain: one row per pool) using + last modified ledger sequence number. View includes all pools. + (Deleted and Existing). View matches the Horizon snapshotted state tables. */ + +with + current_lps as ( + select + lp.liquidity_pool_id + , lp.fee + , lp.trustline_count + , lp.pool_share_count + , case + when lp.asset_a_type = 'native' then concat('XLM:', lp.asset_b_code) + else concat(lp.asset_a_code, ':', lp.asset_b_code) + end as asset_pair + , lp.asset_a_code + , lp.asset_a_issuer + , lp.asset_a_type + , lp.asset_b_code + , lp.asset_b_issuer + , lp.asset_b_type + , lp.asset_a_amount + , lp.asset_b_amount + , lp.last_modified_ledger + , lp.ledger_entry_change + , l.closed_at + , lp.deleted + , lp.batch_run_date + , lp.batch_insert_ts + , row_number() + over ( + partition by lp.liquidity_pool_id + order by lp.last_modified_ledger desc, lp.ledger_entry_change desc + ) as row_nr + from {{ ref('stg_liquidity_pools') }} as lp + join {{ ref('stg_history_ledgers') }} as l + on lp.last_modified_ledger = l.sequence + + {% if is_incremental() %} + -- limit the number of partitions fetched + where + lp.batch_run_date >= date_sub(current_date(), interval 30 day) + -- fetch the last week of records loaded + and timestamp_add(lp.batch_insert_ts, interval 7 day) + > (select max(t.upstream_insert_ts) from {{ this }} as t) + {% endif %} + + ) +select + liquidity_pool_id + , fee + , trustline_count + , pool_share_count + , asset_pair + , asset_a_code + , asset_a_issuer + , asset_a_type + , asset_b_code + , asset_b_issuer + , asset_b_type + , asset_a_amount + , asset_b_amount + , last_modified_ledger + , ledger_entry_change + , closed_at + , deleted + , batch_run_date + , batch_insert_ts as upstream_insert_ts + , current_timestamp() as batch_insert_ts +from current_lps +where row_nr = 1 diff --git a/models/marts/ledger_current_state/liquidity_pools_current.yml b/models/marts/ledger_current_state/liquidity_pools_current.yml new file mode 100644 index 0000000..15a7955 --- /dev/null +++ b/models/marts/ledger_current_state/liquidity_pools_current.yml @@ -0,0 +1,191 @@ +version: 2 + +models: + - name: liquidity_pools_current + description: '{{ doc("liquidity_pools_current") }}' + tests: + - dbt_utils.recency: + datepart: hour + field: cast(closed_at as datetime) + interval: 12 + config: + severity: warn + meta: + description: + "Monitors the freshness of your table over time, as the expected time between data updates." + - elementary.volume_anomalies: + timestamp_column: closed_at + backfill_days: 90 + time_bucket: + period: day + count: 1 + meta: + description: + "Monitors the row count of your table over time." + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - liquidity_pool_id + - ledger_entry_change + - last_modified_ledger + config: + where: closed_at >= timestamp_trunc(timestamp_sub(current_timestamp(), interval 2 day), day) + and closed_at < timestamp_trunc(current_timestamp(), day) + meta: + description: + "Tests the uniqueness combination of: liquidity_pool_id, ledger_entry_change and last_modified_ledger." + columns: + - name: liquidity_pool_id + description: '{{ doc("liquidity_pool_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: fee + description: '{{ doc("fee") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - dbt_utils.expression_is_true: + expression: '= 30' + meta: + description: + "Test if fee is equal to 30." + + - name: trustline_count + description: '{{ doc("trustline_count") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - dbt_utils.expression_is_true: + expression: '>= 1' + meta: + description: + "Test if trustline count is greater or equal to 1." + + - name: pool_share_count + description: '{{ doc("pool_share_count") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_pair + description: '{{ doc("asset_pair") }}' + + - name: asset_a_code + description: '{{ doc("asset_a_code") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_a_issuer + description: '{{ doc("asset_a_issuer") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_a_type + description: '{{ doc("asset_a_type") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["credit_alphanum12", "credit_alphanum4", "native"] + + - name: asset_a_amount + description: '{{ doc("asset_a_amount") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_b_code + description: '{{ doc("asset_b_code") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_b_issuer + description: '{{ doc("asset_b_issuer") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_b_type + description: '{{ doc("asset_b_type") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["credit_alphanum12", "credit_alphanum4", "native"] + + - name: asset_b_amount + description: '{{ doc("asset_b_amount") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: [0, 1, 2] + quote: false + + - name: closed_at + description: '{{ doc("closed_at") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: deleted + description: '{{ doc("deleted") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["true", "false"] + quote: false + + - name: batch_run_date + description: '{{ doc("batch_run_date") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: batch_insert_ts + description: '{{ doc("batch_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: upstream_insert_ts + description: '{{ doc("upstream_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day \ No newline at end of file diff --git a/models/marts/ledger_current_state/offers_current.sql b/models/marts/ledger_current_state/offers_current.sql new file mode 100644 index 0000000..baf398c --- /dev/null +++ b/models/marts/ledger_current_state/offers_current.sql @@ -0,0 +1,76 @@ +{{ + config( + tags = ["current_state"], + materialized = "incremental", + unique_key = "offer_id", + cluster_by = ["selling_asset_code", "selling_asset_issuer", "buying_asset_code", "buying_asset_issuer"] + ) +}} + + +with + current_offers as ( + select + o.seller_id + , o.offer_id + , o.selling_asset_type + , o.selling_asset_code + , o.selling_asset_issuer + , o.buying_asset_type + , o.buying_asset_code + , o.buying_asset_issuer + , o.amount + , o.pricen + , o.priced + , o.price + , o.flags + , o.last_modified_ledger + , l.closed_at + , o.ledger_entry_change + , o.deleted + , o.sponsor + , o.batch_run_date + , o.batch_insert_ts + , row_number() + over ( + partition by o.offer_id + order by o.last_modified_ledger desc, o.ledger_entry_change desc + ) as row_nr + from {{ ref('stg_offers') }} as o + join {{ ref('stg_history_ledgers') }} as l + on o.last_modified_ledger = l.sequence + + {% if is_incremental() %} + -- limit the number of partitions fetched + where + o.batch_run_date >= date_sub(current_date(), interval 30 day) + -- fetch the last week of records loaded + and timestamp_add(o.batch_insert_ts, interval 7 day) + > (select max(t.upstream_insert_ts) from {{ this }} as t) + {% endif %} + + ) +select + seller_id + , offer_id + , selling_asset_type + , selling_asset_code + , selling_asset_issuer + , buying_asset_type + , buying_asset_code + , buying_asset_issuer + , amount + , pricen + , priced + , price + , flags + , last_modified_ledger + , closed_at + , ledger_entry_change + , deleted + , sponsor + , batch_run_date + , batch_insert_ts as upstream_insert_ts + , current_timestamp() as batch_insert_ts +from current_offers +where row_nr = 1 diff --git a/models/marts/ledger_current_state/offers_current.yml b/models/marts/ledger_current_state/offers_current.yml new file mode 100644 index 0000000..f0b427c --- /dev/null +++ b/models/marts/ledger_current_state/offers_current.yml @@ -0,0 +1,203 @@ +version: 2 + +models: + - name: offers_current + description: '{{ doc("offers_current") }}' + tests: + - dbt_utils.recency: + datepart: hour + field: cast(closed_at as datetime) + interval: 12 + config: + severity: warn + meta: + description: + "Monitors the freshness of your table over time, as the expected time between data updates." + - elementary.volume_anomalies: + timestamp_column: closed_at + backfill_days: 90 + time_bucket: + period: day + count: 1 + meta: + description: + "Monitors the row count of your table over time." + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - offer_id + - ledger_entry_change + - last_modified_ledger + config: + where: closed_at >= timestamp_trunc(timestamp_sub(current_timestamp(), interval 2 day), day) + and closed_at < timestamp_trunc(current_timestamp(), day) + meta: + description: + "Tests the uniqueness combination of: offer_id, ledger_entry_change and last_modified_ledger." + columns: + - name: seller_id + description: '{{ doc("seller_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: offer_id + description: '{{ doc("offer_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: selling_asset_code + description: '{{ doc("asset_code") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: selling_asset_issuer + description: '{{ doc("asset_issuer") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: selling_asset_type + description: '{{ doc("asset_type") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["credit_alphanum12", "credit_alphanum4", "native"] + + - name: buying_asset_code + description: '{{ doc("asset_code") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: buying_asset_issuer + description: '{{ doc("asset_issuer") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: buying_asset_type + description: '{{ doc("asset_type") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["credit_alphanum12", "credit_alphanum4", "native"] + + - name: amount + description: '{{ doc("amount") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - dbt_utils.expression_is_true: + expression: '> 0' + meta: + description: + "Test if the amount is positive." + + - name: pricen + description: '{{ doc("price_n") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - dbt_utils.expression_is_true: + expression: '>= 1' + meta: + description: + "Test if the price_n is greater or equal to 1" + + - name: priced + description: '{{ doc("price_d") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - dbt_utils.expression_is_true: + expression: '>= 1' + meta: + description: + "Test if the price_n is greater or equal to 1" + + - name: price + description: '{{ doc("price") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: flags + description: '{{ doc("flags") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: sponsor + description: '{{ doc("sponsor") }}' + + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: [0, 1, 2] + quote: false + + - name: closed_at + description: '{{ doc("closed_at") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: deleted + description: '{{ doc("deleted") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["true", "false"] + quote: false + + - name: batch_run_date + description: '{{ doc("batch_run_date") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: batch_insert_ts + description: '{{ doc("batch_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: upstream_insert_ts + description: '{{ doc("upstream_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day \ No newline at end of file diff --git a/models/marts/ledger_current_state/trust_lines_current.sql b/models/marts/ledger_current_state/trust_lines_current.sql new file mode 100644 index 0000000..b4962d2 --- /dev/null +++ b/models/marts/ledger_current_state/trust_lines_current.sql @@ -0,0 +1,77 @@ +{{ + config( + tags = ["current_state"], + materialized = "incremental", + unique_key = "unique_id", + cluster_by = ["asset_code", "asset_issuer"] + ) +}} + +/* Finds the latest state of each trust line in the `trust_lines` table. + Ranks each record (grain: one row per trust line) using + last modified ledger sequence number. View includes all trust lines. + (Deleted and Existing). View matches the Horizon snapshotted state tables. */ +with + current_tls as ( + select + tl.account_id + , tl.asset_code + , tl.asset_issuer + , tl.asset_type + , tl.liquidity_pool_id + , tl.balance + , tl.buying_liabilities + , tl.selling_liabilities + , tl.flags + , tl.sponsor + , tl.trust_line_limit + , tl.last_modified_ledger + , tl.ledger_entry_change + , l.closed_at + , tl.deleted + -- table only has natural keys, creating a primary key + , concat(tl.account_id, '-', tl.asset_code, '-', tl.asset_issuer, '-', tl.liquidity_pool_id + ) as unique_id + , tl.batch_run_date + , tl.batch_insert_ts + , row_number() + over ( + partition by tl.account_id, tl.asset_code, tl.asset_issuer, tl.liquidity_pool_id + order by tl.last_modified_ledger desc, tl.ledger_entry_change desc + ) as row_nr + from {{ ref('stg_trust_lines') }} as tl + join {{ ref('stg_history_ledgers') }} as l + on tl.last_modified_ledger = l.sequence + + {% if is_incremental() %} + -- limit the number of partitions fetched incrementally + where + tl.batch_run_date >= date_sub(current_date(), interval 30 day) + -- fetch the last week of records loaded + and timestamp_add(tl.batch_insert_ts, interval 7 day) + > (select max(t.upstream_insert_ts) from {{ this }} as t) + {% endif %} + + ) +select + account_id + , asset_code + , asset_issuer + , asset_type + , liquidity_pool_id + , balance + , buying_liabilities + , selling_liabilities + , flags + , sponsor + , trust_line_limit + , last_modified_ledger + , ledger_entry_change + , closed_at + , deleted + , unique_id + , batch_run_date + , batch_insert_ts as upstream_insert_ts + , current_timestamp() as batch_insert_ts +from current_tls +where row_nr = 1 diff --git a/models/marts/ledger_current_state/trust_lines_current.yml b/models/marts/ledger_current_state/trust_lines_current.yml new file mode 100644 index 0000000..7c7186b --- /dev/null +++ b/models/marts/ledger_current_state/trust_lines_current.yml @@ -0,0 +1,177 @@ +version: 2 + +models: + - name: trust_lines_current + description: '{{ doc("trust_lines_current") }}' + tests: + - dbt_utils.recency: + datepart: hour + field: cast(closed_at as datetime) + interval: 12 + config: + severity: warn + meta: + description: + "Monitors the freshness of your table over time, as the expected time between data updates." + - elementary.volume_anomalies: + timestamp_column: closed_at + backfill_days: 90 + time_bucket: + period: day + count: 1 + meta: + description: + "Monitors the row count of your table over time." + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - account_id + - asset_code + - asset_issuer + - liquidity_pool_id + - ledger_entry_change + - last_modified_ledger + config: + where: closed_at >= timestamp_trunc(timestamp_sub(current_timestamp(), interval 2 day), day) + and closed_at < timestamp_trunc(current_timestamp(), day) + meta: + description: + "Tests the uniqueness combination of: account_id, asset_code, asset_issuer, liquidity_pool_id, ledger_entry_change and last_modified_ledger." + columns: + - name: account_id + description: '{{ doc("account_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_code + description: '{{ doc("asset_code") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_issuer + description: '{{ doc("asset_issuer") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: asset_type + description: '{{ doc("asset_type") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: [1, 2, 3] + quote: false + + - name: liquidity_pool_id + description: '{{ doc("liquidity_pool_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: balance + description: '{{ doc("balance") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: trust_line_limit + description: '{{ doc("trust_line_limit") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: buying_liabilities + description: '{{ doc("buying_liabilities") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: selling_liabilities + description: '{{ doc("selling_liabilities") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: flags + description: '{{ doc("flags") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: sponsor + description: '{{ doc("sponsor") }}' + + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: [0, 1, 2] + quote: false + + - name: closed_at + description: '{{ doc("closed_at") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: deleted + description: '{{ doc("deleted") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - accepted_values: + values: ["true", "false"] + quote: false + + - name: batch_run_date + description: '{{ doc("batch_run_date") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: batch_insert_ts + description: '{{ doc("batch_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: upstream_insert_ts + description: '{{ doc("upstream_insert_ts") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + + - name: unique_id + description: '{{ doc("unique_id") }}' + tests: + - not_null: + config: + where: closed_at > current_timestamp - interval 2 day + - unique \ No newline at end of file diff --git a/models/marts/trade_agg.sql b/models/marts/trade_agg.sql index f1c6929..72c4bd6 100644 --- a/models/marts/trade_agg.sql +++ b/models/marts/trade_agg.sql @@ -1,6 +1,7 @@ {{ config( - materialized='incremental', - partition_by={ + tags = ["trade_agg"], + materialized = 'incremental', + partition_by = { "field": "day_agg" , "data_type": "date" , "granularity": "month"} @@ -42,11 +43,6 @@ with {% endif %} ) - , history_assets as ( - select * - from {{ ref('history_assets') }} - ) - , join_trades as ( select join_table_yearly.day_agg @@ -117,4 +113,4 @@ with ) select * -from join_trades \ No newline at end of file +from join_trades diff --git a/models/sources/src_accounts_signers.yml b/models/sources/src_accounts_signers.yml index 427617c..e95852d 100644 --- a/models/sources/src_accounts_signers.yml +++ b/models/sources/src_accounts_signers.yml @@ -16,15 +16,6 @@ sources: meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - - elementary.volume_anomalies: - timestamp_column: batch_run_date - backfill_days: 90 - time_bucket: - period: day - count: 1 - meta: - description: - "Monitors the row count of your table over time." - dbt_utils.unique_combination_of_columns: combination_of_columns: - account_id @@ -88,4 +79,4 @@ sources: tests: - not_null: config: - where: batch_run_date > current_datetime - interval 2 day \ No newline at end of file + where: batch_run_date > current_datetime - interval 2 day diff --git a/models/staging/stg_account_signers.sql b/models/staging/stg_account_signers.sql index a8a28fb..8beb969 100644 --- a/models/staging/stg_account_signers.sql +++ b/models/staging/stg_account_signers.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_accounts.sql b/models/staging/stg_accounts.sql index dbffa6a..a3829a1 100644 --- a/models/staging/stg_accounts.sql +++ b/models/staging/stg_accounts.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_claimable_balances.sql b/models/staging/stg_claimable_balances.sql index 8b384c6..8977483 100644 --- a/models/staging/stg_claimable_balances.sql +++ b/models/staging/stg_claimable_balances.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_history_assets.sql b/models/staging/stg_history_assets.sql index d9ed55f..d85bcf4 100644 --- a/models/staging/stg_history_assets.sql +++ b/models/staging/stg_history_assets.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + /* This query prepares the assets of each load for deduplication, in order to guarantee a new asset won't be loaded twice */ with diff --git a/models/staging/stg_history_effects.sql b/models/staging/stg_history_effects.sql index dd569f8..b31687c 100644 --- a/models/staging/stg_history_effects.sql +++ b/models/staging/stg_history_effects.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_history_ledgers.sql b/models/staging/stg_history_ledgers.sql index 7bcf674..72659ce 100644 --- a/models/staging/stg_history_ledgers.sql +++ b/models/staging/stg_history_ledgers.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_history_operations.sql b/models/staging/stg_history_operations.sql index 6c2b50f..0103eb3 100644 --- a/models/staging/stg_history_operations.sql +++ b/models/staging/stg_history_operations.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_history_trades.sql b/models/staging/stg_history_trades.sql index 1da1914..48bc089 100644 --- a/models/staging/stg_history_trades.sql +++ b/models/staging/stg_history_trades.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_history_transactions.sql b/models/staging/stg_history_transactions.sql index 0071800..38e4634 100644 --- a/models/staging/stg_history_transactions.sql +++ b/models/staging/stg_history_transactions.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_liquidity_pools.sql b/models/staging/stg_liquidity_pools.sql index b5bdd76..b02ef8e 100644 --- a/models/staging/stg_liquidity_pools.sql +++ b/models/staging/stg_liquidity_pools.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_offers.sql b/models/staging/stg_offers.sql index 68bcae5..2e1b9ca 100644 --- a/models/staging/stg_offers.sql +++ b/models/staging/stg_offers.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select * diff --git a/models/staging/stg_trust_lines.sql b/models/staging/stg_trust_lines.sql index 7ef9565..1066b4f 100644 --- a/models/staging/stg_trust_lines.sql +++ b/models/staging/stg_trust_lines.sql @@ -1,3 +1,8 @@ +{{ config( + tags = ["enriched_history_operations"] + ) +}} + with raw_table as ( select *