Skip to content

Commit

Permalink
Separate parsing query from its execution (#174)
Browse files Browse the repository at this point in the history
This PR extracts `ParseQuery` function which encapsulates parsing query.
It wraps parsing without extensive implementation refactoring. This
allows doing transformations on results.
  • Loading branch information
pdelewski authored May 21, 2024
1 parent 6270b59 commit 6d7a425
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 44 deletions.
2 changes: 1 addition & 1 deletion quesma/eql/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (cw *ClickhouseEQLQueryTranslator) ParseQuery(queryAsJson string) (query qu

query.Sql.Stmt = where
query.CanParse = true
query.SortFields = []queryparser.SortField{{Field: "@timestamp"}}
query.SortFields = []model.SortField{{Field: "@timestamp"}}

return query, searchQueryInfo, highlighter, nil
}
Expand Down
6 changes: 6 additions & 0 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
const RowNumberColumnName = "row_number"
const EmptyFieldSelection = "''" // we can query SELECT '', that's why such quotes

type SortField struct {
Field string
Desc bool
}

type Highlighter struct {
Tokens []string
Fields map[string]bool
Expand All @@ -35,6 +40,7 @@ type Query struct {
Parent string // parent aggregation name, used in some pipeline aggregations
Aggregators []Aggregator // keeps names of aggregators, e.g. "0", "1", "2", "suggestions". Needed for JSON response.
Type QueryType
SortFields []SortField // fields to sort by
// dictionary to add as 'meta' field in the response.
// WARNING: it's probably not passed everywhere where it's needed, just in one place.
// But it works for the test + our dashboards, so let's fix it later if necessary.
Expand Down
23 changes: 9 additions & 14 deletions quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,13 @@ type (
Sql Statement
CanParse bool
FieldName string
SortFields []SortField
SortFields []model.SortField
}
Statement struct {
Stmt string
isCompound bool // "a" -> not compound, "a AND b" -> compound. Used to not make unnecessary brackets (not always, but usually)
FieldName string
}

SortField struct {
Field string
Desc bool
}
)

// Added to the generated SQL where the query is fine, but we're sure no rows will match it
Expand Down Expand Up @@ -1149,8 +1144,8 @@ func (cw *ClickhouseQueryTranslator) extractInterval(queryMap QueryMap) string {

// parseSortFields parses sort fields from the query
// We're skipping ELK internal fields, like "_doc", "_id", etc. (we only accept field starting with "_" if it exists in our table)
func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortFields []SortField) {
sortFields = make([]SortField, 0)
func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortFields []model.SortField) {
sortFields = make([]model.SortField, 0)
switch sortMaps := sortMaps.(type) {
case []any:
for _, sortMapAsAny := range sortMaps {
Expand All @@ -1173,20 +1168,20 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortFields [
if orderAsString, ok := order.(string); ok {
orderAsString = strings.ToLower(orderAsString)
if orderAsString == "asc" || orderAsString == "desc" {
sortFields = append(sortFields, SortField{Field: fieldName, Desc: orderAsString == "desc"})
sortFields = append(sortFields, model.SortField{Field: fieldName, Desc: orderAsString == "desc"})
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected order value: %s. Skipping", orderAsString)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected order type: %T, value: %v. Skipping", order, order)
}
} else {
sortFields = append(sortFields, SortField{Field: fieldName, Desc: false})
sortFields = append(sortFields, model.SortField{Field: fieldName, Desc: false})
}
case string:
v = strings.ToLower(v)
if v == "asc" || v == "desc" {
sortFields = append(sortFields, SortField{Field: fieldName, Desc: v == "desc"})
sortFields = append(sortFields, model.SortField{Field: fieldName, Desc: v == "desc"})
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected order value: %s. Skipping", v)
}
Expand All @@ -1205,7 +1200,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortFields [
if fieldValue, ok := fieldValue.(string); ok {
fieldValue = strings.ToLower(fieldValue)
if fieldValue == "asc" || fieldValue == "desc" {
sortFields = append(sortFields, SortField{Field: fieldName, Desc: fieldValue == "desc"})
sortFields = append(sortFields, model.SortField{Field: fieldName, Desc: fieldValue == "desc"})
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected order value: %s. Skipping", fieldValue)
}
Expand All @@ -1222,7 +1217,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortFields [
}
fieldValue = strings.ToLower(fieldValue)
if fieldValue == "asc" || fieldValue == "desc" {
sortFields = append(sortFields, SortField{Field: fieldName, Desc: fieldValue == "desc"})
sortFields = append(sortFields, model.SortField{Field: fieldName, Desc: fieldValue == "desc"})
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected order value: %s. Skipping", fieldValue)
}
Expand All @@ -1231,7 +1226,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortFields [
return sortFields
default:
logger.ErrorWithCtx(cw.Ctx).Msgf("unexpected type of sortMaps: %T, value: %v", sortMaps, sortMaps)
return []SortField{}
return []model.SortField{}
}
}

Expand Down
12 changes: 6 additions & 6 deletions quesma/queryparser/query_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func Test_parseSortFields(t *testing.T) {
tests := []struct {
name string
sortMap any
sortFields []SortField
sortFields []model.SortField
}{
{
name: "compound",
Expand All @@ -454,7 +454,7 @@ func Test_parseSortFields(t *testing.T) {
QueryMap{"_table_field_with_underscore": QueryMap{"order": "asc", "unmapped_type": "boolean"}}, // this should be accepted, as it exists in the table
QueryMap{"_doc": QueryMap{"order": "desc", "unmapped_type": "boolean"}}, // this should be discarded, as it doesn't exist in the table
},
sortFields: []SortField{
sortFields: []model.SortField{
{Field: "@timestamp", Desc: true},
{Field: "service.name", Desc: false},
{Field: "no_order_field", Desc: false},
Expand All @@ -464,15 +464,15 @@ func Test_parseSortFields(t *testing.T) {
{
name: "empty",
sortMap: []any{},
sortFields: []SortField{},
sortFields: []model.SortField{},
},
{
name: "map[string]string",
sortMap: map[string]string{
"timestamp": "desc",
"_doc": "desc",
},
sortFields: []SortField{
sortFields: []model.SortField{
{Field: "timestamp", Desc: true},
},
},
Expand All @@ -482,7 +482,7 @@ func Test_parseSortFields(t *testing.T) {
"timestamp": "desc",
"_doc": "desc",
},
sortFields: []SortField{
sortFields: []model.SortField{
{Field: "timestamp", Desc: true},
},
}, {
Expand All @@ -491,7 +491,7 @@ func Test_parseSortFields(t *testing.T) {
QueryMap{"@timestamp": "asc"},
QueryMap{"_doc": "asc"},
},
sortFields: []SortField{
sortFields: []model.SortField{
{Field: "@timestamp", Desc: false},
},
},
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (cw *ClickhouseQueryTranslator) sortInTopologicalOrder(queries []model.Quer
return indexesSorted
}

func AsQueryString(sortFields []SortField) string {
func AsQueryString(sortFields []model.SortField) string {
if len(sortFields) == 0 {
return ""
}
Expand Down
76 changes: 54 additions & 22 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,43 @@ func isNonAggregationQuery(queryInfo model.SearchQueryInfo, body []byte) bool {
queryInfo.Typ == model.CountAsync
}

func (q *QueryRunner) ParseQuery(ctx context.Context,
queryTranslator IQueryTranslator,
body []byte,
table *clickhouse.Table) ([]model.Query, []string, bool, bool, error) {
simpleQuery, queryInfo, highlighter, err := queryTranslator.ParseQuery(string(body))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error parsing query: %v", err)
return nil, nil, false, false, err
}
var columns []string
var query *model.Query
var queries []model.Query
var isAggregation bool
canParse := false

if simpleQuery.CanParse {
canParse = true
if isNonAggregationQuery(queryInfo, body) {
query, columns = q.makeBasicQuery(ctx, queryTranslator, table, simpleQuery, queryInfo, highlighter)
query.SortFields = simpleQuery.SortFields
queries = append(queries, *query)
isAggregation = false
return queries, columns, isAggregation, canParse, nil
} else {
queries, err = queryTranslator.ParseAggregationJson(string(body))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error parsing aggregation: %v", err)
return nil, nil, false, false, err
}
isAggregation = true
return queries, columns, isAggregation, canParse, nil
}
}

return nil, nil, false, false, err
}

func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body []byte, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.logManager)

Expand Down Expand Up @@ -198,8 +235,6 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin

for _, resolvedTableName := range sourcesClickhouse {
var queryTranslator IQueryTranslator
var highlighter model.Highlighter
var aggregations []model.Query
var err error
var queryInfo model.SearchQueryInfo
doneCh := make(chan AsyncSearchWithError, 1)
Expand All @@ -212,45 +247,42 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin

queryTranslator = NewQueryTranslator(ctx, queryLanguage, table, q.logManager, q.DateMathRenderer)

simpleQuery, queryInfo, highlighter, err = queryTranslator.ParseQuery(string(body))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error parsing query: %v", err)
return nil, errors.Join(errCouldNotParseRequest, err)
}

if simpleQuery.CanParse {
if isNonAggregationQuery(queryInfo, body) {
logger.InfoWithCtx(ctx).Msgf("received search request, type: %v, async: %v", queryInfo.Typ, optAsync != nil)
queries, columns, isAggregation, canParse, err := q.ParseQuery(ctx, queryTranslator, body, table)

if properties := q.findNonexistingProperties(queryInfo, simpleQuery, table); len(properties) > 0 {
if canParse {
if isNonAggregationQuery(queries[0].QueryInfo, body) {
if properties := q.findNonexistingProperties(queries[0].QueryInfo, queries[0].SortFields, table); len(properties) > 0 {
logger.DebugWithCtx(ctx).Msgf("properties %s not found in table %s", properties, table.Name)
if elasticsearch.IsIndexPattern(indexPattern) {
return queryparser.EmptySearchResponse(ctx), nil
} else {
return nil, fmt.Errorf("properties %s not found in table %s", properties, table.Name)
}
}
fullQuery, columns := q.makeBasicQuery(ctx, queryTranslator, table, simpleQuery, queryInfo, highlighter)
}
}
if canParse {
if !isAggregation {
var columnsSlice [][]string
go func() {
defer recovery.LogAndHandlePanic(ctx, func() {
doneCh <- AsyncSearchWithError{err: errors.New("panic")}
})
translatedQueryBody, hitsSlice := q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, doneCh, optAsync)
searchResponse, err := queryTranslator.MakeSearchResponse(hitsSlice[0], *fullQuery)
translatedQueryBody, hitsSlice := q.searchWorker(ctx, queries, append(columnsSlice, columns), table, false, doneCh, optAsync)
searchResponse, err := queryTranslator.MakeSearchResponse(hitsSlice[0], queries[0])
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits)
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, queries[0].QueryInfo, hits)
}
doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: err}
}()
} else if aggregations, err = queryTranslator.ParseAggregationJson(string(body)); err == nil {
columns := make([][]string, len(aggregations))
} else {
columns := make([][]string, len(queries))
go func() {
defer recovery.LogAndHandlePanic(ctx, func() {
doneCh <- AsyncSearchWithError{err: errors.New("panic")}
})
translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, doneCh, optAsync)
searchResponse := queryTranslator.MakeResponseAggregation(aggregations, aggregationResults)
translatedQueryBody, aggregationResults = q.searchWorker(ctx, queries, columns, table, true, doneCh, optAsync)
searchResponse := queryTranslator.MakeResponseAggregation(queries, aggregationResults)
doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody}
}()
}
Expand Down Expand Up @@ -489,11 +521,11 @@ func (q *QueryRunner) Close() {
logger.Info().Msg("queryRunner Stopped")
}

func (q *QueryRunner) findNonexistingProperties(queryInfo model.SearchQueryInfo, simpleQuery queryparser.SimpleQuery, table *clickhouse.Table) []string {
func (q *QueryRunner) findNonexistingProperties(queryInfo model.SearchQueryInfo, sortFields []model.SortField, table *clickhouse.Table) []string {
var results = make([]string, 0)
var allReferencedFields = make([]string, 0)
allReferencedFields = append(allReferencedFields, queryInfo.RequestedFields...)
for _, field := range simpleQuery.SortFields {
for _, field := range sortFields {
allReferencedFields = append(allReferencedFields, field.Field)
}

Expand Down

0 comments on commit 6d7a425

Please sign in to comment.