Skip to content

Commit

Permalink
WIP - Feature Branch for Contract Data and Code Staging tables
Browse files Browse the repository at this point in the history
  • Loading branch information
harsha-stellar-data committed Sep 20, 2024
1 parent bfa1997 commit 09731a4
Show file tree
Hide file tree
Showing 5 changed files with 492 additions and 0 deletions.
9 changes: 9 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ vars:
airflow_start_timestamp: "{{ env_var('AIRFLOW_START_TIMESTAMP', '2000-01-01') }}"
is_singular_airflow_task: "{{ env_var('IS_SINGULAR_AIRFLOW_TASK', 'false') }}"

# Soroban-contract dim airflow vars
recovery: "{{ env_var('RECOVERY', 'False') }}"
recovery_type: "{{ env_var('RECOVERY_TYPE', 'SingleDay') }}"
recovery_date: "{{ env_var('RECOVERY_DATE', '2024-09-01') }}"
recovery_start_day: "{{ env_var('RECOVERY_START_DAY', '2024-09-10') }}"
recovery_end_day: "{{ env_var('RECOVERY_END_DAY', '2024-09-15') }}"

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
Expand All @@ -39,6 +46,8 @@ models:
+materialized: view # how the staging models will be materialized
intermediate:
+materialized: view # how the intermediate models will be materialized
dims:
+materialized: table # how the dims models will be materialized
marts:
+materialized: table # how the mart models will be materialized

Expand Down
9 changes: 9 additions & 0 deletions models/dims/contract_details_hist.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config(materialized = 'table') }}

select
'dummy_contract_id' as contract_id
, 'dummy_ledger_key_hash' as ledger_key_hash
, current_timestamp() as start_date
, current_timestamp() as end_date
, true as is_current
where false
111 changes: 111 additions & 0 deletions models/dims/contract_details_hist.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
version: 2

models:
- name: contract_details_hist
description: "This table stores historical contract details with slowly changing dimension type 2 logic. It includes contract metadata and ledger information."
columns:
- name: contract_id
description: "Unique identifier for the contract."
tests:
- not_null
- name: ledger_key_hash
description: "Hash of the ledger key related to the contract."
tests:
- not_null
- name: start_date
description: "Start date of the contract's current or historical record."
tests:
- not_null
- name: end_date
description: "End date of the contract's historical record."
- name: is_current
description: "Indicates whether the record is the current version."
tests:
- not_null
- name: contract_key_type
description: "Type of contract key."
- name: contract_durability
description: "Durability level of the contract."
- name: contract_create_ts
description: "Timestamp when the contract was created."
- name: contract_update_ts
description: "Timestamp when the contract was last updated."
- name: contract_delete_ts
description: "Timestamp when the contract was deleted."
- name: last_modified_ledger
description: "Ledger sequence at the last modification."
- name: ledger_entry_change
description: "Change in the ledger entry."
- name: ledger_entry_change_type
description: "Type of change in the ledger entry."
- name: ledger_sequence
description: "Sequence number of the ledger."
- name: asset_code
description: "Code of the asset related to the contract."
- name: asset_issuer
description: "Issuer of the asset related to the contract."
- name: asset_type
description: "Type of asset related to the contract."
- name: balance
description: "Balance of the contract."
- name: balance_holder
description: "Holder of the contract balance."
- name: deleted
description: "Indicates whether the contract has been deleted."
- name: closed_at
description: "Timestamp when the contract was closed."
tests:
- not_null
- name: live_until_ledger_seq
description: "Ledger sequence up to which the contract is live."
- name: is_expired
description: "Indicates whether the contract has expired."
- name: n_instructions
description: "Number of instructions executed in the contract."
- name: n_functions
description: "Number of functions executed in the contract."
- name: n_globals
description: "Number of global variables in the contract."
- name: n_table_entries
description: "Number of table entries in the contract."
- name: n_types
description: "Number of types in the contract."
- name: n_data_segments
description: "Number of data segments in the contract."
- name: n_elem_segments
description: "Number of element segments in the contract."
- name: n_imports
description: "Number of imports in the contract."
- name: n_exports
description: "Number of exports in the contract."
- name: n_data_segment_bytes
description: "Number of data segment bytes in the contract."
- name: airflow_start_ts
description: "Timestamp for when the Airflow job started."
tests:
- not_null
- name: batch_id
description: "Unique identifier for the batch run."
tests:
- not_null
- name: batch_run_date
description: "Date of the batch run."
tests:
- not_null
- name: dw_load_ts
description: "Timestamp when the data was loaded into the data warehouse."
tests:
- not_null
- name: dw_update_ts
description: "Timestamp when the data was last updated in the data warehouse."
tests:
- not_null

# Unique Combination Test
tests:
- unique_combination_of_columns:
name: contract_details_hist_unique_constraint
columns:
- ledger_key_hash
- start_date
description: "Test to ensure that each combination of ledger_key_hash and start_date is unique."
191 changes: 191 additions & 0 deletions models/staging/stg_soroban_contract_code.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
{{ config(
materialized = 'view',
tags = ["soroban_analytics"],
enabled = true,
tests = {
'unique': {
'column_name': 'ledger_key_hash'
},
'not_null': {
'column_name': 'ledger_key_hash'
}
}
) }}

{% set target_table_query %}
SELECT COUNT(*) AS tgt_row_count FROM {{ ref('contract_details_hist') }}
{% endset %}

{% set results = run_query(target_table_query) %}

{% if execute %}
{% set is_empty_target_table = (results.columns[0].values()[0] == 0) %}
{% else %}
{% set is_empty_target_table = false %}
{% endif %}

with
contract_code as (
-- Pull data from contract_data
-- Depending on whether the target table is empty or based on recovery logic

select
cc.contract_code_hash
, cc.last_modified_ledger
, cc.ledger_entry_change
, cc.ledger_sequence
, cc.ledger_key_hash
, cc.n_instructions
, cc.n_functions
, cc.n_globals
, cc.n_table_entries
, cc.n_types
, cc.n_data_segments
, cc.n_elem_segments
, cc.n_imports
, cc.n_exports
, cc.n_data_segment_bytes
, cc.deleted
, cc.closed_at
, cc.batch_insert_ts
, '{{ var("airflow_start_timestamp") }}' as airflow_start_ts
, cc.batch_id
, cc.batch_run_date
, row_number() over (partition by cc.ledger_key_hash order by cc.closed_at desc) as row_num
from
{{ source('crypto_stellar', 'contract_code') }} as cc

-- If the target table is empty, pull all data for first-time run
{% if is_empty_target_table %}
-- No WHERE condition here, as we want to pull all records

{% elif var('recovery', 'False') == 'True' %}
-- If recovery is enabled, apply the appropriate recovery filter

-- Single day recovery
{% if var('recovery_type', 'SingleDay') == 'SingleDay' %}
WHERE DATE(cc.closed_at) = '{{ var("recovery_date") }}'

-- Date Range recovery
{% elif var('recovery_type', 'Range') == 'Range' %}
WHERE DATE(cc.closed_at) BETWEEN '{{ var("recovery_start_day") }}' AND '{{ var("recovery_end_day") }}'

-- Full recovery
{% elif var('recovery_type', 'Full') == 'Full' %}
-- No WHERE condition for full recovery, as we want all data
{% endif %}

-- Regular cadence filtering (2-hour forward window, 4-hour backward window)
{% else %}
WHERE
timestamp(cc.closed_at) < timestamp_add(timestamp('{{ var("airflow_start_timestamp") }}'), interval 2 hour)
AND timestamp(cc.closed_at) >= timestamp_sub(timestamp('{{ var("airflow_start_timestamp") }}'), interval 4 hour)
{% endif %}
)

, ttl_data as (
select
ttl.key_hash
, ttl.closed_at as ttl_closed_at
, ttl.live_until_ledger_seq
, ttl.deleted as ttl_deleted
, row_number() over (partition by ttl.key_hash order by ttl.closed_at desc) as row_num
from
{{ source('crypto_stellar', 'ttl') }} as ttl

-- If the target table is empty, pull all data for first-time run
{% if is_empty_target_table %}
-- No WHERE condition here, as we want to pull all records

{% elif var('recovery', 'False') == 'True' %}
-- If recovery is enabled, apply the appropriate recovery filter

-- Single day recovery
{% if var('recovery_type', 'SingleDay') == 'SingleDay' %}
WHERE DATE(ttl.closed_at) = '{{ var("recovery_date") }}'

-- Range recovery
{% elif var('recovery_type', 'Range') == 'Range' %}
WHERE DATE(ttl.closed_at) BETWEEN '{{ var("recovery_start_day") }}' AND '{{ var("recovery_end_day") }}'

-- Full recovery
{% elif var('recovery_type', 'Full') == 'Full' %}
-- No WHERE condition for full recovery, as we want all data
{% endif %}

-- Regular cadence filtering (2-hour forward window, 4-hour backward window)
{% else %}
WHERE
timestamp(ttl.closed_at) < timestamp_add(timestamp('{{ var("airflow_start_timestamp") }}'), interval 2 hour)
AND timestamp(ttl.closed_at) >= timestamp_sub(timestamp('{{ var("airflow_start_timestamp") }}'), interval 4 hour)
{% endif %}
)

, source_data as (
select
cc.contract_code_hash
, cc.last_modified_ledger
, cc.ledger_entry_change
, cc.ledger_sequence
, cc.ledger_key_hash
, cc.n_instructions
, cc.n_functions
, cc.n_globals
, cc.n_table_entries
, cc.n_types
, cc.n_data_segments
, cc.n_elem_segments
, cc.n_imports
, cc.n_exports
, cc.n_data_segment_bytes
, cc.deleted
, cc.closed_at
, ttl.key_hash
, ttl.ttl_closed_at
, ttl.live_until_ledger_seq
, ttl.ttl_deleted
, cc.batch_insert_ts
, cc.airflow_start_ts
, cc.batch_id
, cc.batch_run_date
from
contract_code as cc
left join
ttl_data as ttl
on
cc.ledger_key_hash = ttl.key_hash
where
cc.row_num = 1 -- Only the latest record per ledger_key_hash from contract_code
and (ttl.row_num = 1 or ttl.row_num is null) -- Only the latest record from ttl, if available
)

select
contract_code_hash
, last_modified_ledger
, ledger_entry_change
, ledger_sequence
, ledger_key_hash
, n_instructions
, n_functions
, n_globals
, n_table_entries
, n_types
, n_data_segments
, n_elem_segments
, n_imports
, n_exports
, n_data_segment_bytes
, deleted
, closed_at
, key_hash
, ttl_closed_at
, live_until_ledger_seq
, ttl_deleted
, batch_insert_ts
, airflow_start_ts
, batch_id
, batch_run_date
, current_timestamp() as dw_load_ts
from source_data
order by
ledger_key_hash, closed_at
Loading

0 comments on commit 09731a4

Please sign in to comment.