diff --git a/models/marts/enriched_history/enriched_history_operations.yml b/models/marts/enriched_history/enriched_history_operations.yml index f807d85..29738bf 100644 --- a/models/marts/enriched_history/enriched_history_operations.yml +++ b/models/marts/enriched_history/enriched_history_operations.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/enriched_history/enriched_history_operations_soroban.yml b/models/marts/enriched_history/enriched_history_operations_soroban.yml index f5f66e9..9a8d9f9 100644 --- a/models/marts/enriched_history/enriched_history_operations_soroban.yml +++ b/models/marts/enriched_history/enriched_history_operations_soroban.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/fee_stats_agg.yml b/models/marts/fee_stats_agg.yml index 7b8e200..556109d 100644 --- a/models/marts/fee_stats_agg.yml +++ b/models/marts/fee_stats_agg.yml @@ -9,7 +9,7 @@ models: field: cast(day_agg as timestamp) interval: 2 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/history_assets.yml b/models/marts/history_assets.yml index 4c0a7a6..0050a26 100644 --- a/models/marts/history_assets.yml +++ b/models/marts/history_assets.yml @@ -9,9 +9,16 @@ models: field: cast(batch_run_date as timestamp) interval: 2 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." + - incremental_unique_combination_of_columns: + combination_of_columns: + - asset_type + - asset_code + - asset_issuer + date_column_name: "batch_run_date" + greater_than_equal_to: "2 day" columns: - name: asset_id description: '{{ doc("assets_id") }}' diff --git a/models/marts/ledger_current_state/account_signers_current.yml b/models/marts/ledger_current_state/account_signers_current.yml index cd19957..5108298 100644 --- a/models/marts/ledger_current_state/account_signers_current.yml +++ b/models/marts/ledger_current_state/account_signers_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/accounts_current.yml b/models/marts/ledger_current_state/accounts_current.yml index 80f66c7..de962f3 100644 --- a/models/marts/ledger_current_state/accounts_current.yml +++ b/models/marts/ledger_current_state/accounts_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/claimable_balances_current.yml b/models/marts/ledger_current_state/claimable_balances_current.yml index ca9d406..7afeab1 100644 --- a/models/marts/ledger_current_state/claimable_balances_current.yml +++ b/models/marts/ledger_current_state/claimable_balances_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/ledger_current_state/contract_data_current.yml b/models/marts/ledger_current_state/contract_data_current.yml index 3443f8c..952f62f 100644 --- a/models/marts/ledger_current_state/contract_data_current.yml +++ b/models/marts/ledger_current_state/contract_data_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/ledger_current_state/liquidity_pools_current.yml b/models/marts/ledger_current_state/liquidity_pools_current.yml index 39c6567..b256305 100644 --- a/models/marts/ledger_current_state/liquidity_pools_current.yml +++ b/models/marts/ledger_current_state/liquidity_pools_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/offers_current.yml b/models/marts/ledger_current_state/offers_current.yml index b5dd807..3f0573c 100644 --- a/models/marts/ledger_current_state/offers_current.yml +++ b/models/marts/ledger_current_state/offers_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/trust_lines_current.yml b/models/marts/ledger_current_state/trust_lines_current.yml index e780478..fadd38c 100644 --- a/models/marts/ledger_current_state/trust_lines_current.yml +++ b/models/marts/ledger_current_state/trust_lines_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/models/marts/ledger_current_state/ttl_current.yml b/models/marts/ledger_current_state/ttl_current.yml index 32013e5..e131d84 100644 --- a/models/marts/ledger_current_state/ttl_current.yml +++ b/models/marts/ledger_current_state/ttl_current.yml @@ -9,7 +9,7 @@ models: field: closed_at interval: 12 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." columns: diff --git a/models/marts/trade_agg.yml b/models/marts/trade_agg.yml index 2847a89..f34e96b 100644 --- a/models/marts/trade_agg.yml +++ b/models/marts/trade_agg.yml @@ -9,7 +9,7 @@ models: field: cast(day_agg as timestamp) interval: 2 config: - severity: warn + severity: error meta: description: "Monitors the freshness of your table over time, as the expected time between data updates." - incremental_unique_combination_of_columns: diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql new file mode 100644 index 0000000..a1d869b --- /dev/null +++ b/tests/bucketlist_db_size_check.sql @@ -0,0 +1,17 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +with bucketlist_db_size as ( + select sequence, + closed_at, + total_byte_size_of_bucket_list / 1000000000 as bl_db_gb + from {{ ref('stg_history_ledgers') }} + where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) + -- alert when the bucketlist has grown larger than 12 gb + and total_byte_size_of_bucket_list / 1000000000 >= 12 +) + +select * from bucketlist_db_size diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql new file mode 100644 index 0000000..6691f4b --- /dev/null +++ b/tests/eho_by_ops.sql @@ -0,0 +1,36 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +-- Enriched_history_operations table is dependent on the +-- history_operations table to load. It is assumed that +-- any id present in the upstream table should be loaded in +-- the downstream. If records are not present, alert the team. +WITH find_missing AS ( + SELECT op.id, + op.batch_run_date, + op.batch_id + FROM {{ ref('stg_history_operations') }} op + LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho + ON op.id = eho.op_id + WHERE eho.op_id IS NULL + -- Scan only the last 24 hours of data. Alert runs intraday so failures + -- are caught and resolved quickly. + AND TIMESTAMP(op.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) +), +find_max_batch AS ( + SELECT MAX(batch_run_date) AS max_batch + FROM {{ ref('stg_history_operations') }} + WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) +) +SELECT batch_run_date, + batch_id, + count(*) +FROM find_missing +-- Account for delay in loading history_operations table prior to +-- enriched_history_operations table being loaded. +WHERE batch_run_date != (SELECT max_batch FROM find_max_batch) +GROUP BY 1, 2 +ORDER BY 1 diff --git a/tests/ledger_sequence_increment.sql b/tests/ledger_sequence_increment.sql index d1ce9e9..59c72ef 100644 --- a/tests/ledger_sequence_increment.sql +++ b/tests/ledger_sequence_increment.sql @@ -11,7 +11,7 @@ with , batch_id , closed_at , max(sequence) as max_sequence - from {{ source('crypto_stellar', 'history_ledgers') }} + from {{ ref('stg_history_ledgers') }} where closed_at > TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 7 DAY ) group by id, batch_id, closed_at ) diff --git a/tests/num_txns_and_ops.sql b/tests/num_txns_and_ops.sql new file mode 100644 index 0000000..06d161f --- /dev/null +++ b/tests/num_txns_and_ops.sql @@ -0,0 +1,65 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +-- Query studies the number of reported transactions and operations +-- reported and committed per ledger in history_ledgers with the +-- actual transaction count and operation count in the ledger. +-- If the counts mismatch, there was a batch processing error +-- and transactions or operations were dropped from the dataset. +-- Get the actual count of transactions per ledger +WITH txn_count AS ( + SELECT ledger_sequence, COUNT(id) as txn_transaction_count + FROM {{ ref('stg_history_transactions') }} + --Take all ledgers committed in the last 36 hours to validate newly written data + -- Alert runs at 12pm UTC in GCP which creates the 36 hour interval + WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + GROUP BY ledger_sequence +), +-- Get the actual count of operations per ledger + operation_count AS ( + SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count + FROM {{ ref('stg_history_transactions') }} A + JOIN {{ ref('stg_history_operations') }} B + ON A.id = B.transaction_id + WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + GROUP BY A.ledger_sequence + ), +-- compare actual counts with the counts reported in the ledgers table + final_counts AS ( + SELECT A.sequence, A.closed_at, A.batch_id, + A.tx_set_operation_count as expected_operation_count, + A.operation_count, + (A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count, + COALESCE(B.txn_transaction_count, 0) as actual_transaction_count, + COALESCE(C.op_operation_count, 0) as actual_operation_count + FROM {{ ref('stg_history_ledgers') }} A + LEFT OUTER JOIN txn_count B + ON A.sequence = B.ledger_sequence + LEFT OUTER JOIN operation_count C + ON A.sequence = C.ledger_sequence + WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + ) + , raw_values AS ( + SELECT sequence, closed_at, batch_id, + expected_transaction_count, actual_transaction_count, + expected_operation_count, actual_operation_count + FROM final_counts + WHERE + ((expected_transaction_count <> actual_transaction_count) + OR (expected_operation_count <> actual_operation_count)) +) +SELECT batch_id, + SUM(expected_transaction_count) as exp_txn_count, + SUM(actual_transaction_count ) as actual_txn_count, + SUM(expected_operation_count ) as exp_op_count, + SUM(actual_operation_count ) as actual_op_count +FROM raw_values +--@TODO: figure out a more precise delay for ledgers. Since tables are loaded on a 15-30 min delay, +-- we do not want a premature alert to row count mismatches when it could be loading latency +WHERE closed_at <= TIMESTAMP_ADD('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL -180 MINUTE ) +GROUP BY batch_id +ORDER BY batch_id diff --git a/tests/sorobon_surge_pricing_check.sql b/tests/sorobon_surge_pricing_check.sql new file mode 100644 index 0000000..6374b9b --- /dev/null +++ b/tests/sorobon_surge_pricing_check.sql @@ -0,0 +1,17 @@ +{{ config( + severity="warn" + , tags=["singular_test"] + ) +}} + +with surge_pricing_check as ( + select inclusion_fee_charged, + ledger_sequence, + closed_at +from {{ ref('enriched_history_operations_soroban') }} +where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) + -- inclusion fees over 100 stroops indicate surge pricing on the network + and inclusion_fee_charged > 100 +) + +select * from surge_pricing_check