From ed5057d07e978c26761f2a4e1a78a195142e4fec Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Thu, 7 Mar 2024 09:20:09 -0500 Subject: [PATCH] feat: Add command to load test tracking events --- docs/how-tos/index.rst | 5 + docs/how-tos/load_test_tracking_pipeline.rst | 145 ++++++++ .../commands/load_test_tracking_events.py | 216 +++++++++++ .../commands/monitor_load_test_tracking.py | 341 ++++++++++++++++++ .../tests/commands/test_load_test_monitor.py | 158 ++++++++ .../test_load_test_tracking_events.py | 92 +++++ platform_plugin_aspects/utils.py | 2 +- requirements/base.in | 1 + requirements/base.txt | 4 + requirements/dev.txt | 6 + requirements/doc.txt | 6 + requirements/quality.txt | 6 + requirements/test.txt | 6 + test_settings.py | 10 +- 14 files changed, 995 insertions(+), 3 deletions(-) create mode 100644 docs/how-tos/load_test_tracking_pipeline.rst create mode 100644 platform_plugin_aspects/management/commands/load_test_tracking_events.py create mode 100644 platform_plugin_aspects/management/commands/monitor_load_test_tracking.py create mode 100644 platform_plugin_aspects/tests/commands/test_load_test_monitor.py create mode 100644 platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py diff --git a/docs/how-tos/index.rst b/docs/how-tos/index.rst index 5147f80..1c28996 100644 --- a/docs/how-tos/index.rst +++ b/docs/how-tos/index.rst @@ -1,2 +1,7 @@ How-tos ####### + +.. toctree:: + :maxdepth: 2 + + load_test_tracking_pipeline diff --git a/docs/how-tos/load_test_tracking_pipeline.rst b/docs/how-tos/load_test_tracking_pipeline.rst new file mode 100644 index 0000000..6d97e75 --- /dev/null +++ b/docs/how-tos/load_test_tracking_pipeline.rst @@ -0,0 +1,145 @@ +Load Testing Tracking Event Pipelines +##################################### + +Aspects has several ways of getting learning traces from tracking events to xAPI statements and then to ClickHouse. In order to be able to emulate them all in a realistic way for performance testing we've created two management commands that are available in edx-platform when ``platform-plugin-aspects`` is installed: + +``load_test_tracking_events`` generates tracking events by creating a test course, a configured number of test users, and enrolling / unenrolling them in a loop. There are options to run a fixed number of events, or to run until killed. There is also a configurable sleep time that can be calibrated for long soak tests, or set to 0 to run in a tight loop. `Note: This needs to be run in CMS as it uses CMS APIs to create the course!` + +All options and defaults are described by running: + +``tutor local run cms ./manage.py cms load_test_tracking_events --help`` + + +``monitor_load_test_tracking`` gathers performance data on the given system while a test is running and stores the data in ClickHouse for later reporting. A configurable sleep time allows you to determine how often stats are collected. The supported backends are: + +- ``celery`` +- ``vector`` +- ``redis_bus`` +- ``kafka_bus`` + +All options and defaults are described by running: + +``tutor local run lms ./manage.py lms monitor_load_test_tracking --help`` + +Each backend stores different statistics, since they each have unique properties. They log the most important stats to the the console for easy monitoring of the test. The data points we're most interested in capturing are: + +- How old is the most recent xAPI event in ClickHouse? What is the lag from event generation to being queryable? +- How far behind is the pipeline? How many events per second can this configuration handle? When will we run out of storage and break the system? + + +Running a test +-------------- + +**The test will create persistent data in your environment! Do not run on production server!** + +#. Make sure you have an isolated test system that is configured in a way that you understand. The point of the test is to stress the system, so using shared infrastructure will skew the test results or potentially cause outages. + +#. Note which backend your system is configured to use, by default Open edX uses Celery. + +#. Make sure this plugin is installed and activated, configuration has been saved, and you have rebuilt the ``openedx`` image. + +#. Start up your environment and let it hit steady state, you may wish to log in and go to the course homepage just to make sure all startup is complete, otherwise the first events will be artificially slow. + +#. Start the monitor. This command will pull ClickHouse lag and Celery queue lengths every 5 seconds: ``tutor local run lms ./manage.py lms monitor_load_test_tracking --backend celery --sleep_time 5`` + +#. Start the test. This command will create a test course, 10 test users, and create 100 events with 0.5 seconds sleep in between: ``tutor local run cms ./manage.py cms load_test_tracking_events --num_users 10 --num_events 100 --sleep_time 0.5`` + +#. Alternatively you can run a test that will continue until stopped, and configure a prefix to your user names if you want to separate them from other tests in the database: ``tutor local run cms ./manage.py cms load_test_tracking_events --num_users 10 --run_until_killed --sleep_time 0.5 --username_prefix loadtest2_`` + +#. Stop the test and monitor with ``ctrl-c``, you may want to let the monitor run until the queue of your system is cleared to get full data on how long it takes to recover from a backlog of events. + +#. Check the table in ClickHouse for the results of the run: ``event_sink.load_test_stats``. Each run has a unique identifier, and each row has a timestamp. The stats themselves are stored in JSON as they differ a great deal between backends. With this information you should be able to chart a run and see how the system performs at various levels of load. + +Celery Notes +------------ + +Celery can be scaled by adding more CMS workers. + +The JSON in ClickHouse for Celery looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "celery": { + "lms_queue_length": 0, # Size of the redis queue of pending Celery tasks for the LMS workers + "cms_queue_length": 0 # Size of the redis queue of pending Celery tasks for the CMS workers + } + } + + +Vector Notes +------------ + +Vector scales differently depending on your deployment strategy. Note that Vector's stats are different from other backends. We are only able to calculate the lag between when Vector reads a line from the logs and when it is sent to ClickHouse. There is no way of telling how far behind the log Vector is, but this lag is still useful to see if ClickHouse insert times are slowing down Vector. Instead we should rely on the ClickHouse lag_seconds metric to get a better idea of how far behind Vector is. + +The JSON in ClickHouse for Vector looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "vector": { + "events_received": 20.0, + "events_sent": 10.0, + "lag": 10.0 + } + } + } + + + +Redis Bus Notes +--------------- + +The redis bus can be scaled by adding more consumers. + +The JSON in ClickHouse for redis bus looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "redis_bus": { + "total_events": 77, # Total number of events that have been added to the redis Stream + "consumers": [ + { + "name": "aspects", # Name of each consumer in the consumer group + "processing": 0, # How many events are currently being processed by that consumer (should be 0 or 1) + "queue_length": 0 # How many events are waiting to be processed in the stream + } + ] + } + } + + +Kafka Bus Notes +--------------- + +The Kafka bus can be scaled by adding more consumers. + +The JSON in ClickHouse for the Kafka bus looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "kafka_bus": { + "topic": "dev-analytics", # The name of the Kafka topic that's being read + "partitions": [ + { + "partition": 0, # The index of the partition + "lag": 150 # How many events are waiting to be processed by the partition + } + ] + } + } + diff --git a/platform_plugin_aspects/management/commands/load_test_tracking_events.py b/platform_plugin_aspects/management/commands/load_test_tracking_events.py new file mode 100644 index 0000000..5e5b142 --- /dev/null +++ b/platform_plugin_aspects/management/commands/load_test_tracking_events.py @@ -0,0 +1,216 @@ +""" +Generates tracking events by creating test users and fake activity. + +This should never be run on a production server as it will generate a lot of +bad data. It is entirely for benchmarking purposes in load test environments. +It is also fragile due to reaching into the edx-platform testing internals. +""" + +import logging +import uuid +from datetime import datetime, timedelta +from random import choice +from textwrap import dedent +from time import sleep +from typing import Any + +from django.core.management.base import BaseCommand, CommandError + +# For testing we won't be able to import from edx-platform +try: # pragma: no cover + from cms.djangoapps.contentstore.views.course import create_new_course_in_store + from common.djangoapps.student.helpers import do_create_account + from common.djangoapps.student.models.course_enrollment import CourseEnrollment + from openedx.core.djangoapps.user_authn.views.registration_form import ( + AccountCreationForm, + ) + from xmodule.modulestore import ModuleStoreEnum + + RUNNING_IN_PLATFORM = True +except ImportError: + create_new_course_in_store = None + do_create_account = None + CourseEnrollment = None + AccountCreationForm = None + ModuleStoreEnum = None + + RUNNING_IN_PLATFORM = False + +log = logging.getLogger(__name__) + + +class LoadTest: + """ + Runs the load test and reports results to ClickHouse. + """ + + course = None + instructor = None + users = [] + sent_event_count = 0 + + def __init__(self, num_users: int, username_prefix: str): + course_shortname = str(uuid.uuid4())[:6] + self.instructor = self.create_user( + username=f"instructor_{course_shortname}", + name="Instructor", + password="aspects", + email=f"instructor_{course_shortname}@openedx.invalid", + ) + + start_date = datetime.now() - timedelta(days=7) + + fields = {"start": start_date, "display_name": f"Course {course_shortname}"} + + log.info( + f"""Creating course: + Instructor: {self.instructor.id} + Org: "OEX" + Number: "{course_shortname}" + Run: "2024-1" + Fields: {fields} + """ + ) + + self.course = create_new_course_in_store( + ModuleStoreEnum.Type.split, + self.instructor, + "OEX", + course_shortname, + "2024-1", + fields, + ) + + log.info(f"Created course {self.course.id}") + self.create_and_enroll_learners(num_users, username_prefix) + + def create_and_enroll_learners(self, num_users, username_prefix): + """ + Uses create test users and enroll them in our test course. + """ + log.info(f"Creating {num_users} users prefixed with {username_prefix}.") + + for _ in range(num_users): + user_short_name = str(uuid.uuid4())[:6] + u = self.create_user( + username=f"{username_prefix}_{user_short_name}", + name=f"Learner {user_short_name}", + password="aspects", + email=f"{user_short_name}@openedx.invalid", + ) + self.users.append(u) + e = CourseEnrollment.get_or_create_enrollment( + user=u, course_key=self.course.id + ) + e.is_active = True + e.save() + + def create_user(self, **user_data): + """ + Create, activate, and return a user using the edx-platform API. + """ + account_creation_form = AccountCreationForm(data=user_data, tos_required=False) + + user, _, _ = do_create_account(account_creation_form) + user.is_active = True + user.save() + return user + + def trigger_events( + self, num_events: int, sleep_time: float, run_until_killed: bool + ) -> None: + """ + Trigger the appropriate number of events based on configuration. + """ + + if run_until_killed: + log.info(f"Creating events until killed with {sleep_time} sleep between!") + while True: + self.trigger_event_and_sleep(sleep_time) + else: + log.info(f"Creating {num_events} event with {sleep_time} sleep between!") + for _ in range(num_events): + self.trigger_event_and_sleep(sleep_time) + + def trigger_event_and_sleep(self, sleep_time: float) -> None: + """ + Cause a tracking log to be emitted and sleep the specified amount of time. + """ + user = choice(self.users) + log.info(f"Triggering event for user {user.username}.") + e = CourseEnrollment.get_or_create_enrollment( + user=user, course_key=self.course.id + ) + + if e.is_active: + e.unenroll(user, self.course.id) + else: + e.enroll(user, self.course.id) # pragma: no cover + + self.sent_event_count += 1 + sleep(sleep_time) + + +class Command(BaseCommand): + """ + Create tracking log events for load testing purposes. + + Example: + tutor local run lms ./manage.py lms load_test_tracking_events --sleep_time 0 + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser: Any) -> None: + parser.add_argument( + "--num_users", + type=int, + default=10, + help="The number of users to create. All events will be generated for these learners.", + ) + parser.add_argument( + "--username_prefix", + type=str, + default="lt_", + help="Prefix for the generated user names.", + ) + parser.add_argument( + "--num_events", + type=int, + default=10, + help="The number of events to generate. This is ignored if --run_until_killed is set.", + ) + parser.add_argument( + "--run_until_killed", + action="store_true", + default=False, + help="If this is set, the process will run endlessly until killed.", + ) + parser.add_argument( + "--sleep_time", + type=float, + default=0.75, + help="Fractional number of seconds to sleep between sending events.", + ) + + def handle(self, *args, **options): + """ + Create users and trigger events for them as configured above. + """ + if not RUNNING_IN_PLATFORM: # pragma: no cover + raise CommandError("This command must be run in the Open edX LMS or CMS.") + + start = datetime.now() + lt = LoadTest(options["num_users"], options["username_prefix"]) + + try: + lt.trigger_events( + options["num_events"], + options["sleep_time"], + options["run_until_killed"], + ) + except KeyboardInterrupt: + log.warning("Killed by keyboard, finishing.") + + end = datetime.now() + log.info(f"Sent {lt.sent_event_count} events in {end - start}.") diff --git a/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py new file mode 100644 index 0000000..c32f280 --- /dev/null +++ b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py @@ -0,0 +1,341 @@ +""" +Monitors the load test tracking script and saves output for later analysis. +""" + +import csv +import datetime +import io +import json +import logging +import uuid +from textwrap import dedent +from time import sleep +from typing import Any + +import redis +import requests +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError + +from platform_plugin_aspects.sinks.base_sink import ClickHouseAuth + +try: + import confluent_kafka +except ImportError: + confluent_kafka = None + +log = logging.getLogger(__name__) + + +class Monitor: + """ + Manages the configuration and state of the load test monitor. + """ + + def __init__(self, sleep_time: float, backend: str): + self.run_id = str(uuid.uuid4())[:6] + self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] + self.ch_auth = ClickHouseAuth( + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"], + ) + self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] + self.ch_xapi_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_database", "xapi" + ) + self.ch_xapi_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_table", "xapi_events_all" + ) + self.ch_stats_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "stats_table", "load_test_stats" + ) + self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[ + "timeout_secs" + ] + self.sleep_time = sleep_time + self.backend = backend + + def run(self) -> None: + """ + Run the monitor until killed. + """ + collect_redis_bus = self.backend == "redis_bus" + collect_celery = self.backend == "celery" + collect_kafka_bus = self.backend == "kafka_bus" + collect_vector = self.backend == "vector" + + while True: + start = datetime.datetime.now() + log.info(f"----------- {start} --------") + + current_stats = {"clickhouse": self.get_clickhouse_stats()} + if collect_redis_bus: + current_stats["redis_bus"] = self.get_redis_bus_stats() + if collect_celery: + current_stats["celery"] = self.get_celery_stats() + if collect_kafka_bus: + current_stats["kafka_bus"] = self.get_kafka_bus_stats() + if collect_vector: + current_stats["vector"] = self.get_vector_stats() + + self.store_stats(current_stats) + + # Try to keep our collection cadence to exactly what was asked + # otherwise + check_duration = datetime.datetime.now() - start + + if check_duration.total_seconds() >= self.sleep_time: + log.warning( + f"It took {check_duration} to collect and store stats, this is greater than the sleep time!" + ) + + next_sleep = self.sleep_time - check_duration.total_seconds() + sleep(next_sleep) + + def store_stats(self, current_stats: dict) -> None: + """ + Send the results for this iteration to ClickHouse. + """ + stats = json.dumps(current_stats) + + insert = f"""INSERT INTO {self.ch_stats_table} (run_id, stats) FORMAT CSV""" + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) + writer.writerow((self.run_id, stats)) + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_database, "query": insert}, + data=output.getvalue().encode("utf-8"), + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + def get_clickhouse_stats(self): + """ + Get the current state of ClickHouse for this iteration. + """ + select = f""" + SELECT + count(*) as ttl_count, + max(emission_time) as most_recent, + date_diff('second', max(emission_time), now()) as lag_seconds + FROM {self.ch_xapi_table} + FORMAT JSON + """ + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_xapi_database, "query": select}, + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + resp = response.json()["data"][0] + log.info(f"Clickhouse lag seconds: {resp['lag_seconds']}") + + return { + "total_rows": resp["ttl_count"], + "most_recent_event": resp["most_recent"], + "lag_seconds": resp["lag_seconds"], + } + + def get_celery_stats(self): + """ + Get the current state of Celery for this iteration. + """ + r = redis.Redis.from_url(settings.BROKER_URL) + lms_queue = r.llen("edx.lms.core.default") + cms_queue = r.llen("edx.cms.core.default") + + log.info(f"Celery queues: LMS {lms_queue}, CMS {cms_queue}") + + return { + "lms_queue_length": lms_queue, + "cms_queue_length": cms_queue, + } + + def get_redis_bus_stats(self): + """ + Get the current state of redis for this iteration. + """ + r = redis.Redis.from_url(settings.EVENT_BUS_REDIS_CONNECTION_URL) + info = r.xinfo_stream("openedx-analytics", full=True) + + lag = [] + for g in info["groups"]: + lag.append({str(g["name"]): g["lag"]}) + + consumer_stats = { + "total_events": info["length"], + "queue_lengths": lag, + } + + log.info(f"Redis bus queue length: {consumer_stats['queue_lengths']}") + + return consumer_stats + + def get_kafka_bus_stats(self): + """ + Get the current state of ClickHouse for this iteration. + """ + if not confluent_kafka: # pragma: no cover + raise CommandError( + "Trying to monitor Kafka bus, but confluent_kafka is not installed" + ) + + brokers = settings.EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS + topic = f"{settings.EVENT_BUS_TOPIC_PREFIX}-analytics" + group = "analytics-service" + + # This consumer will not join the group, but the group.id is required by + # committed() to know which group to get offsets for. + consumer = confluent_kafka.Consumer( + {"bootstrap.servers": brokers, "group.id": group} + ) + + # Get the topic's partitions + metadata = consumer.list_topics(topic, timeout=10) + + if metadata.topics[topic].error is not None: # pragma: no cover + log.info(metadata.topics[topic].error) + + partitions = [ + confluent_kafka.TopicPartition(topic, p) + for p in metadata.topics[topic].partitions + ] + committed = consumer.committed(partitions, timeout=10) + + consumer_stats = { + "topic": topic, + "partitions": [], + } + + for partition in committed: + # Get the partitions low and high watermark offsets. + low, high = consumer.get_watermark_offsets( + partition, timeout=10, cached=False + ) + + if high < 0: # pragma: no cover + lag = 0 + elif partition.offset < 0: # pragma: no cover + # No committed offset, show total message count as lag. + # The actual message count may be lower due to compaction + # and record deletions. + lag = high - low + else: + lag = high - partition.offset + + log.info(f"{partition.topic} [{partition.partition}] Lag: {lag}") + + consumer_stats["partitions"].append( + { + "partition": partition.partition, + "lag": lag, + } + ) + + consumer.close() + return consumer_stats + + def _call_vector_graphql(self): + """ + Make the actual GraphQL call to the Vector API. + """ + # FIXME: Pull this from settings + url = "http://vector:8686/graphql" + query = """ + { + sinks(filter:{componentId:{equals:"clickhouse_xapi"}}) { + edges { + node { + ...on Sink { + componentId, + componentType, + metrics { + receivedEventsTotal {timestamp, receivedEventsTotal}, + sentEventsTotal {timestamp, sentEventsTotal} + } + } + } + } + } + } + """ + r = requests.post(url, json={"query": query}, timeout=10) + r.raise_for_status() + return r.json()["data"]["sinks"]["edges"][0]["node"]["metrics"] + + def get_vector_stats(self): + """ + Get the current state of Vector for this iteration. + """ + metrics = self._call_vector_graphql() + + # These will be null until events start arriving + received = ( + metrics["receivedEventsTotal"]["receivedEventsTotal"] + if metrics["receivedEventsTotal"] + else 0.0 + ) + sent = ( + metrics["sentEventsTotal"]["sentEventsTotal"] + if metrics["sentEventsTotal"] + else 0.0 + ) + + rtn = {"events_received": received, "events_sent": sent, "lag": received - sent} + + log.info( + f"Vector received: {rtn['events_received']} sent: {rtn['events_sent']} lag: {rtn.get('lag')}" + ) + return rtn + + +class Command(BaseCommand): + """ + Dump objects to a ClickHouse instance. + + Example: + tutor local run lms ./manage.py lms monitor_load_test_tracking --sleep_time 5 --backend redis_bus + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser: Any) -> None: + parser.add_argument( + "--sleep_time", + type=float, + default=10, + help="Fractional number of seconds to sleep between gathering data.", + ) + parser.add_argument( + "--backend", + choices=["redis_bus", "kafka_bus", "celery", "vector"], + default="celery", + help="Backend used to send events to ClickHouse", + ) + + def handle(self, *_, **options): + """ + Creates users and triggers events for them as configured above. + """ + start = datetime.datetime.now() + log.info( + f"Starting monitor for {options['backend']} with sleep of {options['sleep_time']} seconds" + ) + monitor = Monitor(options["sleep_time"], options["backend"]) + + try: + monitor.run() + except KeyboardInterrupt: + log.warning("Killed by keyboard, finishing.") + # monitor.send_end_event() + + end = datetime.datetime.now() + log.info(f"Monitored from {start} to {end} (duration {end - start}).") diff --git a/platform_plugin_aspects/tests/commands/test_load_test_monitor.py b/platform_plugin_aspects/tests/commands/test_load_test_monitor.py new file mode 100644 index 0000000..2f09aee --- /dev/null +++ b/platform_plugin_aspects/tests/commands/test_load_test_monitor.py @@ -0,0 +1,158 @@ +""" +Tests for the monitor_load_test_tracking management command. +""" + +import datetime +from collections import namedtuple +from unittest.mock import DEFAULT, Mock, patch + +import pytest +from django.core.management import call_command + +CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_logs"]) +KafkaPartition = namedtuple("KafkaPartition", ["offset", "topic", "partition"]) + + +def load_test_command_basic_options(): + """ + Pytest params for all the different non-ClickHouse command options. + """ + options = [ + # Test our defaults + CommandOptions( + options={}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Celery queues: LMS 5, CMS 5", + "Starting monitor for celery with sleep of 10 seconds", + ], + ), + # Test when sleep time is shorter than the time it takes to gather stats + CommandOptions( + options={"sleep_time": 0}, + expected_logs=[ + "this is greater than the sleep time!", + ], + ), + # Test overriding sleep time + CommandOptions( + options={"sleep_time": 5}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Celery queues: LMS 5, CMS 5", + "Starting monitor for celery with sleep of 5 seconds", + ], + ), + # Test explicit celery backend + CommandOptions( + options={"backend": "celery"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Celery queues: LMS 5, CMS 5", + "Starting monitor for celery with sleep of 10 seconds", + ], + ), + # Test redis bus backend + CommandOptions( + options={"backend": "redis_bus"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Starting monitor for redis_bus with sleep of 10 seconds", + "Redis bus queue length: [{'group1': 2}, {'group2': 3}]", + ], + ), + # Test kafka bus backend + CommandOptions( + options={"backend": "kafka_bus"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Starting monitor for kafka_bus with sleep of 10 seconds", + "test [test] Lag: 95", + ], + ), + # Test vector backend + CommandOptions( + options={"backend": "vector"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Vector received: 10 sent: 10 lag: 0", + "Starting monitor for vector with sleep of 10 seconds", + ], + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", load_test_command_basic_options()) +def test_monitor_options(test_command_option, caplog): + option_combination, expected_outputs = test_command_option + patch_prefix = ( + "platform_plugin_aspects.management.commands.monitor_load_test_tracking" + ) + + with patch.multiple( + f"{patch_prefix}", + requests=DEFAULT, + settings=DEFAULT, + redis=DEFAULT, + json=DEFAULT, + confluent_kafka=DEFAULT, + sleep=Mock(side_effect=KeyboardInterrupt), + ) as patches: + patches["requests"].post.return_value.json.side_effect = ( + # First response is the ClickHouse call + { + "data": [ + { + "lag_seconds": 1, + "ttl_count": 100, + "most_recent": datetime.datetime.now().isoformat(), + } + ] + }, + # Second response is the Vector API call, GraphQL is awful. + { + "data": { + "sinks": { + "edges": [ + { + "node": { + "metrics": { + "sentEventsTotal": {"sentEventsTotal": 10}, + "receivedEventsTotal": { + "receivedEventsTotal": 10 + }, + }, + }, + } + ], + } + } + }, + ) + + patches["redis"].Redis.from_url.return_value.llen.return_value = 5 + patches["redis"].Redis.from_url.return_value.xinfo_stream.return_value = { + "length": 100, + "groups": [ + {"name": "group1", "lag": 2}, + {"name": "group2", "lag": 3}, + ], + } + + patches["confluent_kafka"].Consumer.return_value.committed.return_value = { + KafkaPartition(offset=5, topic="test", partition="test"): Mock() + } + + patches[ + "confluent_kafka" + ].Consumer.return_value.get_watermark_offsets.return_value = (10, 100) + + call_command("monitor_load_test_tracking", **option_combination) + + print(caplog.text) + + for expected_output in expected_outputs: + assert expected_output in caplog.text diff --git a/platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py b/platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py new file mode 100644 index 0000000..8cee9bf --- /dev/null +++ b/platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py @@ -0,0 +1,92 @@ +""" +Tests for the load_test_tracking_events management command. +""" + +from collections import namedtuple +from unittest.mock import DEFAULT, Mock, patch + +import pytest +from django.core.management import call_command + +CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_logs"]) + + +def load_test_command_basic_options(): + """ + Pytest params for all the different non-ClickHouse command options. + + "--num_users", + "--username_prefix", + "--num_events", + "--run_until_killed", + "--sleep_time", + """ + options = [ + CommandOptions( + options={"sleep_time": 0}, + expected_logs=[ + "events in", + "Creating 10 users prefixed with lt_", + "Creating 10 event with 0 sleep between!", + ], + ), + CommandOptions( + options={"sleep_time": 0.1, "num_users": 2, "username_prefix": "cheese_"}, + expected_logs=[ + "events in", + "Creating 2 users prefixed with cheese_", + "Creating 10 event with 0.1 sleep between!", + ], + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", load_test_command_basic_options()) +def test_load_test_options(test_command_option, caplog): + option_combination, expected_outputs = test_command_option + patch_prefix = ( + "platform_plugin_aspects.management.commands.load_test_tracking_events" + ) + + with patch.multiple( + f"{patch_prefix}", + create_new_course_in_store=DEFAULT, + do_create_account=lambda _: (Mock(), DEFAULT, DEFAULT), + CourseEnrollment=DEFAULT, + AccountCreationForm=DEFAULT, + ModuleStoreEnum=DEFAULT, + RUNNING_IN_PLATFORM=True, + ) as _: + call_command("load_test_tracking_events", **option_combination) + + print(caplog.text) + + for expected_output in expected_outputs: + assert expected_output in caplog.text + + +def test_load_test_run_until_killed(caplog): + patch_prefix = ( + "platform_plugin_aspects.management.commands.load_test_tracking_events" + ) + with patch.multiple( + f"{patch_prefix}", + create_new_course_in_store=DEFAULT, + do_create_account=lambda _: (Mock(), DEFAULT, DEFAULT), + CourseEnrollment=DEFAULT, + AccountCreationForm=DEFAULT, + ModuleStoreEnum=DEFAULT, + RUNNING_IN_PLATFORM=True, + sleep=Mock(side_effect=KeyboardInterrupt), + ) as _: + call_command( + "load_test_tracking_events", **{"run_until_killed": True, "sleep_time": 0} + ) + + print(caplog.text) + + assert f"Creating events until killed with 0 sleep between!" in caplog.text + assert f"Killed by keyboard, finishing" in caplog.text diff --git a/platform_plugin_aspects/utils.py b/platform_plugin_aspects/utils.py index 1c14094..d44aeb6 100644 --- a/platform_plugin_aspects/utils.py +++ b/platform_plugin_aspects/utils.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -if settings.DEBUG: +if settings.DEBUG: # pragma: no cover os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" diff --git a/requirements/base.in b/requirements/base.in index f0bb4ae..6c3bf19 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -9,6 +9,7 @@ web_fragments django_crum celery # Asynchronous task execution library Django # Web application framework +redis requests # HTTP request library edx-django-utils # Django utilities, we use caching and monitoring edx-opaque-keys # Parsing library for course and usage keys diff --git a/requirements/base.txt b/requirements/base.txt index 31818b7..7ed928a 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,6 +10,8 @@ appdirs==1.4.4 # via fs asgiref==3.7.2 # via django +async-timeout==4.0.3 + # via redis backports-zoneinfo[tzdata]==0.2.1 # via # celery @@ -123,6 +125,8 @@ pyyaml==6.0.1 # code-annotations # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/base.in requests==2.31.0 # via # -r requirements/base.in diff --git a/requirements/dev.txt b/requirements/dev.txt index 2d36640..5beda19 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -21,6 +21,10 @@ astroid==3.1.0 # -r requirements/quality.txt # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/quality.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/quality.txt @@ -364,6 +368,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/quality.txt requests==2.31.0 # via # -r requirements/quality.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index 9c29f49..58c8b22 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -20,6 +20,10 @@ asgiref==3.7.2 # via # -r requirements/test.txt # django +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis babel==2.14.0 # via # pydata-sphinx-theme @@ -298,6 +302,8 @@ pyyaml==6.0.1 # xblock readme-renderer==43.0 # via twine +redis==5.0.3 + # via -r requirements/test.txt requests==2.31.0 # via # -r requirements/test.txt diff --git a/requirements/quality.txt b/requirements/quality.txt index 81d83a1..c15ebff 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -20,6 +20,10 @@ astroid==3.1.0 # via # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/test.txt @@ -275,6 +279,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/test.txt requests==2.31.0 # via # -r requirements/test.txt diff --git a/requirements/test.txt b/requirements/test.txt index 7c74ace..98f35cf 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -16,6 +16,10 @@ asgiref==3.7.2 # via # -r requirements/base.txt # django +async-timeout==4.0.3 + # via + # -r requirements/base.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/base.txt @@ -213,6 +217,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/base.txt requests==2.31.0 # via # -r requirements/base.txt diff --git a/test_settings.py b/test_settings.py index 01e636c..79d2bff 100644 --- a/test_settings.py +++ b/test_settings.py @@ -24,8 +24,14 @@ }, } - -INSTALLED_APPS = ("platform_plugin_aspects",) +INSTALLED_APPS = ( + "django.contrib.admin", + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.messages", + "django.contrib.sessions", + "platform_plugin_aspects", +) EVENT_SINK_CLICKHOUSE_MODEL_CONFIG = { "user_profile": {