Skip to content

Commit

Permalink
Merge pull request #955 from openedx/cag/metrics-improves
Browse files Browse the repository at this point in the history
feat(metrics): sort metrics by clickhouse query time
  • Loading branch information
Cristhian Garcia authored Sep 30, 2024
2 parents c75cd98 + 20b4f53 commit e56179f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 72 deletions.
17 changes: 12 additions & 5 deletions tutoraspects/commands_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,14 @@ def init_clickhouse() -> list[tuple[str, str]]:
# Ex: "tutor local do performance-metrics "
@click.command(context_settings={"ignore_unknown_options": True})
@click.option(
"--course_key",
"--org",
default="",
help="A course_key to apply as a filter, you must include the 'course-v1:'.",
help="An organization to apply as a filter.",
)
@click.option(
"--course_name",
default="",
help="A course_name to apply as a filter.",
)
@click.option(
"--dashboard_slug", default="", help="Only run charts for the given dashboard."
Expand All @@ -168,13 +173,15 @@ def init_clickhouse() -> list[tuple[str, str]]:
@click.option(
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics(
course_key, dashboard_slug, slice_name, print_sql, fail_on_error
def performance_metrics( # pylint: disable=too-many-arguments,too-many-positional-arguments
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
) -> (list)[tuple[str, str]]:
"""
Job to measure performance metrics of charts and its queries in Superset and ClickHouse.
"""
options = f"--course_key {course_key}" if course_key else ""
options = ""
options += f"--org '{org}' " if org else ""
options += f"--course_name '{course_name}' " if course_name else ""
options += f" --dashboard_slug {dashboard_slug}" if dashboard_slug else ""
options += f' --slice_name "{slice_name}"' if slice_name else ""
options += " --print_sql" if print_sql else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
across Superset installations.
"""

from create_assets import BASE_DIR, ASSET_FOLDER_MAPPING, app

import json
import logging
import os
import time
Expand All @@ -20,10 +17,12 @@
import click
import sqlparse
import yaml
from create_assets import app

from flask import g
from superset import security_manager
from superset.commands.chart.data.get_data_command import ChartDataCommand
from superset.charts.schemas import ChartDataQueryContextSchema
from superset.commands.chart.data.get_data_command import ChartDataCommand
from superset.extensions import db
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
Expand All @@ -42,46 +41,53 @@
"Result rows: {result_rows}\n"
"Memory Usage (MB): {memory_usage_mb}\n"
"Row count (superset) {rowcount:}\n"
"Filters: {filters}\n\n"
"Filters: {filters}\n"
"SQL:\n"
"{sql}\n\n\n"
)


@click.command()
@click.option("--org", default="", help="An organization to apply as a filter.")
@click.option(
"--course_key",
"--course_name",
default="",
help="A course_key to apply as a filter, you must include the 'course-v1:'.")
help="A course_name to apply as a filter, you must include the 'course-v1:'.",
)
@click.option(
"--dashboard_slug",
default="",
help="Only run charts for the given dashboard.")
"--dashboard_slug", default="", help="Only run charts for the given dashboard."
)
@click.option(
"--slice_name",
default="",
help="Only run charts for the given slice name, if the name appears in more than "
"one dashboard it will be run for each.")
"one dashboard it will be run for each.",
)
@click.option(
"--print_sql",
is_flag=True,
default=False,
help="Whether to print the SQL run."
"--print_sql", is_flag=True, default=False, help="Whether to print the SQL run."
)
@click.option(
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics(course_key, dashboard_slug, slice_name, print_sql,
fail_on_error):
def performance_metrics(
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
):
"""
Measure the performance of the dashboard.
"""
# Mock the client name to identify the queries in the clickhouse system.query_log
# table by by the http_user_agent field.
extra_filters = []
if course_key:
extra_filters += [{"col": "course_key", "op": "==", "val": course_key}]
if course_name:
extra_filters += [{"col": "course_name", "op": "IN", "val": course_name}]
if org:
extra_filters += [{"col": "org", "op": "IN", "val": org}]

with patch("clickhouse_connect.common.build_client_name") as mock_build_client_name:
mock_build_client_name.return_value = RUN_ID
target_dashboards = [dashboard_slug] if dashboard_slug else {{SUPERSET_EMBEDDABLE_DASHBOARDS}}
target_dashboards = (
[dashboard_slug] if dashboard_slug else {{SUPERSET_EMBEDDABLE_DASHBOARDS}}
)

dashboards = (
db.session.query(Dashboard)
Expand All @@ -98,14 +104,13 @@ def performance_metrics(course_key, dashboard_slug, slice_name, print_sql,
logger.info(f"Dashboard: {dashboard.slug}")
for slice in dashboard.slices:
if slice_name and not slice_name == slice.slice_name:
logger.info(f"{slice.slice_name} doesn't match {slice_name}, "
f"skipping.")
logger.info(
f"{slice.slice_name} doesn't match {slice_name}, " f"skipping."
)
continue

query_context = get_slice_query_context(
slice,
query_contexts,
extra_filters
slice, query_contexts, extra_filters
)
result = measure_chart(slice, query_context, fail_on_error)
if not result:
Expand Down Expand Up @@ -167,27 +172,32 @@ def get_slice_query_context(slice, query_contexts, extra_filters=None):
}
)

query_context["form_data"]["extra_form_data"] = {"filters": extra_filters}

if extra_filters:
for query in query_context["queries"]:
query["filters"] += extra_filters

return query_context


def measure_chart(slice, query_context, fail_on_error):
def measure_chart(slice, query_context_dict, fail_on_error):
"""
Measure the performance of a chart and return the results.
"""
logger.info(f"Fetching slice data: {slice}")

g.user = security_manager.find_user(username="{{SUPERSET_ADMIN_USERNAME}}")
query_context = ChartDataQueryContextSchema().load(query_context)
query_context = ChartDataQueryContextSchema().load(query_context_dict)
command = ChartDataCommand(query_context)

start_time = datetime.now()
command.validate()
g.form_data = query_context.form_data
try:
start_time = datetime.now()
result = command.run()

end_time = datetime.now()
result["time_elapsed"] = (end_time - start_time).total_seconds()
result["slice"] = slice
for query in result["queries"]:
if "error" in query and query["error"]:
raise query["error"]
Expand All @@ -197,11 +207,6 @@ def measure_chart(slice, query_context, fail_on_error):
raise e
return

end_time = datetime.now()

result["time_elapsed"] = (end_time - start_time).total_seconds()
result["slice"] = slice

return result


Expand All @@ -227,44 +232,38 @@ def get_query_log_from_clickhouse(report, query_contexts, print_sql, fail_on_err
parsed_sql = str(sqlparse.parse(row.pop("query"))[0])
clickhouse_queries[parsed_sql] = row

if print_sql:
logger.info("ClickHouse SQL: ")
logger.info(parsed_sql)

# Sort report by slowest queries
report = sorted(report, key=lambda x: x["time_elapsed"], reverse=True)

report_str = f"\nSuperset Reports: {RUN_ID}\n\n"
for i, chart_result in enumerate(report):
report_str += (
report_format.format(
i=(i + 1),
dashboard=chart_result["dashboard"],
slice=chart_result["slice"],
superset_time=chart_result["time_elapsed"]
)
)
for i, query in enumerate(chart_result["queries"]):
for k, chart_result in enumerate(report):
for query in chart_result["queries"]:
parsed_sql = (
str(sqlparse.parse(query["query"])[0]).replace(";", "")
+ "\n FORMAT Native"
)
chart_result["sql"] = parsed_sql
clickhouse_report = clickhouse_queries.get(parsed_sql, {})
chart_result.update(clickhouse_report)
chart_result.update(
{"query_duration_ms": chart_result.get("query_duration_ms", 0)}
)

if print_sql:
logger.info("Superset SQL: ")
logger.info(parsed_sql)
# Sort report by slowest queries
report = sorted(report, key=lambda x: x["query_duration_ms"], reverse=True)

clickhouse_report = clickhouse_queries.get(parsed_sql, {})
report_str += (
query_format.format(
query_duration_ms=clickhouse_report.get(
"query_duration_ms", 0
) / 1000,
memory_usage_mb=clickhouse_report.get("memory_usage_mb"),
result_rows=clickhouse_report.get("result_rows"),
rowcount=query["rowcount"],
filters=query["applied_filters"],
)
report_str = f"\nSuperset Reports: {RUN_ID}\n\n"
for k, chart_result in enumerate(report):
report_str += report_format.format(
i=(k + 1),
dashboard=chart_result["dashboard"],
slice=chart_result["slice"],
superset_time=chart_result["time_elapsed"],
)
for query in chart_result["queries"]:
report_str += query_format.format(
query_duration_ms=chart_result.get("query_duration_ms") / 1000,
memory_usage_mb=chart_result.get("memory_usage_mb"),
result_rows=chart_result.get("result_rows"),
rowcount=query["rowcount"],
filters=query["applied_filters"],
sql=chart_result["sql"] if print_sql else "",
)
logger.info(report_str)

Expand Down

0 comments on commit e56179f

Please sign in to comment.