Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for data pipeline load testing #675

Merged
merged 2 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tutoraspects/patches/openedx-common-settings
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ SUPERSET_CONFIG = {
}
EVENT_SINK_CLICKHOUSE_PII_MODELS = {{ EVENT_SINK_PII_MODELS }}

EVENT_ROUTING_BACKEND_BATCH_SIZE = 10
EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False
bmtcril marked this conversation as resolved.
Show resolved Hide resolved

ASPECTS_INSTRUCTOR_DASHBOARDS = {{ ASPECTS_INSTRUCTOR_DASHBOARDS }}
SUPERSET_EXTRA_FILTERS_FORMAT = {{ ASPECTS_SUPERSET_EXTRA_FILTERS_FORMAT }}
{% if ASPECTS_ENABLE_INSTRUCTOR_DASHBOARD_PLUGIN %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \
pip install "platform-plugin-aspects==0.4.0"
pip install "platform-plugin-aspects==v0.5.0"
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \
pip install "edx-event-routing-backends==v8.1.1"
pip install "edx-event-routing-backends==v8.3.1"
7 changes: 5 additions & 2 deletions tutoraspects/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
# Each new setting is a pair: (setting_name, default_value).
# Prefix your setting names with 'ASPECTS_'.
("ASPECTS_VERSION", __version__),
# General tutor specific settings
("RUN_VECTOR", True),
# For out default deployment we currently use Celery -> Ralph for transport,
# so Vector is off by default.
("RUN_VECTOR", False),
("RUN_CLICKHOUSE", True),
("RUN_RALPH", True),
("RUN_SUPERSET", True),
Expand Down Expand Up @@ -136,6 +137,8 @@
("ASPECTS_EVENT_SINK_RECENT_BLOCKS_MV", "most_recent_course_blocks_mv"),
# Vector settings
("ASPECTS_DOCKER_HOST_SOCK_PATH", "/var/run/docker.sock"),
("ASPECTS_VECTOR_STORE_TRACKING_LOGS", False),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These settings are new and let us turn on and off each piece of the Vector storage individually.

("ASPECTS_VECTOR_STORE_XAPI", True),
("ASPECTS_VECTOR_DATABASE", "openedx"),
("ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE", "_tracking"),
("ASPECTS_VECTOR_RAW_XAPI_TABLE", "xapi_events_all"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
"""
Partition the event_sink.user_profile table

.. pii: Stores Open edX user profile data.
.. pii_types: user_id, name, username, location, phone_number, email_address, birth_date, biography, gender
.. pii_retirement: local_api, consumer_api
Partition the xapi table by year and month
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in this file is just cleaning up incorrect language from the original commit.

"""
from alembic import op

Expand All @@ -15,15 +11,15 @@
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = "ReplicatedReplacingMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "ReplacingMergeTree"

old_user_profile_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}"
old_xapi_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}"

def upgrade():
# Partition event_sink.user_profile table
# 1. Rename old table
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
TO {old_xapi_table}
{on_cluster}
"""
)
Expand All @@ -46,13 +42,13 @@ def upgrade():
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
SELECT event_id, emission_time, event FROM {old_xapi_table}
"""
)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
DROP TABLE {old_xapi_table}
{on_cluster}
"""
)
Expand All @@ -64,7 +60,7 @@ def downgrade():
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
TO {old_xapi_table}
{on_cluster}
"""
)
Expand All @@ -87,14 +83,14 @@ def downgrade():
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
SELECT * FROM {old_xapi_table}
"""

)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
DROP TABLE {old_xapi_table}
{on_cluster}
"""
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Create the load_test_stats table

This table is always created, but it will only be populated if the load test
management commands are run from the platform_plugin_aspects app.
"""
from alembic import op


revision = "0034"
down_revision = "0033"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = "ReplicatedMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "MergeTree"


def upgrade():
op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_runs
{on_cluster}
(
run_id String,
timestamp DateTime default now(),
event_type String,
extra String
)
engine = {engine} PRIMARY KEY (run_id, timestamp)
ORDER BY (run_id, timestamp)
SETTINGS index_granularity = 8192;
"""
)

op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_stats
{on_cluster}
(
run_id String,
timestamp DateTime default now(),
stats String
)
engine = {engine} PRIMARY KEY (run_id, timestamp)
ORDER BY (run_id, timestamp)
SETTINGS index_granularity = 8192;
"""
)


def downgrade():
op.execute(
"DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_stats"
f"{on_cluster}"
)

op.execute(
"DROP TABLE IF EXISTS {{ASPECTS_EVENT_SINK_DATABASE}}.load_test_runs"
f"{on_cluster}"
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### Transforms
### Tracking logs

{% if ASPECTS_VECTOR_STORE_TRACKING_LOGS %}
# Parse tracking logs: extract time
[transforms.tracking]
type = "remap"
Expand Down Expand Up @@ -30,6 +31,7 @@ if err_timestamp != null {
drop_on_error = true
drop_on_abort = true


[transforms.tracking_debug]
type = "remap"
inputs = ["tracking"]
Expand All @@ -38,6 +40,32 @@ source = '''
.message = parse_json!(.message)
'''

# Log all events to stdout, for debugging
[sinks.out]
type = "console"
inputs = ["tracking_debug"]
encoding.codec = "json"
encoding.only_fields = ["time", "message.context.course_id", "message.context.user_id", "message.name"]

# # Send logs to clickhouse
[sinks.clickhouse]
type = "clickhouse"
auth.strategy = "basic"
auth.user = "{{ ASPECTS_CLICKHOUSE_VECTOR_USER }}"
auth.password = "{{ ASPECTS_CLICKHOUSE_VECTOR_PASSWORD }}"
# Required: https://github.com/timberio/vector/issues/5797
encoding.timestamp_format = "unix"
inputs = ["tracking"]
endpoint = "{% if CLICKHOUSE_SECURE_CONNECTION %}https{% else %}http{% endif %}://{{ CLICKHOUSE_HOST }}:{{ CLICKHOUSE_INTERNAL_HTTP_PORT }}"
database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE }}"
healthcheck = true

{% endif %}

### xAPI
{% if ASPECTS_VECTOR_STORE_XAPI %}

[transforms.xapi]
type = "remap"
inputs = ["openedx_containers"]
Expand Down Expand Up @@ -80,35 +108,12 @@ source = '''
.message = parse_json!(.event)
'''

### Sinks

# Log all events to stdout, for debugging
[sinks.out]
type = "console"
inputs = ["tracking_debug"]
encoding.codec = "json"
encoding.only_fields = ["time", "message.context.course_id", "message.context.user_id", "message.name"]

[sinks.out_xapi]
type = "console"
inputs = ["xapi_debug"]
encoding.codec = "json"
encoding.only_fields = ["event_id", "emission_time", "event"]

# # Send logs to clickhouse
[sinks.clickhouse]
type = "clickhouse"
auth.strategy = "basic"
auth.user = "{{ ASPECTS_CLICKHOUSE_VECTOR_USER }}"
auth.password = "{{ ASPECTS_CLICKHOUSE_VECTOR_PASSWORD }}"
# Required: https://github.com/timberio/vector/issues/5797
encoding.timestamp_format = "unix"
inputs = ["tracking"]
endpoint = "{% if CLICKHOUSE_SECURE_CONNECTION %}https{% else %}http{% endif %}://{{ CLICKHOUSE_HOST }}:{{ CLICKHOUSE_INTERNAL_HTTP_PORT }}"
database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE }}"
healthcheck = true

[sinks.clickhouse_xapi]
type = "clickhouse"
auth.strategy = "basic"
Expand All @@ -124,4 +129,6 @@ database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_VECTOR_RAW_XAPI_TABLE }}"
healthcheck = true

{% endif %}

{{ patch("vector-common-toml") }}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ data_dir = "/vector-data-dir"
# Vector's API for introspection
[api]
enabled = true
address = "127.0.0.1:8686"
address = "0.0.0.0:8686"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to support the load test scripts hitting the Vector API, but it should still not be available outside of the cluster unless someone configures it that way on purpose.