From fdc60c0910519ca1971de4aaf7934fc52b40da4d Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Thu, 7 Mar 2024 09:20:09 -0500 Subject: [PATCH] feat: Add commands to load test tracking events --- docs/how-tos/index.rst | 5 + docs/how-tos/load_test_tracking_pipeline.rst | 145 ++++++ .../commands/load_test_tracking_events.py | 313 +++++++++++++ .../commands/monitor_load_test_tracking.py | 438 ++++++++++++++++++ .../tests/commands/test_load_test_monitor.py | 154 ++++++ .../test_load_test_tracking_events.py | 110 +++++ platform_plugin_aspects/utils.py | 2 +- requirements/base.in | 1 + requirements/base.txt | 4 + requirements/dev.txt | 6 + requirements/doc.txt | 6 + requirements/quality.txt | 6 + requirements/test.txt | 6 + test_settings.py | 1 - tox.ini | 2 +- 15 files changed, 1196 insertions(+), 3 deletions(-) create mode 100644 docs/how-tos/load_test_tracking_pipeline.rst create mode 100644 platform_plugin_aspects/management/commands/load_test_tracking_events.py create mode 100644 platform_plugin_aspects/management/commands/monitor_load_test_tracking.py create mode 100644 platform_plugin_aspects/tests/commands/test_load_test_monitor.py create mode 100644 platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py diff --git a/docs/how-tos/index.rst b/docs/how-tos/index.rst index 5147f80..1c28996 100644 --- a/docs/how-tos/index.rst +++ b/docs/how-tos/index.rst @@ -1,2 +1,7 @@ How-tos ####### + +.. toctree:: + :maxdepth: 2 + + load_test_tracking_pipeline diff --git a/docs/how-tos/load_test_tracking_pipeline.rst b/docs/how-tos/load_test_tracking_pipeline.rst new file mode 100644 index 0000000..6d97e75 --- /dev/null +++ b/docs/how-tos/load_test_tracking_pipeline.rst @@ -0,0 +1,145 @@ +Load Testing Tracking Event Pipelines +##################################### + +Aspects has several ways of getting learning traces from tracking events to xAPI statements and then to ClickHouse. In order to be able to emulate them all in a realistic way for performance testing we've created two management commands that are available in edx-platform when ``platform-plugin-aspects`` is installed: + +``load_test_tracking_events`` generates tracking events by creating a test course, a configured number of test users, and enrolling / unenrolling them in a loop. There are options to run a fixed number of events, or to run until killed. There is also a configurable sleep time that can be calibrated for long soak tests, or set to 0 to run in a tight loop. `Note: This needs to be run in CMS as it uses CMS APIs to create the course!` + +All options and defaults are described by running: + +``tutor local run cms ./manage.py cms load_test_tracking_events --help`` + + +``monitor_load_test_tracking`` gathers performance data on the given system while a test is running and stores the data in ClickHouse for later reporting. A configurable sleep time allows you to determine how often stats are collected. The supported backends are: + +- ``celery`` +- ``vector`` +- ``redis_bus`` +- ``kafka_bus`` + +All options and defaults are described by running: + +``tutor local run lms ./manage.py lms monitor_load_test_tracking --help`` + +Each backend stores different statistics, since they each have unique properties. They log the most important stats to the the console for easy monitoring of the test. The data points we're most interested in capturing are: + +- How old is the most recent xAPI event in ClickHouse? What is the lag from event generation to being queryable? +- How far behind is the pipeline? How many events per second can this configuration handle? When will we run out of storage and break the system? + + +Running a test +-------------- + +**The test will create persistent data in your environment! Do not run on production server!** + +#. Make sure you have an isolated test system that is configured in a way that you understand. The point of the test is to stress the system, so using shared infrastructure will skew the test results or potentially cause outages. + +#. Note which backend your system is configured to use, by default Open edX uses Celery. + +#. Make sure this plugin is installed and activated, configuration has been saved, and you have rebuilt the ``openedx`` image. + +#. Start up your environment and let it hit steady state, you may wish to log in and go to the course homepage just to make sure all startup is complete, otherwise the first events will be artificially slow. + +#. Start the monitor. This command will pull ClickHouse lag and Celery queue lengths every 5 seconds: ``tutor local run lms ./manage.py lms monitor_load_test_tracking --backend celery --sleep_time 5`` + +#. Start the test. This command will create a test course, 10 test users, and create 100 events with 0.5 seconds sleep in between: ``tutor local run cms ./manage.py cms load_test_tracking_events --num_users 10 --num_events 100 --sleep_time 0.5`` + +#. Alternatively you can run a test that will continue until stopped, and configure a prefix to your user names if you want to separate them from other tests in the database: ``tutor local run cms ./manage.py cms load_test_tracking_events --num_users 10 --run_until_killed --sleep_time 0.5 --username_prefix loadtest2_`` + +#. Stop the test and monitor with ``ctrl-c``, you may want to let the monitor run until the queue of your system is cleared to get full data on how long it takes to recover from a backlog of events. + +#. Check the table in ClickHouse for the results of the run: ``event_sink.load_test_stats``. Each run has a unique identifier, and each row has a timestamp. The stats themselves are stored in JSON as they differ a great deal between backends. With this information you should be able to chart a run and see how the system performs at various levels of load. + +Celery Notes +------------ + +Celery can be scaled by adding more CMS workers. + +The JSON in ClickHouse for Celery looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "celery": { + "lms_queue_length": 0, # Size of the redis queue of pending Celery tasks for the LMS workers + "cms_queue_length": 0 # Size of the redis queue of pending Celery tasks for the CMS workers + } + } + + +Vector Notes +------------ + +Vector scales differently depending on your deployment strategy. Note that Vector's stats are different from other backends. We are only able to calculate the lag between when Vector reads a line from the logs and when it is sent to ClickHouse. There is no way of telling how far behind the log Vector is, but this lag is still useful to see if ClickHouse insert times are slowing down Vector. Instead we should rely on the ClickHouse lag_seconds metric to get a better idea of how far behind Vector is. + +The JSON in ClickHouse for Vector looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "vector": { + "events_received": 20.0, + "events_sent": 10.0, + "lag": 10.0 + } + } + } + + + +Redis Bus Notes +--------------- + +The redis bus can be scaled by adding more consumers. + +The JSON in ClickHouse for redis bus looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "redis_bus": { + "total_events": 77, # Total number of events that have been added to the redis Stream + "consumers": [ + { + "name": "aspects", # Name of each consumer in the consumer group + "processing": 0, # How many events are currently being processed by that consumer (should be 0 or 1) + "queue_length": 0 # How many events are waiting to be processed in the stream + } + ] + } + } + + +Kafka Bus Notes +--------------- + +The Kafka bus can be scaled by adding more consumers. + +The JSON in ClickHouse for the Kafka bus looks like this:: + + { + "clickhouse": { + "total_rows": "1273", # Total xAPI rows in the database + "most_recent_event": "2024-03-12 14:46:32.828206", + "lag_seconds": "2912" # Difference between now() and the most_recent_event + }, + "kafka_bus": { + "topic": "dev-analytics", # The name of the Kafka topic that's being read + "partitions": [ + { + "partition": 0, # The index of the partition + "lag": 150 # How many events are waiting to be processed by the partition + } + ] + } + } + diff --git a/platform_plugin_aspects/management/commands/load_test_tracking_events.py b/platform_plugin_aspects/management/commands/load_test_tracking_events.py new file mode 100644 index 0000000..d6a58ad --- /dev/null +++ b/platform_plugin_aspects/management/commands/load_test_tracking_events.py @@ -0,0 +1,313 @@ +""" +Generates tracking events by creating test users and fake activity. + +This should never be run on a production server as it will generate a lot of +bad data. It is entirely for benchmarking purposes in load test environments. +It is also fragile due to reaching into the edx-platform testing internals. +""" + +import csv +import io +import json +import logging +import uuid +from datetime import datetime, timedelta +from random import choice +from textwrap import dedent +from time import sleep +from typing import Any, List + +import requests +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError + +from platform_plugin_aspects.sinks.base_sink import ClickHouseAuth + +# For testing we won't be able to import from edx-platform +try: # pragma: no cover + from cms.djangoapps.contentstore.views.course import create_new_course_in_store + from common.djangoapps.student.helpers import do_create_account + from common.djangoapps.student.models.course_enrollment import CourseEnrollment + from openedx.core.djangoapps.user_authn.views.registration_form import ( + AccountCreationForm, + ) + from xmodule.modulestore import ModuleStoreEnum + + RUNNING_IN_PLATFORM = True +except ImportError: + create_new_course_in_store = None + do_create_account = None + CourseEnrollment = None + AccountCreationForm = None + ModuleStoreEnum = None + + RUNNING_IN_PLATFORM = False + +log = logging.getLogger("tracking_event_loadtest") + + +class LoadTest: + """ + Runs the load test and reports results to ClickHouse. + """ + + users = [] + sent_event_count = 0 + ch_runs_table = "load_test_runs" + + def __init__(self, num_users: int, username_prefix: str, tags: List[str]): + self.num_users = num_users + self.username_prefix = username_prefix + self.tags = tags + self.run_id = str(uuid.uuid4())[:6] + + self.course_shortname = str(uuid.uuid4())[:6] + + self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] + self.ch_auth = ClickHouseAuth( + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"], + ) + self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] + self.ch_xapi_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_database", "xapi" + ) + self.ch_runs_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "runs_table", "load_test_runs" + ) + self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[ + "timeout_secs" + ] + + self.instructor = self.create_user( + username=f"instructor_{self.course_shortname}", + name="Instructor", + password="aspects", + email=f"instructor_{self.course_shortname}@openedx.invalid", + ) + self.create_course() + self.record_start() + self.create_and_enroll_learners(num_users, username_prefix) + + def create_course(self) -> None: + """ + Create a course using the CMS API. + """ + start_date = datetime.now() - timedelta(days=7) + fields = { + "start": start_date, + "display_name": f"Course {self.course_shortname}", + } + + log.info( + f"""Creating course: + Instructor: {self.instructor.id} + Org: "OEX" + Number: "{self.course_shortname}" + Run: "2024-1" + Fields: {fields} + """ + ) + + self.course = create_new_course_in_store( + ModuleStoreEnum.Type.split, + self.instructor, + "OEX", + self.course_shortname, + "2024-1", + fields, + ) + + log.info(f"Created course {self.course.id}") + + def record_start(self) -> None: + """ + Send a start event to ClickHouse. + + Start and end events are used by the monitor script to know when to + begin and end monitoring. + """ + self.record_to_clickhouse( + "start", + { + "tags": self.tags, + "course_id": str(self.course.id), + "num_users": self.num_users, + "username_prefix": self.username_prefix, + }, + ) + + # Let the monitoring script connect, otherwise we can finish the test before it even + # knows we've started. + sleep(5) + + def record_end(self) -> None: + """ + Send an end event to ClickHouse. + """ + self.record_to_clickhouse("end", {"sent_event_count": self.sent_event_count}) + + def record_to_clickhouse(self, event_type, extra) -> None: + """ + Send the run events to ClickHouse. + """ + insert = ( + f"INSERT INTO {self.ch_runs_table} (run_id, event_type, extra) FORMAT CSV " + ) + + output = io.StringIO() + writer = csv.writer(output) + writer.writerow((self.run_id, event_type, json.dumps(extra))) + print(output.getvalue().encode("utf-8")) + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_database, "query": insert}, + data=output.getvalue().encode("utf-8"), + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + def create_and_enroll_learners(self, num_users, username_prefix): + """ + Uses create test users and enroll them in our test course. + """ + log.info(f"Creating {num_users} users prefixed with {username_prefix}.") + + for _ in range(num_users): + user_short_name = str(uuid.uuid4())[:6] + u = self.create_user( + username=f"{username_prefix}_{user_short_name}", + name=f"Learner {user_short_name}", + password="aspects", + email=f"{user_short_name}@openedx.invalid", + ) + self.users.append(u) + e = CourseEnrollment.get_or_create_enrollment( + user=u, course_key=self.course.id + ) + e.is_active = True + e.save() + + def create_user(self, **user_data): + """ + Create, activate, and return a user using the edx-platform API. + """ + account_creation_form = AccountCreationForm(data=user_data, tos_required=False) + + user, _, _ = do_create_account(account_creation_form) + user.is_active = True + user.save() + return user + + def trigger_events( + self, num_events: int, sleep_time: float, run_until_killed: bool + ) -> None: + """ + Trigger the appropriate number of events based on configuration. + """ + + if run_until_killed: + log.info(f"Creating events until killed with {sleep_time} sleep between!") + while True: + self.trigger_event_and_sleep(sleep_time) + else: + log.info(f"Creating {num_events} event with {sleep_time} sleep between!") + for _ in range(num_events): + self.trigger_event_and_sleep(sleep_time) + + def trigger_event_and_sleep(self, sleep_time: float) -> None: + """ + Cause a tracking log to be emitted and sleep the specified amount of time. + """ + user = choice(self.users) + + e = CourseEnrollment.get_or_create_enrollment( + user=user, course_key=self.course.id + ) + + if e.is_active: + e.unenroll(user, self.course.id) + else: + e.enroll(user, self.course.id) # pragma: no cover + + self.sent_event_count += 1 + sleep(sleep_time) + + +class Command(BaseCommand): + """ + Create tracking log events for load testing purposes. + + Example: + tutor local run lms ./manage.py lms load_test_tracking_events --sleep_time 0 --tags celery 1worker + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser: Any) -> None: + parser.add_argument( + "--num_users", + type=int, + default=10, + help="The number of users to create. All events will be generated for these learners.", + ) + parser.add_argument( + "--username_prefix", + type=str, + default="lt_", + help="Prefix for the generated user names.", + ) + parser.add_argument( + "--num_events", + type=int, + default=10, + help="The number of events to generate. This is ignored if --run_until_killed is set.", + ) + parser.add_argument( + "--run_until_killed", + action="store_true", + default=False, + help="If this is set, the process will run endlessly until killed.", + ) + parser.add_argument( + "--sleep_time", + type=float, + default=0.75, + help="Fractional number of seconds to sleep between sending events.", + ) + parser.add_argument( + "--tags", + nargs="*", + help="Tags to help define the run (ex: --tags celery 3workers k8s).", + ) + + def handle(self, *args, **options): + """ + Create users and trigger events for them as configured above. + """ + if not RUNNING_IN_PLATFORM: # pragma: no cover + raise CommandError("This command must be run in the Open edX LMS or CMS.") + + start = datetime.now() + lt = LoadTest(options["num_users"], options["username_prefix"], options["tags"]) + + try: + lt.trigger_events( + options["num_events"], + options["sleep_time"], + options["run_until_killed"], + ) + lt.record_end() + except KeyboardInterrupt: + log.warning("Killed by keyboard, finishing.") + lt.record_end() + + end = datetime.now() + log.info(f"Sent {lt.sent_event_count} events in {end - start}.") + + # Wait 5 seconds for Kafka to clear the queue otherwise not all events + # may be delivered. + sleep(5) diff --git a/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py new file mode 100644 index 0000000..fe4ba86 --- /dev/null +++ b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py @@ -0,0 +1,438 @@ +""" +Monitors the load test tracking script and saves output for later analysis. +""" + +import csv +import datetime +import io +import json +import logging +from textwrap import dedent +from time import sleep +from typing import Any, Union + +import redis +import requests +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError + +from platform_plugin_aspects.sinks.base_sink import ClickHouseAuth + +try: + import confluent_kafka +except ImportError: + confluent_kafka = None + +log = logging.getLogger("tracking_event_loadtest_monitor") + + +class Monitor: + """ + Manages the configuration and state of the load test monitor. + """ + + run_id = None + + def __init__(self, sleep_time: float, backend: str): + self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] + self.ch_auth = ClickHouseAuth( + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"], + ) + self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] + self.ch_xapi_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_database", "xapi" + ) + self.ch_xapi_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "xapi_table", "xapi_events_all" + ) + self.ch_stats_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "stats_table", "load_test_stats" + ) + self.ch_runs_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get( + "runs_table", "load_test_runs" + ) + self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[ + "timeout_secs" + ] + self.sleep_time = sleep_time + self.backend = backend + + def check_for_run_id(self) -> Union[str, None]: + """ + Return a run id for any unfinished run started in the last minute. + """ + query = f""" + SELECT run_id + FROM {self.ch_runs_table} runs + WHERE event_type = 'start' + AND run_id NOT IN ( + SELECT run_id + FROM {self.ch_runs_table} + WHERE event_type = 'end' + ) + AND dateDiff('second', now(), timestamp) <= 60 + """ + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_database, "query": query}, + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + # ClickHouse will respond with the ID and a newline or an empty string + return response.text.strip() + + def wait_for_start(self) -> None: + """ + Wait for a run to start, tested by checking ClickHouse for new run ids. + """ + while True: + self.run_id = self.check_for_run_id() + + if self.run_id: + log.info(f"Found run id {self.run_id}! Starting monitor.") + return + + log.info("No run id from the last 60 seconds found...") # pragma: no cover + sleep(2) # pragma: no cover + + def test_has_ended(self) -> bool: + """ + Return True if the current run has finished. + """ + query = f""" + SELECT run_id + FROM {self.ch_runs_table} runs + WHERE run_id = '{self.run_id}' + AND event_type = 'end' + """ + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_database, "query": query}, + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + # ClickHouse will respond with the ID and a newline or an empty string + return response.text.strip() == self.run_id + + def run(self) -> None: + """ + Wait for a new test to start, then run the monitor until killed. + """ + collect_redis_bus = self.backend == "redis_bus" + collect_celery = self.backend == "celery" + collect_kafka_bus = self.backend == "kafka_bus" + collect_vector = self.backend == "vector" + + log.info("Waiting for test to start...") + self.wait_for_start() + + # Once the test has ended we will wait for the backend to drain the + # backlog before exiting. This tracks the "in between" state. + shutting_down = False + + while True: + start = datetime.datetime.now() + log.info(f"----------- {start} --------") + + current_stats = {"clickhouse": self.get_clickhouse_stats()} + if collect_redis_bus: + current_stats["redis_bus"] = self.get_redis_bus_stats() + lag = current_stats["redis_bus"]["lag"] + elif collect_celery: + current_stats["celery"] = self.get_celery_stats() + lag = current_stats["celery"]["lag"] + elif collect_kafka_bus: + current_stats["kafka_bus"] = self.get_kafka_bus_stats() + lag = current_stats["kafka_bus"]["lag"] + elif collect_vector: + current_stats["vector"] = self.get_vector_stats() + lag = current_stats["vector"]["lag"] + + self.store_stats(current_stats) + + if not shutting_down and self.test_has_ended(): # pragma: no cover + shutting_down = True + log.info( + "----- Test has ended, waiting for events to drain off or force end with CTRL-C" + ) + + if shutting_down and not lag: + log.info("----- Test has ended, events are drained. Ending monitor!") + break + + # Try to keep our collection cadence to exactly what was asked + # otherwise + check_duration = datetime.datetime.now() - start + + if check_duration.total_seconds() >= self.sleep_time: + log.warning( + f"It took {check_duration} to collect and store stats, this is greater than the sleep time!" + ) + + next_sleep = self.sleep_time - check_duration.total_seconds() + sleep(next_sleep) + + def store_stats(self, current_stats: dict) -> None: + """ + Send the results for this iteration to ClickHouse. + """ + stats = json.dumps(current_stats) + + insert = f"""INSERT INTO {self.ch_stats_table} (run_id, stats) FORMAT CSV""" + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) + writer.writerow((self.run_id, stats)) + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_database, "query": insert}, + data=output.getvalue().encode("utf-8"), + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + def get_clickhouse_stats(self): + """ + Get the current state of ClickHouse for this iteration. + """ + select = f""" + SELECT + count(*) as ttl_count, + max(emission_time) as most_recent, + date_diff('second', max(emission_time), now()) as lag_seconds + FROM {self.ch_xapi_table} + FINAL + FORMAT JSON + """ + + response = requests.post( + url=self.ch_url, + auth=self.ch_auth, + params={"database": self.ch_xapi_database, "query": select}, + timeout=self.ch_timeout_secs, + ) + + response.raise_for_status() + + resp = response.json()["data"][0] + log.info(f"Clickhouse lag seconds: {resp['lag_seconds']}") + + return { + "total_rows": resp["ttl_count"], + "most_recent_event": resp["most_recent"], + "lag_seconds": resp["lag_seconds"], + } + + def get_celery_stats(self): + """ + Get the current state of Celery for this iteration. + """ + r = redis.Redis.from_url(settings.BROKER_URL) + lms_queue = r.llen("edx.lms.core.default") + cms_queue = r.llen("edx.cms.core.default") + + log.info(f"Celery queues: LMS {lms_queue}, CMS {cms_queue}") + + return { + "lms_queue_length": lms_queue, + "cms_queue_length": cms_queue, + "lag": lms_queue + cms_queue, + } + + def get_redis_bus_stats(self): + """ + Get the current state of redis for this iteration. + """ + r = redis.Redis.from_url(settings.EVENT_BUS_REDIS_CONNECTION_URL) + info = r.xinfo_stream("openedx-analytics", full=True) + + lag = 0 + + try: + for g in info["groups"]: + lag += g["lag"] + # Older versions of redis don't have "lag". + except KeyError: # pragma: no cover + pass + + consumer_stats = { + "total_events": info["length"], + "lag": lag, + } + + log.info(f"Redis bus queue length: {consumer_stats['lag']}") + + return consumer_stats + + def get_kafka_bus_stats(self): + """ + Get the current state of ClickHouse for this iteration. + """ + if not confluent_kafka: # pragma: no cover + raise CommandError( + "Trying to monitor Kafka bus, but confluent_kafka is not installed" + ) + + brokers = settings.EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS + topic = f"{settings.EVENT_BUS_TOPIC_PREFIX}-analytics" + group = "analytics-service" + + # This consumer will not join the group, but the group.id is required by + # committed() to know which group to get offsets for. + consumer = confluent_kafka.Consumer( + {"bootstrap.servers": brokers, "group.id": group} + ) + + # Get the topic's partitions + metadata = consumer.list_topics(topic, timeout=10) + + if metadata.topics[topic].error is not None: # pragma: no cover + log.info(metadata.topics[topic].error) + + partitions = [ + confluent_kafka.TopicPartition(topic, p) + for p in metadata.topics[topic].partitions + ] + committed = consumer.committed(partitions, timeout=10) + + consumer_stats = { + "topic": topic, + "partitions": [], + } + + total_lag = 0 + + for partition in committed: + # Get the partitions low and high watermark offsets. + low, high = consumer.get_watermark_offsets( + partition, timeout=10, cached=False + ) + + if high < 0: # pragma: no cover + lag = 0 + elif partition.offset < 0: # pragma: no cover + # No committed offset, show total message count as lag. + # The actual message count may be lower due to compaction + # and record deletions. + lag = high - low + else: + lag = high - partition.offset + + log.info(f"{partition.topic} [{partition.partition}] Lag: {lag}") + total_lag += lag + + consumer_stats["partitions"].append( + { + "partition": partition.partition, + "lag": lag, + } + ) + + consumer.close() + consumer_stats["lag"] = total_lag + return consumer_stats + + def _call_vector_graphql(self): + """ + Make the actual GraphQL call to the Vector API. + """ + # These values are hard coded in tutor local, K8s changes TBD + url = "http://vector:8686/graphql" + query = """ + { + sinks(filter:{componentId:{equals:"clickhouse_xapi"}}) { + edges { + node { + ...on Sink { + componentId, + componentType, + metrics { + receivedEventsTotal {timestamp, receivedEventsTotal}, + sentEventsTotal {timestamp, sentEventsTotal} + } + } + } + } + } + } + """ + r = requests.post(url, json={"query": query}, timeout=10) + r.raise_for_status() + return r.json()["data"]["sinks"]["edges"][0]["node"]["metrics"] + + def get_vector_stats(self): + """ + Get the current state of Vector for this iteration. + """ + metrics = self._call_vector_graphql() + + # These will be null until events start arriving + received = ( + metrics["receivedEventsTotal"]["receivedEventsTotal"] + if metrics["receivedEventsTotal"] + else 0.0 + ) + sent = ( + metrics["sentEventsTotal"]["sentEventsTotal"] + if metrics["sentEventsTotal"] + else 0.0 + ) + + rtn = {"events_received": received, "events_sent": sent, "lag": received - sent} + + log.info( + f"Vector received: {rtn['events_received']} sent: {rtn['events_sent']} lag: {rtn.get('lag')}" + ) + return rtn + + +class Command(BaseCommand): + """ + Dump objects to a ClickHouse instance. + + Example: + tutor local run lms ./manage.py lms monitor_load_test_tracking --sleep_time 5 --backend redis_bus + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser: Any) -> None: + parser.add_argument( + "--sleep_time", + type=float, + default=10, + help="Fractional number of seconds to sleep between gathering data.", + ) + parser.add_argument( + "--backend", + choices=["redis_bus", "kafka_bus", "celery", "vector"], + default="celery", + help="Backend used to send events to ClickHouse", + ) + + def handle(self, *_, **options): + """ + Creates users and triggers events for them as configured above. + """ + start = datetime.datetime.now() + log.info( + f"Starting monitor for {options['backend']} with sleep of {options['sleep_time']} seconds" + ) + + monitor = Monitor(options["sleep_time"], options["backend"]) + monitor.run() + + end = datetime.datetime.now() + log.info(f"Monitored from {start} to {end} (duration {end - start}).") diff --git a/platform_plugin_aspects/tests/commands/test_load_test_monitor.py b/platform_plugin_aspects/tests/commands/test_load_test_monitor.py new file mode 100644 index 0000000..15579f6 --- /dev/null +++ b/platform_plugin_aspects/tests/commands/test_load_test_monitor.py @@ -0,0 +1,154 @@ +""" +Tests for the monitor_load_test_tracking management command. +""" + +import datetime +from collections import namedtuple +from unittest.mock import DEFAULT, Mock, patch + +import pytest +from django.core.management import call_command + +CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_logs"]) +KafkaPartition = namedtuple("KafkaPartition", ["offset", "topic", "partition"]) + + +def load_test_command_basic_options(): + """ + Pytest params for all the different non-ClickHouse command options. + """ + options = [ + # Test our defaults + CommandOptions( + options={}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Celery queues: LMS 0, CMS 0", + "Starting monitor for celery with sleep of 10 seconds", + ], + ), + # Test overriding sleep time + CommandOptions( + options={"sleep_time": 5}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Celery queues: LMS 0, CMS 0", + "Starting monitor for celery with sleep of 5 seconds", + ], + ), + # Test explicit celery backend + CommandOptions( + options={"backend": "celery"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Celery queues: LMS 0, CMS 0", + "Starting monitor for celery with sleep of 10 seconds", + ], + ), + # Test redis bus backend + CommandOptions( + options={"backend": "redis_bus"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Starting monitor for redis_bus with sleep of 10 seconds", + "Redis bus queue length: 0", + ], + ), + # Test kafka bus backend + CommandOptions( + options={"backend": "kafka_bus"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Starting monitor for kafka_bus with sleep of 10 seconds", + "test [test] Lag: 0", + ], + ), + # Test vector backend + CommandOptions( + options={"backend": "vector"}, + expected_logs=[ + "Clickhouse lag seconds: 1", + "Vector received: 10 sent: 10 lag: 0", + "Starting monitor for vector with sleep of 10 seconds", + ], + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", load_test_command_basic_options()) +def test_monitor_options(test_command_option, caplog): + option_combination, expected_outputs = test_command_option + patch_prefix = ( + "platform_plugin_aspects.management.commands.monitor_load_test_tracking" + ) + + with patch.multiple( + f"{patch_prefix}", + requests=DEFAULT, + settings=DEFAULT, + redis=DEFAULT, + json=DEFAULT, + confluent_kafka=DEFAULT, + sleep=DEFAULT, + ) as patches: + # First response is the ClickHouse call to get the run id + patches["requests"].post.return_value.text.strip.return_value = "runabc" + + # Then the call to get clickhouse lag data + patches["requests"].post.return_value.json.side_effect = ( + { + "data": [ + { + "lag_seconds": 1, + "ttl_count": 100, + "most_recent": datetime.datetime.now().isoformat(), + }, + ] + }, + # Then the Vector API call, GraphQL is awful. + { + "data": { + "sinks": { + "edges": [ + { + "node": { + "metrics": { + "sentEventsTotal": {"sentEventsTotal": 10}, + "receivedEventsTotal": { + "receivedEventsTotal": 10 + }, + }, + }, + } + ], + } + } + }, + ) + + patches["redis"].Redis.from_url.return_value.llen.return_value = 0 + patches["redis"].Redis.from_url.return_value.xinfo_stream.return_value = { + "length": 100, + "groups": [ + {"name": "group1", "lag": 0}, + {"name": "group2", "lag": 0}, + ], + } + + patches["confluent_kafka"].Consumer.return_value.committed.return_value = { + KafkaPartition(offset=10, topic="test", partition="test"): Mock() + } + + patches[ + "confluent_kafka" + ].Consumer.return_value.get_watermark_offsets.return_value = (10, 10) + + call_command("monitor_load_test_tracking", **option_combination) + + print(caplog.text) + + for expected_output in expected_outputs: + assert expected_output in caplog.text diff --git a/platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py b/platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py new file mode 100644 index 0000000..a41a513 --- /dev/null +++ b/platform_plugin_aspects/tests/commands/test_load_test_tracking_events.py @@ -0,0 +1,110 @@ +""" +Tests for the load_test_tracking_events management command. +""" + +from collections import namedtuple +from unittest.mock import DEFAULT, Mock, patch + +import pytest +from django.core.management import call_command + +CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_logs"]) + + +def load_test_command_basic_options(): + """ + Pytest params for all the different non-ClickHouse command options. + + "--num_users", + "--username_prefix", + "--num_events", + "--run_until_killed", + "--sleep_time", + """ + options = [ + CommandOptions( + options={"sleep_time": 0}, + expected_logs=[ + "events in", + "Creating 10 users prefixed with lt_", + "Creating 10 event with 0 sleep between!", + ], + ), + CommandOptions( + options={"sleep_time": 0.1, "num_users": 2, "username_prefix": "cheese_"}, + expected_logs=[ + "events in", + "Creating 2 users prefixed with cheese_", + "Creating 10 event with 0.1 sleep between!", + ], + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", load_test_command_basic_options()) +def test_load_test_options(test_command_option, caplog): + option_combination, expected_outputs = test_command_option + + fake_course = Mock() + fake_course.return_value.id = "fake_course_id" + + patch_prefix = ( + "platform_plugin_aspects.management.commands.load_test_tracking_events" + ) + with patch.multiple( + f"{patch_prefix}", + create_new_course_in_store=fake_course, + do_create_account=lambda _: (Mock(), DEFAULT, DEFAULT), + CourseEnrollment=DEFAULT, + AccountCreationForm=DEFAULT, + ModuleStoreEnum=DEFAULT, + RUNNING_IN_PLATFORM=True, + requests=DEFAULT, + sleep=DEFAULT, + ) as _: + call_command("load_test_tracking_events", **option_combination) + + print(caplog.text) + + for expected_output in expected_outputs: + assert expected_output in caplog.text + + +def test_load_test_run_until_killed(caplog): + fake_course = Mock() + fake_course.return_value.id = "fake_course_id" + + patch_prefix = ( + "platform_plugin_aspects.management.commands.load_test_tracking_events" + ) + + def fake_sleep_or_raise(sleep_time): + # Magic 5 is the hard coded time for start/end events + if sleep_time == 5: + return + # We use the sleep at the end of the loop to break out of it + else: + raise KeyboardInterrupt() + + with patch.multiple( + f"{patch_prefix}", + create_new_course_in_store=fake_course, + do_create_account=lambda _: (Mock(), DEFAULT, DEFAULT), + CourseEnrollment=DEFAULT, + AccountCreationForm=DEFAULT, + ModuleStoreEnum=DEFAULT, + RUNNING_IN_PLATFORM=True, + requests=DEFAULT, + sleep=Mock(side_effect=fake_sleep_or_raise), + ) as _: + call_command( + "load_test_tracking_events", **{"run_until_killed": True, "sleep_time": 0} + ) + + print(caplog.text) + + assert f"Creating events until killed with 0 sleep between!" in caplog.text + assert f"Killed by keyboard, finishing" in caplog.text diff --git a/platform_plugin_aspects/utils.py b/platform_plugin_aspects/utils.py index 1c14094..d44aeb6 100644 --- a/platform_plugin_aspects/utils.py +++ b/platform_plugin_aspects/utils.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -if settings.DEBUG: +if settings.DEBUG: # pragma: no cover os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" diff --git a/requirements/base.in b/requirements/base.in index f0bb4ae..6c3bf19 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -9,6 +9,7 @@ web_fragments django_crum celery # Asynchronous task execution library Django # Web application framework +redis requests # HTTP request library edx-django-utils # Django utilities, we use caching and monitoring edx-opaque-keys # Parsing library for course and usage keys diff --git a/requirements/base.txt b/requirements/base.txt index fc48dd7..0773f60 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,6 +10,8 @@ appdirs==1.4.4 # via fs asgiref==3.8.1 # via django +async-timeout==4.0.3 + # via redis backports-zoneinfo[tzdata]==0.2.1 # via # celery @@ -123,6 +125,8 @@ pyyaml==6.0.1 # code-annotations # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/base.in requests==2.31.0 # via # -r requirements/base.in diff --git a/requirements/dev.txt b/requirements/dev.txt index fb9acfd..8064c4a 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -21,6 +21,10 @@ astroid==3.1.0 # -r requirements/quality.txt # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/quality.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/quality.txt @@ -364,6 +368,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/quality.txt requests==2.31.0 # via # -r requirements/quality.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index 242d585..695e8e1 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -20,6 +20,10 @@ asgiref==3.8.1 # via # -r requirements/test.txt # django +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis babel==2.14.0 # via # pydata-sphinx-theme @@ -304,6 +308,8 @@ pyyaml==6.0.1 # xblock readme-renderer==43.0 # via twine +redis==5.0.3 + # via -r requirements/test.txt requests==2.31.0 # via # -r requirements/test.txt diff --git a/requirements/quality.txt b/requirements/quality.txt index 6a1a896..ce3b7c7 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -20,6 +20,10 @@ astroid==3.1.0 # via # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/test.txt @@ -275,6 +279,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/test.txt requests==2.31.0 # via # -r requirements/test.txt diff --git a/requirements/test.txt b/requirements/test.txt index 90bdc27..9a602e4 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -16,6 +16,10 @@ asgiref==3.8.1 # via # -r requirements/base.txt # django +async-timeout==4.0.3 + # via + # -r requirements/base.txt + # redis backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/base.txt @@ -213,6 +217,8 @@ pyyaml==6.0.1 # responses # superset-api-client # xblock +redis==5.0.3 + # via -r requirements/base.txt requests==2.31.0 # via # -r requirements/base.txt diff --git a/test_settings.py b/test_settings.py index 01e636c..f3a099c 100644 --- a/test_settings.py +++ b/test_settings.py @@ -24,7 +24,6 @@ }, } - INSTALLED_APPS = ("platform_plugin_aspects",) EVENT_SINK_CLICKHOUSE_MODEL_CONFIG = { diff --git a/tox.ini b/tox.ini index e57b6ff..c02f993 100644 --- a/tox.ini +++ b/tox.ini @@ -43,7 +43,7 @@ deps = -r{toxinidir}/requirements/test.txt commands = python manage.py check - pytest {posargs} + pytest {posargs} --full-trace [testenv:docs] setenv =