From 529e5866a082820b09556d7daa578162079200cf Mon Sep 17 00:00:00 2001 From: harsha-stellar-data Date: Thu, 31 Oct 2024 18:39:28 -0400 Subject: [PATCH] Combined incremental and transformation models into one to prevent dbt_project_evaluator specs --- .../intermediate/int_incr_contract_code.sql | 103 ----------- .../intermediate/int_incr_contract_code.yml | 103 ----------- .../intermediate/int_incr_contract_data.sql | 103 ----------- .../intermediate/int_incr_contract_data.yml | 100 ----------- models/intermediate/int_incr_ttl.sql | 90 ---------- models/intermediate/int_incr_ttl.yml | 75 -------- .../int_transform_contract_code.sql | 162 +++++++++++------- .../int_transform_contract_code.yml | 51 ++++-- .../int_transform_contract_data.sql | 153 ++++++++++------- .../int_transform_contract_data.yml | 56 ++++-- models/intermediate/int_transform_ttl.sql | 78 ++++++--- models/intermediate/int_transform_ttl.yml | 11 +- 12 files changed, 330 insertions(+), 755 deletions(-) delete mode 100644 models/intermediate/int_incr_contract_code.sql delete mode 100644 models/intermediate/int_incr_contract_code.yml delete mode 100644 models/intermediate/int_incr_contract_data.sql delete mode 100644 models/intermediate/int_incr_contract_data.yml delete mode 100644 models/intermediate/int_incr_ttl.sql delete mode 100644 models/intermediate/int_incr_ttl.yml diff --git a/models/intermediate/int_incr_contract_code.sql b/models/intermediate/int_incr_contract_code.sql deleted file mode 100644 index 3bdad38..0000000 --- a/models/intermediate/int_incr_contract_code.sql +++ /dev/null @@ -1,103 +0,0 @@ -{{ - config( - materialized = 'incremental', - unique_key = ['ledger_key_hash', 'closed_at'], - cluster_by = ["ledger_key_hash", "closed_at"], - tags = ["soroban_analytics", "intermediate", "daily"] - ) -}} -/* -Model: int_incr_contract_code - -Description: ------------- -This intermediate model handles the incremental loading of contract code from the staging layer. -It processes only new or updated data up to the previous day based on `execution_date`. - -Features: ---------- -1. Uses execution_date from Airflow to determine the processing window. -2. Processes new or updated records for completed days up to the previous day. -3. Selects only the latest entry per `ledger_key_hash` per day within the processing window. - -Usage: ------- -Compile: - dbt compile --models int_incr_contract_code - dbt compile --models int_incr_contract_code --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' -Run: - dbt run --models int_incr_contract_code --full-refresh - dbt run --models int_incr_contract_code - dbt run --models int_incr_contract_code --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' -*/ - --- Set the execution date (use Airflow value, fallback to dbt default if absent) -{% set execution_date = var('execution_date', dbt_airflow_macros.ts(timezone=none)) %} - -with - source_data as ( - select - contract_code_hash - , ledger_key_hash - , last_modified_ledger - , ledger_entry_change - , ledger_sequence - , deleted - , closed_at - , n_instructions - , n_functions - , n_globals - , n_table_entries - , n_types - , n_data_segments - , n_elem_segments - , n_imports - , n_exports - , n_data_segment_bytes - , batch_id - , batch_run_date - from {{ ref('stg_contract_code') }} - where - -- Process only completed days up to execution_date for incremental loads - date(closed_at) < date('{{ execution_date }}') - - -- Process records inserted since the last load (incremental only) - {% if is_incremental() %} - and date(closed_at) >= (select coalesce(max(date(closed_at)), '2000-01-01') from {{ this }}) - {% endif %} - ) - , ranked_data as ( - select - * - , row_number() over ( - partition by ledger_key_hash, cast(closed_at as date) - order by closed_at desc, ledger_sequence desc - ) as rn - from source_data - ) - --- Pick only the latest change per ledger per day -select - ledger_key_hash - , contract_code_hash - , last_modified_ledger - , ledger_entry_change - , ledger_sequence - , deleted - , closed_at - , n_instructions - , n_functions - , n_globals - , n_table_entries - , n_types - , n_data_segments - , n_elem_segments - , n_imports - , n_exports - , n_data_segment_bytes - , batch_id - , batch_run_date - , cast('{{ execution_date }}' as timestamp) as airflow_start_ts - , current_timestamp() as dw_load_ts -from ranked_data -where rn = 1 diff --git a/models/intermediate/int_incr_contract_code.yml b/models/intermediate/int_incr_contract_code.yml deleted file mode 100644 index b92ab17..0000000 --- a/models/intermediate/int_incr_contract_code.yml +++ /dev/null @@ -1,103 +0,0 @@ -version: 2 - -models: - - name: int_incr_contract_code - description: "This intermediate model handles the incremental loading of contract code data from the staging layer." - meta: - owner: "Data Team" - update_schedule: "daily" - description: "Incremental model for contract code with daily updates" - tests: - - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - ledger_key_hash - - closed_at - meta: - description: "Tests the uniqueness combination of contract code tracking fields" - columns: - - name: ledger_key_hash - description: '{{ doc("ledger_key_hash") }}' - tests: - - not_null - - - name: contract_code_hash - description: '{{ doc("contract_code_hash") }}' - tests: - - not_null - - - name: last_modified_ledger - description: '{{ doc("last_modified_ledger") }}' - tests: - - not_null - - - name: ledger_entry_change - description: '{{ doc("ledger_entry_change") }}' - tests: - - not_null - - incremental_accepted_values: - values: [0, 1, 2] - - - name: ledger_sequence - description: '{{ doc("ledger_sequence") }}' - tests: - - not_null - - - name: deleted - description: '{{ doc("deleted") }}' - tests: - - not_null - - - name: closed_at - description: '{{ doc("closed_at") }}' - tests: - - not_null - - - name: n_instructions - description: '{{ doc("n_instructions") }}' - - - name: n_functions - description: '{{ doc("n_functions") }}' - - - name: n_globals - description: '{{ doc("n_globals") }}' - - - name: n_table_entries - description: '{{ doc("n_table_entries") }}' - - - name: n_types - description: '{{ doc("n_types") }}' - - - name: n_data_segments - description: '{{ doc("n_data_segments") }}' - - - name: n_elem_segments - description: '{{ doc("n_elem_segments") }}' - - - name: n_imports - description: '{{ doc("n_imports") }}' - - - name: n_exports - description: '{{ doc("n_exports") }}' - - - name: n_data_segment_bytes - description: '{{ doc("n_data_segment_bytes") }}' - - - name: batch_id - description: '{{ doc("batch_id") }}' - tests: - - not_null - - - name: batch_run_date - description: '{{ doc("batch_run_date") }}' - tests: - - not_null - - - name: airflow_start_ts - description: "The timestamp indicating the start of the Airflow task." - tests: - - not_null - - - name: dw_load_ts - description: "The timestamp for when the data was loaded into the data warehouse." - tests: - - not_null diff --git a/models/intermediate/int_incr_contract_data.sql b/models/intermediate/int_incr_contract_data.sql deleted file mode 100644 index 7fc1d37..0000000 --- a/models/intermediate/int_incr_contract_data.sql +++ /dev/null @@ -1,103 +0,0 @@ -{{ - config( - materialized = 'incremental', - unique_key = ['ledger_key_hash', 'closed_at'], - partition_by = { - "field": "closed_at", - "data_type": "date", - "granularity": "month" - }, - cluster_by = ['closed_at', 'ledger_key_hash'], - tags = ["soroban_analytics", "intermediate", "daily"] - ) -}} -/* -Model: int_incr_contract_data - -Description: ------------- -This intermediate model handles the incremental loading of contract data from the staging layer. -It processes only new or updated data up to the previous day based on `execution_date`. - -Features: ---------- -1. Uses execution_date from Airflow to determine the processing window. -2. Processes new or updated records for completed days up to the previous day. -3. Selects only the latest entry per `ledger_key_hash` per day within the processing window. - -Usage: ------- -Compile: - dbt compile --models int_incr_contract_data - dbt compile --models int_incr_contract_data --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' -Run: - dbt run --models int_incr_contract_data --full-refresh - dbt run --models int_incr_contract_data - dbt run --models int_incr_contract_data --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' -*/ - --- Set the execution date (use Airflow value, fallback to dbt default if absent) -{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} - -with - source_data as ( - select - contract_id - , ledger_key_hash - , contract_key_type - , contract_durability - , last_modified_ledger - , ledger_entry_change - , ledger_sequence - , asset_code - , asset_issuer - , asset_type - , balance - , balance_holder - , deleted - , closed_at - , batch_insert_ts - , batch_id - , batch_run_date - from {{ ref('stg_contract_data') }} - where - -- Process only completed days up to execution_date for incremental loads - date(closed_at) < date('{{ execution_date }}') - - -- Process records inserted since the last load (incremental only) - {% if is_incremental() %} - and date(closed_at) >= (select coalesce(max(date(closed_at)), '2000-01-01') from {{ this }}) - {% endif %} - ) - , ranked_data as ( - select - * - , row_number() over ( - partition by ledger_key_hash, cast(closed_at as date) - order by closed_at desc, ledger_sequence desc - ) as rn - from source_data - ) - --- Pick only the latest change per ledger per day -select - contract_id - , ledger_key_hash - , contract_key_type - , contract_durability - , last_modified_ledger - , ledger_entry_change - , ledger_sequence - , deleted - , closed_at - , asset_code - , asset_issuer - , asset_type - , balance - , balance_holder - , batch_id - , batch_run_date - , cast('{{ execution_date }}' as timestamp) as airflow_start_ts - , current_timestamp() as dw_load_ts -from ranked_data -where rn = 1 diff --git a/models/intermediate/int_incr_contract_data.yml b/models/intermediate/int_incr_contract_data.yml deleted file mode 100644 index a1984f4..0000000 --- a/models/intermediate/int_incr_contract_data.yml +++ /dev/null @@ -1,100 +0,0 @@ -version: 2 - -models: - - name: int_incr_contract_data - description: "This intermediate model handles the incremental loading of contract data from the staging layer." - meta: - owner: "Data Team" - update_schedule: "daily" - description: "Incremental model for contract data with daily updates" - tests: - - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - ledger_key_hash - - closed_at - meta: - description: "Tests the uniqueness combination of contract data tracking fields" - - columns: - - name: ledger_key_hash - description: '{{ doc("ledger_key_hash") }}' - tests: - - not_null - - - name: contract_id - description: '{{ doc("contract_id") }}' - tests: - - not_null - - - name: contract_key_type - description: '{{ doc("contract_key_type") }}' - tests: - - not_null - - - name: contract_durability - description: '{{ doc("contract_durability") }}' - tests: - - not_null - - - name: last_modified_ledger - description: '{{ doc("last_modified_ledger") }}' - tests: - - not_null - - - name: ledger_entry_change - description: '{{ doc("ledger_entry_change") }}' - tests: - - not_null - - incremental_accepted_values: - values: [0, 1, 2] - - - name: ledger_sequence - description: '{{ doc("ledger_sequence") }}' - tests: - - not_null - - - name: asset_code - description: '{{ doc("asset_code") }}' - - - name: asset_issuer - description: '{{ doc("asset_issuer") }}' - - - name: asset_type - description: '{{ doc("asset_type") }}' - - - name: balance_holder - description: '{{ doc("balance_holder") }}' - - - name: balance - description: '{{ doc("balance") }}' - - - name: deleted - description: '{{ doc("deleted") }}' - - - name: closed_at - description: '{{ doc("closed_at") }}' - tests: - - not_null - - - name: batch_insert_ts - description: '{{ doc("batch_insert_ts") }}' - - - name: batch_id - description: '{{ doc("batch_id") }}' - tests: - - not_null - - - name: batch_run_date - description: '{{ doc("batch_run_date") }}' - tests: - - not_null - - - name: airflow_start_ts - description: "The timestamp indicating the start of the Airflow task." - tests: - - not_null - - - name: dw_load_ts - description: "Timestamp when the record was loaded." - tests: - - not_null diff --git a/models/intermediate/int_incr_ttl.sql b/models/intermediate/int_incr_ttl.sql deleted file mode 100644 index 87a0780..0000000 --- a/models/intermediate/int_incr_ttl.sql +++ /dev/null @@ -1,90 +0,0 @@ -{{ - config( - materialized = 'incremental', - unique_key = ['key_hash', 'closed_at'], - partition_by = { - "field": "closed_at", - "data_type": "date", - "granularity": "month" - }, - cluster_by = ['closed_at', 'key_hash'], - tags = ["soroban_analytics", "intermediate", "daily"] - ) -}} -/* -Model: int_incr_ttl - -Description: ------------- -This intermediate model handles the incremental loading of ledger time-to-live (TTL) -data from the staging layer. It processes only new or updated data up to the previous day based on `execution_date`. - -Features: ---------- -1. Uses execution_date from Airflow to determine the processing window. -2. Processes new or updated records for completed days up to the previous day. -3. Selects only the latest entry per `key_hash` per day within the processing window. - -Usage: ------- -Compile: - dbt compile --models int_incr_ttl - dbt compile --models int_incr_ttl --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' -Run: - dbt run --models int_incr_ttl --full-refresh - dbt run --models int_incr_ttl - dbt run --models int_incr_ttl --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' -*/ - --- Set the execution date (use Airflow value, fallback to dbt default if absent) -{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} - -with - source_data as ( - select - key_hash - , live_until_ledger_seq - , last_modified_ledger - , ledger_entry_change - , ledger_sequence - , deleted - , closed_at - , batch_insert_ts - , batch_id - , batch_run_date - from {{ ref('stg_ttl') }} - where - -- Process only completed days up to execution_date for incremental loads - date(closed_at) < date('{{ execution_date }}') - - -- Process records inserted since the last load (incremental only) - {% if is_incremental() %} - and date(closed_at) >= (select coalesce(max(date(closed_at)), '2000-01-01') from {{ this }}) - {% endif %} - ) - , ranked_data as ( - select - * - , row_number() over ( - partition by key_hash, cast(closed_at as date) - order by closed_at desc, ledger_sequence desc - ) as rn - from source_data - ) - --- Pick only the latest change per key_hash per day -select - key_hash - , live_until_ledger_seq - , last_modified_ledger - , ledger_entry_change - , ledger_sequence - , deleted - , closed_at - , batch_insert_ts - , batch_id - , batch_run_date - , cast('{{ execution_date }}' as timestamp) as airflow_start_ts - , current_timestamp() as dw_load_ts -from ranked_data -where rn = 1 diff --git a/models/intermediate/int_incr_ttl.yml b/models/intermediate/int_incr_ttl.yml deleted file mode 100644 index bdb47a1..0000000 --- a/models/intermediate/int_incr_ttl.yml +++ /dev/null @@ -1,75 +0,0 @@ -version: 2 - -models: - - name: int_incr_ttl - description: | - This intermediate model handles the incremental loading of ledger time-to-live (TTL) - data from the staging layer. It processes only new or updated data up to the previous day based on `execution_date`. - meta: - owner: "Data Team" - update_schedule: "daily" - description: "Incremental model for TTL data with daily updates" - tests: - - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - key_hash - - closed_at - meta: - description: "Tests the uniqueness combination of TTL tracking fields" - columns: - - name: key_hash - description: '{{ doc("key_hash") }}' - tests: - - not_null - - - name: live_until_ledger_seq - description: '{{ doc("live_until_ledger_seq") }}' - tests: - - not_null - - - name: last_modified_ledger - description: '{{ doc("last_modified_ledger") }}' - tests: - - not_null - - - name: ledger_entry_change - description: '{{ doc("ledger_entry_change") }}' - tests: - - not_null - - incremental_accepted_values: - values: [0, 1, 2] - - - name: ledger_sequence - description: '{{ doc("ledger_sequence") }}' - tests: - - not_null - - - name: deleted - description: '{{ doc("deleted") }}' - tests: - - not_null - - - name: closed_at - description: '{{ doc("closed_at") }}' - tests: - - not_null - - - name: batch_id - description: '{{ doc("batch_id") }}' - tests: - - not_null - - - name: batch_run_date - description: '{{ doc("batch_run_date") }}' - tests: - - not_null - - - name: airflow_start_ts - description: "The timestamp indicating the start of the Airflow task." - tests: - - not_null - - - name: dw_load_ts - description: "The timestamp for when the data was loaded into the data warehouse." - tests: - - not_null diff --git a/models/intermediate/int_transform_contract_code.sql b/models/intermediate/int_transform_contract_code.sql index 94b96fc..c637064 100644 --- a/models/intermediate/int_transform_contract_code.sql +++ b/models/intermediate/int_transform_contract_code.sql @@ -12,18 +12,19 @@ Model: int_transform_contract_code Description: ------------ -This intermediate model applies transformations to the contract code, calculates new fields, -and performs deduplication based on the transformed data. +This intermediate model combines incremental loading and transformations for contract code data. +It processes new or updated data, calculates new fields, and performs deduplication. -Load Type: Full Truncate and Reload +Load Type: Truncate and Reload Features: --------- -1. Calculates contract_create_ts and contract_delete_ts from staging data. -2. Joins the calculated timestamps with the incremental data and applies transformations. -3. Standardizes data by applying default values as well as type casting as applicable. -4. Calculates row hash using sha256 on all relevant CDC fields. -5. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects dupes in consecutive rows. +1. Uses execution_date from Airflow to determine the processing window. +2. Fetches data till the previous day from the Staging layer, one key hash per day (latest). +3. Calculates `contract_create_ts` and `contract_delete_ts` from staging data. +4. Joins the calculated timestamps with the incremental data and applies transformations. +5. Calculates a row hash using SHA256 on all relevant fields. +6. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects duplicates in consecutive rows. Usage: ------ @@ -33,88 +34,127 @@ Compile: Run: dbt run --models int_transform_contract_code --full-refresh dbt run --models int_transform_contract_code + dbt run --models int_transform_contract_code --vars '{"execution_date": "2024-10-01T00:00:00+00:00"}' */ +-- Set the execution date (use Airflow value, fallback to dbt default if absent) +{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} + with - derived_data as ( + source_data as ( select contract_code_hash - , min(case when ledger_entry_change = 0 then closed_at end) as contract_create_ts -- Calculate contract created timestamp - -- Calculate contract deletion timestamp - , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts + , ledger_key_hash + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , n_instructions + , n_functions + , n_globals + , n_table_entries + , n_types + , n_data_segments + , n_elem_segments + , n_imports + , n_exports + , n_data_segment_bytes + , batch_id + , batch_run_date + , cast('{{ execution_date }}' as timestamp) as airflow_start_ts + -- Rank records to identify the latest entry + , row_number() over ( + partition by ledger_key_hash, cast(closed_at as date) + order by closed_at desc, ledger_sequence desc + ) as row_num from {{ ref('stg_contract_code') }} - group by contract_code_hash + where + -- Process only completed days up to execution_date + date(closed_at) < date('{{ execution_date }}') + ) + + , filtered_data as ( + select * + from source_data + where row_num = 1 -- Only consider the latest record ) - , source_data as ( + , derived_data as ( select - int.contract_code_hash - , int.ledger_key_hash - , int.n_instructions - , int.n_functions - , int.n_globals - , int.n_table_entries - , int.n_types - , int.n_data_segments - , int.n_elem_segments - , int.n_imports - , int.n_exports - , int.n_data_segment_bytes - , derived.contract_create_ts - , derived.contract_delete_ts - , int.closed_at - , int.ledger_sequence - , int.airflow_start_ts - , int.batch_id - , int.batch_run_date - from {{ ref('int_incr_contract_code') }} as int - left join derived_data as derived on int.contract_code_hash = derived.contract_code_hash + ledger_key_hash + , min(case when ledger_entry_change = 0 then closed_at end) as contract_create_ts -- Calculate contract created timestamp + -- Calculate contract deletion timestamp + , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts + from source_data + group by ledger_key_hash ) - -- Calculate a hash for each row using sha256 - , hashed_data as ( + , transformed_data as ( select - * + f.ledger_key_hash + , f.contract_code_hash + , f.last_modified_ledger + , f.ledger_entry_change + , f.ledger_sequence + , f.deleted + , f.closed_at + , f.n_instructions + , f.n_functions + , f.n_globals + , f.n_table_entries + , f.n_types + , f.n_data_segments + , f.n_elem_segments + , f.n_imports + , f.n_exports + , f.n_data_segment_bytes + , d.contract_create_ts + , d.contract_delete_ts + , f.airflow_start_ts + , f.batch_id + , f.batch_run_date , sha256(concat( - coalesce(ledger_key_hash, '') - , coalesce(cast(n_instructions as string), '') - , coalesce(cast(n_functions as string), '') - , coalesce(cast(n_globals as string), '') - , coalesce(cast(n_table_entries as string), '') - , coalesce(cast(n_types as string), '') - , coalesce(cast(n_data_segments as string), '') - , coalesce(cast(n_elem_segments as string), '') - , coalesce(cast(n_imports as string), '') - , coalesce(cast(n_exports as string), '') - , coalesce(cast(n_data_segment_bytes as string), '') - , coalesce(cast(contract_create_ts as string), '') - , coalesce(cast(contract_delete_ts as string), '') + coalesce(f.ledger_key_hash, '') + , coalesce(cast(f.n_instructions as string), '') + , coalesce(cast(f.n_functions as string), '') + , coalesce(cast(f.n_globals as string), '') + , coalesce(cast(f.n_table_entries as string), '') + , coalesce(cast(f.n_types as string), '') + , coalesce(cast(f.n_data_segments as string), '') + , coalesce(cast(f.n_elem_segments as string), '') + , coalesce(cast(f.n_imports as string), '') + , coalesce(cast(f.n_exports as string), '') + , coalesce(cast(f.n_data_segment_bytes as string), '') + , coalesce(cast(d.contract_create_ts as string), '') + , coalesce(cast(d.contract_delete_ts as string), '') )) as row_hash - from source_data + from filtered_data as f + left join derived_data as d on f.ledger_key_hash = d.ledger_key_hash ) - -- Identify changes between consecutive records and pick the first record if duplicates , dedup_data as ( select * - -- Get the hash of the previous record for each contract_code_hash - , lag(row_hash) over (partition by contract_code_hash order by closed_at, ledger_sequence) as prev_row_hash + , lag(row_hash) over (partition by ledger_key_hash order by closed_at, ledger_sequence) as prev_row_hash , case - when row_hash = lag(row_hash) over (partition by contract_code_hash order by closed_at, ledger_sequence) + when row_hash = lag(row_hash) over (partition by ledger_key_hash order by closed_at, ledger_sequence) then 0 -- No change else 1 -- Change end as is_change -- Flag records that are different from their previous record - from hashed_data + from transformed_data ) --- Keep records that are different from their previous record (is_change = 1) --- OR the first record for each ledger_key_hash (prev_row_hash is null) +-- Select deduplicated records select - contract_code_hash - , ledger_key_hash + ledger_key_hash + , contract_code_hash + , last_modified_ledger + , ledger_entry_change , ledger_sequence , contract_create_ts , contract_delete_ts + , deleted , closed_at , n_instructions , n_functions diff --git a/models/intermediate/int_transform_contract_code.yml b/models/intermediate/int_transform_contract_code.yml index 223bfe2..8f1ffa1 100644 --- a/models/intermediate/int_transform_contract_code.yml +++ b/models/intermediate/int_transform_contract_code.yml @@ -2,48 +2,55 @@ version: 2 models: - name: int_transform_contract_code - description: "This intermediate model applies transformations to the contract code, calculates new fields like contract_create_ts and contract_delete_ts, and performs deduplication based on row hash." + description: | + This intermediate model combines incremental loading and transformations for contract code data. + It processes new or updated data, calculates new fields like `contract_create_ts` and `contract_delete_ts`, + and performs deduplication based on row hash. + meta: - owner: "Data team" + owner: "Data Team" update_schedule: "daily" description: "Transformed and deduplicated contract code data with derived timestamps" upstream_dependencies: ["int_incr_contract_code"] + tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: - ledger_key_hash - closed_at meta: - description: "Tests the uniqueness combination of contract code tracking fields" + description: "Tests the uniqueness combination of contract tracking fields" + columns: - name: contract_code_hash description: '{{ doc("contract_code_hash") }}' tests: - not_null - - relationships: - to: ref('int_incr_contract_code') - field: contract_code_hash - name: ledger_key_hash description: '{{ doc("ledger_key_hash") }}' tests: - not_null - - relationships: - to: ref('int_incr_contract_code') - field: ledger_key_hash - - name: ledger_sequence - description: '{{ doc("ledger_sequence") }}' + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' tests: - not_null - - name: contract_create_ts - description: "Timestamp when the contract was created (ledger_entry_change = 0)" + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' tests: - not_null + - incremental_accepted_values: + values: [0, 1, 2] - - name: contract_delete_ts - description: "Timestamp when the contract was deleted (ledger_entry_change = 2 and deleted = true)" + - name: ledger_sequence + description: '{{ doc("ledger_sequence") }}' + tests: + - not_null + + - name: deleted + description: '{{ doc("deleted") }}' - name: closed_at description: '{{ doc("closed_at") }}' @@ -80,11 +87,14 @@ models: - name: n_data_segment_bytes description: '{{ doc("n_data_segment_bytes") }}' - - name: row_hash - description: "SHA256 hash of the row data for deduplication" + - name: contract_create_ts + description: "Timestamp when the contract was created (ledger_entry_change = 0)" tests: - not_null + - name: contract_delete_ts + description: "Timestamp when the contract was deleted (ledger_entry_change = 2 and deleted = true)" + - name: batch_id description: '{{ doc("batch_id") }}' tests: @@ -101,6 +111,11 @@ models: - not_null - name: dw_load_ts - description: "The timestamp for when the data was loaded into the data warehouse." + description: "Timestamp when the record was loaded." + tests: + - not_null + + - name: row_hash + description: "SHA256 hash of the row data for deduplication" tests: - not_null diff --git a/models/intermediate/int_transform_contract_data.sql b/models/intermediate/int_transform_contract_data.sql index 3802a66..cf5e9ac 100644 --- a/models/intermediate/int_transform_contract_data.sql +++ b/models/intermediate/int_transform_contract_data.sql @@ -17,18 +17,19 @@ Model: int_transform_contract_data Description: ------------ -This intermediate model applies transformations to the contract data, calculates new fields, -and performs deduplication based on the transformed data. +This intermediate model combines incremental loading and transformations for contract data. +It processes new or updated data, calculates new fields, and performs deduplication. Load Type: Truncate and Reload Features: --------- -1. Calculates contract_create_ts and contract_delete_ts from staging data. -2. Joins the calculated timestamps with the incremental data and applies transformations. -3. Trims spaces on string fields and standardizes blank values as NULL. -4. Calculates row hash using sha256 on all relevant fields. -5. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects dupes in consecutive rows. +1. Uses execution_date from Airflow to determine the processing window. +2. Fetches data till the previous day from the Staging layer, one key hash per day (latest). +3. Calculates `contract_create_ts` and `contract_delete_ts` from staging data. +4. Joins the calculated timestamps with the incremental data and applies transformations. +5. Calculates a row hash using SHA256 on all relevant fields. +6. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects duplicates in consecutive rows. Usage: ------ @@ -38,98 +39,134 @@ Compile: Run: dbt run --models int_transform_contract_data --full-refresh dbt run --models int_transform_contract_data + dbt run --models int_transform_contract_data --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' */ +-- Set the execution date (use Airflow value, fallback to dbt default if absent) +{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} + with - derived_data as ( + source_data as ( select contract_id - -- Calculate contract created timestamp - , min(case when ledger_entry_change = 0 and contract_key_type = 'ScValTypeScvLedgerKeyContractInstance' then closed_at end) - as contract_create_ts + , ledger_key_hash + , contract_key_type + , contract_durability + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , asset_code + , asset_issuer + , asset_type + , balance + , balance_holder + , deleted + , closed_at + , batch_insert_ts + , batch_id + , batch_run_date + , cast('{{ execution_date }}' as timestamp) as airflow_start_ts + -- Rank records to identify the latest entry + , row_number() over ( + partition by ledger_key_hash, cast(closed_at as date) + order by closed_at desc, ledger_sequence desc + ) as row_num + from {{ ref('stg_contract_data') }} + where + -- Process only completed days up to execution_date + date(closed_at) < date('{{ execution_date }}') + ) + + , filtered_data as ( + select * + from source_data + where row_num = 1 -- Only consider the latest record + ) + + , derived_data as ( + select + ledger_key_hash + , min(case when ledger_entry_change = 0 then closed_at end) as contract_create_ts -- Calculate contract created timestamp -- Calculate contract deletion timestamp , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as contract_delete_ts - from {{ ref('stg_contract_data') }} - group by contract_id + from source_data + group by ledger_key_hash ) , transformed_data as ( select - incr.contract_id - , incr.ledger_key_hash + f.ledger_key_hash + , f.contract_id + , f.contract_key_type , case - when nullif(trim(incr.contract_durability), '') = 'ContractDataDurabilityPersistent' then 'persistent' - when nullif(trim(incr.contract_durability), '') = 'ContractDataDurabilityTemporary' then 'temporary' - else nullif(trim(incr.contract_durability), '') + when nullif(trim(f.contract_durability), '') = 'ContractDataDurabilityPersistent' then 'persistent' + when nullif(trim(f.contract_durability), '') = 'ContractDataDurabilityTemporary' then 'temporary' + else nullif(trim(f.contract_durability), '') end as contract_durability - , nullif(trim(incr.asset_code), '') as asset_code - , nullif(trim(incr.asset_issuer), '') as asset_issuer - , nullif(trim(incr.asset_type), '') as asset_type - , cast(round(safe_cast(incr.balance as float64) / pow(10, 7), 5) as numeric) as balance - , nullif(trim(incr.balance_holder), '') as balance_holder - , derived.contract_create_ts - , derived.contract_delete_ts - , incr.closed_at - , incr.ledger_sequence - , incr.airflow_start_ts - , incr.batch_id - , incr.batch_run_date - from {{ ref('int_incr_contract_data') }} as incr - left join derived_data as derived on incr.contract_id = derived.contract_id - ) - - -- Calculate a hash for each row using sha256 - , hashed_data as ( - select - * + , nullif(trim(f.asset_code), '') as asset_code + , nullif(trim(f.asset_issuer), '') as asset_issuer + , nullif(trim(f.asset_type), '') as asset_type + , cast(round(safe_cast(f.balance as float64) / pow(10, 7), 5) as numeric) as balance + , nullif(trim(f.balance_holder), '') as balance_holder + , d.contract_create_ts + , d.contract_delete_ts + , f.deleted + , f.closed_at + , f.ledger_sequence + , f.ledger_entry_change + , f.last_modified_ledger + , f.airflow_start_ts + , f.batch_id + , f.batch_run_date , sha256(concat( - coalesce(ledger_key_hash, '') - , coalesce(contract_durability, '') - , coalesce(asset_code, '') - , coalesce(asset_issuer, '') - , coalesce(asset_type, '') - , coalesce(cast(balance as string), '') - , coalesce(balance_holder, '') - , coalesce(cast(contract_create_ts as string), '') - , coalesce(cast(contract_delete_ts as string), '') + coalesce(f.ledger_key_hash, '') + , coalesce(f.contract_durability, '') + , coalesce(f.asset_code, '') + , coalesce(f.asset_issuer, '') + , coalesce(f.asset_type, '') + , coalesce(cast(f.balance as string), '') + , coalesce(f.balance_holder, '') + , coalesce(cast(d.contract_create_ts as string), '') + , coalesce(cast(d.contract_delete_ts as string), '') )) as row_hash - from transformed_data + from filtered_data as f + left join derived_data as d on f.ledger_key_hash = d.ledger_key_hash ) - -- Identify changes between consecutive records and pick the first record if duplicates , dedup_data as ( select * - -- Get the hash of the previous record for each contract_id - , lag(row_hash) over (partition by contract_id order by closed_at, ledger_sequence) as prev_row_hash + , lag(row_hash) over (partition by ledger_key_hash order by closed_at, ledger_sequence) as prev_row_hash , case - when row_hash = lag(row_hash) over (partition by contract_id order by closed_at, ledger_sequence) + when row_hash = lag(row_hash) over (partition by ledger_key_hash order by closed_at, ledger_sequence) then 0 -- No change else 1 -- Change end as is_change -- Flag records that are different from their previous record - from hashed_data + from transformed_data ) -- Select deduplicated records --- Keep records that are different from their previous record (is_change = 1) --- OR the first record for each ledger_key_hash (prev_row_hash is null) select - contract_id - , ledger_key_hash + ledger_key_hash + , contract_id + , contract_key_type , contract_durability + , last_modified_ledger + , ledger_entry_change , ledger_sequence , contract_create_ts , contract_delete_ts + , deleted , closed_at , asset_code , asset_issuer , asset_type , balance , balance_holder + , to_hex(row_hash) as row_hash -- Convert the row hash to hex format to store it as a STRING field , batch_id , batch_run_date , airflow_start_ts - , to_hex(row_hash) as row_hash -- Convert the row hash to hex format to store it as a STRING field , current_timestamp() as dw_load_ts from dedup_data where is_change = 1 or prev_row_hash is null diff --git a/models/intermediate/int_transform_contract_data.yml b/models/intermediate/int_transform_contract_data.yml index f190280..363af20 100644 --- a/models/intermediate/int_transform_contract_data.yml +++ b/models/intermediate/int_transform_contract_data.yml @@ -2,19 +2,25 @@ version: 2 models: - name: int_transform_contract_data - description: "This intermediate model applies transformations to the contract data, calculates new fields like contract_create_ts and contract_delete_ts, and performs deduplication based on row hash." + description: | + This intermediate model combines incremental loading and transformations for contract data. + It processes new or updated data, calculates new fields like `contract_create_ts` and `contract_delete_ts`, + and performs deduplication based on row hash. + meta: - owner: "Data team" + owner: "Data Team" update_schedule: "daily" description: "Transformed and deduplicated contract data with derived timestamps" upstream_dependencies: ["int_incr_contract_data"] + tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: - ledger_key_hash - closed_at meta: - description: "Tests the uniqueness combination of contract data tracking fields" + description: "Tests the uniqueness combination of contract tracking fields" + columns: - name: contract_id description: '{{ doc("contract_id") }}' @@ -26,24 +32,28 @@ models: tests: - not_null + - name: contract_key_type + description: '{{ doc("contract_key_type") }}' + - name: contract_durability description: '{{ doc("contract_durability") }}' - - - name: ledger_sequence - description: '{{ doc("ledger_sequence") }}' tests: - not_null - - name: contract_create_ts - description: "Timestamp when the contract was created" + - name: last_modified_ledger + description: '{{ doc("last_modified_ledger") }}' tests: - not_null - - name: contract_delete_ts - description: "Timestamp when the contract was deleted (if applicable)" + - name: ledger_entry_change + description: '{{ doc("ledger_entry_change") }}' + tests: + - not_null + - incremental_accepted_values: + values: [0, 1, 2] - - name: closed_at - description: '{{ doc("closed_at") }}' + - name: ledger_sequence + description: '{{ doc("ledger_sequence") }}' tests: - not_null @@ -56,11 +66,19 @@ models: - name: asset_type description: '{{ doc("asset_type") }}' + - name: balance_holder + description: '{{ doc("balance_holder") }}' + - name: balance description: '{{ doc("balance") }}' - - name: balance_holder - description: '{{ doc("balance_holder") }}' + - name: deleted + description: '{{ doc("deleted") }}' + + - name: closed_at + description: '{{ doc("closed_at") }}' + tests: + - not_null - name: batch_id description: '{{ doc("batch_id") }}' @@ -78,10 +96,18 @@ models: - not_null - name: dw_load_ts - description: "The timestamp for when the data was loaded into the data warehouse." + description: "Timestamp when the record was loaded." tests: - not_null + - name: contract_create_ts + description: "Timestamp when the contract was created (ledger_entry_change = 0)" + tests: + - not_null + + - name: contract_delete_ts + description: "Timestamp when the contract was deleted (ledger_entry_change = 2 and deleted = true)" + - name: row_hash description: "SHA256 hash of the row data for deduplication" tests: diff --git a/models/intermediate/int_transform_ttl.sql b/models/intermediate/int_transform_ttl.sql index 824872c..a241aa1 100644 --- a/models/intermediate/int_transform_ttl.sql +++ b/models/intermediate/int_transform_ttl.sql @@ -17,17 +17,19 @@ Model: int_transform_ttl Description: ------------ -This intermediate model applies transformations to ttl data, calculates new fields, +This intermediate model combines incremental loading and transformations for ledger TTL data, calculates new fields, and performs deduplication based on the transformed data. Load Type: Truncate and Reload Features: --------- -1. Calculates ttl_create_ts and ttl_delete_ts from staging data. -2. Joins the calculated timestamps with the incremental data and applies transformations. -3. Calculates row hash using sha256 on all relevant fields. -4. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects dupes in consecutive rows. +1. Uses execution_date from Airflow to determine the processing window. +2. Fetches data till the previous day from the Staging layer, one key hash per day (latest). +3. Calculates `ttl_create_ts` and `ttl_delete_ts` from staging data. +4. Joins the calculated timestamps with the incremental data and applies transformations. +5. Calculates a row hash using SHA256 on all relevant fields. +6. Performs deduplication based on the row hash of the fully transformed data. This step primarily detects duplicates in consecutive rows. Usage: ------ @@ -37,18 +39,56 @@ Compile: Run: dbt run --models int_transform_ttl --full-refresh dbt run --models int_transform_ttl + dbt run --models int_transform_ttl --vars '{"execution_date": "2024-10-25T00:00:00+00:00"}' */ +-- Set the execution date (use Airflow value, fallback to dbt default if absent) +{% set execution_date = var('execution_date', dbt_airflow_macros.ts()) %} + with - derived_data as ( + -- Extract data from the staging table and rank records based on key_hash and closed_at + source_data as ( select key_hash - , min(case when ledger_entry_change = 0 then closed_at end) as ttl_create_ts -- Calculate contract created timestamp - , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as ttl_delete_ts -- Calculate contract deletion timestamp + , live_until_ledger_seq + , last_modified_ledger + , ledger_entry_change + , ledger_sequence + , deleted + , closed_at + , batch_insert_ts + , batch_id + , batch_run_date + , cast('{{ execution_date }}' as timestamp) as airflow_start_ts + -- Rank records to identify the latest entry + , row_number() over ( + partition by key_hash, cast(closed_at as date) + order by closed_at desc, ledger_sequence desc + ) as row_num from {{ ref('stg_ttl') }} + where + -- Process only completed days up to execution_date + date(closed_at) < date('{{ execution_date }}') + ) + + -- Filter to retain only the latest record for each key_hash + , filtered_data as ( + select * + from source_data + where row_num = 1 -- Only consider the latest record + ) + + -- Calculate TTL create and delete timestamps + , derived_data as ( + select + key_hash + , min(case when ledger_entry_change = 0 then closed_at end) as ttl_create_ts + , max(case when ledger_entry_change = 2 and deleted = true then closed_at end) as ttl_delete_ts + from source_data group by key_hash ) + -- Prepare transformed data with additional fields , transformed_data as ( select ttl.key_hash @@ -60,39 +100,29 @@ with , ttl.batch_id , ttl.batch_run_date , ttl.airflow_start_ts - from {{ ref('int_incr_ttl') }} as ttl - left join derived_data as derived on ttl.key_hash = derived.key_hash - ) - - -- Calculate a hash for each row using sha256 - , hashed_data as ( - select - * , sha256(concat( - coalesce(cast(live_until_ledger_seq as string), '') - , coalesce(cast(ttl_create_ts as string), '') - , coalesce(cast(ttl_delete_ts as string), '') + coalesce(cast(ttl.live_until_ledger_seq as string), '') + , coalesce(cast(derived.ttl_create_ts as string), '') + , coalesce(cast(derived.ttl_delete_ts as string), '') )) as row_hash - from transformed_data + from filtered_data as ttl + left join derived_data as derived on ttl.key_hash = derived.key_hash ) -- Identify changes between consecutive records and pick the first record if duplicates , dedup_data as ( select * - -- Get the hash of the previous record for each key_hash , lag(row_hash) over (partition by key_hash order by closed_at, ledger_sequence) as prev_row_hash , case when row_hash = lag(row_hash) over (partition by key_hash order by closed_at, ledger_sequence) then 0 -- No change else 1 -- Change end as is_change -- Flag records that are different from their previous record - from hashed_data + from transformed_data ) -- Select deduplicated records --- Keep records that are different from their previous record (is_change = 1) --- OR the first record for each key_hash (prev_row_hash is null) select key_hash , live_until_ledger_seq diff --git a/models/intermediate/int_transform_ttl.yml b/models/intermediate/int_transform_ttl.yml index ab847f9..edad48e 100644 --- a/models/intermediate/int_transform_ttl.yml +++ b/models/intermediate/int_transform_ttl.yml @@ -2,12 +2,15 @@ version: 2 models: - name: int_transform_ttl - description: "This intermediate model applies transformations to TTL data, calculates new fields like ttl_create_ts and ttl_delete_ts, and performs deduplication based on the transformed data." + description: | + This intermediate model combines incremental loading and transformations for ledger TTL data, calculates new fields, + and performs deduplication based on the transformed data. meta: owner: "Data Team" update_schedule: "daily" description: "Transformed and deduplicated TTL data with derived timestamps" - upstream_dependencies: ["int_incr_ttl"] + upstream_dependencies: ["stg_ttl"] + tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: @@ -15,14 +18,12 @@ models: - closed_at meta: description: "Tests the uniqueness combination of TTL tracking fields" + columns: - name: key_hash description: '{{ doc("key_hash") }}' tests: - not_null - - relationships: - to: ref('int_incr_ttl') - field: key_hash - name: live_until_ledger_seq description: '{{ doc("live_until_ledger_seq") }}'