diff --git a/models/intermediate/int_incr_contract_code.sql b/models/intermediate/int_incr_contract_code.sql new file mode 100644 index 0000000..de56414 --- /dev/null +++ b/models/intermediate/int_incr_contract_code.sql @@ -0,0 +1,103 @@ +{{ + config( + materialized = 'incremental', + unique_key = ['ledger_key_hash', 'closed_at'], + cluster_by = ["ledger_key_hash", "closed_at"], + tags = ["soroban_analytics"] + ) +}} +/* +Model: int_incr_contract_code + +Description: +------------ +This intermediate model handles the incremental loading of contract code from the staging layer. +It processes only new or updated data up to the previous day based on `execution_date`. + +Features: +--------- +1. Uses execution_date from Airflow to determine the processing window. +2. Processes new or updated records for completed days up to the previous day. +3. Selects only the latest entry per `ledger_key_hash` per day within the processing window. + +Usage: +------ +Compile: + dbt compile --models int_incr_contract_code + dbt compile --models int_incr_contract_code --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' +Run: + dbt run --models int_incr_contract_code --full-refresh + dbt run --models int_incr_contract_code + dbt run --models int_incr_contract_code --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' +*/ + +-- Set the execution date (use Airflow value, fallback to dbt default if absent) +{% set execution_date = var('execution_date', dbt_airflow_macros.ts(timezone=none)) %} + +with + source_data as ( + select + contract_code_hash + , ledger_key_hash + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , batch_id + , batch_run_date + from {{ ref('stg_contract_code') }} + where + -- Process only completed days up to execution_date for incremental loads + date(closed_at) < date('{{ execution_date }}') + + -- Process records inserted since the last load (incremental only) + {% if is_incremental() %} + and date(closed_at) >= (select coalesce(max(date(closed_at)), '2000-01-01') from {{ this }}) + {% endif %} + ) + , ranked_data as ( + select + * + , row_number() over ( + partition by ledger_key_hash, cast(closed_at as date) + order by closed_at desc, ledger_sequence desc + ) as rn + from source_data + ) + +-- Pick only the latest change per ledger per day +select + ledger_key_hash + , contract_code_hash + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , batch_id + , batch_run_date + , cast('{{ execution_date }}' as timestamp) as airflow_start_ts + , current_timestamp() as dw_load_ts +from ranked_data +where rn = 1 diff --git a/models/intermediate/int_incr_contract_data.sql b/models/intermediate/int_incr_contract_data.sql new file mode 100644 index 0000000..5b2568b --- /dev/null +++ b/models/intermediate/int_incr_contract_data.sql @@ -0,0 +1,103 @@ +{{ + config( + materialized = 'incremental', + unique_key = ['ledger_key_hash', 'closed_at'], + partition_by = { + "field": "closed_at", + "data_type": "date", + "granularity": "month" + }, + cluster_by = ['closed_at', 'ledger_key_hash'], + tags = ["soroban_analytics"] + ) +}} +/* +Model: int_incr_contract_data + +Description: +------------ +This intermediate model handles the incremental loading of contract data from the staging layer. +It processes only new or updated data up to the previous day based on `execution_date`. + +Features: +--------- +1. Uses execution_date from Airflow to determine the processing window. +2. Processes new or updated records for completed days up to the previous day. +3. Selects only the latest entry per `ledger_key_hash` per day within the processing window. + +Usage: +------ +Compile: + dbt compile --models int_incr_contract_data + dbt compile --models int_incr_contract_data --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' +Run: + dbt run --models int_incr_contract_data --full-refresh + dbt run --models int_incr_contract_data + dbt run --models int_incr_contract_data --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' +*/ + +-- Set the execution date (use Airflow value, fallback to dbt default if absent) +{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} + +with + source_data as ( + select + contract_id + , ledger_key_hash + , contract_key_type + , contract_durability + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , deleted + , closed_at + , batch_insert_ts + , batch_id + , batch_run_date + from {{ ref('stg_contract_data') }} + where + -- Process only completed days up to execution_date for incremental loads + date(closed_at) < date('{{ execution_date }}') + + -- Process records inserted since the last load (incremental only) + {% if is_incremental() %} + and date(closed_at) >= (select coalesce(max(date(closed_at)), '2000-01-01') from {{ this }}) + {% endif %} + ) + , ranked_data as ( + select + * + , row_number() over ( + partition by ledger_key_hash, cast(closed_at as date) + order by closed_at desc, ledger_sequence desc + ) as rn + from source_data + ) + +-- Pick only the latest change per ledger per day +select + contract_id + , ledger_key_hash + , contract_key_type + , contract_durability + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , batch_id + , batch_run_date + , cast('{{ execution_date }}' as timestamp) as airflow_start_ts + , current_timestamp() as dw_load_ts +from ranked_data +where rn = 1 diff --git a/models/intermediate/int_incr_ttl.sql b/models/intermediate/int_incr_ttl.sql new file mode 100644 index 0000000..0e8bebd --- /dev/null +++ b/models/intermediate/int_incr_ttl.sql @@ -0,0 +1,90 @@ +{{ + config( + materialized = 'incremental', + unique_key = ['key_hash', 'closed_at'], + partition_by = { + "field": "closed_at", + "data_type": "date", + "granularity": "month" + }, + cluster_by = ['closed_at', 'key_hash'], + tags = ["soroban_analytics"] + ) +}} +/* +Model: int_incr_ttl + +Description: +------------ +This intermediate model handles the incremental loading of ledger time-to-live (TTL) +data from the staging layer. It processes only new or updated data up to the previous day based on `execution_date`. + +Features: +--------- +1. Uses execution_date from Airflow to determine the processing window. +2. Processes new or updated records for completed days up to the previous day. +3. Selects only the latest entry per `key_hash` per day within the processing window. + +Usage: +------ +Compile: + dbt compile --models int_incr_ttl + dbt compile --models int_incr_ttl --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' +Run: + dbt run --models int_incr_ttl --full-refresh + dbt run --models int_incr_ttl + dbt run --models int_incr_ttl --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' +*/ + +-- Set the execution date (use Airflow value, fallback to dbt default if absent) +{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} + +with + source_data as ( + select + key_hash + , live_until_ledger_seq + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , batch_insert_ts + , batch_id + , batch_run_date + from {{ ref('stg_ttl') }} + where + -- Process only completed days up to execution_date for incremental loads + date(closed_at) < date('{{ execution_date }}') + + -- Process records inserted since the last load (incremental only) + {% if is_incremental() %} + and date(closed_at) >= (select coalesce(max(date(closed_at)), '2000-01-01') from {{ this }}) + {% endif %} + ) + , ranked_data as ( + select + * + , row_number() over ( + partition by key_hash, cast(closed_at as date) + order by closed_at desc, ledger_sequence desc + ) as rn + from source_data + ) + +-- Pick only the latest change per key_hash per day +select + key_hash + , live_until_ledger_seq + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , batch_insert_ts + , batch_id + , batch_run_date + , cast('{{ execution_date }}' as timestamp) as airflow_start_ts + , current_timestamp() as dw_load_ts +from ranked_data +where rn = 1 diff --git a/models/intermediate/int_transform_contract_code.sql b/models/intermediate/int_transform_contract_code.sql new file mode 100644 index 0000000..88d2345 --- /dev/null +++ b/models/intermediate/int_transform_contract_code.sql @@ -0,0 +1,135 @@ +{{ + config( + materialized = 'table', + unique_key = ['ledger_key_hash', 'closed_at'], + cluster_by = ["ledger_key_hash", "closed_at", "row_hash"], + tags = ["soroban_analytics"] + ) +}} + +/* +Model: int_transform_contract_code + +Description: +------------ +This intermediate model applies transformations to the contract code, calculates new fields, +and performs deduplication based on the transformed data. + +Load Type: Full Truncate and Reload + +Features: +--------- +1. Calculates contract_create_ts and contract_delete_ts from staging data. +2. Joins the calculated timestamps with the incremental data and applies transformations. +3. Standardizes data by applying default values as well as type casting as applicable. +4. Calculates row hash using sha256 on all relevant CDC fields. +5. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects dupes in consecutive rows. + +Usage: +------ +Compile: + dbt compile --models int_transform_contract_code + +Run: + dbt run --models int_transform_contract_code --full-refresh + dbt run --models int_transform_contract_code +*/ + +with + derived_data as ( + select + contract_code_hash + , min(case when ledger_entry_change = 0 then closed_at end) as contract_create_ts -- Calculate contract created timestamp + -- Calculate contract deletion timestamp + , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts + from {{ ref('stg_contract_code') }} + group by contract_code_hash + ) + + , source_data as ( + select + int.contract_code_hash + , int.ledger_key_hash + , int.n_instructions + , int.n_functions + , int.n_globals + , int.n_table_entries + , int.n_types + , int.n_data_segments + , int.n_elem_segments + , int.n_imports + , int.n_exports + , int.n_data_segment_bytes + , derived.contract_create_ts + , derived.contract_delete_ts + , int.closed_at + , int.ledger_sequence + , int.airflow_start_ts + , int.batch_id + , int.batch_run_date + from {{ ref('int_incr_contract_code') }} as int + left join derived_data as derived on int.contract_code_hash = derived.contract_code_hash + ) + + -- Calculate a hash for each row using sha256 + , hashed_data as ( + select + * + , sha256(concat( + coalesce(ledger_key_hash, '') + , coalesce(cast(n_instructions as string), '') + , coalesce(cast(n_functions as string), '') + , coalesce(cast(n_globals as string), '') + , coalesce(cast(n_table_entries as string), '') + , coalesce(cast(n_types as string), '') + , coalesce(cast(n_data_segments as string), '') + , coalesce(cast(n_elem_segments as string), '') + , coalesce(cast(n_imports as string), '') + , coalesce(cast(n_exports as string), '') + , coalesce(cast(n_data_segment_bytes as string), '') + , coalesce(cast(contract_create_ts as string), '') + , coalesce(cast(contract_delete_ts as string), '') + )) as row_hash + from source_data + ) + + -- Identify changes between consecutive records and pick the first record if duplicates + , dedup_data as ( + select + * + -- Get the hash of the previous record for each contract_code_hash + , lag(row_hash) over (partition by contract_code_hash order by closed_at, ledger_sequence) as prev_row_hash + , case + when row_hash = lag(row_hash) over (partition by contract_code_hash order by closed_at, ledger_sequence) + then 0 -- No change + else 1 -- Change + end as is_change -- Flag records that are different from their previous record + from hashed_data + ) + +-- Keep records that are different from their previous record (is_change = 1) +-- OR the first record for each ledger_key_hash (prev_row_hash is null) +select + contract_code_hash + , ledger_key_hash + , ledger_sequence + , contract_create_ts + , contract_delete_ts + , closed_at + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , to_hex(row_hash) as row_hash -- Convert the row hash to hex format to store it as a STRING field + , batch_id + , batch_run_date + , airflow_start_ts + , current_timestamp() as dw_load_ts +from dedup_data +where is_change = 1 or prev_row_hash is null diff --git a/models/intermediate/int_transform_contract_data.sql b/models/intermediate/int_transform_contract_data.sql new file mode 100644 index 0000000..cd6d62d --- /dev/null +++ b/models/intermediate/int_transform_contract_data.sql @@ -0,0 +1,135 @@ +{{ + config( + materialized = 'table', + unique_key = ['ledger_key_hash', 'closed_at'], + partition_by = { + "field": "closed_at", + "data_type": "date", + "granularity": "month" + }, + cluster_by = ["ledger_key_hash", "closed_at", "row_hash"], + tags = ["soroban_analytics"] + ) +}} + +/* +Model: int_transform_contract_data + +Description: +------------ +This intermediate model applies transformations to the contract data, calculates new fields, +and performs deduplication based on the transformed data. + +Load Type: Truncate and Reload + +Features: +--------- +1. Calculates contract_create_ts and contract_delete_ts from staging data. +2. Joins the calculated timestamps with the incremental data and applies transformations. +3. Trims spaces on string fields and standardizes blank values as NULL. +4. Calculates row hash using sha256 on all relevant fields. +5. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects dupes in consecutive rows. + +Usage: +------ +Compile: + dbt compile --models int_transform_contract_data + +Run: + dbt run --models int_transform_contract_data --full-refresh + dbt run --models int_transform_contract_data +*/ + +with + derived_data as ( + select + contract_id + -- Calculate contract created timestamp + , min(case when ledger_entry_change = 0 and contract_key_type = 'ScValTypeScvLedgerKeyContractInstance' then closed_at end) + as contract_create_ts + -- Calculate contract deletion timestamp + , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts + from {{ ref('stg_contract_data') }} + group by contract_id + ) + + , transformed_data as ( + select + incr.contract_id + , incr.ledger_key_hash + , case + when nullif(trim(incr.contract_durability), '') = 'ContractDataDurabilityPersistent' then 'persistent' + when nullif(trim(incr.contract_durability), '') = 'ContractDataDurabilityTemporary' then 'temporary' + else nullif(trim(incr.contract_durability), '') + end as contract_durability + , nullif(trim(incr.asset_code), '') as asset_code + , nullif(trim(incr.asset_issuer), '') as asset_issuer + , nullif(trim(incr.asset_type), '') as asset_type + , cast(round(safe_cast(incr.balance as float64) / pow(10, 7), 5) as numeric) as balance + , nullif(trim(incr.balance_holder), '') as balance_holder + , derived.contract_create_ts + , derived.contract_delete_ts + , incr.closed_at + , incr.ledger_sequence + , incr.airflow_start_ts + , incr.batch_id + , incr.batch_run_date + from {{ ref('int_incr_contract_data') }} as incr + left join derived_data as derived on incr.contract_id = derived.contract_id + ) + + -- Calculate a hash for each row using sha256 + , hashed_data as ( + select + * + , sha256(concat( + coalesce(ledger_key_hash, '') + , coalesce(contract_durability, '') + , coalesce(asset_code, '') + , coalesce(asset_issuer, '') + , coalesce(asset_type, '') + , coalesce(cast(balance as string), '') + , coalesce(balance_holder, '') + , coalesce(cast(contract_create_ts as string), '') + , coalesce(cast(contract_delete_ts as string), '') + )) as row_hash + from transformed_data + ) + + -- Identify changes between consecutive records and pick the first record if duplicates + , dedup_data as ( + select + * + -- Get the hash of the previous record for each contract_id + , lag(row_hash) over (partition by contract_id order by closed_at, ledger_sequence) as prev_row_hash + , case + when row_hash = lag(row_hash) over (partition by contract_id order by closed_at, ledger_sequence) + then 0 -- No change + else 1 -- Change + end as is_change -- Flag records that are different from their previous record + from hashed_data + ) + +-- Select deduplicated records +-- Keep records that are different from their previous record (is_change = 1) +-- OR the first record for each ledger_key_hash (prev_row_hash is null) +select + contract_id + , ledger_key_hash + , contract_durability + , ledger_sequence + , contract_create_ts + , contract_delete_ts + , closed_at + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , batch_id + , batch_run_date + , airflow_start_ts + , to_hex(row_hash) as row_hash -- Convert the row hash to hex format to store it as a STRING field + , current_timestamp() as dw_load_ts +from dedup_data +where is_change = 1 or prev_row_hash is null diff --git a/models/intermediate/int_transform_ttl.sql b/models/intermediate/int_transform_ttl.sql new file mode 100644 index 0000000..9a04e09 --- /dev/null +++ b/models/intermediate/int_transform_ttl.sql @@ -0,0 +1,109 @@ +{{ + config( + materialized = 'table', + unique_key = ['key_hash', 'closed_at'], + partition_by = { + "field": "closed_at", + "data_type": "date", + "granularity": "month" + }, + cluster_by = ["key_hash", "closed_at", "row_hash"], + tags = ["soroban_analytics"] + ) +}} + +/* +Model: int_transform_ttl + +Description: +------------ +This intermediate model applies transformations to ttl data, calculates new fields, +and performs deduplication based on the transformed data. + +Load Type: Truncate and Reload + +Features: +--------- +1. Calculates ttl_create_ts and ttl_delete_ts from staging data. +2. Joins the calculated timestamps with the incremental data and applies transformations. +3. Calculates row hash using sha256 on all relevant fields. +4. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects dupes in consecutive rows. + +Usage: +------ +Compile: + dbt compile --models int_transform_ttl + +Run: + dbt run --models int_transform_ttl --full-refresh + dbt run --models int_transform_ttl +*/ + +with + derived_data as ( + select + key_hash + , min(case when ledger_entry_change = 0 then closed_at end) as ttl_create_ts -- Calculate contract created timestamp + , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as ttl_delete_ts -- Calculate contract deletion timestamp + from {{ ref('stg_ttl') }} + group by key_hash + ) + + , transformed_data as ( + select + ttl.key_hash + , ttl.live_until_ledger_seq + , derived.ttl_create_ts + , derived.ttl_delete_ts + , ttl.closed_at + , ttl.ledger_sequence + , ttl.batch_id + , ttl.batch_run_date + , ttl.airflow_start_ts + from {{ ref('int_incr_ttl') }} as ttl + left join derived_data as derived on ttl.key_hash = derived.key_hash + ) + + -- Calculate a hash for each row using sha256 + , hashed_data as ( + select + * + , sha256(concat( + coalesce(cast(live_until_ledger_seq as string), '') + , coalesce(cast(ttl_create_ts as string), '') + , coalesce(cast(ttl_delete_ts as string), '') + )) as row_hash + from transformed_data + ) + + -- Identify changes between consecutive records and pick the first record if duplicates + , dedup_data as ( + select + * + -- Get the hash of the previous record for each key_hash + , lag(row_hash) over (partition by key_hash order by closed_at, ledger_sequence) as prev_row_hash + , case + when row_hash = lag(row_hash) over (partition by key_hash order by closed_at, ledger_sequence) + then 0 -- No change + else 1 -- Change + end as is_change -- Flag records that are different from their previous record + from hashed_data + ) + +-- Select deduplicated records +-- Keep records that are different from their previous record (is_change = 1) +-- OR the first record for each key_hash (prev_row_hash is null) +select + key_hash + , live_until_ledger_seq + , ttl_create_ts + , ttl_delete_ts + , closed_at + , ledger_sequence + , to_hex(row_hash) as row_hash -- Convert the row hash to hex format to store it as a STRING field + , batch_id + , batch_run_date + , airflow_start_ts + , current_timestamp() as dw_load_ts +from dedup_data +where is_change = 1 or prev_row_hash is null diff --git a/models/marts/dim_contract_code_current.sql b/models/marts/dim_contract_code_current.sql new file mode 100644 index 0000000..5274823 --- /dev/null +++ b/models/marts/dim_contract_code_current.sql @@ -0,0 +1,76 @@ +{{ + config( + materialized = 'table', + unique_key = ['ledger_key_hash'], + cluster_by = ["ledger_key_hash"], + tags = ["soroban_analytics"] + ) +}} + +-- Model: dim_contract_code_current +-- +-- Description: +-- ------------ +-- This model filters the `dim_contract_code_hist` table to select only the current records (where `is_current` is TRUE). +-- This allows us to create a snapshot of the latest active state for each `ledger_key_hash`. +-- +-- Features: +-- --------- +-- 1. Selects only the records where `is_current` is TRUE from the historical table. +-- 2. Includes the relevant columns to represent the current state of each contract code. +-- +-- Usage: +-- ------ +-- Compile: +-- dbt compile --models dim_contract_code_current +-- +-- Run: +-- dbt run --models dim_contract_code_current + +with + current_records as ( + select + ledger_key_hash + , contract_code_hash + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , contract_create_ts + , contract_delete_ts + , closed_at + , batch_id + , batch_run_date + , airflow_start_ts + from {{ ref('dim_contract_code_hist') }} + where is_current is true -- Select only current version of each contract + ) + +-- Final output: Current state snapshot of all contracts +select + ledger_key_hash + , contract_code_hash + , closed_at + , contract_create_ts + , contract_delete_ts + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , batch_id + , batch_run_date + , airflow_start_ts + , current_timestamp() as dw_load_ts +from current_records diff --git a/models/marts/dim_contract_code_hist.sql b/models/marts/dim_contract_code_hist.sql new file mode 100644 index 0000000..dab69d5 --- /dev/null +++ b/models/marts/dim_contract_code_hist.sql @@ -0,0 +1,242 @@ +{{ + config( + materialized = 'incremental', + unique_key = ['ledger_key_hash', 'start_date'], + cluster_by = ["ledger_key_hash", "start_date", "row_hash"], + tags = ["soroban_analytics"] + ) +}} + +/* +Model: dim_contract_code_hist + +Description: +------------ +This model implements Slowly Changing Dimension (SCD) Type 2 logic to track historical changes +in contract code data over time. The model supports initial loads and incremental updates, including catch-up. + +Features: +--------- +1. Creates a record for each change in `ledger_key_hash`. +2. Applies change tracking logic using row hash comparison to detect updates. +3. Implements date chaining to maintain historical records, marking `is_current` for the latest active record. +4. Supports multi-day catch-up scenarios by processing all relevant records based on the incremental logic. + +Usage: +------ +Compile: + dbt compile --models dim_contract_code_hist + +Run: + dbt run --models dim_contract_code_hist --full-refresh + dbt run --models dim_contract_code_hist +*/ + +-- Load latest changes from transformed contract code data, applying incremental filter +with + source_data as ( + select + contract_code_hash + , ledger_key_hash + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , contract_create_ts + , contract_delete_ts + , closed_at + , date(closed_at) as start_date + , airflow_start_ts + , batch_id + , batch_run_date + , row_hash + from {{ ref('int_transform_contract_code') }} + {% if is_incremental() %} + where closed_at > (select coalesce(max(closed_at), '2000-01-01T00:00:00+00:00') from {{ this }}) + {% endif %} + ) + + -- Get existing history records for affected contracts, or create empty shell for initial load + , target_data as ( + {% if is_incremental() %} + select * + from {{ this }} + where ledger_key_hash in (select distinct ledger_key_hash from source_data) + {% else %} + select + CAST(NULL AS STRING) AS ledger_key_hash + , CAST(NULL AS DATE) AS start_date + , CAST(NULL AS STRING) AS contract_code_hash + , CAST(NULL AS INT64) AS n_instructions + , CAST(NULL AS INT64) AS n_functions + , CAST(NULL AS INT64) AS n_globals + , CAST(NULL AS INT64) AS n_table_entries + , CAST(NULL AS INT64) AS n_types + , CAST(NULL AS INT64) AS n_data_segments + , CAST(NULL AS INT64) AS n_elem_segments + , CAST(NULL AS INT64) AS n_imports + , CAST(NULL AS INT64) AS n_exports + , CAST(NULL AS INT64) AS n_data_segment_bytes + , CAST(NULL AS TIMESTAMP) AS contract_create_ts + , CAST(NULL AS TIMESTAMP) AS contract_delete_ts + , CAST(NULL AS TIMESTAMP) AS closed_at + , CAST(NULL AS DATE) AS end_date + , CAST(NULL AS BOOLEAN) AS is_current + , CAST(NULL AS TIMESTAMP) AS airflow_start_ts + , CAST(NULL AS STRING) AS batch_id + , CAST(NULL AS DATETIME) AS batch_run_date + , CAST(NULL AS STRING) AS row_hash + , CAST(NULL AS TIMESTAMP) AS dw_load_ts + , CAST(NULL AS TIMESTAMP) AS dw_update_ts + from source_data + where 1 = 0 + {% endif %} + ) + + -- Detect changes by comparing source and target data attributes + -- Outputs: Change type (Insert/Update/NoChange) based on row hash comparison + , combined_cdc_data as ( + select + coalesce(s.ledger_key_hash, t.ledger_key_hash) as ledger_key_hash + , coalesce(s.start_date, t.start_date) as start_date + , s.contract_code_hash as source_contract_code_hash + , t.contract_code_hash as target_contract_code_hash + , s.n_instructions as source_n_instructions + , t.n_instructions as target_n_instructions + , s.n_functions as source_n_functions + , t.n_functions as target_n_functions + , s.n_globals as source_n_globals + , t.n_globals as target_n_globals + , s.n_table_entries as source_n_table_entries + , t.n_table_entries as target_n_table_entries + , s.n_types as source_n_types + , t.n_types as target_n_types + , s.n_data_segments as source_n_data_segments + , t.n_data_segments as target_n_data_segments + , s.n_elem_segments as source_n_elem_segments + , t.n_elem_segments as target_n_elem_segments + , s.n_imports as source_n_imports + , t.n_imports as target_n_imports + , s.n_exports as source_n_exports + , t.n_exports as target_n_exports + , s.n_data_segment_bytes as source_n_data_segment_bytes + , t.n_data_segment_bytes as target_n_data_segment_bytes + , s.contract_create_ts as source_contract_create_ts + , t.contract_create_ts as target_contract_create_ts + , s.contract_delete_ts as source_contract_delete_ts + , t.contract_delete_ts as target_contract_delete_ts + , s.closed_at as source_closed_at + , t.closed_at as target_closed_at + , t.end_date as target_end_date + , t.is_current as target_is_current + , s.airflow_start_ts as source_airflow_start_ts + , t.airflow_start_ts as target_airflow_start_ts + , s.batch_id as source_batch_id + , t.batch_id as target_batch_id + , s.batch_run_date as source_batch_run_date + , t.batch_run_date as target_batch_run_date + , s.row_hash as source_row_hash + , t.row_hash as target_row_hash + , t.dw_load_ts as target_dw_load_ts + , t.dw_update_ts as target_dw_update_ts + , case + when t.ledger_key_hash is null then 'Insert' -- New record + when s.row_hash != t.row_hash then 'Update' -- Changed record + else 'NoChange' -- No change in the current record + end as change_type + from source_data as s + full outer join target_data as t + on s.ledger_key_hash = t.ledger_key_hash + and s.start_date = t.start_date + ) + + -- Date chaining for SCD Type 2 with separated CTEs + , date_chained as ( + select + cdc.* + , lead(cdc.start_date) over (partition by cdc.ledger_key_hash order by cdc.start_date) as next_start_date + -- Operation Types: + -- INSERT_NEW_KEY: First occurrence of a ledger_key_hash + -- START_NEW_VERSION: New version of existing record due to attribute changes + -- END_CURRENT_VERSION: Close current version due to future changes + -- KEEP_CURRENT: No changes needed, maintain current record + , case + when cdc.change_type = 'Insert' then 'INSERT_NEW_KEY' + when cdc.change_type = 'Update' then 'START_NEW_VERSION' + when + cdc.change_type = 'NoChange' + and lead(cdc.start_date) over (partition by cdc.ledger_key_hash order by cdc.start_date) is not null + then 'END_CURRENT_VERSION' + else 'KEEP_CURRENT' + end as operation_type + from combined_cdc_data as cdc + ) + + -- Final data processing with all transformations + , final_data as ( + select + dc.ledger_key_hash + , coalesce(dc.source_contract_code_hash, dc.target_contract_code_hash) as contract_code_hash + , coalesce(dc.source_n_instructions, dc.target_n_instructions) as n_instructions + , coalesce(dc.source_n_functions, dc.target_n_functions) as n_functions + , coalesce(dc.source_n_globals, dc.target_n_globals) as n_globals + , coalesce(dc.source_n_table_entries, dc.target_n_table_entries) as n_table_entries + , coalesce(dc.source_n_types, dc.target_n_types) as n_types + , coalesce(dc.source_n_data_segments, dc.target_n_data_segments) as n_data_segments + , coalesce(dc.source_n_elem_segments, dc.target_n_elem_segments) as n_elem_segments + , coalesce(dc.source_n_imports, dc.target_n_imports) as n_imports + , coalesce(dc.source_n_exports, dc.target_n_exports) as n_exports + , coalesce(dc.source_n_data_segment_bytes, dc.target_n_data_segment_bytes) as n_data_segment_bytes + , coalesce(dc.source_contract_create_ts, dc.target_contract_create_ts) as contract_create_ts + , coalesce(dc.source_contract_delete_ts, dc.target_contract_delete_ts) as contract_delete_ts + , coalesce(dc.source_closed_at, dc.target_closed_at) as closed_at + , dc.start_date + , coalesce(date_sub(dc.next_start_date, interval 1 day), date('9999-12-31')) as end_date + , coalesce(row_number() over (partition by dc.ledger_key_hash order by dc.start_date desc) = 1, false) as is_current + , coalesce(dc.source_airflow_start_ts, dc.target_airflow_start_ts) as airflow_start_ts + , coalesce(dc.source_batch_id, dc.target_batch_id) as batch_id + , coalesce(dc.source_batch_run_date, dc.target_batch_run_date) as batch_run_date + , coalesce(dc.source_row_hash, dc.target_row_hash) as row_hash + , coalesce(dc.target_dw_load_ts, current_timestamp()) as dw_load_ts + , current_timestamp() as dw_update_ts + , dc.operation_type + from date_chained as dc + where dc.operation_type in ( + 'INSERT_NEW_KEY' + , 'START_NEW_VERSION' + , 'END_CURRENT_VERSION' + ) + ) + +select + ledger_key_hash + , contract_code_hash + , start_date + , end_date + , is_current + , contract_create_ts + , contract_delete_ts + , closed_at + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , row_hash + , batch_id + , batch_run_date + , airflow_start_ts + , dw_load_ts + , dw_update_ts +from final_data diff --git a/models/marts/dim_contract_data_current.sql b/models/marts/dim_contract_data_current.sql new file mode 100644 index 0000000..b23ab06 --- /dev/null +++ b/models/marts/dim_contract_data_current.sql @@ -0,0 +1,70 @@ +{{ config( + materialized = 'table', + unique_key = ['ledger_key_hash'], + partition_by = { + "field": "TIMESTAMP_TRUNC(closed_at, MONTH)", + "data_type": "DATE" + }, + cluster_by = ["ledger_key_hash"], + tags = ["soroban_analytics"] +) }} + +-- Model: dim_contract_data_current +-- +-- Description: +-- ------------ +-- This model filters the `dim_contract_data_hist` table to select only the current records (where `is_current` is TRUE). +-- This creates a snapshot of the latest active state for each `ledger_key_hash`. +-- +-- Features: +-- --------- +-- 1. Selects only the records where `is_current` is TRUE from the historical table. +-- 2. Includes relevant columns to represent the current state of each contract. +-- +-- Usage: +-- ------ +-- Compile: +-- dbt compile --models dim_contract_data_current +-- +-- Run: +-- dbt run --models dim_contract_data_current + +with + current_records as ( + select + ledger_key_hash + , contract_id + , contract_durability + , contract_create_ts + , contract_delete_ts + , closed_at + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , batch_id + , batch_run_date + , airflow_start_ts + from {{ ref('dim_contract_data_hist') }} + where is_current = true -- Select only current version of each contract + ) + +-- Final output: Current state snapshot of all contracts +select + ledger_key_hash + , contract_id + , contract_durability + , contract_create_ts + , contract_delete_ts + , closed_at + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , batch_id + , batch_run_date + , airflow_start_ts + , current_timestamp() as dw_load_ts -- Load timestamp to track data freshness +from current_records diff --git a/models/marts/dim_contract_data_hist.sql b/models/marts/dim_contract_data_hist.sql new file mode 100644 index 0000000..446b10a --- /dev/null +++ b/models/marts/dim_contract_data_hist.sql @@ -0,0 +1,216 @@ +{{ config( + materialized = 'incremental', + unique_key = ['ledger_key_hash', 'start_date'], + partition_by = { + "field": "start_date", + "data_type": "date", + "granularity": "month" + }, + cluster_by = ["ledger_key_hash", "start_date", "row_hash"], + tags = ["soroban_analytics"] +) }} + +/* +Model: dim_contract_data_hist + +Description: +------------ +This model implements Slowly Changing Dimension (SCD) Type 2 logic to track historical changes +in contract data over time. The model supports initial loads and incremental updates. + +Features: +--------- +1. Creates a record for each change in `ledger_key_hash`. +2. Applies change tracking logic using row hash comparison to detect updates. +3. Implements date chaining to maintain historical records, marking `is_current` for the latest active record. + +Usage: +------ +Compile: + dbt compile --models dim_contract_data_hist + +Run: + dbt run --models dim_contract_data_hist --full-refresh + dbt run --models dim_contract_data_hist +*/ + +-- Load latest changes from transformed contract data, applying incremental filter +with + source_data as ( + select + contract_id + , ledger_key_hash + , contract_durability + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , contract_create_ts + , contract_delete_ts + , closed_at + , date(closed_at) as start_date + , airflow_start_ts + , batch_id + , batch_run_date + , row_hash + from {{ ref('int_transform_contract_data') }} + {% if is_incremental() %} + where closed_at > (select coalesce(max(closed_at), '2000-01-01T00:00:00+00:00') from {{ this }}) + {% endif %} + ) + + -- Get existing history records for affected contracts, or create empty shell for initial load + , target_data as ( + {% if is_incremental() %} + select * + from {{ this }} + where ledger_key_hash in (select distinct ledger_key_hash from source_data) + {% else %} + select + CAST(NULL AS STRING) AS ledger_key_hash + , CAST(NULL AS DATE) AS start_date + , CAST(NULL AS STRING) AS contract_id + , CAST(NULL AS STRING) AS contract_durability + , CAST(NULL AS STRING) AS asset_code + , CAST(NULL AS STRING) AS asset_issuer + , CAST(NULL AS STRING) AS asset_type + , CAST(NULL AS NUMERIC) AS balance + , CAST(NULL AS STRING) AS balance_holder + , CAST(NULL AS TIMESTAMP) AS contract_create_ts + , CAST(NULL AS TIMESTAMP) AS contract_delete_ts + , CAST(NULL AS TIMESTAMP) AS closed_at + , CAST(NULL AS DATE) AS end_date + , CAST(NULL AS BOOLEAN) AS is_current + , CAST(NULL AS TIMESTAMP) AS airflow_start_ts + , CAST(NULL AS STRING) AS batch_id + , CAST(NULL AS DATETIME) AS batch_run_date + , CAST(NULL AS STRING) AS row_hash + , CAST(NULL AS TIMESTAMP) AS dw_load_ts + , CAST(NULL AS TIMESTAMP) AS dw_update_ts + from source_data + where 1 = 0 + {% endif %} + ) + + -- Detect changes by comparing source and target data attributes + -- Outputs: Change type (Insert/Update/NoChange) based on row hash comparison + , combined_cdc_data as ( + select + coalesce(s.ledger_key_hash, t.ledger_key_hash) as ledger_key_hash + , coalesce(s.start_date, t.start_date) as start_date + , s.contract_id as source_contract_id + , t.contract_id as target_contract_id + , s.contract_durability as source_contract_durability + , t.contract_durability as target_contract_durability + , s.asset_code as source_asset_code + , t.asset_code as target_asset_code + , s.asset_issuer as source_asset_issuer + , t.asset_issuer as target_asset_issuer + , s.asset_type as source_asset_type + , t.asset_type as target_asset_type + , s.balance as source_balance + , t.balance as target_balance + , s.balance_holder as source_balance_holder + , t.balance_holder as target_balance_holder + , s.contract_create_ts as source_contract_create_ts + , t.contract_create_ts as target_contract_create_ts + , s.contract_delete_ts as source_contract_delete_ts + , t.contract_delete_ts as target_contract_delete_ts + , s.closed_at as source_closed_at + , t.closed_at as target_closed_at + , t.end_date as target_end_date + , t.is_current as target_is_current + , s.airflow_start_ts as source_airflow_start_ts + , t.airflow_start_ts as target_airflow_start_ts + , s.batch_id as source_batch_id + , t.batch_id as target_batch_id + , s.batch_run_date as source_batch_run_date + , t.batch_run_date as target_batch_run_date + , s.row_hash as source_row_hash + , t.row_hash as target_row_hash + , t.dw_load_ts as target_dw_load_ts + , t.dw_update_ts as target_dw_update_ts + , case + when t.ledger_key_hash is null then 'Insert' -- New record + when s.row_hash != t.row_hash then 'Update' -- Changed record + else 'NoChange' -- No change in the current record + end as change_type + from source_data as s + full outer join target_data as t + on s.ledger_key_hash = t.ledger_key_hash + and s.start_date = t.start_date + ) + + -- Date chaining for SCD Type 2 with separated CTEs + , date_chained as ( + select + cdc.* + , lead(cdc.start_date) over (partition by cdc.ledger_key_hash order by cdc.start_date) as next_start_date + -- Operation Types: + -- INSERT_NEW_KEY: First occurrence of a ledger_key_hash + -- START_NEW_VERSION: New version of existing record due to attribute changes + -- END_CURRENT_VERSION: Close current version due to future changes + -- KEEP_CURRENT: No changes needed, maintain current record + , case + when cdc.change_type = 'Insert' then 'INSERT_NEW_KEY' + when cdc.change_type = 'Update' then 'START_NEW_VERSION' + when + cdc.change_type = 'NoChange' + and lead(cdc.start_date) over (partition by cdc.ledger_key_hash order by cdc.start_date) is not null + then 'END_CURRENT_VERSION' + else 'KEEP_CURRENT' + end as operation_type + from combined_cdc_data as cdc + ) + + -- Final data processing with all transformations + , final_data as ( + select + dc.ledger_key_hash + , coalesce(dc.source_contract_id, dc.target_contract_id) as contract_id + , coalesce(dc.source_contract_durability, dc.target_contract_durability) as contract_durability + , coalesce(dc.source_asset_code, dc.target_asset_code) as asset_code + , coalesce(dc.source_asset_issuer, dc.target_asset_issuer) as asset_issuer + , coalesce(dc.source_asset_type, dc.target_asset_type) as asset_type + , coalesce(dc.source_balance, dc.target_balance) as balance + , coalesce(dc.source_balance_holder, dc.target_balance_holder) as balance_holder + , coalesce(dc.source_contract_create_ts, dc.target_contract_create_ts) as contract_create_ts + , coalesce(dc.source_contract_delete_ts, dc.target_contract_delete_ts) as contract_delete_ts + , coalesce(dc.source_closed_at, dc.target_closed_at) as closed_at + , dc.start_date + , coalesce(date_sub(dc.next_start_date, interval 1 day), date('9999-12-31')) as end_date + , coalesce(row_number() over (partition by dc.ledger_key_hash order by dc.start_date desc) = 1, false) as is_current + , coalesce(dc.source_airflow_start_ts, dc.target_airflow_start_ts) as airflow_start_ts + , coalesce(dc.source_batch_id, dc.target_batch_id) as batch_id + , coalesce(dc.source_batch_run_date, dc.target_batch_run_date) as batch_run_date + , coalesce(dc.source_row_hash, dc.target_row_hash) as row_hash + , coalesce(dc.target_dw_load_ts, current_timestamp()) as dw_load_ts + , current_timestamp() as dw_update_ts + , dc.operation_type + from date_chained as dc + where dc.operation_type in ('INSERT_NEW_KEY', 'START_NEW_VERSION', 'END_CURRENT_VERSION') + ) + +select + ledger_key_hash + , contract_id + , start_date + , end_date + , is_current + , contract_create_ts + , contract_delete_ts + , closed_at + , contract_durability + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , row_hash + , batch_id + , batch_run_date + , airflow_start_ts + , dw_load_ts + , dw_update_ts +from final_data diff --git a/models/marts/dim_ttl_current.sql b/models/marts/dim_ttl_current.sql new file mode 100644 index 0000000..0cafce7 --- /dev/null +++ b/models/marts/dim_ttl_current.sql @@ -0,0 +1,59 @@ +{{ config( + materialized = 'table', + unique_key = ['key_hash'], + partition_by = { + "field": "TIMESTAMP_TRUNC(closed_at, MONTH)", + "data_type": "DATE" + }, + cluster_by = ["key_hash"], + tags = ["soroban_analytics"] +) }} + +-- Model: dim_ttl_current +-- +-- Description: +-- ------------ +-- This model filters the `dim_ttl_hist` table to select only the current records (where `is_current` is TRUE). +-- This creates a snapshot of the latest active state for each `key_hash`. +-- +-- Features: +-- --------- +-- 1. Selects only the records where `is_current` is TRUE from the historical table. +-- 2. Includes relevant columns to represent the current state of each TTL. +-- 3. Adds a `dw_load_ts` column to capture the timestamp when the data was loaded into the data warehouse. +-- +-- Usage: +-- ------ +-- Compile: +-- dbt compile --models dim_ttl_current +-- +-- Run: +-- dbt run --models dim_ttl_current + +with + current_records as ( + select + key_hash + , live_until_ledger_seq + , ttl_create_ts + , ttl_delete_ts + , closed_at + , batch_id + , batch_run_date + , airflow_start_ts + from {{ ref('dim_ttl_hist') }} + where is_current = true -- Select only current version of each TTL record + ) + +-- Final output: Current state snapshot of all TTL records +select + key_hash + , live_until_ledger_seq + , ttl_create_ts + , ttl_delete_ts + , closed_at + , batch_id + , batch_run_date + , airflow_start_ts + , current_timestamp() as dw_load_ts -- Load timestamp to track data freshness +from current_records diff --git a/models/marts/dim_ttl_hist.sql b/models/marts/dim_ttl_hist.sql new file mode 100644 index 0000000..2c74d9a --- /dev/null +++ b/models/marts/dim_ttl_hist.sql @@ -0,0 +1,178 @@ +{{ config( + materialized = 'incremental', + unique_key = ['key_hash', 'start_date'], + partition_by = { + "field": "start_date", + "data_type": "date", + "granularity": "month" + }, + cluster_by = ["key_hash", "start_date", "row_hash"], + tags = ["soroban_analytics"] +) }} + +/* +Model: dim_ttl_hist + +Description: +------------ +This model implements Slowly Changing Dimension (SCD) Type 2 logic to track historical changes +in ttl data over time. The model supports initial loads and incremental updates. + +Features: +--------- +1. Creates a record for each change in key_hash. +2. Applies change tracking logic using row hash comparison to detect updates. +3. Date chaining to maintain historical records, marking is_current for the latest active record. + +Usage: +------ +Compile: + dbt compile --models dim_ttl_hist + +Run: + dbt run --models dim_ttl_hist --full-refresh + dbt run --models dim_ttl_hist +*/ + +-- Load latest changes from transformed ttl data, applying incremental filter +with + source_data as ( + select + key_hash + , live_until_ledger_seq + , ttl_create_ts + , ttl_delete_ts + , closed_at + , date(closed_at) as start_date + , airflow_start_ts + , batch_id + , batch_run_date + , row_hash + from {{ ref('int_transform_ttl') }} + {% if is_incremental() %} + where closed_at > (select coalesce(max(closed_at), '2000-01-01T00:00:00+00:00') from {{ this }}) + {% endif %} + ) + + -- Get existing history records for affected ttl entries, or create empty shell for initial load + , target_data as ( + {% if is_incremental() %} + select * + from {{ this }} + where key_hash in (select distinct key_hash from source_data) + {% else %} + select + CAST(NULL AS STRING) AS key_hash, + CAST(NULL AS DATE) AS start_date, + CAST(NULL AS INT64) AS live_until_ledger_seq, + CAST(NULL AS TIMESTAMP) AS ttl_create_ts, + CAST(NULL AS TIMESTAMP) AS ttl_delete_ts, + CAST(NULL AS TIMESTAMP) AS closed_at, + CAST(NULL AS TIMESTAMP) AS airflow_start_ts, + CAST(NULL AS STRING) AS batch_id, + CAST(NULL AS DATETIME) AS batch_run_date, + CAST(NULL AS STRING) AS row_hash, + CAST(NULL AS DATE) AS end_date, + TRUE AS is_current, + CURRENT_TIMESTAMP() AS dw_load_ts, + CURRENT_TIMESTAMP() AS dw_update_ts + from source_data + where 1 = 0 + {% endif %} + ) + + -- Detect changes by comparing source and target data attributes + -- Outputs: Change type (Insert/Update/NoChange) based on row hash comparison + , combined_cdc_data as ( + select + coalesce(s.key_hash, t.key_hash) as key_hash + , coalesce(s.start_date, t.start_date) as start_date + , s.live_until_ledger_seq as source_live_until_ledger_seq + , t.live_until_ledger_seq as target_live_until_ledger_seq + , s.ttl_create_ts as source_ttl_create_ts + , t.ttl_create_ts as target_ttl_create_ts + , s.ttl_delete_ts as source_ttl_delete_ts + , t.ttl_delete_ts as target_ttl_delete_ts + , s.closed_at as source_closed_at + , t.closed_at as target_closed_at + , t.end_date as target_end_date + , t.is_current as target_is_current + , s.airflow_start_ts as source_airflow_start_ts + , t.airflow_start_ts as target_airflow_start_ts + , s.batch_id as source_batch_id + , t.batch_id as target_batch_id + , s.batch_run_date as source_batch_run_date + , t.batch_run_date as target_batch_run_date + , s.row_hash as source_row_hash + , t.row_hash as target_row_hash + , t.dw_load_ts as target_dw_load_ts + , t.dw_update_ts as target_dw_update_ts + , case + when t.key_hash is null then 'Insert' -- New record + when s.row_hash != t.row_hash then 'Update' -- Changed record + else 'NoChange' -- No change in the current record + end as change_type + from source_data as s + full outer join target_data as t + on s.key_hash = t.key_hash + and s.start_date = t.start_date + ) + + -- Date chaining for SCD Type 2 with separated CTEs + , date_chained as ( + select + cdc.* + , lead(cdc.start_date) over (partition by cdc.key_hash order by cdc.start_date) as next_start_date + -- Operation Types: + -- INSERT_NEW_KEY: First occurrence of a ledger_key_hash + -- START_NEW_VERSION: New version of existing record due to attribute changes + -- END_CURRENT_VERSION: Close current version due to future changes + -- KEEP_CURRENT: No changes needed, maintain current record + , case + when cdc.change_type = 'Insert' then 'INSERT_NEW_KEY' + when cdc.change_type = 'Update' then 'START_NEW_VERSION' + when cdc.change_type = 'NoChange' and lead(cdc.start_date) over (partition by cdc.key_hash order by cdc.start_date) is not null + then 'END_CURRENT_VERSION' + else 'KEEP_CURRENT' + end as operation_type + from combined_cdc_data as cdc + ) + + -- Final data processing with all transformations + , final_data as ( + select + dc.key_hash + , coalesce(dc.source_live_until_ledger_seq, dc.target_live_until_ledger_seq) as live_until_ledger_seq + , coalesce(dc.source_ttl_create_ts, dc.target_ttl_create_ts) as ttl_create_ts + , coalesce(dc.source_ttl_delete_ts, dc.target_ttl_delete_ts) as ttl_delete_ts + , coalesce(dc.source_closed_at, dc.target_closed_at) as closed_at + , dc.start_date + , coalesce(date_sub(dc.next_start_date, interval 1 day), date('9999-12-31')) as end_date + , coalesce(row_number() over (partition by dc.key_hash order by dc.start_date desc) = 1, false) as is_current + , coalesce(dc.source_airflow_start_ts, dc.target_airflow_start_ts) as airflow_start_ts + , coalesce(dc.source_batch_id, dc.target_batch_id) as batch_id + , coalesce(dc.source_batch_run_date, dc.target_batch_run_date) as batch_run_date + , coalesce(dc.source_row_hash, dc.target_row_hash) as row_hash + , coalesce(dc.target_dw_load_ts, current_timestamp()) as dw_load_ts + , current_timestamp() as dw_update_ts + , dc.operation_type + from date_chained as dc + where dc.operation_type in ('INSERT_NEW_KEY', 'START_NEW_VERSION', 'END_CURRENT_VERSION') + ) + +select + key_hash + , live_until_ledger_seq + , start_date + , end_date + , is_current + , ttl_create_ts + , ttl_delete_ts + , closed_at + , row_hash + , batch_id + , batch_run_date + , airflow_start_ts + , dw_load_ts + , dw_update_ts +from final_data