Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): sort metrics by clickhouse query time #955

Merged
merged 10 commits into from
Sep 30, 2024
17 changes: 12 additions & 5 deletions tutoraspects/commands_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,14 @@ def init_clickhouse() -> list[tuple[str, str]]:
# Ex: "tutor local do performance-metrics "
@click.command(context_settings={"ignore_unknown_options": True})
@click.option(
"--course_key",
"--org",
default="",
help="A course_key to apply as a filter, you must include the 'course-v1:'.",
help="An organization to apply as a filter.",
)
@click.option(
"--course_name",
default="",
help="A course_name to apply as a filter.",
)
@click.option(
"--dashboard_slug", default="", help="Only run charts for the given dashboard."
Expand All @@ -168,13 +173,15 @@ def init_clickhouse() -> list[tuple[str, str]]:
@click.option(
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics(
course_key, dashboard_slug, slice_name, print_sql, fail_on_error
def performance_metrics( # pylint: disable=too-many-arguments,too-many-positional-arguments
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
) -> (list)[tuple[str, str]]:
"""
Job to measure performance metrics of charts and its queries in Superset and ClickHouse.
"""
options = f"--course_key {course_key}" if course_key else ""
options = ""
options += f"--org '{org}' " if org else ""
options += f"--course_name '{course_name}' " if course_name else ""
options += f" --dashboard_slug {dashboard_slug}" if dashboard_slug else ""
options += f' --slice_name "{slice_name}"' if slice_name else ""
options += " --print_sql" if print_sql else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
across Superset installations.
"""

from create_assets import BASE_DIR, ASSET_FOLDER_MAPPING, app

import json
import logging
import os
import time
Expand All @@ -20,10 +17,12 @@
import click
import sqlparse
import yaml
from create_assets import app

from flask import g
from superset import security_manager
from superset.commands.chart.data.get_data_command import ChartDataCommand
from superset.charts.schemas import ChartDataQueryContextSchema
from superset.commands.chart.data.get_data_command import ChartDataCommand
from superset.extensions import db
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
Expand All @@ -42,46 +41,53 @@
"Result rows: {result_rows}\n"
"Memory Usage (MB): {memory_usage_mb}\n"
"Row count (superset) {rowcount:}\n"
"Filters: {filters}\n\n"
"Filters: {filters}\n"
"SQL:\n"
"{sql}\n\n\n"
)


@click.command()
@click.option("--org", default="", help="An organization to apply as a filter.")
@click.option(
"--course_key",
"--course_name",
default="",
help="A course_key to apply as a filter, you must include the 'course-v1:'.")
help="A course_name to apply as a filter, you must include the 'course-v1:'.",
)
@click.option(
"--dashboard_slug",
default="",
help="Only run charts for the given dashboard.")
"--dashboard_slug", default="", help="Only run charts for the given dashboard."
)
@click.option(
"--slice_name",
default="",
help="Only run charts for the given slice name, if the name appears in more than "
"one dashboard it will be run for each.")
"one dashboard it will be run for each.",
)
@click.option(
"--print_sql",
is_flag=True,
default=False,
help="Whether to print the SQL run."
"--print_sql", is_flag=True, default=False, help="Whether to print the SQL run."
)
@click.option(
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics(course_key, dashboard_slug, slice_name, print_sql,
fail_on_error):
def performance_metrics(
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
):
"""
Measure the performance of the dashboard.
"""
# Mock the client name to identify the queries in the clickhouse system.query_log
# table by by the http_user_agent field.
extra_filters = []
if course_key:
extra_filters += [{"col": "course_key", "op": "==", "val": course_key}]
if course_name:
extra_filters += [{"col": "course_name", "op": "IN", "val": course_name}]
if org:
extra_filters += [{"col": "org", "op": "IN", "val": org}]

with patch("clickhouse_connect.common.build_client_name") as mock_build_client_name:
mock_build_client_name.return_value = RUN_ID
target_dashboards = [dashboard_slug] if dashboard_slug else {{SUPERSET_EMBEDDABLE_DASHBOARDS}}
target_dashboards = (
[dashboard_slug] if dashboard_slug else {{SUPERSET_EMBEDDABLE_DASHBOARDS}}
)

dashboards = (
db.session.query(Dashboard)
Expand All @@ -98,14 +104,13 @@ def performance_metrics(course_key, dashboard_slug, slice_name, print_sql,
logger.info(f"Dashboard: {dashboard.slug}")
for slice in dashboard.slices:
if slice_name and not slice_name == slice.slice_name:
logger.info(f"{slice.slice_name} doesn't match {slice_name}, "
f"skipping.")
logger.info(
f"{slice.slice_name} doesn't match {slice_name}, " f"skipping."
)
continue

query_context = get_slice_query_context(
slice,
query_contexts,
extra_filters
slice, query_contexts, extra_filters
)
result = measure_chart(slice, query_context, fail_on_error)
if not result:
Expand Down Expand Up @@ -167,27 +172,32 @@ def get_slice_query_context(slice, query_contexts, extra_filters=None):
}
)

query_context["form_data"]["extra_form_data"] = {"filters": extra_filters}

if extra_filters:
for query in query_context["queries"]:
query["filters"] += extra_filters

return query_context


def measure_chart(slice, query_context, fail_on_error):
def measure_chart(slice, query_context_dict, fail_on_error):
"""
Measure the performance of a chart and return the results.
"""
logger.info(f"Fetching slice data: {slice}")

g.user = security_manager.find_user(username="{{SUPERSET_ADMIN_USERNAME}}")
query_context = ChartDataQueryContextSchema().load(query_context)
query_context = ChartDataQueryContextSchema().load(query_context_dict)
command = ChartDataCommand(query_context)

start_time = datetime.now()
command.validate()
g.form_data = query_context.form_data
try:
start_time = datetime.now()
result = command.run()

end_time = datetime.now()
result["time_elapsed"] = (end_time - start_time).total_seconds()
result["slice"] = slice
for query in result["queries"]:
if "error" in query and query["error"]:
raise query["error"]
Expand All @@ -197,11 +207,6 @@ def measure_chart(slice, query_context, fail_on_error):
raise e
return

end_time = datetime.now()

result["time_elapsed"] = (end_time - start_time).total_seconds()
result["slice"] = slice

return result


Expand All @@ -227,44 +232,38 @@ def get_query_log_from_clickhouse(report, query_contexts, print_sql, fail_on_err
parsed_sql = str(sqlparse.parse(row.pop("query"))[0])
clickhouse_queries[parsed_sql] = row

if print_sql:
logger.info("ClickHouse SQL: ")
logger.info(parsed_sql)

# Sort report by slowest queries
report = sorted(report, key=lambda x: x["time_elapsed"], reverse=True)

report_str = f"\nSuperset Reports: {RUN_ID}\n\n"
for i, chart_result in enumerate(report):
report_str += (
report_format.format(
i=(i + 1),
dashboard=chart_result["dashboard"],
slice=chart_result["slice"],
superset_time=chart_result["time_elapsed"]
)
)
for i, query in enumerate(chart_result["queries"]):
for k, chart_result in enumerate(report):
for query in chart_result["queries"]:
parsed_sql = (
str(sqlparse.parse(query["query"])[0]).replace(";", "")
+ "\n FORMAT Native"
)
chart_result["sql"] = parsed_sql
clickhouse_report = clickhouse_queries.get(parsed_sql, {})
chart_result.update(clickhouse_report)
chart_result.update(
{"query_duration_ms": chart_result.get("query_duration_ms", 0)}
)

if print_sql:
logger.info("Superset SQL: ")
logger.info(parsed_sql)
# Sort report by slowest queries
report = sorted(report, key=lambda x: x["query_duration_ms"], reverse=True)

clickhouse_report = clickhouse_queries.get(parsed_sql, {})
report_str += (
query_format.format(
query_duration_ms=clickhouse_report.get(
"query_duration_ms", 0
) / 1000,
memory_usage_mb=clickhouse_report.get("memory_usage_mb"),
result_rows=clickhouse_report.get("result_rows"),
rowcount=query["rowcount"],
filters=query["applied_filters"],
)
report_str = f"\nSuperset Reports: {RUN_ID}\n\n"
for k, chart_result in enumerate(report):
report_str += report_format.format(
i=(k + 1),
dashboard=chart_result["dashboard"],
slice=chart_result["slice"],
superset_time=chart_result["time_elapsed"],
)
for query in chart_result["queries"]:
report_str += query_format.format(
query_duration_ms=chart_result.get("query_duration_ms") / 1000,
memory_usage_mb=chart_result.get("memory_usage_mb"),
result_rows=chart_result.get("result_rows"),
rowcount=query["rowcount"],
filters=query["applied_filters"],
sql=chart_result["sql"] if print_sql else "",
)
logger.info(report_str)

Expand Down