Skip to content

Commit

Permalink
Merge pull request #34 from SoryRawyer/rds/include-materialized-views
Browse files Browse the repository at this point in the history
feat: move second-tier MVs into dbt project (FC-0033)
  • Loading branch information
bmtcril authored Dec 18, 2023
2 parents a6321a8 + 66e30ee commit d74b350
Show file tree
Hide file tree
Showing 19 changed files with 193 additions and 112 deletions.
15 changes: 15 additions & 0 deletions macros/cluster_macros.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Return an ON CLUSTER clause if we are running dbt against a cluster
{% macro on_cluster() -%}
{%- if env_var("CLICKHOUSE_CLUSTER_NAME", "") != "" -%}
ON CLUSTER {{ env_var("CLICKHOUSE_CLUSTER_NAME") }}
{%- endif -%}
{%- endmacro %}

-- Return a version of the given table engine suitable for use on ClickHouse clusters
{% macro get_engine(engine) -%}
{%- if env_var("CLICKHOUSE_CLUSTER_NAME", "") != "" -%}
{{ "Replicated" + engine }}
{% else %}
{{ engine }}
{%- endif -%}
{%- endmacro %}
103 changes: 4 additions & 99 deletions models/base/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,109 +5,14 @@ sources:
database: "{{ env_var('XAPI_SCHEMA', 'xapi') }}"
description: "the xapi database in clickhouse"
tables:
- name: xapi_events_all_parsed
identifier: "{{ env_var('ASPECTS_XAPI_TABLE', 'xapi_events_all_parsed') }}"
description: "Materialized view of parsed xapi events"
- name: xapi_events_all
identifier: "{{ env_var('ASPECTS_VECTOR_RAW_XAPI_TABLE', 'xapi_events_all') }}"
description: "Base table for raw xAPI events"
columns:
- name: event_id
- name: verb_id
- name: actor_id
- name: org
- name: course_id
- name: emission_time
description: "timestamp of when the event was emitted"
- name: event
- name: event_str
description: "json string of the event"

- name: enrollment_events
identifier: "{{ env_var('ASPECTS_ENROLLMENT_EVENTS_TABLE', 'enrollment_events') }}"
description: "Materialized view of enrollment events"
columns:
- name: event_id
- name: emission_time
- name: actor_id
- name: object_id
- name: course_key
- name: org
- name: verb_id
- name: enrollment_mode

- name: video_playback_events
identifier: "{{ env_var('ASPECTS_VIDEO_PLAYBACK_EVENTS_TABLE', 'video_playback_events') }}"
columns:
- name: event_id
- name: emission_time
- name: actor_id
- name: object_id
- name: course_key
- name: org
- name: verb_id
- name: video_position

- name: problem_events
identifier: "{{ env_var('ASPECTS_PROBLEM_EVENTS_TABLE', 'problem_events') }}"
columns:
- name: event_id
- name: emission_time
- name: actor_id
- name: object_id
- name: course_key
- name: org
- name: verb_id
- name: responses
- name: scaled_score
- name: success
- name: interaction_type
- name: attempts

- name: navigation_events
identifier: "{{ env_var('ASPECTS_NAVIGATION_EVENTS_TABLE', 'navigation_events') }}"
columns:
- name: event_id
- name: emission_time
- name: actor_id
- name: object_id
- name: course_key
- name: org
- name: verb_id
- name: object_type
- name: starting_position
- name: ending_point

- name: grading_events
identifier: "{{ env_var('ASPECTS_GRADING_EVENTS_TABLE', 'grading_events') }}"
columns:
- name: event_id
- name: emission_time
- name: actor_id
- name: object_id
- name: course_key
- name: org
- name: verb_id
- name: scaled_score

- name: completions_events
identifier: "{{ env_var('ASPECTS_COMPLETION_EVENTS_TABLE', 'completion_events') }}"
columns:
- name: event_id
- name: emission_time
- name: actor_id
- name: object_id
- name: course_key
- name: org
- name: verb_id
- name: progress_percent

- name: forum_events
identifier: "{{ env_var('ASPECTS_FORUM_EVENTS_TABLE', 'forum_events') }}"
columns:
- name: event_id
- name: emission_time
- name: org
- name: course_key
- name: actor_id
- name: object_id
- name: verb_id

- name: event_sink
database: "{{ env_var('ASPECTS_EVENT_SINK_DATABASE', 'event_sink')}}"
Expand Down
30 changes: 30 additions & 0 deletions models/base/xapi_events_all_parsed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_id, verb_id, actor_id, emission_time, event_id)',
order_by='(org, course_id, verb_id, actor_id, emission_time, event_id)',
post_hook='OPTIMIZE TABLE {{ this }} {{ on_cluster() }} FINAL'
) }}

SELECT
event_id as event_id,
JSON_VALUE(event_str, '$.verb.id') as verb_id,
COALESCE(
NULLIF(JSON_VALUE(event_str, '$.actor.account.name'), ''),
NULLIF(JSON_VALUE(event_str, '$.actor.mbox'), ''),
JSON_VALUE(event_str, '$.actor.mbox_sha1sum')
) as actor_id,
JSON_VALUE(event_str, '$.object.id') as object_id,
-- If the contextActivities parent is a course, use that. Otherwise use the object id for the course id
if(
JSON_VALUE(
event_str,
'$.context.contextActivities.parent[0].definition.type')
= 'http://adlnet.gov/expapi/activities/course',
JSON_VALUE(event_str, '$.context.contextActivities.parent[0].id'),
JSON_VALUE(event_str, '$.object.id')
) as course_id,
coalesce(get_org_from_course_url(course_id), '') as org,
emission_time as emission_time,
event_str as event_str
FROM {{ source('xapi', 'xapi_events_all') }}
18 changes: 18 additions & 0 deletions models/completion/completion_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_key, verb_id)',
order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, event_id)'
) }}

SELECT
event_id,
CAST(emission_time, 'DateTime') AS emission_time,
actor_id,
object_id,
splitByString('/', course_id)[-1] AS course_key,
org,
verb_id,
JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/cmi5/result/extensions/progress"') AS progress_percent
FROM {{ ref('xapi_events_all_parsed') }}
WHERE verb_id = 'http://adlnet.gov/expapi/verbs/progressed'
4 changes: 2 additions & 2 deletions models/completion/fact_completions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ with completions as (
splitByString('/course/', object_id)[-1],
splitByString('/xblock/', object_id)[-1]
) as entity_id,
progress_percent/100 as scaled_progress
cast(progress_percent as Float)/100 as scaled_progress
from
{{ source('xapi', 'completions_events') }}
{{ ref('completion_events') }}
)

select
Expand Down
21 changes: 21 additions & 0 deletions models/enrollment/enrollment_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_key)',
order_by='(org, course_key, emission_time, actor_id, enrollment_mode, event_id)'
) }}

SELECT
event_id,
cast(emission_time as DateTime) as emission_time,
actor_id,
object_id,
splitByString('/', course_id)[-1] AS course_key,
org,
verb_id,
JSON_VALUE(event_str, '$.object.definition.extensions."https://w3id.org/xapi/acrossx/extensions/type"') AS enrollment_mode
FROM {{ ref('xapi_events_all_parsed') }}
WHERE verb_id IN (
'http://adlnet.gov/expapi/verbs/registered',
'http://id.tincanapi.com/verb/unregistered'
)
2 changes: 1 addition & 1 deletion models/enrollment/fact_enrollments.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ with enrollments as (
enrollment_mode,
splitByString('/', verb_id)[-1] as enrollment_status
from
{{ source('xapi', 'enrollment_events') }}
{{ ref('enrollment_events') }}
)

select
Expand Down
2 changes: 1 addition & 1 deletion models/forum/fact_forum_interactions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ select
forum.actor_id as actor_id,
forum.verb_id as verb_id
from
{{ source('xapi', 'forum_events') }} forum
{{ ref('forum_events') }} forum
join {{ source('event_sink', 'course_names') }} courses
on (forum.course_key = courses.course_key)
17 changes: 17 additions & 0 deletions models/forum/forum_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_key, verb_id)',
order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, event_id)'
) }}

SELECT
event_id,
CAST(emission_time, 'DateTime') AS emission_time,
org,
splitByString('/', course_id)[-1] AS course_key,
object_id,
actor_id,
verb_id
FROM {{ ref('xapi_events_all_parsed') }}
WHERE JSON_VALUE(event_str, '$.object.definition.type') = 'http://id.tincanapi.com/activitytype/discussion'
4 changes: 2 additions & 2 deletions models/grading/fact_grades.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ with grades as (
actor_id,
scaled_score
from
{{ source('xapi', 'grading_events') }}
{{ ref('grading_events') }}
)

select
Expand All @@ -30,7 +30,7 @@ select
if(blocks.block_name != '', blocks.display_name_with_location, null) as entity_name_with_location,
grades.grade_type as grade_type,
grades.actor_id as actor_id,
cast(grades.scaled_score as Float) as scaled_score,
grades.scaled_score as scaled_score,
case
when scaled_score >= 0.9 then '90-100%'
when scaled_score >= 0.8 and scaled_score < 0.9 then '80-89%'
Expand Down
19 changes: 19 additions & 0 deletions models/grading/grading_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_key, verb_id)',
order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, scaled_score, event_id)'
) }}


SELECT
event_id,
CAST(emission_time, 'DateTime') AS emission_time,
actor_id,
object_id,
splitByString('/', course_id)[-1] AS course_key,
org,
verb_id,
JSONExtractFloat(event_str, 'result', 'score', 'scaled') AS scaled_score
FROM {{ ref('xapi_events_all_parsed') }}
WHERE verb_id IN ('http://id.tincanapi.com/verb/earned', 'https://w3id.org/xapi/acrossx/verbs/evaluated')
2 changes: 1 addition & 1 deletion models/problems/fact_problem_responses.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ with responses as (
success,
attempts
from
{{ source('xapi', 'problem_events') }}
{{ ref('problem_events') }}
where
verb_id = 'https://w3id.org/xapi/acrossx/verbs/evaluated'
)
Expand Down
2 changes: 1 addition & 1 deletion models/problems/int_problem_hints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ with hints as (
else 'N/A'
end as help_type
from
{{ source('xapi', 'problem_events') }}
{{ ref('problem_events') }}
where
verb_id = 'http://adlnet.gov/expapi/verbs/asked'
)
Expand Down
36 changes: 36 additions & 0 deletions models/problems/problem_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_key, verb_id)',
order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, responses, success, event_id)'
) }}

SELECT
event_id,
cast(emission_time as DateTime) as emission_time,
actor_id,
object_id,
splitByString('/', course_id)[-1] AS course_key,
org,
verb_id,
JSON_VALUE(event_str, '$.result.response') as responses,
JSON_VALUE(event_str, '$.result.score.scaled') as scaled_score,
if(
verb_id = 'https://w3id.org/xapi/acrossx/verbs/evaluated',
cast(JSON_VALUE(event_str, '$.result.success') as Bool),
false
) as success,
JSON_VALUE(event_str, '$.object.definition.interactionType') as interaction_type,
if(
verb_id = 'https://w3id.org/xapi/acrossx/verbs/evaluated',
cast(JSON_VALUE(event_str, '$.object.definition.extensions."http://id.tincanapi.com/extension/attempt-id"') as Int16),
0
) as attempts
FROM
{{ ref('xapi_events_all_parsed') }}
WHERE
verb_id in (
'https://w3id.org/xapi/acrossx/verbs/evaluated',
'http://adlnet.gov/expapi/verbs/passed',
'http://adlnet.gov/expapi/verbs/asked'
)
2 changes: 1 addition & 1 deletion models/video/fact_transcript_usage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ with transcripts as (
splitByString('/xblock/', object_id)[2] as video_id,
actor_id
from
{{ source('xapi', 'xapi_events_all_parsed') }}
{{ ref('xapi_events_all_parsed') }}
where
verb_id = 'http://adlnet.gov/expapi/verbs/interacted'
and JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/video/extensions/cc-enabled"') = 'true'
Expand Down
2 changes: 1 addition & 1 deletion models/video/fact_video_plays.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ with plays as (
splitByString('/xblock/', object_id)[-1] as video_id,
actor_id
from
{{ source('xapi', 'video_playback_events') }}
{{ ref('video_playback_events') }}
where
verb_id = 'https://w3id.org/xapi/video/verbs/played'
)
Expand Down
18 changes: 18 additions & 0 deletions models/video/video_playback_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized='materialized_view',
engine=get_engine('ReplacingMergeTree()'),
primary_key='(org, course_key, verb_id)',
order_by='(org, course_key, verb_id, emission_time, actor_id, video_position, event_id)'
) }}

SELECT
event_id,
CAST(emission_time, 'DateTime') AS emission_time,
actor_id,
object_id,
splitByString('/', course_id)[-1] AS course_key,
org,
verb_id,
ceil(CAST(coalesce(nullIf(JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/video/extensions/time"'), ''), nullIf(JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/video/extensions/time-from"'), ''), '0.0'), 'Decimal32(2)')) AS video_position
FROM {{ ref('xapi_events_all_parsed') }}
WHERE (verb_id IN ('http://adlnet.gov/expapi/verbs/completed', 'http://adlnet.gov/expapi/verbs/initialized', 'http://adlnet.gov/expapi/verbs/terminated', 'https://w3id.org/xapi/video/verbs/paused', 'https://w3id.org/xapi/video/verbs/played', 'https://w3id.org/xapi/video/verbs/seeked')) AND (object_id LIKE '%video+block%')
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
dbt-clickhouse==1.4.1
dbt-core==1.4.0
dbt-clickhouse==1.7.0
dbt-core==1.7.0
4 changes: 3 additions & 1 deletion tests/problem_results_uniqueness.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ select
course_key,
problem_id,
actor_id,
responses,
count(*) as num_rows
from
{{ ref('int_problem_results') }}
group by
org,
course_key,
problem_id,
actor_id
actor_id,
responses
having num_rows > 1

0 comments on commit d74b350

Please sign in to comment.