diff --git a/tests/bucketlist_db_size_check.sql b/tests/bucketlist_db_size_check.sql index 06155d8..a16e42f 100644 --- a/tests/bucketlist_db_size_check.sql +++ b/tests/bucketlist_db_size_check.sql @@ -8,7 +8,7 @@ with bucketlist_db_size as ( select sequence, closed_at, total_byte_size_of_bucket_list / 1000000000 as bl_db_gb - from `crypto-stellar.crypto_stellar.history_ledgers` + from {{ source('crypto_stellar', 'history_ledgers') }} where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- alert when the bucketlist has grown larger than 12 gb and total_byte_size_of_bucket_list / 1000000000 >= 12 diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql new file mode 100644 index 0000000..11b3ed8 --- /dev/null +++ b/tests/eho_by_ops.sql @@ -0,0 +1,36 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +-- Enriched_history_operations table is dependent on the +-- history_operations table to load. It is assumed that +-- any id present in the upstream table should be loaded in +-- the downstream. If records are not present, alert the team. +WITH find_missing AS ( + SELECT op.id, + op.batch_run_date, + op.batch_id + FROM {{ source('crypto_stellar', 'history_operations') }} op + LEFT OUTER JOIN {{ source('crypto_stellar', 'enriched_history_operations') }} eho + ON op.id = eho.op_id + WHERE eho.op_id IS NULL + -- Scan only the last 24 hours of data. Alert runs intraday so failures + -- are caught and resolved quickly. + AND TIMESTAMP(op.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) +), +find_max_batch AS ( + SELECT MAX(batch_run_date) AS max_batch + FROM {{ source('crypto_stellar', 'history_operations') }} + WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) +) +SELECT batch_run_date, + batch_id, + count(*) +FROM find_missing +-- Account for delay in loading history_operations table prior to +-- enriched_history_operations table being loaded. +WHERE batch_run_date != (SELECT max_batch FROM find_max_batch) +GROUP BY 1, 2 +ORDER BY 1 diff --git a/tests/num_txns_and_ops.sql b/tests/num_txns_and_ops.sql new file mode 100644 index 0000000..4d7c18d --- /dev/null +++ b/tests/num_txns_and_ops.sql @@ -0,0 +1,65 @@ +{{ config( + severity="error" + , tags=["singular_test"] + ) +}} + +-- Query studies the number of reported transactions and operations +-- reported and committed per ledger in history_ledgers with the +-- actual transaction count and operation count in the ledger. +-- If the counts mismatch, there was a batch processing error +-- and transactions or operations were dropped from the dataset. +-- Get the actual count of transactions per ledger +WITH txn_count AS ( + SELECT ledger_sequence, COUNT(id) as txn_transaction_count + FROM {{ source('crypto_stellar', 'history_transactions') }} + --Take all ledgers committed in the last 36 hours to validate newly written data + -- Alert runs at 12pm UTC in GCP which creates the 36 hour interval + WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + GROUP BY ledger_sequence +), +-- Get the actual count of operations per ledger + operation_count AS ( + SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count + FROM {{ source('crypto_stellar', 'history_transactions') }} A + JOIN {{ source('crypto_stellar', 'history_operations') }} B + ON A.id = B.transaction_id + WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + GROUP BY A.ledger_sequence + ), +-- compare actual counts with the counts reported in the ledgers table + final_counts AS ( + SELECT A.sequence, A.closed_at, A.batch_id, + A.tx_set_operation_count as expected_operation_count, + A.operation_count, + (A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count, + COALESCE(B.txn_transaction_count, 0) as actual_transaction_count, + COALESCE(C.op_operation_count, 0) as actual_operation_count + FROM {{ source('crypto_stellar', 'history_ledgers') }} A + LEFT OUTER JOIN txn_count B + ON A.sequence = B.ledger_sequence + LEFT OUTER JOIN operation_count C + ON A.sequence = C.ledger_sequence + WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) + ) + , raw_values AS ( + SELECT sequence, closed_at, batch_id, + expected_transaction_count, actual_transaction_count, + expected_operation_count, actual_operation_count + FROM final_counts + WHERE + ((expected_transaction_count <> actual_transaction_count) + OR (expected_operation_count <> actual_operation_count)) +) +SELECT batch_id, + SUM(expected_transaction_count) as exp_txn_count, + SUM(actual_transaction_count ) as actual_txn_count, + SUM(expected_operation_count ) as exp_op_count, + SUM(actual_operation_count ) as actual_op_count +FROM raw_values +--@TODO: figure out a more precise delay for ledgers. Since tables are loaded on a 15-30 min delay, +-- we do not want a premature alert to row count mismatches when it could be loading latency +WHERE closed_at <= TIMESTAMP_ADD('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL -180 MINUTE ) +GROUP BY batch_id +ORDER BY batch_id diff --git a/tests/sorobon_surge_pricing_check.sql b/tests/sorobon_surge_pricing_check.sql index 8eabff7..ffc0785 100644 --- a/tests/sorobon_surge_pricing_check.sql +++ b/tests/sorobon_surge_pricing_check.sql @@ -8,7 +8,7 @@ with surge_pricing_check as ( select inclusion_fee_charged, ledger_sequence, closed_at -from `crypto-stellar.crypto_stellar_dbt.enriched_history_operations_soroban` +from {{ ref('enriched_history_operations_soroban') }} where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR ) -- inclusion fees over 100 stroops indicate surge pricing on the network and inclusion_fee_charged > 100