diff --git a/platform_plugin_aspects/management/commands/load_test_tracking_events.py b/platform_plugin_aspects/management/commands/load_test_tracking_events.py new file mode 100644 index 0000000..b322fab --- /dev/null +++ b/platform_plugin_aspects/management/commands/load_test_tracking_events.py @@ -0,0 +1,197 @@ +""" +Generates tracking events by creating test users and fake activity. + +This should never be run on a production server as it will generate a lot of +bad data. It is entirely for benchmarking purposes in load test environments. +It is also fragile due to reaching into the edx-platform testing internals. +""" + +import logging +import uuid +from datetime import datetime, timedelta +from random import choice +from textwrap import dedent +from time import sleep +from typing import Any + +from django.contrib.auth.models import User +from django.core.management.base import BaseCommand, CommandError + +try: + from cms.djangoapps.contentstore.views.course import create_new_course_in_store + from common.djangoapps.student.helpers import do_create_account + from common.djangoapps.student.models.course_enrollment import CourseEnrollment + from openedx.core.djangoapps.user_authn.views.registration_form import ( + AccountCreationForm, + ) + from xmodule.modulestore import ModuleStoreEnum + + RUNNING_IN_PLATFORM = True +except ImportError: + RUNNING_IN_PLATFORM = False + +log = logging.getLogger(__name__) + + +class LoadTest: + """ + Base class for setting up and sending events. + """ + + course = None + instructor = None + users = [] + sent_event_count = 0 + + def __init__(self, num_users: int, username_prefix: str): + course_shortname = str(uuid.uuid4())[:6] + self.instructor = self.create_user( + username=f"instructor_{course_shortname}", + name="Instructor", + password="aspects", + email=f"instructor_{course_shortname}@openedx.invalid", + ) + + start_date = datetime.now() - timedelta(days=7) + + fields = {"start": start_date, "display_name": f"Course {course_shortname}"} + + log.info( + f"""Creating course: + Instructor: {self.instructor.id} + Org: "OEX" + Number: "{course_shortname}" + Run: "2024-1" + Fields: {fields} + """ + ) + + self.course = create_new_course_in_store( + ModuleStoreEnum.Type.split, + self.instructor, + "OEX", + course_shortname, + "2024-1", + fields, + ) + + log.info(f"Created course {self.course.id}") + self.create_and_enroll_learners(num_users, username_prefix) + + def create_and_enroll_learners(self, num_users, username_prefix): + log.info(f"Creating {num_users} users prefixed with {username_prefix}.") + + for _ in range(num_users): + user_short_name = str(uuid.uuid4())[:6] + u = self.create_user( + username=f"{username_prefix}_{user_short_name}", + name=f"Learner {user_short_name}", + password="aspects", + email=f"{user_short_name}@openedx.invalid", + ) + self.users.append(u) + e = CourseEnrollment.get_or_create_enrollment( + user=u, course_key=self.course.id + ) + e.is_active = True + e.save() + + def create_user(self, **user_data): + account_creation_form = AccountCreationForm(data=user_data, tos_required=False) + + user, _, _ = do_create_account(account_creation_form) + user.is_active = True + user.save() + return user + + def trigger_events( + self, num_events: int, sleep_time: float, run_until_killed: bool + ) -> None: + if run_until_killed: + log.info(f"Creating events until killed with {sleep_time} between!") + while True: + self.trigger_event_and_sleep(sleep_time) + else: + log.info(f"Creating events {num_events} with {sleep_time} between!") + for _ in range(num_events): + self.trigger_event_and_sleep(sleep_time) + + def trigger_event_and_sleep(self, sleep_time: float) -> None: + user = choice(self.users) + log.info(f"Triggering event for user {user.username}.") + e = CourseEnrollment.get_or_create_enrollment( + user=user, course_key=self.course.id + ) + + if e.is_active: + e.unenroll(user, self.course.id) + else: + e.enroll(user, self.course.id) + + self.sent_event_count += 1 + sleep(sleep_time) + + +class Command(BaseCommand): + """ + Create tracking log events for load testing purposes. + + Example: + tutor local run lms ./manage.py lms load_test_tracking_events --sleep_time 0 + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser: Any) -> None: + parser.add_argument( + "--num_users", + type=int, + default=10, + help="The number of users to create. All events will be generated for these learners.", + ) + parser.add_argument( + "--username_prefix", + type=str, + default="lt_", + help="Prefix for the generated user names.", + ) + parser.add_argument( + "--num_events", + type=int, + default=10, + help="The number of events to generate. This is ignored if --run_until_killed is set.", + ) + parser.add_argument( + "--run_until_killed", + action="store_true", + default=False, + help="If this is set, the process will run endlessly until killed.", + ) + parser.add_argument( + "--sleep_time", + type=float, + default=0.75, + help="Fractional number of seconds to sleep between sending events.", + ) + + def handle(self, *args, **options): + """ + Creates users and triggers events for them as configured above. + """ + if not RUNNING_IN_PLATFORM: + raise CommandError("This command must be run in the Open edX LMS or CMS.") + + start = datetime.now() + lt = LoadTest(options["num_users"], options["username_prefix"]) + + try: + lt.trigger_events( + options["num_events"], + options["sleep_time"], + options["run_until_killed"], + ) + except KeyboardInterrupt: + log.warning("Killed by keyboard, finishing.") + + end = datetime.now() + log.info(f"Sent {lt.sent_event_count} events in {end - start}.") diff --git a/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py new file mode 100644 index 0000000..56199b4 --- /dev/null +++ b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py @@ -0,0 +1,288 @@ +""" +Monitors the load test tracking script and saves output for later analysis. +""" + +import csv +import datetime +import io +import json +import logging +import uuid +from textwrap import dedent +from time import sleep +from typing import Any + +import redis +import requests +from django.conf import settings +from django.core.management.base import BaseCommand + +from platform_plugin_aspects.sinks.base_sink import ClickHouseAuth + +log = logging.getLogger(__name__) + + +class Monitor: + def __init__(self, sleep_time: float, backend: str): + self.run_id = str(uuid.uuid4())[:6] + self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] + self.ch_auth = ClickHouseAuth( + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"], + ) + self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] + self.ch_xapi_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_database", "xapi" + ) + self.ch_xapi_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_table", "xapi_events_all" + ) + self.ch_stats_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "stats_table", "load_test_stats" + ) + self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[ + "timeout_secs" + ] + self.sleep_time = sleep_time + self.backend = backend + + def run(self) -> None: + collect_redis_bus = True if self.backend == "redis_bus" else False + collect_celery = True if self.backend == "celery" else False + collect_kafka_bus = True if self.backend == "kafka_bus" else False + collect_vector = True if self.backend == "vector" else False + + while True: + start = datetime.datetime.now() + log.info(f"----------- {start} --------") + + current_stats = {"clickhouse": self.get_clickhouse_stats()} + if collect_redis_bus: + current_stats["redis_bus"] = self.get_redis_bus_stats() + if collect_celery: + current_stats["celery"] = self.get_celery_stats() + if collect_kafka_bus: + current_stats["kafka_bus"] = self.get_kafka_bus_stats() + if collect_vector: + current_stats["vector"] = self.get_vector_stats() + + self.store_stats(current_stats) + + # Try to keep our collection cadence to exactly what was asked + # otherwise + check_duration = start - datetime.datetime.now() + + if check_duration.total_seconds() >= self.sleep_time: + log.warning( + f"It took {check_duration} to collect and store stats, this is greater than the sleep time!" + ) + + next_sleep = self.sleep_time - check_duration.total_seconds() + sleep(next_sleep) + + def store_stats(self, current_stats: dict) -> None: + stats = json.dumps(current_stats) + + insert = f"""INSERT INTO {self.ch_stats_table} (run_id, stats) FORMAT CSV""" + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) + writer.writerow((self.run_id, stats)) + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_database, "query": insert}, + data=output.getvalue().encode("utf-8"), + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + def get_clickhouse_stats(self): + select = f""" + SELECT + count(*) as ttl_count, + max(emission_time) as most_recent, + date_diff('second', max(emission_time), now()) as lag_seconds + FROM {self.ch_xapi_table} + FORMAT JSON + """ + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_xapi_database, "query": select}, + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + resp = response.json()["data"][0] + log.info(f"Clickhouse lag seconds: {resp['lag_seconds']}") + + return { + "total_rows": resp["ttl_count"], + "most_recent_event": resp["most_recent"], + "lag_seconds": resp["lag_seconds"], + } + + def get_celery_stats(self): + r = redis.Redis.from_url(settings.BROKER_URL) + lms_queue = r.llen("edx.lms.core.default") + cms_queue = r.llen("edx.cms.core.default") + + log.info(f"Celery queues: LMS {lms_queue}, CMS {cms_queue}") + + return { + "lms_queue_length": lms_queue, + "cms_queue_length": cms_queue, + } + + def get_redis_bus_stats(self): + r = redis.Redis.from_url(settings.EVENT_BUS_REDIS_CONNECTION_URL) + info = r.xinfo_stream("openedx-analytics", full=True) + + lag = [] + for g in info["groups"]: + lag.append({str(g["name"]): g["lag"]}) + + consumer_stats = { + "total_events": info["length"], + "queue_lengths": lag, + } + + log.info(f"Redis bus queue length: {consumer_stats['queue_lengths']}") + + return consumer_stats + + def get_kafka_bus_stats(self): + # This isn't always installed, but should be if the Kafka bus is on + import confluent_kafka + + brokers = settings.EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS + topic = f"{settings.EVENT_BUS_TOPIC_PREFIX}-analytics" + group = "analytics-service" + + # This consumer will not join the group, but the group.id is required by + # committed() to know which group to get offsets for. + consumer = confluent_kafka.Consumer({ + 'bootstrap.servers': brokers, + 'group.id': group + }) + + # Get the topic's partitions + metadata = consumer.list_topics(topic, timeout=10) + + if metadata.topics[topic].error is not None: + log.info(metadata.topics[topic].error) + + partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions] + committed = consumer.committed(partitions, timeout=10) + + consumer_stats = { + "topic": topic, + "partitions": [], + } + + for partition in committed: + # Get the partitions low and high watermark offsets. + low, high = consumer.get_watermark_offsets(partition, timeout=10, cached=False) + + if high < 0: + lag = 0 + elif partition.offset < 0: + # No committed offset, show total message count as lag. + # The actual message count may be lower due to compaction + # and record deletions. + lag = high - low + else: + lag = high - partition.offset + + log.info(f"{partition.topic} [{partition.partition}] Lag: {lag}") + + consumer_stats["partitions"].append({ + "partition": partition.partition, + "lag": lag, + }) + + consumer.close() + return consumer_stats + + def get_vector_stats(self): + url = "http://vector:8686/graphql" + query = """ +{ + sinks(filter:{componentId:{equals:"clickhouse_xapi"}}) { + edges { + node { + ...on Sink { + componentId, + componentType, + metrics { + receivedEventsTotal {timestamp, receivedEventsTotal}, + sentEventsTotal {timestamp, sentEventsTotal} + } + } + } + } + } +} + """ + r = requests.post(url, json={"query": query}) + log.info(r.text) + r.raise_for_status() + metrics = r.json()["data"]["sinks"]["edges"][0]["node"]["metrics"] + + # These will be null until events start arriving + received = metrics["receivedEventsTotal"]["receivedEventsTotal"] if metrics["receivedEventsTotal"] else 0.0 + sent = metrics["sentEventsTotal"]["sentEventsTotal"] if metrics["sentEventsTotal"] else 0.0 + + rtn = { + "events_received": received, + "events_sent": sent, + "lag": received - sent + } + + log.info(f"Vector received: {rtn['events_received']} sent: {rtn['events_sent']} lag: {rtn.get('lag')}") + return rtn + + +class Command(BaseCommand): + """ + Dump objects to a ClickHouse instance. + + Example: + tutor local run lms ./manage.py lms monitor_load_test_tracking --sleep_time 5 --backend redis_bus + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser: Any) -> None: + parser.add_argument( + "--sleep_time", + type=float, + default=10, + help="Fractional number of seconds to sleep between gathering data.", + ) + parser.add_argument( + "--backend", + choices=["redis_bus", "kafka_bus", "celery", "vector"], + default="celery", + help="Backend used to send events to ClickHouse", + ) + + def handle(self, *args, **options): + """ + Creates users and triggers events for them as configured above. + """ + start = datetime.datetime.now() + monitor = Monitor(options["sleep_time"], options["backend"]) + + try: + monitor.run() + except KeyboardInterrupt: + log.warning("Killed by keyboard, finishing.") + # monitor.send_end_event() + + end = datetime.datetime.now() + log.info(f"Monitored from {start} to {end} (duration {end - start}).") diff --git a/requirements/base.in b/requirements/base.in index f0bb4ae..6c3bf19 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -9,6 +9,7 @@ web_fragments django_crum celery # Asynchronous task execution library Django # Web application framework +redis requests # HTTP request library edx-django-utils # Django utilities, we use caching and monitoring edx-opaque-keys # Parsing library for course and usage keys diff --git a/requirements/base.txt b/requirements/base.txt index 31818b7..7ed928a 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,6 +10,8 @@ appdirs==1.4.4 # via fs asgiref==3.7.2 # via django +async-timeout==4.0.3 + # via redis backports-zoneinfo[tzdata]==0.2.1 # via # celery @@ -123,6 +125,8 @@ pyyaml==6.0.1 # code-annotations # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/base.in requests==2.31.0 # via # -r requirements/base.in diff --git a/requirements/dev.txt b/requirements/dev.txt index 2d36640..5beda19 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -21,6 +21,10 @@ astroid==3.1.0 # -r requirements/quality.txt # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/quality.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/quality.txt @@ -364,6 +368,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/quality.txt requests==2.31.0 # via # -r requirements/quality.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index 9c29f49..58c8b22 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -20,6 +20,10 @@ asgiref==3.7.2 # via # -r requirements/test.txt # django +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis babel==2.14.0 # via # pydata-sphinx-theme @@ -298,6 +302,8 @@ pyyaml==6.0.1 # xblock readme-renderer==43.0 # via twine +redis==5.0.3 + # via -r requirements/test.txt requests==2.31.0 # via # -r requirements/test.txt diff --git a/requirements/quality.txt b/requirements/quality.txt index 81d83a1..c15ebff 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -20,6 +20,10 @@ astroid==3.1.0 # via # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/test.txt @@ -275,6 +279,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/test.txt requests==2.31.0 # via # -r requirements/test.txt diff --git a/requirements/test.txt b/requirements/test.txt index 7c74ace..98f35cf 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -16,6 +16,10 @@ asgiref==3.7.2 # via # -r requirements/base.txt # django +async-timeout==4.0.3 + # via + # -r requirements/base.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/base.txt @@ -213,6 +217,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/base.txt requests==2.31.0 # via # -r requirements/base.txt