Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bryce/bq time spine compatibility #83

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
logs
.envrc
.env
6 changes: 6 additions & 0 deletions integration_tests/models/datasets/time_spine/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ models:
tests:
- dbt_utils.equality:
compare_model: ref("output__time_spine")
compare_columns:
- ts
- customer_id
- total_items_purchased_between
- total_sales_between
- total_purchases_between
6 changes: 5 additions & 1 deletion integration_tests/seeds/datasets/seed_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,8 @@ seeds:
column_types:
first_before_referrer_url: "{{ 'string' if target.name == 'bigquery' else 'text' }}"
second_pageview_at: "{{ 'datetime' if target.name == 'bigquery' else 'timestamp' }}"


- name: output__time_spine
config:
column_types:
ts: "{{ 'datetime' if target.name == 'bigquery' else 'timestamp' }}"
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ ts,activity_id,customer_id,total_items_purchased_between,total_sales_between,tot
2022-01-01,ac78479a9f4e0a82141e63285e0bd937,1,6,350,2
2022-02-01,40baeaad6b5e05afb0f2346f3f73f2c4,1,5,250,1
2022-03-01,c258c810fde59ef5e4d5f0c5b3b59c50,1,0,0,0
2022-04-01,6dc2dde9fdba51de24cd3d537dd974c7,1,0,0,0
2022-01-01,d242adbbac9ebdbba8caf4d669a13db8,4,2,80,1
2022-02-01,b040c6925c8a8b4fda30d87bb3c828fe,4,6,600,2
2022-03-01,414fd7624da769643b82b680a0476d01,4,0,0,0
2022-04-01,9e0dfbdab056ac46796b461ce49845c2,4,0,0,0
2022-01-01,e93fa0bce5965387e1ee99086d13ebba,7,3,120,1
2022-02-01,6b62dceb80db68b6260c1ab313a13d36,7,2,150,2
2022-03-01,e554eceabffb4739dffa03e6d96d90b0,7,0,0,0
2022-04-01,90e45d8e8f5ab2fe50745d01e119e2c4,7,0,0,0
2022-01-01,5168a14e1555818beb23a394f9e2f95f,10,4,50,1
2022-02-01,91ee7a6ca7f704b424afae0af772078c,10,18,1600,2
2022-03-01,97e03bb3fa450367f3fff74634a2f886,10,0,0,0
2022-04-01,0deea889ea5d00d2c598ad774d598c72,10,0,0,0
49 changes: 34 additions & 15 deletions macros/activity_schema/dataset/_build_dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ time_spine_entities as (
{%- for column in primary_activity.columns %}
{{ dbt_activity_schema.select_column(stream, primary, column).column_sql }} as {{column.alias}},
{%- endfor %}
date_trunc('{{primary_activity.interval}}', min({{primary}}.{{columns.ts}})) as period_start,
{% if primary_activity.end_period=='current' %}current_timestamp{% else %}date_trunc('{{primary_activity.interval}}', max({{primary}}.{{columns.ts}})){% endif %} as period_end,
date_diff('{{primary_activity.interval}}', period_start::timestamp, {% if primary_activity.end_period=='current' %}current_timestamp{% else %}date_trunc('{{primary_activity.interval}}', max({{primary}}.{{columns.ts}})){% endif %}::timestamp) as active_periods
{{dbt_activity_schema.date_trunc(primary_activity.interval, 'min('~columns.ts~')')}} as period_start,
{{dbt_activity_schema.end_period_expression(primary_activity.end_period, columns.ts)}} as period_end,
{{dbt_activity_schema.date_diff(primary_activity.interval, dbt_activity_schema.date_trunc(primary_activity.interval, 'min('~columns.ts~')'), dbt_activity_schema.end_period_expression(primary_activity.end_period, columns.ts))}} as active_periods
from {% if primary_activity.filters is none %}{% if not skip_stream %}{{ stream_relation }}{% else %}{{ ref(primary_activity.model_name) }}{% endif %}{% else %}{{primary_activity_alias}}{{fs}}{% endif %} as {{primary}}
where true
{% if not skip_stream %}
Expand All @@ -109,34 +109,53 @@ time_spine_metadata as (
from time_spine_entities
),
number_spine as (
{% if target.type != 'bigquery' %}
with recursive number_spine as (
select 1 as n -- start the spine at 1
select 0 as n -- start the spine at 1
union all
select n + 1
from number_spine
where n <= (select max_active_periods from time_spine_metadata) -- adjust the upper limit as needed
)
select * from number_spine),
{{primary_activity_alias}} as (
select * from number_spine
{% else %}
select n
from unnest(generate_array(0, 10000)) as n
{% endif %}
),
joined_time_spine as (
select
{%- for column in primary_activity.columns %}
tse.{{column.alias}},
{%- endfor %}
ns.n-1 as n0,
{{ dbt_activity_schema.dateadd(primary_activity.interval, 'n0', 'tse.period_start') }} as {{columns.ts}},
{{ dbt_activity_schema.dateadd(primary_activity.interval, 1, columns.ts) }} as {{columns.activity_repeated_at}},
ns.n as {{columns.activity_occurrence}},
md5(tse.{{req}}{{columns.customer}} || {{columns.ts}}) as {{req}}{{columns.activity_id}},
tse.{{req}}{{columns.customer}},
ns.n,
tse.period_start,
{{ dbt_activity_schema.dateadd(primary_activity.interval, 'ns.n', 'tse.period_start') }} as {{columns.ts}},
{% if columns.anonymous_customer_id is defined %}
tse.{{req}}{{columns.anonymous_customer_id}},
{% endif %}
{{columns.ts}} as {{req}}{{columns.ts}},
{{columns.activity_occurrence}} as {{req}}{{columns.activity_occurrence}},
{{columns.activity_repeated_at}} as {{req}}{{columns.activity_repeated_at}}
tse.{{req}}{{columns.customer}}
from time_spine_entities tse
left join number_spine ns
on tse.active_periods >= ns.n
),
{{primary_activity_alias}} as (
select
{%- for column in primary_activity.columns %}
{{column.alias}},
{%- endfor %}
{{columns.ts}},
{{ dbt_activity_schema.dateadd(primary_activity.interval, 1, columns.ts) }} as {{columns.activity_repeated_at}},
n+1 as {{columns.activity_occurrence}},
{{dbt_activity_schema.md5(req~columns.customer~' || '~columns.ts)}} as {{req}}{{columns.activity_id}},
{{req}}{{columns.customer}},
{% if columns.anonymous_customer_id is defined %}
{{req}}{{columns.anonymous_customer_id}},
{% endif %}
{{columns.ts}} as {{req}}{{columns.ts}},
n+1 as {{req}}{{columns.activity_occurrence}},
{{ dbt_activity_schema.dateadd(primary_activity.interval, 1, columns.ts) }} as {{req}}{{columns.activity_repeated_at}},
from joined_time_spine
)
{% else %}
{% if primary_activity.filters is not none %}
Expand Down
104 changes: 104 additions & 0 deletions macros/activity_schema/dataset/aggregations/_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,107 @@
{% macro bigquery__type_json() %}
{%- do return("json") -%}
{% endmacro %}

{% macro end_period_expression(end_period, ts_col) %}
{%- if end_period == 'current' -%}
{{dbt_activity_schema.current_timestamp()}}
{%- elif end_period == 'max' -%}
max({{dbt_activity_schema.primary()}}.{{ts_col}})
{%- else -%}
{%- endif -%}
{% endmacro %}


{% macro current_timestamp() %}
{{ return(adapter.dispatch("current_timestamp", "dbt_activity_schema")())}}
{% endmacro %}

{% macro default__current_timestamp() %}
current_timestamp
{% endmacro %}

{% macro bigquery__current_timestamp() %}
current_timestamp()
{% endmacro %}


{% macro dateadd(interval, periods, ts) %}
{{ return(adapter.dispatch('dateadd', 'dbt_activity_schema')(interval, periods, ts)) }}
{% endmacro %}

{% macro default__dateadd(interval, periods, ts) %}
dateadd({{interval}}, {{periods}}, {{ts}})
{% endmacro %}

{% macro duckdb__dateadd(interval, periods, ts) %}
date_add({{ts}}, {{periods}} * interval 1 {{interval}})
{% endmacro %}

{% macro snowflake__dateadd(interval, periods, ts) %}
dateadd({{interval}}, {{periods}}, {{ts}})
{% endmacro %}

{% macro redshift__dateadd(interval, periods, ts) %}
dateadd({{interval}}, {{periods}}, {{ts}})
{% endmacro %}

{% macro bigquery__dateadd(interval, periods, ts) %}
date_add({{ts}}, interval {{periods}} {{interval}})
{% endmacro %}


{% macro to_timestamp(ts) %}
{{ return(adapter.dispatch("to_timestamp", "dbt_activity_schema")(ts))}}
{% endmacro %}

{% macro default__to_timestamp(ts) %}
{{ts}}::timestamp
{% endmacro %}

{% macro bigquery__to_timestamp(ts) %}
timestamp({{ts}})
{% endmacro %}



{% macro date_trunc(period, ts) %}
{{ return(adapter.dispatch("date_trunc", "dbt_activity_schema")(period, ts))}}
{% endmacro %}

{% macro default__date_trunc(period, ts) %}
date_trunc('{{period}}', {{ts}})
{% endmacro %}

{% macro bigquery__date_trunc(period, ts) %}
date_trunc({{ts}}, {{period}})
{% endmacro %}


{% macro date_diff(period, start_ts, end_ts) %}
{{ return(adapter.dispatch("date_diff", "dbt_activity_schema")(period, start_ts, end_ts))}}
{% endmacro %}

{% macro default__date_diff(period, start_ts, end_ts) %}
datediff('{{period}}', {{start_ts}}, {{end_ts}})
{% endmacro %}

{% macro duckdb__date_diff(period, start_ts, end_ts) %}
date_diff('{{period}}', {{start_ts}}, {{end_ts}})
{% endmacro %}

{% macro bigquery__date_diff(period, start_ts, end_ts) %}
date_diff({{end_ts}}, {{start_ts}}, {{period}})
{% endmacro %}


{% macro md5(expr) %}
{{ return(adapter.dispatch("md5", "dbt_activity_schema")(expr))}}
{% endmacro %}

{% macro default__md5(expr) %}
md5({{expr}})
{% endmacro %}

{% macro bigquery__md5(expr) %}
to_hex(md5({{expr}}))
{% endmacro %}
27 changes: 0 additions & 27 deletions macros/activity_schema/dataset/utils.sql

This file was deleted.

2 changes: 1 addition & 1 deletion macros/aql/parse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ be wrapped in a valid aggregation function.

{%- set ws_as = ws~"as"~ws -%}
{%- if modules.re.search(ws_as, column_str, modules.re.IGNORECASE) is not none -%}
{%- set alias_base = modules.re.split(ws_as, column_str.strip())[-1] -%}
{%- set alias_base = modules.re.split(ws_as, column_str.strip(), modules.re.IGNORECASE)[-1] -%}
{%- set alias = alias_base.translate(alias_base.maketrans("","", punc)).strip() -%}
{%- else -%}
{%- set alias = dbt_activity_schema.alias_column(activity_name, column, verb, relationship_selector, join_condition, n) -%}
Expand Down
Loading