Skip to content

Commit

Permalink
Execution plan refactor (#566)
Browse files Browse the repository at this point in the history
This PR adds the ability to execute an alternative execution plan in
parallel. The results of the main and alternative plans are compared.

There is no alternative plan enabled at this point.
  • Loading branch information
nablaone authored Jul 25, 2024
1 parent 2fd5d28 commit 73abee8
Show file tree
Hide file tree
Showing 14 changed files with 421 additions and 175 deletions.
4 changes: 3 additions & 1 deletion quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
settings[k] = v
}

queryAsString = queryAsString + "\n-- optimizations: " + strings.Join(query.OptimizeHints.OptimizationsPerformed, ", ") + "\n"
if len(query.OptimizeHints.OptimizationsPerformed) > 0 {
queryAsString = queryAsString + "\n-- optimizations: " + strings.Join(query.OptimizeHints.OptimizationsPerformed, ", ") + "\n"
}
}

queryID := getQueryId(ctx)
Expand Down
12 changes: 6 additions & 6 deletions quesma/eql/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package eql

import (
"context"
"fmt"
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/eql/transform"
Expand Down Expand Up @@ -73,31 +74,30 @@ func (cw *ClickhouseEQLQueryTranslator) MakeSearchResponse(queries []*model.Quer
}
}

func (cw *ClickhouseEQLQueryTranslator) ParseQuery(body types.JSON) (*model.ExecutionPlan, bool, error) {
func (cw *ClickhouseEQLQueryTranslator) ParseQuery(body types.JSON) (*model.ExecutionPlan, error) {
simpleQuery, queryInfo, highlighter, err := cw.parseQuery(body)

if err != nil {
logger.ErrorWithCtx(cw.Ctx).Msgf("error parsing query: %v", err)
return nil, false, err
return nil, err
}

var query *model.Query
var queries []*model.Query
canParse := false

if simpleQuery.CanParse {
canParse = true

query = query_util.BuildHitsQuery(cw.Ctx, cw.Table.Name, "*", &simpleQuery, queryInfo.I2)
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, query.SelectCommand.OrderByFieldNames(), true, false, false, cw.Table.Name)
query.Type = &queryType
query.Highlighter = highlighter
query.SelectCommand.OrderBy = simpleQuery.OrderBy
queries = append(queries, query)
return &model.ExecutionPlan{Queries: queries}, canParse, nil
return &model.ExecutionPlan{Queries: queries}, nil

}

return nil, false, err
return nil, fmt.Errorf("could not parse query")
}

func (cw *ClickhouseEQLQueryTranslator) parseQuery(queryAsMap types.JSON) (query model.SimpleQuery, searchQueryInfo model.SearchQueryInfo, highlighter model.Highlighter, err error) {
Expand Down
9 changes: 9 additions & 0 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package model

import (
"context"
"time"
)

const (
Expand Down Expand Up @@ -80,7 +81,14 @@ type QueryRowsTransfomer interface {
Transform(ctx context.Context, rows []QueryResultRow) []QueryResultRow
}

const MainExecutionPlan = "main"
const AlternativeExecutionPlan = "alternative"

type ExecutionPlan struct {
Name string

IndexPattern string

Queries []*Query

QueryRowsTransformers []QueryRowsTransfomer
Expand All @@ -89,6 +97,7 @@ type ExecutionPlan struct {

// add more fields here
// JSON renderers
StartTime time.Time
}

func NewQueryExecutionHints() *QueryOptimizeHints {
Expand Down
3 changes: 1 addition & 2 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,8 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
body, parseErr := types.ParseJSON(test.QueryRequestJson)
assert.NoError(t, parseErr)

plan, canParse, err := cw.ParseQuery(body)
plan, err := cw.ParseQuery(body)
queries := plan.Queries
assert.True(t, canParse)
assert.NoError(t, err)
assert.Len(t, test.ExpectedResults, len(queries))
sortAggregations(queries) // to make test runs deterministic
Expand Down
8 changes: 4 additions & 4 deletions quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ func NewEmptyHighlighter() model.Highlighter {
}
}

func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.ExecutionPlan, bool, error) {
func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.ExecutionPlan, error) {
if cw.SchemaRegistry == nil {
logger.Error().Msg("Schema registry is not set")
return &model.ExecutionPlan{}, false, errors.New("schema registry is not set")
return &model.ExecutionPlan{}, errors.New("schema registry is not set")
}

simpleQuery, queryInfo, highlighter, err := cw.parseQueryInternal(body)
if err != nil || !simpleQuery.CanParse {
logger.WarnWithCtx(cw.Ctx).Msgf("error parsing query: %v", err)
return &model.ExecutionPlan{}, false, err
return &model.ExecutionPlan{}, err
}

var queries []*model.Query
Expand Down Expand Up @@ -86,7 +86,7 @@ func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.Executi
QueryRowsTransformers: queryResultTransformers,
}

return plan, true, err
return plan, err
}

func (cw *ClickhouseQueryTranslator) buildListQueryIfNeeded(
Expand Down
9 changes: 3 additions & 6 deletions quesma/queryparser/query_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func TestQueryParserStringAttrConfig(t *testing.T) {
t.Run(fmt.Sprintf("%s(%d)", tt.Name, i), func(t *testing.T) {
body, parseErr := types.ParseJSON(tt.QueryJson)
assert.NoError(t, parseErr)
plan, canParse, errQuery := cw.ParseQuery(body)
plan, errQuery := cw.ParseQuery(body)
queries := plan.Queries
assert.True(t, canParse, "can parse")
assert.NoError(t, errQuery, "no ParseQuery error")
assert.True(t, len(queries) > 0, "len queries > 0")
var simpleListQuery *model.Query
Expand Down Expand Up @@ -139,10 +138,9 @@ func TestQueryParserNoFullTextFields(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
body, parseErr := types.ParseJSON(tt.QueryJson)
assert.NoError(t, parseErr)
plan, canParse, errQuery := cw.ParseQuery(body)
plan, errQuery := cw.ParseQuery(body)
queries := plan.Queries
assert.NoError(t, errQuery, "no error in ParseQuery")
assert.True(t, canParse, "can parse")
assert.True(t, len(queries) > 0, "len queries > 0")
whereClause := model.AsString(queries[0].SelectCommand.WhereClause)
assert.Contains(t, tt.WantedSql, whereClause, "contains wanted sql")
Expand Down Expand Up @@ -207,10 +205,9 @@ func TestQueryParserNoAttrsConfig(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
body, parseErr := types.ParseJSON(tt.QueryJson)
assert.NoError(t, parseErr)
plan, canParse, errQuery := cw.ParseQuery(body)
plan, errQuery := cw.ParseQuery(body)
queries := plan.Queries
assert.NoError(t, errQuery, "no error in ParseQuery")
assert.True(t, canParse, "can parse")
assert.True(t, len(queries) > 0, "len queries > 0")
whereClause := model.AsString(queries[0].SelectCommand.WhereClause)
assert.Contains(t, tt.WantedSql, whereClause)
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// 2. ClickhouseEQLQueryTranslator (implements only a subset of methods)

type IQueryTranslator interface {
ParseQuery(body types.JSON) (*model.ExecutionPlan, bool, error)
ParseQuery(body types.JSON) (*model.ExecutionPlan, error)
MakeSearchResponse(queries []*model.Query, ResultSets [][]model.QueryResultRow) *model.SearchResp
}

Expand Down
Loading

0 comments on commit 73abee8

Please sign in to comment.