Skip to content

Commit

Permalink
Move adhoc business queries
Browse files Browse the repository at this point in the history
  • Loading branch information
amishas157 committed Sep 11, 2024
1 parent acfc893 commit eed3dec
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 2 deletions.
2 changes: 1 addition & 1 deletion tests/bucketlist_db_size_check.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ with bucketlist_db_size as (
select sequence,
closed_at,
total_byte_size_of_bucket_list / 1000000000 as bl_db_gb
from `crypto-stellar.crypto_stellar.history_ledgers`
from {{ source('crypto_stellar', 'history_ledgers') }}
where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR )
-- alert when the bucketlist has grown larger than 12 gb
and total_byte_size_of_bucket_list / 1000000000 >= 12
Expand Down
36 changes: 36 additions & 0 deletions tests/eho_by_ops.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{{ config(
severity="error"
, tags=["singular_test"]
)
}}

-- Enriched_history_operations table is dependent on the
-- history_operations table to load. It is assumed that
-- any id present in the upstream table should be loaded in
-- the downstream. If records are not present, alert the team.
WITH find_missing AS (
SELECT op.id,
op.batch_run_date,
op.batch_id
FROM {{ source('crypto_stellar', 'history_operations') }} op
LEFT OUTER JOIN {{ source('crypto_stellar', 'enriched_history_operations') }} eho
ON op.id = eho.op_id
WHERE eho.op_id IS NULL
-- Scan only the last 24 hours of data. Alert runs intraday so failures
-- are caught and resolved quickly.
AND TIMESTAMP(op.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
),
find_max_batch AS (
SELECT MAX(batch_run_date) AS max_batch
FROM {{ source('crypto_stellar', 'history_operations') }}
WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
)
SELECT batch_run_date,
batch_id,
count(*)
FROM find_missing
-- Account for delay in loading history_operations table prior to
-- enriched_history_operations table being loaded.
WHERE batch_run_date != (SELECT max_batch FROM find_max_batch)
GROUP BY 1, 2
ORDER BY 1
65 changes: 65 additions & 0 deletions tests/num_txns_and_ops.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{{ config(
severity="error"
, tags=["singular_test"]
)
}}

-- Query studies the number of reported transactions and operations
-- reported and committed per ledger in history_ledgers with the
-- actual transaction count and operation count in the ledger.
-- If the counts mismatch, there was a batch processing error
-- and transactions or operations were dropped from the dataset.
-- Get the actual count of transactions per ledger
WITH txn_count AS (
SELECT ledger_sequence, COUNT(id) as txn_transaction_count
FROM {{ source('crypto_stellar', 'history_transactions') }}
--Take all ledgers committed in the last 36 hours to validate newly written data
-- Alert runs at 12pm UTC in GCP which creates the 36 hour interval
WHERE TIMESTAMP(batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
GROUP BY ledger_sequence
),
-- Get the actual count of operations per ledger
operation_count AS (
SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count
FROM {{ source('crypto_stellar', 'history_transactions') }} A
JOIN {{ source('crypto_stellar', 'history_operations') }} B
ON A.id = B.transaction_id
WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
GROUP BY A.ledger_sequence
),
-- compare actual counts with the counts reported in the ledgers table
final_counts AS (
SELECT A.sequence, A.closed_at, A.batch_id,
A.tx_set_operation_count as expected_operation_count,
A.operation_count,
(A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count,
COALESCE(B.txn_transaction_count, 0) as actual_transaction_count,
COALESCE(C.op_operation_count, 0) as actual_operation_count
FROM {{ source('crypto_stellar', 'history_ledgers') }} A
LEFT OUTER JOIN txn_count B
ON A.sequence = B.ledger_sequence
LEFT OUTER JOIN operation_count C
ON A.sequence = C.ledger_sequence
WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
)
, raw_values AS (
SELECT sequence, closed_at, batch_id,
expected_transaction_count, actual_transaction_count,
expected_operation_count, actual_operation_count
FROM final_counts
WHERE
((expected_transaction_count <> actual_transaction_count)
OR (expected_operation_count <> actual_operation_count))
)
SELECT batch_id,
SUM(expected_transaction_count) as exp_txn_count,
SUM(actual_transaction_count ) as actual_txn_count,
SUM(expected_operation_count ) as exp_op_count,
SUM(actual_operation_count ) as actual_op_count
FROM raw_values
--@TODO: figure out a more precise delay for ledgers. Since tables are loaded on a 15-30 min delay,
-- we do not want a premature alert to row count mismatches when it could be loading latency
WHERE closed_at <= TIMESTAMP_ADD('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL -180 MINUTE )
GROUP BY batch_id
ORDER BY batch_id
2 changes: 1 addition & 1 deletion tests/sorobon_surge_pricing_check.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ with surge_pricing_check as (
select inclusion_fee_charged,
ledger_sequence,
closed_at
from `crypto-stellar.crypto_stellar_dbt.enriched_history_operations_soroban`
from {{ ref('enriched_history_operations_soroban') }}
where closed_at >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 HOUR )
-- inclusion fees over 100 stroops indicate surge pricing on the network
and inclusion_fee_charged > 100
Expand Down

0 comments on commit eed3dec

Please sign in to comment.