Skip to content

Commit

Permalink
feat: Add command to monitor the load test
Browse files Browse the repository at this point in the history
This doesn't work in the current form, but is getting close.
  • Loading branch information
bmtcril committed Mar 8, 2024
1 parent 72e7980 commit 800b987
Showing 1 changed file with 71 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""
Monitors the load test tracking script and saves output for later analysis.
"""
import csv
import io
import json
import logging
import datetime
import uuid
from pprint import pprint
from time import sleep
from typing import Any

Check warning on line 12 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L4-L12

Added lines #L4 - L12 were not covered by tests
Expand All @@ -22,42 +26,77 @@

class Monitor:
def __init__(self, sleep_time: float):
# TODO: Eventually I'd like to store the stats data in Clickhouse with a sink
# self.sink = BaseSink()
self.run_id = str(uuid.uuid4())[:6]
self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"]

self.ch_auth = ClickHouseAuth(

Check warning on line 31 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L27-L31

Added lines #L27 - L31 were not covered by tests
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"],
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"],
)
self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"]
self.ch_xapi_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get("xapi_database", "xapi")
self.ch_xapi_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get("xapi_table", "xapi_events_all")
self.ch_stats_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get("stats_database", "load_test")
self.ch_stats_table = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG.get("stats_table", "load_test_stats")
self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[

Check warning on line 39 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L35-L39

Added lines #L35 - L39 were not covered by tests
"timeout_secs"
]
self.sleep_time = sleep_time

Check warning on line 42 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L42

Added line #L42 was not covered by tests

def run(self) -> None:
collect_redis_bus = True
collect_celery = True
collect_kafka_bus = False

Check warning on line 47 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L44-L47

Added lines #L44 - L47 were not covered by tests

while True:
# TODO: Need to pass in which stats to collect, or infer them from config
self.get_clickhouse_stats()
# self.get_celery_stats()
self.get_redis_bus_stats()
current_stats = {"clickhouse": self.get_clickhouse_stats()}

Check warning on line 51 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L51

Added line #L51 was not covered by tests
if collect_redis_bus:
current_stats["redis_bus"] = self.get_redis_bus_stats()

Check warning on line 53 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L53

Added line #L53 was not covered by tests
if collect_celery:
current_stats["celery"] = self.get_celery_stats()

Check warning on line 55 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L55

Added line #L55 was not covered by tests
# if collect_kafka_bus:
# current_stats["kafka_bus"] = self.get_kafka_stats()

self.store_stats(current_stats)

Check warning on line 59 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L59

Added line #L59 was not covered by tests

# TODO: This should eventually be sleep time - however long it's been since the last run
# so it stays on a steady cadence if the stats start taking longer. It should also
# error if stats collection starts taking longer than sleep time.
sleep(self.sleep_time)

Check warning on line 64 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L64

Added line #L64 was not covered by tests

def store_stats(self, current_stats: dict) -> None:
stats = json.dumps(current_stats)

Check warning on line 67 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L66-L67

Added lines #L66 - L67 were not covered by tests

# TODO: There must be a way to parameterize this
insert = f"""INSERT INTO {self.ch_stats_table} (run_id, stats) FORMAT CSV"""

Check warning on line 70 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L70

Added line #L70 was not covered by tests

output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC)
writer.writerow((self.run_id, stats))

Check warning on line 74 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L72-L74

Added lines #L72 - L74 were not covered by tests

response = requests.post(

Check warning on line 76 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L76

Added line #L76 was not covered by tests
url=self.ch_url,
auth=self.ch_auth,
params={
"database": self.ch_database,
"query": insert
},
data=output.getvalue().encode("utf-8"),
timeout=self.ch_timeout_secs
)

response.raise_for_status()
print(datetime.datetime.now())

Check warning on line 88 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L87-L88

Added lines #L87 - L88 were not covered by tests

def get_clickhouse_stats(self):
select = f"""

Check warning on line 91 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L90-L91

Added lines #L90 - L91 were not covered by tests
SELECT count(*) as ttl_count, max(emission_time) as most_recent,
date_diff('second', max(emission_time), now())
SELECT
count(*) as ttl_count,
max(emission_time) as most_recent,
date_diff('second', max(emission_time), now()) as lag_seconds
FROM {self.ch_xapi_table}
FORMAT JSON
"""

response = requests.post(

Check warning on line 100 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L100

Added line #L100 was not covered by tests
url=self.ch_url,
auth=self.ch_auth,
Expand All @@ -67,19 +106,29 @@ def get_clickhouse_stats(self):
},
timeout=self.ch_timeout_secs
)
print(response.text)

response.raise_for_status()
resp = response.json()["data"][0]
print(f"Clickhouse lag seconds: {resp['lag_seconds']}")

Check warning on line 112 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L110-L112

Added lines #L110 - L112 were not covered by tests

return response
return {

Check warning on line 114 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L114

Added line #L114 was not covered by tests
"total_rows": resp["ttl_count"],
"most_recent_event": resp["most_recent"],
"lag_seconds": resp["lag_seconds"],
}

def get_celery_stats(self):

Check warning on line 120 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L120

Added line #L120 was not covered by tests
# TODO: Still trying to find the right way to access these queues
r = redis.Redis(host="redis", port=6379, db=1)
if r.llen("edx.lms.core.default"):
raise Exception("yey")
# TODO: Get this information from config
r = redis.Redis(host="redis", port=6379, db=0)
lms_queue = r.llen("edx.lms.core.default")
cms_queue = r.llen("edx.cms.core.default")

Check warning on line 124 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L122-L124

Added lines #L122 - L124 were not covered by tests

print(f"Celery queues: LMS {lms_queue}, CMS {cms_queue}")

Check warning on line 126 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L126

Added line #L126 was not covered by tests

if r.llen("edx.cms.core.default"):
raise Exception("yey1111")
return {

Check warning on line 128 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L128

Added line #L128 was not covered by tests
"lms_queue_length": lms_queue,
"cms_queue_length": cms_queue,
}

def get_redis_bus_stats(self):

Check warning on line 133 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L133

Added line #L133 was not covered by tests
# TODO: Get this information from config
Expand Down Expand Up @@ -115,11 +164,15 @@ def get_redis_bus_stats(self):
consumer_stats = {
"total_events": info["length"],
"consumers": [
{"name": x[1], "processing": x[7], "queue_length": len(x[9])}
{"name": str(x[1]), "processing": x[7], "queue_length": len(x[9])}
for x in all_consumers
]
}

total_queue = sum([x["queue_length"] for x in consumer_stats["consumers"]])

print(f"Redis bus total queue length: {total_queue}")

Check warning on line 174 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L174

Added line #L174 was not covered by tests

return consumer_stats

Check warning on line 176 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L176

Added line #L176 was not covered by tests


Expand Down

0 comments on commit 800b987

Please sign in to comment.