diff --git a/dbt_project.yml b/dbt_project.yml index 2bc01d4..7fc3456 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -31,6 +31,13 @@ vars: airflow_start_timestamp: "{{ env_var('AIRFLOW_START_TIMESTAMP', '2000-01-01') }}" is_singular_airflow_task: "{{ env_var('IS_SINGULAR_AIRFLOW_TASK', 'false') }}" + # Soroban-contract dim airflow vars + recovery: "{{ env_var('RECOVERY', 'False') }}" + recovery_type: "{{ env_var('RECOVERY_TYPE', 'SingleDay') }}" + recovery_date: "{{ env_var('RECOVERY_DATE', '2024-09-01') }}" + recovery_start_day: "{{ env_var('RECOVERY_START_DAY', '2024-09-10') }}" + recovery_end_day: "{{ env_var('RECOVERY_END_DAY', '2024-09-15') }}" + # Configuring models # Full documentation: https://docs.getdbt.com/docs/configuring-models models: @@ -39,6 +46,8 @@ models: +materialized: view # how the staging models will be materialized intermediate: +materialized: view # how the intermediate models will be materialized + dims: + +materialized: table # how the dims models will be materialized marts: +materialized: table # how the mart models will be materialized diff --git a/models/dims/contract_details_hist.sql b/models/dims/contract_details_hist.sql new file mode 100644 index 0000000..98ad83c --- /dev/null +++ b/models/dims/contract_details_hist.sql @@ -0,0 +1,9 @@ +{{ config(materialized = 'table') }} + +select + 'dummy_contract_id' as contract_id + , 'dummy_ledger_key_hash' as ledger_key_hash + , current_timestamp() as start_date + , current_timestamp() as end_date + , true as is_current +where false diff --git a/models/dims/contract_details_hist.yml b/models/dims/contract_details_hist.yml new file mode 100644 index 0000000..92b85a0 --- /dev/null +++ b/models/dims/contract_details_hist.yml @@ -0,0 +1,111 @@ +version: 2 + +models: + - name: contract_details_hist + description: "This table stores historical contract details with slowly changing dimension type 2 logic. It includes contract metadata and ledger information." + columns: + - name: contract_id + description: "Unique identifier for the contract." + tests: + - not_null + - name: ledger_key_hash + description: "Hash of the ledger key related to the contract." + tests: + - not_null + - name: start_date + description: "Start date of the contract's current or historical record." + tests: + - not_null + - name: end_date + description: "End date of the contract's historical record." + - name: is_current + description: "Indicates whether the record is the current version." + tests: + - not_null + - name: contract_key_type + description: "Type of contract key." + - name: contract_durability + description: "Durability level of the contract." + - name: contract_create_ts + description: "Timestamp when the contract was created." + - name: contract_update_ts + description: "Timestamp when the contract was last updated." + - name: contract_delete_ts + description: "Timestamp when the contract was deleted." + - name: last_modified_ledger + description: "Ledger sequence at the last modification." + - name: ledger_entry_change + description: "Change in the ledger entry." + - name: ledger_entry_change_type + description: "Type of change in the ledger entry." + - name: ledger_sequence + description: "Sequence number of the ledger." + - name: asset_code + description: "Code of the asset related to the contract." + - name: asset_issuer + description: "Issuer of the asset related to the contract." + - name: asset_type + description: "Type of asset related to the contract." + - name: balance + description: "Balance of the contract." + - name: balance_holder + description: "Holder of the contract balance." + - name: deleted + description: "Indicates whether the contract has been deleted." + - name: closed_at + description: "Timestamp when the contract was closed." + tests: + - not_null + - name: live_until_ledger_seq + description: "Ledger sequence up to which the contract is live." + - name: is_expired + description: "Indicates whether the contract has expired." + - name: n_instructions + description: "Number of instructions executed in the contract." + - name: n_functions + description: "Number of functions executed in the contract." + - name: n_globals + description: "Number of global variables in the contract." + - name: n_table_entries + description: "Number of table entries in the contract." + - name: n_types + description: "Number of types in the contract." + - name: n_data_segments + description: "Number of data segments in the contract." + - name: n_elem_segments + description: "Number of element segments in the contract." + - name: n_imports + description: "Number of imports in the contract." + - name: n_exports + description: "Number of exports in the contract." + - name: n_data_segment_bytes + description: "Number of data segment bytes in the contract." + - name: airflow_start_ts + description: "Timestamp for when the Airflow job started." + tests: + - not_null + - name: batch_id + description: "Unique identifier for the batch run." + tests: + - not_null + - name: batch_run_date + description: "Date of the batch run." + tests: + - not_null + - name: dw_load_ts + description: "Timestamp when the data was loaded into the data warehouse." + tests: + - not_null + - name: dw_update_ts + description: "Timestamp when the data was last updated in the data warehouse." + tests: + - not_null + +# Unique Combination Test +tests: + - unique_combination_of_columns: + name: contract_details_hist_unique_constraint + columns: + - ledger_key_hash + - start_date + description: "Test to ensure that each combination of ledger_key_hash and start_date is unique." diff --git a/models/staging/stg_soroban_contract_code.sql b/models/staging/stg_soroban_contract_code.sql new file mode 100644 index 0000000..65ddc55 --- /dev/null +++ b/models/staging/stg_soroban_contract_code.sql @@ -0,0 +1,191 @@ +{{ config( + materialized = 'view', + tags = ["soroban_analytics"], + enabled = true, + tests = { + 'unique': { + 'column_name': 'ledger_key_hash' + }, + 'not_null': { + 'column_name': 'ledger_key_hash' + } + } +) }} + +{% set target_table_query %} + SELECT COUNT(*) AS tgt_row_count FROM {{ ref('contract_details_hist') }} +{% endset %} + +{% set results = run_query(target_table_query) %} + +{% if execute %} + {% set is_empty_target_table = (results.columns[0].values()[0] == 0) %} +{% else %} + {% set is_empty_target_table = false %} +{% endif %} + +with + contract_code as ( + -- Pull data from contract_data + -- Depending on whether the target table is empty or based on recovery logic + + select + cc.contract_code_hash + , cc.last_modified_ledger + , cc.ledger_entry_change + , cc.ledger_sequence + , cc.ledger_key_hash + , cc.n_instructions + , cc.n_functions + , cc.n_globals + , cc.n_table_entries + , cc.n_types + , cc.n_data_segments + , cc.n_elem_segments + , cc.n_imports + , cc.n_exports + , cc.n_data_segment_bytes + , cc.deleted + , cc.closed_at + , cc.batch_insert_ts + , '{{ var("airflow_start_timestamp") }}' as airflow_start_ts + , cc.batch_id + , cc.batch_run_date + , row_number() over (partition by cc.ledger_key_hash order by cc.closed_at desc) as row_num + from + {{ source('crypto_stellar', 'contract_code') }} as cc + + -- If the target table is empty, pull all data for first-time run + {% if is_empty_target_table %} + -- No WHERE condition here, as we want to pull all records + + {% elif var('recovery', 'False') == 'True' %} + -- If recovery is enabled, apply the appropriate recovery filter + + -- Single day recovery + {% if var('recovery_type', 'SingleDay') == 'SingleDay' %} + WHERE DATE(cc.closed_at) = '{{ var("recovery_date") }}' + + -- Date Range recovery + {% elif var('recovery_type', 'Range') == 'Range' %} + WHERE DATE(cc.closed_at) BETWEEN '{{ var("recovery_start_day") }}' AND '{{ var("recovery_end_day") }}' + + -- Full recovery + {% elif var('recovery_type', 'Full') == 'Full' %} + -- No WHERE condition for full recovery, as we want all data + {% endif %} + + -- Regular cadence filtering (2-hour forward window, 4-hour backward window) + {% else %} + WHERE + timestamp(cc.closed_at) < timestamp_add(timestamp('{{ var("airflow_start_timestamp") }}'), interval 2 hour) + AND timestamp(cc.closed_at) >= timestamp_sub(timestamp('{{ var("airflow_start_timestamp") }}'), interval 4 hour) + {% endif %} + ) + + , ttl_data as ( + select + ttl.key_hash + , ttl.closed_at as ttl_closed_at + , ttl.live_until_ledger_seq + , ttl.deleted as ttl_deleted + , row_number() over (partition by ttl.key_hash order by ttl.closed_at desc) as row_num + from + {{ source('crypto_stellar', 'ttl') }} as ttl + + -- If the target table is empty, pull all data for first-time run + {% if is_empty_target_table %} + -- No WHERE condition here, as we want to pull all records + + {% elif var('recovery', 'False') == 'True' %} + -- If recovery is enabled, apply the appropriate recovery filter + + -- Single day recovery + {% if var('recovery_type', 'SingleDay') == 'SingleDay' %} + WHERE DATE(ttl.closed_at) = '{{ var("recovery_date") }}' + + -- Range recovery + {% elif var('recovery_type', 'Range') == 'Range' %} + WHERE DATE(ttl.closed_at) BETWEEN '{{ var("recovery_start_day") }}' AND '{{ var("recovery_end_day") }}' + + -- Full recovery + {% elif var('recovery_type', 'Full') == 'Full' %} + -- No WHERE condition for full recovery, as we want all data + {% endif %} + + -- Regular cadence filtering (2-hour forward window, 4-hour backward window) + {% else %} + WHERE + timestamp(ttl.closed_at) < timestamp_add(timestamp('{{ var("airflow_start_timestamp") }}'), interval 2 hour) + AND timestamp(ttl.closed_at) >= timestamp_sub(timestamp('{{ var("airflow_start_timestamp") }}'), interval 4 hour) + {% endif %} + ) + + , source_data as ( + select + cc.contract_code_hash + , cc.last_modified_ledger + , cc.ledger_entry_change + , cc.ledger_sequence + , cc.ledger_key_hash + , cc.n_instructions + , cc.n_functions + , cc.n_globals + , cc.n_table_entries + , cc.n_types + , cc.n_data_segments + , cc.n_elem_segments + , cc.n_imports + , cc.n_exports + , cc.n_data_segment_bytes + , cc.deleted + , cc.closed_at + , ttl.key_hash + , ttl.ttl_closed_at + , ttl.live_until_ledger_seq + , ttl.ttl_deleted + , cc.batch_insert_ts + , cc.airflow_start_ts + , cc.batch_id + , cc.batch_run_date + from + contract_code as cc + left join + ttl_data as ttl + on + cc.ledger_key_hash = ttl.key_hash + where + cc.row_num = 1 -- Only the latest record per ledger_key_hash from contract_code + and (ttl.row_num = 1 or ttl.row_num is null) -- Only the latest record from ttl, if available + ) + +select + contract_code_hash + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , ledger_key_hash + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , deleted + , closed_at + , key_hash + , ttl_closed_at + , live_until_ledger_seq + , ttl_deleted + , batch_insert_ts + , airflow_start_ts + , batch_id + , batch_run_date + , current_timestamp() as dw_load_ts +from source_data +order by + ledger_key_hash, closed_at diff --git a/models/staging/stg_soroban_contract_data.sql b/models/staging/stg_soroban_contract_data.sql new file mode 100644 index 0000000..cdff0bd --- /dev/null +++ b/models/staging/stg_soroban_contract_data.sql @@ -0,0 +1,172 @@ +{{ config( + materialized = 'view', + tags = ["soroban_analytics"], + enabled = true, + tests = { + 'unique': { + 'column_name': 'ledger_key_hash' + }, + 'not_null': { + 'column_name': 'ledger_key_hash' + } + } +) }} + +{% set target_table_query %} + SELECT COUNT(*) AS tgt_row_count FROM {{ ref('contract_details_hist') }} +{% endset %} + +{% set results = run_query(target_table_query) %} + +{% if execute %} + {% set is_empty_target_table = (results.columns[0].values()[0] == 0) %} +{% else %} + {% set is_empty_target_table = false %} +{% endif %} + +with + contract_data as ( + select + cd.contract_id + , cd.ledger_key_hash + , cd.contract_key_type + , cd.contract_durability + , cd.last_modified_ledger + , cd.ledger_entry_change + , cd.ledger_sequence + , cd.asset_code + , cd.asset_issuer + , cd.asset_type + , cd.balance + , cd.balance_holder + , cd.deleted + , cd.closed_at + , cd.batch_insert_ts + , '{{ var("airflow_start_timestamp") }}' as airflow_start_ts + , cd.batch_id + , cd.batch_run_date + , row_number() over (partition by cd.ledger_key_hash order by cd.closed_at desc) as row_num + from + {{ source('crypto_stellar', 'contract_data') }} as cd + + {% if is_empty_target_table %} + -- No WHERE condition here, as we want to pull all records + {% elif var('recovery', 'False') == 'True' %} + {% if var('recovery_type', 'SingleDay') == 'SingleDay' %} + WHERE DATE(cd.closed_at) = '{{ var("recovery_date") }}' + {% elif var('recovery_type', 'Range') == 'Range' %} + WHERE DATE(cd.closed_at) BETWEEN '{{ var("recovery_start_day") }}' AND '{{ var("recovery_end_day") }}' + {% elif var('recovery_type', 'Full') == 'Full' %} + -- No WHERE condition for full recovery, as we want all data + {% endif %} + {% else %} + WHERE + timestamp(cd.closed_at) < timestamp_add(timestamp('{{ var("airflow_start_timestamp") }}'), interval 2 hour) + AND timestamp(cd.closed_at) >= timestamp_sub(timestamp('{{ var("airflow_start_timestamp") }}'), interval 4 hour) + {% endif %} + ) + + , ttl_data as ( + select + ttl.key_hash + , ttl.closed_at as ttl_closed_at + , ttl.live_until_ledger_seq + , ttl.deleted as ttl_deleted + , row_number() over (partition by ttl.key_hash order by ttl.closed_at desc) as row_num + from + {{ source('crypto_stellar', 'ttl') }} as ttl + + {% if is_empty_target_table %} + -- No WHERE condition here, as we want to pull all records + {% elif var('recovery', 'False') == 'True' %} + {% if var('recovery_type', 'SingleDay') == 'SingleDay' %} + WHERE DATE(ttl.closed_at) = '{{ var("recovery_date") }}' + {% elif var('recovery_type', 'Range') == 'Range' %} + WHERE DATE(ttl.closed_at) BETWEEN '{{ var("recovery_start_day") }}' AND '{{ var("recovery_end_day") }}' + {% elif var('recovery_type', 'Full') == 'Full' %} + -- No WHERE condition for full recovery, as we want all data + {% endif %} + {% else %} + WHERE + timestamp(ttl.closed_at) < timestamp_add(timestamp('{{ var("airflow_start_timestamp") }}'), interval 2 hour) + AND timestamp(ttl.closed_at) >= timestamp_sub(timestamp('{{ var("airflow_start_timestamp") }}'), interval 4 hour) + {% endif %} + ) + + , derived_data as ( + select + contract_id + , min(case when ledger_entry_change = 0 and contract_key_type = 'ScValTypeScvLedgerKeyContractInstance' then closed_at end) + as contract_create_ts + , max(case when ledger_entry_change = 1 then closed_at end) as contract_updated_ts + , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts + from {{ source('crypto_stellar', 'contract_data') }} + group by contract_id + ) + + , source_data as ( + select + cd.contract_id + , cd.ledger_key_hash + , cd.contract_key_type + , cd.contract_durability + , cd.last_modified_ledger + , cd.ledger_entry_change + , cd.ledger_sequence + , cd.asset_code + , cd.asset_issuer + , cd.asset_type + , cd.balance + , cd.balance_holder + , cd.deleted + , cd.closed_at + , ttl.key_hash + , ttl.ttl_closed_at + , ttl.live_until_ledger_seq + , ttl.ttl_deleted + , cd.batch_insert_ts + , cd.airflow_start_ts + , cd.batch_id + , cd.batch_run_date + from + contract_data as cd + left join + ttl_data as ttl + on + cd.ledger_key_hash = ttl.key_hash + where + cd.row_num = 1 -- Only the latest record per key_hash from contract_data + and (ttl.row_num = 1 or ttl.row_num is null) -- Only the latest record from ttl, if available + ) + +select + sd.contract_id + , sd.ledger_key_hash + , sd.contract_key_type + , sd.contract_durability + , dd.contract_create_ts + , dd.contract_updated_ts + , dd.contract_delete_ts + , sd.last_modified_ledger + , sd.ledger_entry_change + , sd.ledger_sequence + , sd.asset_code + , sd.asset_issuer + , sd.asset_type + , sd.balance + , sd.balance_holder + , sd.deleted + , sd.closed_at + , sd.key_hash + , sd.ttl_closed_at + , sd.live_until_ledger_seq + , sd.ttl_deleted + , sd.batch_insert_ts + , sd.airflow_start_ts + , sd.batch_id + , sd.batch_run_date + , current_timestamp() as dw_load_ts +from source_data as sd +left join derived_data as dd on sd.contract_id = dd.contract_id +order by + sd.contract_id, sd.closed_at