diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..d62899a --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,25 @@ +name: lint dbt project on push + +on: + push: + branches-ignore: + - 'main' + - 'master' + +jobs: + lint_project: + name: Run SQLFluff linter + runs-on: ubuntu-latest + + steps: + - uses: "actions/checkout@v3" + - uses: "actions/setup-python@v2" + with: + python-version: "3.9" + - name: Install SQLFluff + run: "pip install sqlfluff-templater-dbt==2.3.5 dbt-core==1.6.0 pytz" + - name: Lint project + run: "sqlfluff lint -p 0 models/base" + - name: Lint project 2 + run: "sqlfluff lint -p 0 models/transform" + diff --git a/.sqlfluff b/.sqlfluff new file mode 100644 index 0000000..c613f26 --- /dev/null +++ b/.sqlfluff @@ -0,0 +1,43 @@ +[sqlfluff] +templater = jinja +dialect = snowflake +rules = L001, L003, L006, L008, L009, L010, L014, L017, L019, L030, L036, L039, L046, L050, L063 +exclude_rules = L011, L026, L027, L029, L031, L034, L051 +large_file_skip_byte_limit = 0 + +; Autofixable Rules +; L001 // trailing whitespace +; L003 // indent +; L006 // whitespace around = +; L008 // comas followed by whitespace unless comment +; L009 // files ending in newline +; L010 // keyword case +; L014 // case of identifiers +; L017 // function not followed by paren +; L019 // no leading comas +; L030 // function name case +; L036 // select on newline +; L039 // extra whitespace +; L050 // files not start with newline or whitespace +; L063 // datatypes case + +; Can't autofix +; L046 // jinja tag spacing + +[sqlfluff:indentation] +indented_joins = false +indented_using_on = true +template_blocks_indent = false +tab_space_size = 4 +indent_unit = space + +[sqlfluff:rules:capitalisation.keywords] +capitalisation_policy = consistent + +[sqlfluff:rules:capitalisation.identifiers] +extended_capitalisation_policy = lower + +[sqlfluff:rules:aliasing.length] +min_alias_length = 1 + + diff --git a/.sqlfluffignore b/.sqlfluffignore new file mode 100644 index 0000000..ebfe68f --- /dev/null +++ b/.sqlfluffignore @@ -0,0 +1,3 @@ +target/ +dbt_packages/ +macros/ diff --git a/dbt_project.yml b/dbt_project.yml index 4ee567b..13cfbda 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -4,17 +4,22 @@ # name or the intended use of these models name: 'rudder_customer_journey_analysis' version: '1.0.0' +config-version: 2 # This setting configures which "profile" dbt uses for this project. -profile: 'default' +profile: 'snowflake_dw' +vars: + + rudder_customer_journey_schema: freshwatersystemscom + rudder_customer_journey_database: raw # These configurations specify where dbt should look for different types of files. # The `source-paths` config, for example, states that models in this project can be # found in the "models/" directory. You probably won't need to change these! -source-paths: ["models"] +model-paths: ["models"] analysis-paths: ["analysis"] test-paths: ["tests"] -data-paths: ["data"] +seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] @@ -31,7 +36,8 @@ clean-targets: # directories to be removed by `dbt clean` # as tables. These settings can be overridden in the individual model files # using the `{{ config(...) }}` macro. models: - my_new_project: - # Applies to all files under models/example/ - example: - materialized: view + dbt_customer_journey_analysis: + base: + schema: dbt_customer_journey_analysis + transform: + schema: dbt_customer_journey_analysis diff --git a/models/base/dbt_aliases_mapping.sql b/models/base/dbt_aliases_mapping.sql new file mode 100644 index 0000000..8537a7e --- /dev/null +++ b/models/base/dbt_aliases_mapping.sql @@ -0,0 +1,35 @@ +/* + +the first intent is to be able to link all events from the same user via some common identifier. rudder data payload contains two fields anonymous_id and user_id. the first is device specific and generated by the rudder sdk, the second is the user identifier that can be assigned by application code depending on a user’s registration or login. + +typically a user would login to an application at least once while using the application on a particular device. the user_id should then become available and be included in event messages. below code creates a unique identifier dbt_visitor_id that links the user_id to the anonymous_id. the analytic function first_value has been used here + +*/ + +{{ config(materialized='table') }} + +with dbt_all_mappings as ( + select + anonymous_id, + user_id, + timestamp as timestamp + from {{ source("schema","tracks") }} + + union + distinct + select + user_id, + null, + timestamp + from {{ source("schema","tracks") }} +) +select +distinct + anonymous_id as alias, + coalesce(first_value(user_id ignore nulls) + over ( + partition by anonymous_id + order by timestamp desc + rows between unbounded preceding and unbounded following + ), user_id, anonymous_id) as dbt_visitor_id +from dbt_all_mappings diff --git a/models/base/dbt_mapped_tracks.sql b/models/base/dbt_mapped_tracks.sql new file mode 100644 index 0000000..88e1e91 --- /dev/null +++ b/models/base/dbt_mapped_tracks.sql @@ -0,0 +1,22 @@ +/* + + Use the ID generated while creating dbt_aliases_mapping to link all events for the same user on that device. Also note the idle time between events + +*/ + +{{ config(materialized='table') }} + +select + *, + timestampdiff(minutes, cast(timestamp as timestamp), cast(lag(timestamp) over (partition by dbt_visitor_id order by timestamp) as timestamp)) as idle_time_minutes +from ( + select + t.id as event_id, + t.anonymous_id, + a2v.dbt_visitor_id, + t.timestamp, + t.event as event + from {{ source("schema","tracks")}} as t + inner join {{ ref('dbt_aliases_mapping') }} as a2v + on a2v.alias = coalesce(t.user_id, t.anonymous_id) +) diff --git a/models/base/dbt_session_tracks.sql b/models/base/dbt_session_tracks.sql new file mode 100644 index 0000000..21cbcfc --- /dev/null +++ b/models/base/dbt_session_tracks.sql @@ -0,0 +1,16 @@ +/* + +a decision is made to treat two events, for the same user, that are separated by 30 minutes or more - as belonging to two different user sessions. the choice of 30 minutes is arbitrary and can be modified as per requirements. sequence number is assigned to each event within a particular session. also, the timestamp for the first event in the session is considered as session start time. start time of the next session is also calculated. + +*/ + +{{ config(materialized='table') }} + +select + concat(cast(row_number() over (partition by dbt_visitor_id order by timestamp) as string), ' - ', dbt_visitor_id) as session_id, + dbt_visitor_id, + timestamp as session_start_at, + row_number() over (partition by dbt_visitor_id order by timestamp) as session_sequence_number, + lead(timestamp) over (partition by dbt_visitor_id order by timestamp) as next_session_start_at +from {{ ref('dbt_mapped_tracks') }} +where (idle_time_minutes > 30 or idle_time_minutes is null) diff --git a/models/base/dbt_track_facts.sql b/models/base/dbt_track_facts.sql new file mode 100644 index 0000000..7717046 --- /dev/null +++ b/models/base/dbt_track_facts.sql @@ -0,0 +1,25 @@ +/* + +below code creates a table to link the track events to the session they belong to. the session association is established via the user identifier linkage and the user session start timestamp. + +so if a user u1 has session s1 with start time as t1 and session s2 with start time as t2 - then event e for user u1 would belong to session s1 if its timestamp falls between t1 and t2 or if t2 is null. the second case occurs for the last recorded session for that user. + +*/ + + +{{ config(materialized='table') }} + +select + t.anonymous_id, + t.timestamp, + t.event_id, + t.event as event, + s.session_id, + t.dbt_visitor_id, + row_number() over (partition by s.session_id order by t.timestamp) as track_sequence_number +from {{ ref('dbt_mapped_tracks') }} as t +inner join {{ ref('dbt_session_tracks') }} as s + on + t.dbt_visitor_id = s.dbt_visitor_id + and t.timestamp >= s.session_start_at + and (t.timestamp < s.next_session_start_at or s.next_session_start_at is null) diff --git a/models/base/dbt_tracks_flow.sql b/models/base/dbt_tracks_flow.sql new file mode 100644 index 0000000..fe1c746 --- /dev/null +++ b/models/base/dbt_tracks_flow.sql @@ -0,0 +1,34 @@ +/* + +we leverage analytic functions like first_value and nth_value to create 5-event sequences that capture the flow of events during a session. 5 can be increased or decreased as per requirements. + +*/ + +{{ config(materialized='table') }} + +with derived_table as ( + select + event_id, + session_id, + track_sequence_number, + first_value(event ignore nulls) over (partition by session_id order by track_sequence_number asc) as event, + dbt_visitor_id, + timestamp, + nth_value(event, 2) ignore nulls over (partition by session_id order by track_sequence_number asc) as second_event, + nth_value(event, 3) ignore nulls over (partition by session_id order by track_sequence_number asc) as third_event, + nth_value(event, 4) ignore nulls over (partition by session_id order by track_sequence_number asc) as fourth_event, + nth_value(event, 5) ignore nulls over (partition by session_id order by track_sequence_number asc) as fifth_event + from {{ ref('dbt_track_facts') }} +) +select + event_id, + session_id, + track_sequence_number, + event, + dbt_visitor_id, + cast(timestamp as timestamp) as timestamp, + second_event as event_2, + third_event as event_3, + fourth_event as event_4, + fifth_event as event_5 +from derived_table a diff --git a/models/dbt_aliases_mapping.sql b/models/dbt_aliases_mapping.sql deleted file mode 100644 index 3a298b7..0000000 --- a/models/dbt_aliases_mapping.sql +++ /dev/null @@ -1,34 +0,0 @@ -/* - -The first intent is to be able to link all events from the same user via some common identifier. Rudder data payload contains two fields anonymous_id and user_id. The first is device specific and generated by the Rudder SDK, the second is the user identifier that can be assigned by application code depending on a user’s registration or login. - -Typically a user would login to an application at least once while using the application on a particular device. The user_id should then become available and be included in event messages. Below code creates a unique identifier dbt_visitor_id that links the user_id to the anonymous_id. The analytic function first_value has been used here - -*/ - -{{ config(materialized='table') }} - - with - dbt_all_mappings as ( - select anonymous_id - , user_id - , timestamp as timestamp - from {{ source("","tracks")}} - - union distinct - - select user_id - , null - , timestamp - from {{ source("","tracks")}} - - ) - - select - distinct anonymous_id as alias - ,coalesce(first_value(user_id IGNORE NULLS) - over( - partition by anonymous_id - order by timestamp desc - rows between unbounded preceding and unbounded following), user_id, anonymous_id) as dbt_visitor_id - from dbt_all_mappings diff --git a/models/dbt_mapped_tracks.sql b/models/dbt_mapped_tracks.sql deleted file mode 100644 index 7ad95ff..0000000 --- a/models/dbt_mapped_tracks.sql +++ /dev/null @@ -1,20 +0,0 @@ -/* - - Use the ID generated while creating dbt_aliases_mapping to link all events for the same user on that device. Also note the idle time between events - -*/ - -{{ config(materialized='table') }} - -select * - ,timestamp_diff(cast(timestamp as timestamp), cast(lag(timestamp) over(partition by dbt_visitor_id order by timestamp) as timestamp), minute) as idle_time_minutes - from ( - select t.id as event_id - ,t.anonymous_id - ,a2v.dbt_visitor_id - ,t.timestamp - ,t.event as event - from {{ source("Torpedo_Redshift_Migration","tracks")}} as t - inner join {{ ref('dbt_aliases_mapping') }} as a2v - on a2v.alias = coalesce(t.user_id, t.anonymous_id) - ) \ No newline at end of file diff --git a/models/dbt_session_tracks.sql b/models/dbt_session_tracks.sql deleted file mode 100644 index fbe9efa..0000000 --- a/models/dbt_session_tracks.sql +++ /dev/null @@ -1,15 +0,0 @@ -/* - -A decision is made to treat two events, for the same user, that are separated by 30 minutes or more - as belonging to two different user sessions. The choice of 30 minutes is arbitrary and can be modified as per requirements. Sequence number is assigned to each event within a particular session. Also, the timestamp for the first event in the session is considered as session start time. Start time of the next session is also calculated. - -*/ - -{{ config(materialized='table') }} - - select concat(cast(row_number() over(partition by dbt_visitor_id order by timestamp) AS string), ' - ', dbt_visitor_id) as session_id - , dbt_visitor_id - , timestamp as session_start_at - , row_number() over(partition by dbt_visitor_id order by timestamp) as session_sequence_number - , lead(timestamp) over(partition by dbt_visitor_id order by timestamp) as next_session_start_at -from {{ ref('dbt_mapped_tracks') }} -where (idle_time_minutes > 30 or idle_time_minutes is null) diff --git a/models/dbt_track_facts.sql b/models/dbt_track_facts.sql deleted file mode 100644 index b7fec20..0000000 --- a/models/dbt_track_facts.sql +++ /dev/null @@ -1,23 +0,0 @@ -/* - -Below code creates a table to link the track events to the session they belong to. The session association is established via the user identifier linkage and the user session start timestamp. - -So if a user U1 has session S1 with start time as T1 and session S2 with start time as T2 - then event E for user U1 would belong to session S1 if its timestamp falls between T1 and T2 or if T2 is null. The second case occurs for the last recorded session for that user. - -*/ - - -{{ config(materialized='table') }} - -select t.anonymous_id - , t.timestamp - , t.event_id - , t.event AS event - , s.session_id - , t.dbt_visitor_id - , row_number() over(partition by s.session_id order by t.timestamp) as track_sequence_number - from {{ ref('dbt_mapped_tracks') }} as t - inner join {{ ref('dbt_session_tracks') }} as s - on t.dbt_visitor_id = s.dbt_visitor_id - and t.timestamp >= s.session_start_at - and (t.timestamp < s.next_session_start_at or s.next_session_start_at is null) diff --git a/models/dbt_tracks_flow.sql b/models/dbt_tracks_flow.sql deleted file mode 100644 index f8ce9ee..0000000 --- a/models/dbt_tracks_flow.sql +++ /dev/null @@ -1,34 +0,0 @@ -/* - -We leverage analytic functions like first_value and nth_value to create 5-event sequences that capture the flow of events during a session. 5 can be increased or decreased as per requirements. - -*/ - -{{ config(materialized='table') }} - -with derived_table as ( - select - event_id, - session_id, - track_sequence_number, - first_value(event IGNORE NULLS) over(partition by session_id order by track_sequence_number asc) as event, - dbt_visitor_id, - timestamp, - nth_value(event,2 IGNORE NULLS) over(partition by session_id order by track_sequence_number asc) as second_event, - nth_value(event,3 IGNORE NULLS) over(partition by session_id order by track_sequence_number asc) as third_event, - nth_value(event,4 IGNORE NULLS) over(partition by session_id order by track_sequence_number asc) as fourth_event, - nth_value(event,5 IGNORE NULLS) over(partition by session_id order by track_sequence_number asc) as fifth_event, - from {{ ref('dbt_track_facts') }} - ) - - select event_id - , session_id - , track_sequence_number - , event - , dbt_visitor_id - , cast(timestamp as timestamp) as timestamp - , second_event as event_2 - , third_event as event_3 - , fourth_event as event_4 - , fifth_event as event_5 - from derived_table a \ No newline at end of file diff --git a/models/tracks.yml b/models/tracks.yml index e994ef6..a401812 100644 --- a/models/tracks.yml +++ b/models/tracks.yml @@ -1,6 +1,8 @@ version: 2 sources: - - name: + - name: schema + database: "{{ var('rudder_customer_journey_database', target.database) }}" + schema: "{{ var('rudder_customer_journey_schema', 'rudder' ) }}" tables: - name: tracks columns: @@ -8,4 +10,4 @@ sources: - name: user_id - - name: timestamp \ No newline at end of file + - name: timestamp diff --git a/models/transform/.gitkeep b/models/transform/.gitkeep new file mode 100644 index 0000000..e69de29