Skip to content

Commit

Permalink
Create SELECT in aggregations (#166)
Browse files Browse the repository at this point in the history
Fixed bugs:
- We always should parse `size` and create hits query, both in async and
as default
- We should set hits total based on count if available, not based on
returned hits

Another step is to unify aggregates and non-aggregates.
  • Loading branch information
jakozaur authored May 20, 2024
1 parent af8d9cc commit e19f6f4
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 36 deletions.
11 changes: 11 additions & 0 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ func (cw *ClickhouseQueryTranslator) ParseAggregationJson(queryAsJson string) ([
return nil, fmt.Errorf("no aggs -> request is not an aggregation query")
}

const defaultSearchSize = 10
size := cw.parseSize(queryAsMap, defaultSearchSize)
if size > 0 {
simpleQuery := currentAggr.whereBuilder
if sort, ok := queryAsMap["sort"]; ok {
simpleQuery.SortFields = cw.parseSortFields(sort)
}
hitQuery := cw.BuildNRowsQuery("*", simpleQuery, size)
aggregations = append(aggregations, *hitQuery)
}

return aggregations, nil
}

Expand Down
42 changes: 28 additions & 14 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ var aggregationTests = []struct {
"field": "AvgTicketPrice"
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -110,7 +111,8 @@ var aggregationTests = []struct {
"size": 1000
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -146,7 +148,8 @@ var aggregationTests = []struct {
"size": 10
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -181,7 +184,8 @@ var aggregationTests = []struct {
}
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -217,7 +221,8 @@ var aggregationTests = []struct {
}
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand All @@ -236,7 +241,8 @@ var aggregationTests = []struct {
"min_doc_count": 1
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -284,7 +290,8 @@ var aggregationTests = []struct {
"size": 10000
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -321,7 +328,8 @@ var aggregationTests = []struct {
"size": 10
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand All @@ -338,7 +346,8 @@ var aggregationTests = []struct {
"field": "taxful_total_price"
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand All @@ -357,7 +366,8 @@ var aggregationTests = []struct {
]
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand All @@ -373,7 +383,8 @@ var aggregationTests = []struct {
"field": "total_quantity"
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down Expand Up @@ -438,7 +449,8 @@ var aggregationTests = []struct {
}
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand All @@ -465,7 +477,8 @@ var aggregationTests = []struct {
"field": "OriginCityName"
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand All @@ -489,7 +502,8 @@ var aggregationTests = []struct {
"shard_size": 5000
}
}
}
},
"size": 0
}`,
[]string{
`SELECT count() FROM ` + tableNameQuoted,
Expand Down
20 changes: 16 additions & 4 deletions quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (cw *ClickhouseQueryTranslator) tryProcessSearchMetadata(queryMap QueryMap)
// case 3: maybe it's a normal request
var queryMapNested QueryMap
var ok bool
size, _ := cw.parseSize(metadata)
size := cw.parseSize(metadata, model.DefaultSizeListQuery)
if queryMapNested, ok = queryMap["aggs"].(QueryMap); !ok {
return model.SearchQueryInfo{Typ: model.Normal, I2: size}
}
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func (cw *ClickhouseQueryTranslator) isItFacetsRequest(queryMap QueryMap) (model
return model.NewSearchQueryInfoNone(), false
}

size, ok := cw.parseSize(firstNestingMap)
size, ok := cw.parseSizeExists(firstNestingMap)
if !ok {
return model.NewSearchQueryInfoNone(), false
}
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (cw *ClickhouseQueryTranslator) isItFacetsRequest(queryMap QueryMap) (model
// returns (model.NewSearchQueryInfoNone, false) if it's not ListAllFields/ListByField request
func (cw *ClickhouseQueryTranslator) isItListRequest(queryMap QueryMap) (model.SearchQueryInfo, bool) {
// 1) case: very simple SELECT * kind of request
size, ok := cw.parseSize(queryMap)
size, ok := cw.parseSizeExists(queryMap)
if !ok {
return model.NewSearchQueryInfoNone(), false
}
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) []string {
}
}

func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap) (size int, ok bool) {
func (cw *ClickhouseQueryTranslator) parseSizeExists(queryMap QueryMap) (size int, ok bool) {
sizeRaw, exists := queryMap["size"]
if !exists {
return model.DefaultSizeListQuery, false
Expand All @@ -1224,3 +1224,15 @@ func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap) (size int, ok
return model.DefaultSizeListQuery, false
}
}

func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap, defaultSize int) int {
sizeRaw, exists := queryMap["size"]
if !exists {
return defaultSize
} else if sizeAsFloat, ok := sizeRaw.(float64); ok {
return int(sizeAsFloat)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("invalid size type: %T, value: %v. Expected float64", sizeRaw, sizeRaw)
return defaultSize
}
}
11 changes: 10 additions & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,15 @@ func (cw *ClickhouseQueryTranslator) MakeAggregationPartOfResponse(queries []mod
}

func (cw *ClickhouseQueryTranslator) MakeResponseAggregation(queries []model.Query, ResultSets [][]model.QueryResultRow) *model.SearchResp {
hits := []model.SearchHit{}
// Process hits as last aggregation
if len(queries) > 0 && len(ResultSets) > 0 && queries[len(queries)-1].IsWildcard() {
response := cw.makeSearchResponseNormal(ResultSets[len(ResultSets)-1], queries[len(queries)-1].Highlighter)
hits = response.Hits.Hits
queries = queries[:len(queries)-1]
ResultSets = ResultSets[:len(ResultSets)-1]
}

var totalCount uint64
if len(ResultSets) > 0 && len(ResultSets[0]) > 0 && len(ResultSets[0][0].Cols) > 0 {
// This if: doesn't hurt much, but mostly for tests, never seen need for this on "production".
Expand All @@ -486,7 +495,7 @@ func (cw *ClickhouseQueryTranslator) MakeResponseAggregation(queries []model.Que
return &model.SearchResp{
Aggregations: cw.MakeAggregationPartOfResponse(queries, ResultSets),
Hits: model.SearchHits{
Hits: []model.SearchHit{}, // seems redundant, but can't remove this, created JSON won't match
Hits: hits,
Total: &model.Total{
Value: int(totalCount), // TODO just change this to uint64? It works now.
Relation: "eq",
Expand Down
20 changes: 3 additions & 17 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
var hits []model.QueryResultRow
var aggregationResults [][]model.QueryResultRow
oldHandlingUsed := false
newAggregationHandlingUsed := false

tables := q.logManager.GetTableDefinitions()

Expand Down Expand Up @@ -259,7 +258,6 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
}
}
} else if aggregations, err = queryTranslator.ParseAggregationJson(string(body)); err == nil {
newAggregationHandlingUsed = true
columns := make([][]string, len(aggregations))
if optAsync != nil {
go func() {
Expand All @@ -272,16 +270,6 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
}()
} else {
translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, nil)
if queryInfo.Size > 0 {
listQuery := queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.Size)
hits, err = q.logManager.ProcessQuery(ctx, table, listQuery, nil)
translatedQueryBody = append(translatedQueryBody, []byte("\n"+listQuery.String()+"\n")...)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
}
}
}
} else {
Expand All @@ -292,14 +280,12 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
}

if optAsync == nil {
var response, responseHits *model.SearchResp = nil, nil
var response *model.SearchResp = nil
err = nil
if oldHandlingUsed {
response, err = queryTranslator.MakeSearchResponse(hits, model.Query{QueryInfo: queryInfo, Highlighter: highlighter})
} else if newAggregationHandlingUsed {
} else {
response = queryTranslator.MakeResponseAggregation(aggregations, aggregationResults)
responseHits, err = queryTranslator.MakeSearchResponse(hits, model.Query{QueryInfo: queryInfo, Highlighter: highlighter})
response.Hits = responseHits.Hits
}
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, queryInfo, hits)
Expand Down Expand Up @@ -508,7 +494,7 @@ func (q *QueryRunner) searchWorkerCommon(
logger.ErrorWithCtx(ctx).Msg(err.Error())
continue
}
if doPostProcessing {
if doPostProcessing && query.Type != nil {
rows = query.Type.PostprocessResults(rows)
}
hits = append(hits, rows)
Expand Down

0 comments on commit e19f6f4

Please sign in to comment.