Skip to content

Commit

Permalink
Live view - show query time and transformations (#534)
Browse files Browse the repository at this point in the history
In the "Live View" tab we display additional information for each
executed SQL query:
- duration time
- which schema transformation was applied
- which optimizations were performed 

<img width="705" alt="Screenshot 2024-07-17 at 12 02 47"
src="https://github.com/user-attachments/assets/011df75f-3008-44e7-9e5f-d479a194cc60">
  • Loading branch information
nablaone authored Jul 17, 2024
1 parent c7f78e7 commit 11f1ab3
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 53 deletions.
31 changes: 19 additions & 12 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string {
return columns
}

type PerformanceResult struct {
Duration time.Duration
ExplainPlan string
}

// ProcessQuery - only WHERE clause
// TODO query param should be type safe Query representing all parts of
// sql statement that were already parsed and not string from which
// we have to extract again different parts like where clause and columns to build a proper result
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) (rows []model.QueryResultRow, performanceResult PerformanceResult, err error) {
if query.NoDBQuery {
return make([]model.QueryResultRow, 0), nil
return make([]model.QueryResultRow, 0), performanceResult, nil
}

table.applyTableSchema(query)
Expand All @@ -72,14 +77,14 @@ func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *mod

}

rows, err := executeQuery(ctx, lm, query, columns, rowToScan)
rows, performanceResult, err = executeQuery(ctx, lm, query, columns, rowToScan)

if err == nil {
for _, row := range rows {
row.Index = table.Name
}
}
return rows, err
return rows, performanceResult, err
}

var random = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -91,22 +96,22 @@ func (lm *LogManager) shouldExplainQuery(elapsed time.Duration) bool {
return elapsed > slowQueryThreshold && random.Float64() < slowQuerySampleRate
}

func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed time.Duration) {
func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed time.Duration) string {

explainQuery := "EXPLAIN json=1, indexes=1 " + query

rows, err := lm.chDb.QueryContext(ctx, explainQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("failed to explain slow query: %v", err)
}
var explain string

defer rows.Close()
if rows.Next() {
var explain string
err := rows.Scan(&explain)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("failed to scan slow query explain: %v", err)
return
return ""
}

// reformat the explain output to make it one line and more readable
Expand All @@ -119,9 +124,10 @@ func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed ti
if rows.Err() != nil {
logger.ErrorWithCtx(ctx).Msgf("failed to read slow query explain: %v", rows.Err())
}
return explain
}

func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) {
func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) (res []model.QueryResultRow, performanceResult PerformanceResult, err error) {
span := lm.phoneHomeAgent.ClickHouseQueryDuration().Begin()

queryAsString := query.SelectCommand.String()
Expand All @@ -148,18 +154,19 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
rows, err := lm.Query(ctx, queryAsString)
if err != nil {
span.End(err)
return nil, end_user_errors.GuessClickhouseErrorType(err).InternalDetails("clickhouse: query failed. err: %v, query: %v", err, queryAsString)
return nil, performanceResult, end_user_errors.GuessClickhouseErrorType(err).InternalDetails("clickhouse: query failed. err: %v, query: %v", err, queryAsString)
}

res, err := read(rows, fields, rowToScan)
res, err = read(rows, fields, rowToScan)
elapsed := span.End(nil)
performanceResult.Duration = elapsed
if err == nil {
if lm.shouldExplainQuery(elapsed) {
lm.explainQuery(ctx, queryAsString, elapsed)
performanceResult.ExplainPlan = lm.explainQuery(ctx, queryAsString, elapsed)
}
}

return res, err
return res, performanceResult, err
}

// 'selectFields' are all values that we return from the query, both columns and non-schema fields,
Expand Down
9 changes: 8 additions & 1 deletion quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@ type QueryOptimizeHints struct {
OptimizationsPerformed []string
}

type TransformationHistory struct {
SchemaTransformers []string
// we may keep AST for each transformation here
// or anything that will help to understand what was done
}

type (
Query struct {
SelectCommand SelectCommand // The representation of SELECT query

OptimizeHints *QueryOptimizeHints // it can be optional
OptimizeHints *QueryOptimizeHints // it can be optional
TransformationHistory TransformationHistory // it can be optional

Type QueryType
TableName string
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/functionality/terms_enum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func handleTermsEnumRequest(ctx context.Context, body types.JSON, qt *queryparse
dbQueryCtx, cancel := context.WithCancel(ctx)
// TODO this will be used to cancel goroutine that is executing the query
_ = cancel
if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
if rows, _, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2)
result, err = json.Marshal(emptyTermsEnumResponse())
} else {
Expand All @@ -102,7 +102,7 @@ func handleTermsEnumRequest(ctx context.Context, body types.JSON, qt *queryparse
Id: ctx.Value(tracing.RequestIdCtxKey).(string),
Path: path,
IncomingQueryBody: reqBody,
QueryBodyTranslated: [][]byte{[]byte(selectQuery.SelectCommand.String())},
QueryBodyTranslated: []types.TranslatedSQLQuery{{Query: []byte(selectQuery.SelectCommand.String())}},
QueryTranslatedResults: result,
SecondaryTook: time.Since(startTime),
})
Expand Down
3 changes: 3 additions & 0 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
inputQuery := query.SelectCommand.String()
query, err = transformation.Transformation(query)
if query.SelectCommand.String() != inputQuery {

query.TransformationHistory.SchemaTransformers = append(query.TransformationHistory.SchemaTransformers, transformation.TransformationName)

logger.Info().Msgf(transformation.TransformationName+" triggered, input query: %s", inputQuery)
logger.Info().Msgf(transformation.TransformationName+" triggered, output query: %s", query.SelectCommand.String())
}
Expand Down
72 changes: 40 additions & 32 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string

type AsyncSearchWithError struct {
response *model.SearchResp
translatedQueryBody [][]byte
translatedQueryBody []types.TranslatedSQLQuery
err error
}

Expand Down Expand Up @@ -278,10 +278,10 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
}()

} else {
queriesBody := make([][]byte, len(queries))
queriesBody := make([]types.TranslatedSQLQuery, len(queries))
queriesBodyConcat := ""
for i, query := range queries {
queriesBody[i] = []byte(query.SelectCommand.String())
queriesBody[i].Query = []byte(query.SelectCommand.String())
queriesBodyConcat += query.SelectCommand.String() + "\n"
}
responseBody = []byte(fmt.Sprintf("Invalid Queries: %s, err: %v", queriesBody, err))
Expand Down Expand Up @@ -465,26 +465,30 @@ func (q *QueryRunner) isInternalKibanaQuery(query *model.Query) bool {
return false
}

type QueryJob func(ctx context.Context) ([]model.QueryResultRow, error)
type QueryJob func(ctx context.Context) ([]model.QueryResultRow, clickhouse.PerformanceResult, error)

func (q *QueryRunner) runQueryJobsSequence(jobs []QueryJob) ([][]model.QueryResultRow, error) {
func (q *QueryRunner) runQueryJobsSequence(jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
var results = make([][]model.QueryResultRow, 0)
var performance = make([]clickhouse.PerformanceResult, 0)
for _, job := range jobs {
rows, err := job(q.executionCtx)
rows, perf, err := job(q.executionCtx)
if err != nil {
return nil, err
return nil, nil, err
}

results = append(results, rows)
performance = append(performance, perf)
}
return results, nil
return results, performance, nil
}

func (q *QueryRunner) runQueryJobsParallel(jobs []QueryJob) ([][]model.QueryResultRow, error) {
func (q *QueryRunner) runQueryJobsParallel(jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {

var results = make([][]model.QueryResultRow, len(jobs))

var performances = make([]clickhouse.PerformanceResult, len(jobs))
type result struct {
rows []model.QueryResultRow
perf clickhouse.PerformanceResult
err error
jobId int
}
Expand All @@ -505,9 +509,9 @@ func (q *QueryRunner) runQueryJobsParallel(jobs []QueryJob) ([][]model.QueryResu
collector <- result{err: err, jobId: jobId}
})
start := time.Now()
rows, err := j(ctx)
rows, perf, err := j(ctx)
logger.DebugWithCtx(ctx).Msgf("parallel job %d finished in %v", jobId, time.Since(start))
collector <- result{rows: rows, err: err, jobId: jobId}
collector <- result{rows: rows, perf: perf, err: err, jobId: jobId}
}(ctx, n, job)
}

Expand All @@ -516,15 +520,16 @@ func (q *QueryRunner) runQueryJobsParallel(jobs []QueryJob) ([][]model.QueryResu
res := <-collector
if res.err == nil {
results[res.jobId] = res.rows
performances[res.jobId] = res.perf
} else {
return nil, res.err
return nil, nil, res.err
}
}

return results, nil
return results, performances, nil
}

func (q *QueryRunner) runQueryJobs(jobs []QueryJob) ([][]model.QueryResultRow, error) {
func (q *QueryRunner) runQueryJobs(jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
const maxParallelQueries = 25 // this is arbitrary value

numberOfJobs := len(jobs)
Expand Down Expand Up @@ -557,9 +562,9 @@ func (q *QueryRunner) runQueryJobs(jobs []QueryJob) ([][]model.QueryResultRow, e
func (q *QueryRunner) searchWorkerCommon(
ctx context.Context,
queries []*model.Query,
table *clickhouse.Table) (translatedQueryBody [][]byte, hits [][]model.QueryResultRow, err error) {
table *clickhouse.Table) (translatedQueryBody []types.TranslatedSQLQuery, hits [][]model.QueryResultRow, err error) {

translatedQueryBody = make([][]byte, len(queries))
translatedQueryBody = make([]types.TranslatedSQLQuery, len(queries))
hits = make([][]model.QueryResultRow, len(queries))

var jobs []QueryJob
Expand All @@ -573,44 +578,47 @@ func (q *QueryRunner) searchWorkerCommon(
}

sql := query.SelectCommand.String()
// TODO we should return what optimizations were performed
// TODO translatedQueryBody should be a struct (sql, optimizations, query time, etc)
//
//if query.OptimizeHints != nil {
// sql = sql + "\n-- optimizations: " + strings.Join(query.OptimizeHints.OptimizationsPerformed, ", ") + "\n"
//}

logger.InfoWithCtx(ctx).Msgf("SQL: %s", sql)
translatedQueryBody[i] = []byte(sql)
translatedQueryBody[i].Query = []byte(sql)
if query.OptimizeHints != nil {
translatedQueryBody[i].PerformedOptimizations = query.OptimizeHints.OptimizationsPerformed
}

translatedQueryBody[i].QueryTransformations = query.TransformationHistory.SchemaTransformers

if q.isInternalKibanaQuery(query) {
hits[i] = make([]model.QueryResultRow, 0)
continue
}

job := func(ctx context.Context) ([]model.QueryResultRow, error) {
job := func(ctx context.Context) ([]model.QueryResultRow, clickhouse.PerformanceResult, error) {
var err error
rows, err := q.logManager.ProcessQuery(ctx, table, query)
rows, performance, err := q.logManager.ProcessQuery(ctx, table, query)

if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
return nil, err
return nil, clickhouse.PerformanceResult{}, err
}

if query.Type != nil {
rows = query.Type.PostprocessResults(rows)
}

return rows, nil
return rows, performance, nil
}
jobs = append(jobs, job)
jobHitsPosition = append(jobHitsPosition, i)
}
dbHits, err := q.runQueryJobs(jobs)
dbHits, performance, err := q.runQueryJobs(jobs)
if err != nil {
return
}

for i, p := range performance {
translatedQueryBody[i].Duration = p.Duration
translatedQueryBody[i].ExplainPlan = p.ExplainPlan
}

// fill the hits array with the results in the order of the database queries
for jobId := range jobHitsPosition {
hitsPosition := jobHitsPosition[jobId]
Expand All @@ -624,7 +632,7 @@ func (q *QueryRunner) searchWorker(ctx context.Context,
queries []*model.Query,
table *clickhouse.Table,
doneCh chan<- AsyncSearchWithError,
optAsync *AsyncQuery) (translatedQueryBody [][]byte, resultRows [][]model.QueryResultRow, err error) {
optAsync *AsyncQuery) (translatedQueryBody []types.TranslatedSQLQuery, resultRows [][]model.QueryResultRow, err error) {
if optAsync != nil {
if q.reachedQueriesLimit(ctx, optAsync.asyncId, doneCh) {
return
Expand Down Expand Up @@ -681,7 +689,7 @@ func (q *QueryRunner) postProcessResults(table *clickhouse.Table, results [][]mo
return geoIpTransformer.Transform(res)
}

func pushSecondaryInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, Path string, IncomingQueryBody []byte, QueryBodyTranslated [][]byte, QueryTranslatedResults []byte, startTime time.Time) {
func pushSecondaryInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, Path string, IncomingQueryBody []byte, QueryBodyTranslated []types.TranslatedSQLQuery, QueryTranslatedResults []byte, startTime time.Time) {
qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
Id: Id,
AsyncId: AsyncId,
Expand Down
17 changes: 17 additions & 0 deletions quesma/quesma/types/translated_sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package types

import "time"

// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
type TranslatedSQLQuery struct {
Query []byte

PerformedOptimizations []string
QueryTransformations []string

Duration time.Duration
ExplainPlan string
}
2 changes: 1 addition & 1 deletion quesma/quesma/ui/html_pages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestHtmlPages(t *testing.T) {
qmc.PushSecondaryInfo(&QueryDebugSecondarySource{Id: id,
Path: xss,
IncomingQueryBody: xssBytes,
QueryBodyTranslated: [][]byte{xssBytes},
QueryBodyTranslated: []types.TranslatedSQLQuery{{Query: xssBytes}},
QueryTranslatedResults: xssBytes,
})
log := fmt.Sprintf(`{"request_id": "%s", "message": "%s"}`, id, xss)
Expand Down
Loading

0 comments on commit 11f1ab3

Please sign in to comment.