Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create fact_enrollments as MV (FC-0033) #478

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
create a materialized view to populate a denormalized fact table of enrollment events
"""

from alembic import op

revision = "0024"
down_revision = "0023"
branch_labels = None
depends_on = None
on_cluster = (
" ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' "
if "{{CLICKHOUSE_CLUSTER_NAME}}"
else ""
)
engine = "ReplicatedMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "MergeTree"


def upgrade():
op.execute(
f"""
create table if not exists {{ ASPECTS_XAPI_DATABASE }}.fact_enrollments (
emission_time DateTime,
org String,
course_key String,
course_name String,
course_run String,
actor_id String,
enrollment_mode LowCardinality(String),
enrollment_status LowCardinality(String)
) ENGINE = {engine}
PRIMARY KEY (org, course_key)
ORDER BY (org, course_key, actor_id, enrollment_mode, enrollment_status, emission_time)
"""
)

op.execute(
f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS {{ ASPECTS_XAPI_DATABASE }}.fact_enrollments_mv
{on_cluster}
TO {{ ASPECTS_XAPI_DATABASE }}.fact_enrollments AS
select
enrollments.emission_time as emission_time,
enrollments.org as org,
enrollments.course_key as course_key,
courses.course_name as course_name,
courses.course_run as course_run,
enrollments.actor_id as actor_id,
enrollments.enrollment_mode as enrollment_mode,
splitByString('/', enrollments.verb_id)[-1] as enrollment_status
from
{{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_ENROLLMENT_EVENTS_TABLE }} enrollments
join {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names courses
on enrollments.course_key = courses.course_key
"""
)

op.execute(
"""
insert into {{ ASPECTS_XAPI_DATABASE }}.fact_enrollments
select
enrollments.emission_time as emission_time,
enrollments.org as org,
enrollments.course_key as course_key,
courses.course_name as course_name,
courses.course_run as course_run,
enrollments.actor_id as actor_id,
enrollments.enrollment_mode as enrollment_mode,
splitByString('/', enrollments.verb_id)[-1] as enrollment_status
from
{{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_ENROLLMENT_EVENTS_TABLE }} enrollments
join {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names courses
on enrollments.course_key = courses.course_key
"""
)


def downgrade():
op.execute("DROP TABLE IF EXISTS {{ ASPECTS_XAPI_DATABASE }}.fact_enrollments")
op.execute("DROP VIEW IF EXISTS {{ ASPECTS_XAPI_DATABASE }}.fact_enrollments_mv")