From 66e30eeaf4207f7647588447b395348fb3751c76 Mon Sep 17 00:00:00 2001 From: Rory Sawyer Date: Thu, 16 Nov 2023 13:34:59 -0500 Subject: [PATCH] feat: move MVs into dbt project This delegates all MV creation and maintenance to the dbt project now that materialized views in ClickHouse are supported by the dbt adapter. --- macros/cluster_macros.sql | 15 +++ models/base/sources.yml | 103 +-------------------- models/base/xapi_events_all_parsed.sql | 30 ++++++ models/completion/completion_events.sql | 18 ++++ models/completion/fact_completions.sql | 4 +- models/enrollment/enrollment_events.sql | 21 +++++ models/enrollment/fact_enrollments.sql | 2 +- models/forum/fact_forum_interactions.sql | 2 +- models/forum/forum_events.sql | 17 ++++ models/grading/fact_grades.sql | 4 +- models/grading/grading_events.sql | 19 ++++ models/problems/fact_problem_responses.sql | 2 +- models/problems/int_problem_hints.sql | 2 +- models/problems/problem_events.sql | 36 +++++++ models/video/fact_transcript_usage.sql | 2 +- models/video/fact_video_plays.sql | 2 +- models/video/video_playback_events.sql | 18 ++++ requirements.txt | 4 +- tests/problem_results_uniqueness.sql | 4 +- 19 files changed, 193 insertions(+), 112 deletions(-) create mode 100644 macros/cluster_macros.sql create mode 100644 models/base/xapi_events_all_parsed.sql create mode 100644 models/completion/completion_events.sql create mode 100644 models/enrollment/enrollment_events.sql create mode 100644 models/forum/forum_events.sql create mode 100644 models/grading/grading_events.sql create mode 100644 models/problems/problem_events.sql create mode 100644 models/video/video_playback_events.sql diff --git a/macros/cluster_macros.sql b/macros/cluster_macros.sql new file mode 100644 index 00000000..5b23a290 --- /dev/null +++ b/macros/cluster_macros.sql @@ -0,0 +1,15 @@ +-- Return an ON CLUSTER clause if we are running dbt against a cluster +{% macro on_cluster() -%} + {%- if env_var("CLICKHOUSE_CLUSTER_NAME", "") != "" -%} + ON CLUSTER {{ env_var("CLICKHOUSE_CLUSTER_NAME") }} + {%- endif -%} +{%- endmacro %} + +-- Return a version of the given table engine suitable for use on ClickHouse clusters +{% macro get_engine(engine) -%} + {%- if env_var("CLICKHOUSE_CLUSTER_NAME", "") != "" -%} + {{ "Replicated" + engine }} + {% else %} + {{ engine }} + {%- endif -%} +{%- endmacro %} diff --git a/models/base/sources.yml b/models/base/sources.yml index 72315d0e..9ea9e18b 100644 --- a/models/base/sources.yml +++ b/models/base/sources.yml @@ -5,109 +5,14 @@ sources: database: "{{ env_var('XAPI_SCHEMA', 'xapi') }}" description: "the xapi database in clickhouse" tables: - - name: xapi_events_all_parsed - identifier: "{{ env_var('ASPECTS_XAPI_TABLE', 'xapi_events_all_parsed') }}" - description: "Materialized view of parsed xapi events" + - name: xapi_events_all + identifier: "{{ env_var('ASPECTS_VECTOR_RAW_XAPI_TABLE', 'xapi_events_all') }}" + description: "Base table for raw xAPI events" columns: - name: event_id - - name: verb_id - - name: actor_id - - name: org - - name: course_id - name: emission_time - description: "timestamp of when the event was emitted" + - name: event - name: event_str - description: "json string of the event" - - - name: enrollment_events - identifier: "{{ env_var('ASPECTS_ENROLLMENT_EVENTS_TABLE', 'enrollment_events') }}" - description: "Materialized view of enrollment events" - columns: - - name: event_id - - name: emission_time - - name: actor_id - - name: object_id - - name: course_key - - name: org - - name: verb_id - - name: enrollment_mode - - - name: video_playback_events - identifier: "{{ env_var('ASPECTS_VIDEO_PLAYBACK_EVENTS_TABLE', 'video_playback_events') }}" - columns: - - name: event_id - - name: emission_time - - name: actor_id - - name: object_id - - name: course_key - - name: org - - name: verb_id - - name: video_position - - - name: problem_events - identifier: "{{ env_var('ASPECTS_PROBLEM_EVENTS_TABLE', 'problem_events') }}" - columns: - - name: event_id - - name: emission_time - - name: actor_id - - name: object_id - - name: course_key - - name: org - - name: verb_id - - name: responses - - name: scaled_score - - name: success - - name: interaction_type - - name: attempts - - - name: navigation_events - identifier: "{{ env_var('ASPECTS_NAVIGATION_EVENTS_TABLE', 'navigation_events') }}" - columns: - - name: event_id - - name: emission_time - - name: actor_id - - name: object_id - - name: course_key - - name: org - - name: verb_id - - name: object_type - - name: starting_position - - name: ending_point - - - name: grading_events - identifier: "{{ env_var('ASPECTS_GRADING_EVENTS_TABLE', 'grading_events') }}" - columns: - - name: event_id - - name: emission_time - - name: actor_id - - name: object_id - - name: course_key - - name: org - - name: verb_id - - name: scaled_score - - - name: completions_events - identifier: "{{ env_var('ASPECTS_COMPLETION_EVENTS_TABLE', 'completion_events') }}" - columns: - - name: event_id - - name: emission_time - - name: actor_id - - name: object_id - - name: course_key - - name: org - - name: verb_id - - name: progress_percent - - - name: forum_events - identifier: "{{ env_var('ASPECTS_FORUM_EVENTS_TABLE', 'forum_events') }}" - columns: - - name: event_id - - name: emission_time - - name: org - - name: course_key - - name: actor_id - - name: object_id - - name: verb_id - name: event_sink database: "{{ env_var('ASPECTS_EVENT_SINK_DATABASE', 'event_sink')}}" diff --git a/models/base/xapi_events_all_parsed.sql b/models/base/xapi_events_all_parsed.sql new file mode 100644 index 00000000..ce884958 --- /dev/null +++ b/models/base/xapi_events_all_parsed.sql @@ -0,0 +1,30 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_id, verb_id, actor_id, emission_time, event_id)', + order_by='(org, course_id, verb_id, actor_id, emission_time, event_id)', + post_hook='OPTIMIZE TABLE {{ this }} {{ on_cluster() }} FINAL' + ) }} + +SELECT + event_id as event_id, + JSON_VALUE(event_str, '$.verb.id') as verb_id, + COALESCE( + NULLIF(JSON_VALUE(event_str, '$.actor.account.name'), ''), + NULLIF(JSON_VALUE(event_str, '$.actor.mbox'), ''), + JSON_VALUE(event_str, '$.actor.mbox_sha1sum') + ) as actor_id, + JSON_VALUE(event_str, '$.object.id') as object_id, + -- If the contextActivities parent is a course, use that. Otherwise use the object id for the course id + if( + JSON_VALUE( + event_str, + '$.context.contextActivities.parent[0].definition.type') + = 'http://adlnet.gov/expapi/activities/course', + JSON_VALUE(event_str, '$.context.contextActivities.parent[0].id'), + JSON_VALUE(event_str, '$.object.id') + ) as course_id, + coalesce(get_org_from_course_url(course_id), '') as org, + emission_time as emission_time, + event_str as event_str +FROM {{ source('xapi', 'xapi_events_all') }} diff --git a/models/completion/completion_events.sql b/models/completion/completion_events.sql new file mode 100644 index 00000000..84ce075d --- /dev/null +++ b/models/completion/completion_events.sql @@ -0,0 +1,18 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_key, verb_id)', + order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, event_id)' + ) }} + +SELECT + event_id, + CAST(emission_time, 'DateTime') AS emission_time, + actor_id, + object_id, + splitByString('/', course_id)[-1] AS course_key, + org, + verb_id, + JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/cmi5/result/extensions/progress"') AS progress_percent +FROM {{ ref('xapi_events_all_parsed') }} +WHERE verb_id = 'http://adlnet.gov/expapi/verbs/progressed' diff --git a/models/completion/fact_completions.sql b/models/completion/fact_completions.sql index a0a45a29..f564ce6c 100644 --- a/models/completion/fact_completions.sql +++ b/models/completion/fact_completions.sql @@ -10,9 +10,9 @@ with completions as ( splitByString('/course/', object_id)[-1], splitByString('/xblock/', object_id)[-1] ) as entity_id, - progress_percent/100 as scaled_progress + cast(progress_percent as Float)/100 as scaled_progress from - {{ source('xapi', 'completions_events') }} + {{ ref('completion_events') }} ) select diff --git a/models/enrollment/enrollment_events.sql b/models/enrollment/enrollment_events.sql new file mode 100644 index 00000000..ab10a6e3 --- /dev/null +++ b/models/enrollment/enrollment_events.sql @@ -0,0 +1,21 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_key)', + order_by='(org, course_key, emission_time, actor_id, enrollment_mode, event_id)' + ) }} + +SELECT + event_id, + cast(emission_time as DateTime) as emission_time, + actor_id, + object_id, + splitByString('/', course_id)[-1] AS course_key, + org, + verb_id, + JSON_VALUE(event_str, '$.object.definition.extensions."https://w3id.org/xapi/acrossx/extensions/type"') AS enrollment_mode +FROM {{ ref('xapi_events_all_parsed') }} +WHERE verb_id IN ( + 'http://adlnet.gov/expapi/verbs/registered', + 'http://id.tincanapi.com/verb/unregistered' +) diff --git a/models/enrollment/fact_enrollments.sql b/models/enrollment/fact_enrollments.sql index 2cafd611..b1159953 100644 --- a/models/enrollment/fact_enrollments.sql +++ b/models/enrollment/fact_enrollments.sql @@ -7,7 +7,7 @@ with enrollments as ( enrollment_mode, splitByString('/', verb_id)[-1] as enrollment_status from - {{ source('xapi', 'enrollment_events') }} + {{ ref('enrollment_events') }} ) select diff --git a/models/forum/fact_forum_interactions.sql b/models/forum/fact_forum_interactions.sql index 4612ca93..261e5e94 100644 --- a/models/forum/fact_forum_interactions.sql +++ b/models/forum/fact_forum_interactions.sql @@ -9,6 +9,6 @@ select forum.actor_id as actor_id, forum.verb_id as verb_id from - {{ source('xapi', 'forum_events') }} forum + {{ ref('forum_events') }} forum join {{ source('event_sink', 'course_names') }} courses on (forum.course_key = courses.course_key) diff --git a/models/forum/forum_events.sql b/models/forum/forum_events.sql new file mode 100644 index 00000000..932c2813 --- /dev/null +++ b/models/forum/forum_events.sql @@ -0,0 +1,17 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_key, verb_id)', + order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, event_id)' + ) }} + +SELECT + event_id, + CAST(emission_time, 'DateTime') AS emission_time, + org, + splitByString('/', course_id)[-1] AS course_key, + object_id, + actor_id, + verb_id +FROM {{ ref('xapi_events_all_parsed') }} +WHERE JSON_VALUE(event_str, '$.object.definition.type') = 'http://id.tincanapi.com/activitytype/discussion' diff --git a/models/grading/fact_grades.sql b/models/grading/fact_grades.sql index 0e31bc9e..d5512c8d 100644 --- a/models/grading/fact_grades.sql +++ b/models/grading/fact_grades.sql @@ -16,7 +16,7 @@ with grades as ( actor_id, scaled_score from - {{ source('xapi', 'grading_events') }} + {{ ref('grading_events') }} ) select @@ -30,7 +30,7 @@ select if(blocks.block_name != '', blocks.display_name_with_location, null) as entity_name_with_location, grades.grade_type as grade_type, grades.actor_id as actor_id, - cast(grades.scaled_score as Float) as scaled_score, + grades.scaled_score as scaled_score, case when scaled_score >= 0.9 then '90-100%' when scaled_score >= 0.8 and scaled_score < 0.9 then '80-89%' diff --git a/models/grading/grading_events.sql b/models/grading/grading_events.sql new file mode 100644 index 00000000..ac736bae --- /dev/null +++ b/models/grading/grading_events.sql @@ -0,0 +1,19 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_key, verb_id)', + order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, scaled_score, event_id)' + ) }} + + +SELECT + event_id, + CAST(emission_time, 'DateTime') AS emission_time, + actor_id, + object_id, + splitByString('/', course_id)[-1] AS course_key, + org, + verb_id, + JSONExtractFloat(event_str, 'result', 'score', 'scaled') AS scaled_score +FROM {{ ref('xapi_events_all_parsed') }} +WHERE verb_id IN ('http://id.tincanapi.com/verb/earned', 'https://w3id.org/xapi/acrossx/verbs/evaluated') diff --git a/models/problems/fact_problem_responses.sql b/models/problems/fact_problem_responses.sql index 832d71dc..eb6ffaa4 100644 --- a/models/problems/fact_problem_responses.sql +++ b/models/problems/fact_problem_responses.sql @@ -9,7 +9,7 @@ with responses as ( success, attempts from - {{ source('xapi', 'problem_events') }} + {{ ref('problem_events') }} where verb_id = 'https://w3id.org/xapi/acrossx/verbs/evaluated' ) diff --git a/models/problems/int_problem_hints.sql b/models/problems/int_problem_hints.sql index a193a227..d9e6c058 100644 --- a/models/problems/int_problem_hints.sql +++ b/models/problems/int_problem_hints.sql @@ -11,7 +11,7 @@ with hints as ( else 'N/A' end as help_type from - {{ source('xapi', 'problem_events') }} + {{ ref('problem_events') }} where verb_id = 'http://adlnet.gov/expapi/verbs/asked' ) diff --git a/models/problems/problem_events.sql b/models/problems/problem_events.sql new file mode 100644 index 00000000..c9d460e3 --- /dev/null +++ b/models/problems/problem_events.sql @@ -0,0 +1,36 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_key, verb_id)', + order_by='(org, course_key, verb_id, emission_time, actor_id, object_id, responses, success, event_id)' + ) }} + +SELECT + event_id, + cast(emission_time as DateTime) as emission_time, + actor_id, + object_id, + splitByString('/', course_id)[-1] AS course_key, + org, + verb_id, + JSON_VALUE(event_str, '$.result.response') as responses, + JSON_VALUE(event_str, '$.result.score.scaled') as scaled_score, + if( + verb_id = 'https://w3id.org/xapi/acrossx/verbs/evaluated', + cast(JSON_VALUE(event_str, '$.result.success') as Bool), + false + ) as success, + JSON_VALUE(event_str, '$.object.definition.interactionType') as interaction_type, + if( + verb_id = 'https://w3id.org/xapi/acrossx/verbs/evaluated', + cast(JSON_VALUE(event_str, '$.object.definition.extensions."http://id.tincanapi.com/extension/attempt-id"') as Int16), + 0 + ) as attempts +FROM + {{ ref('xapi_events_all_parsed') }} +WHERE + verb_id in ( + 'https://w3id.org/xapi/acrossx/verbs/evaluated', + 'http://adlnet.gov/expapi/verbs/passed', + 'http://adlnet.gov/expapi/verbs/asked' + ) diff --git a/models/video/fact_transcript_usage.sql b/models/video/fact_transcript_usage.sql index 7ee162e2..37100ae7 100644 --- a/models/video/fact_transcript_usage.sql +++ b/models/video/fact_transcript_usage.sql @@ -6,7 +6,7 @@ with transcripts as ( splitByString('/xblock/', object_id)[2] as video_id, actor_id from - {{ source('xapi', 'xapi_events_all_parsed') }} + {{ ref('xapi_events_all_parsed') }} where verb_id = 'http://adlnet.gov/expapi/verbs/interacted' and JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/video/extensions/cc-enabled"') = 'true' diff --git a/models/video/fact_video_plays.sql b/models/video/fact_video_plays.sql index 35d7d430..827e0769 100644 --- a/models/video/fact_video_plays.sql +++ b/models/video/fact_video_plays.sql @@ -8,7 +8,7 @@ with plays as ( splitByString('/xblock/', object_id)[-1] as video_id, actor_id from - {{ source('xapi', 'video_playback_events') }} + {{ ref('video_playback_events') }} where verb_id = 'https://w3id.org/xapi/video/verbs/played' ) diff --git a/models/video/video_playback_events.sql b/models/video/video_playback_events.sql new file mode 100644 index 00000000..bf453fa6 --- /dev/null +++ b/models/video/video_playback_events.sql @@ -0,0 +1,18 @@ +{{ config( + materialized='materialized_view', + engine=get_engine('ReplacingMergeTree()'), + primary_key='(org, course_key, verb_id)', + order_by='(org, course_key, verb_id, emission_time, actor_id, video_position, event_id)' + ) }} + +SELECT + event_id, + CAST(emission_time, 'DateTime') AS emission_time, + actor_id, + object_id, + splitByString('/', course_id)[-1] AS course_key, + org, + verb_id, + ceil(CAST(coalesce(nullIf(JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/video/extensions/time"'), ''), nullIf(JSON_VALUE(event_str, '$.result.extensions."https://w3id.org/xapi/video/extensions/time-from"'), ''), '0.0'), 'Decimal32(2)')) AS video_position +FROM {{ ref('xapi_events_all_parsed') }} +WHERE (verb_id IN ('http://adlnet.gov/expapi/verbs/completed', 'http://adlnet.gov/expapi/verbs/initialized', 'http://adlnet.gov/expapi/verbs/terminated', 'https://w3id.org/xapi/video/verbs/paused', 'https://w3id.org/xapi/video/verbs/played', 'https://w3id.org/xapi/video/verbs/seeked')) AND (object_id LIKE '%video+block%') diff --git a/requirements.txt b/requirements.txt index d27c55be..d8ef41d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -dbt-clickhouse==1.4.1 -dbt-core==1.4.0 +dbt-clickhouse==1.7.0 +dbt-core==1.7.0 diff --git a/tests/problem_results_uniqueness.sql b/tests/problem_results_uniqueness.sql index d1532dd7..e8e7a363 100644 --- a/tests/problem_results_uniqueness.sql +++ b/tests/problem_results_uniqueness.sql @@ -7,6 +7,7 @@ select course_key, problem_id, actor_id, + responses, count(*) as num_rows from {{ ref('int_problem_results') }} @@ -14,5 +15,6 @@ group by org, course_key, problem_id, - actor_id + actor_id, + responses having num_rows > 1