Skip to content

Commit

Permalink
Order by: some_other_aggregation enhancements (#788)
Browse files Browse the repository at this point in the history
A couple of small enhancements here:
* During Kibana QA, I found out that it really happens a lot that in
`order by: some_other_aggregation`, this aggregation is not a direct
child, but some further descender in the aggregation tree, so added
support for it (before `order by: "agg1"` worked, now `order by:
"agg1>agg2>agg3"` also works (`agg2` is a child of `agg1`, `agg3` of
`agg2`, etc.)
* Before we had support for
a) `order by: "2"` (where `2` is a metric aggr with single value like
e.g. `avg`)
b) `order by: "2.10"` (where `2` is a `percentile[s|_ranks]` aggr, and
`10` is a percentile)
Here added also support for `2.count`, `2.std_deviation`, and other
stats from `stats` or `extended_stats` aggregations.
* Before we could only order by 1 expression, 2 or more weren't
supported. Fixed that.

Sorry for quite a big PR, but it turned out to be like that out of
necessity. E.g. I implemented the last point only because without it,
proper tests for previous points would need to be much larger 😆

---------

Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
trzysiek and jakozaur authored Oct 9, 2024
1 parent e2fe800 commit e3ca75a
Show file tree
Hide file tree
Showing 13 changed files with 1,070 additions and 361 deletions.
23 changes: 23 additions & 0 deletions quesma/model/metrics_aggregations/extended_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,26 @@ func (query ExtendedStats) getValue(row model.QueryResultRow, functionName strin
}
return row.Cols[column].Value
}

func (query ExtendedStats) ColumnIdx(name string) int {
nameToColumnIdx := map[string]int{
"count": 0,
"min": 1,
"max": 2,
"avg": 3,
"sum": 4,
"sum_of_squares": 5,
"variance": 6,
"variance_population": 6,
"variance_sampling": 7,
"std_deviation": 8,
"std_deviation_population": 8,
"std_deviation_sampling": 9,
}

if columnIdx, ok := nameToColumnIdx[name]; ok {
return columnIdx
}
logger.ErrorWithCtx(query.ctx).Msgf("extended_stats column %s not found", name)
return -1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package metrics_aggregations

// MultipleMetricColumnsInterface is an interface for metrics aggregations
// that have multiple columns in the response.
// It allows to get the index of the column by its name, e.g.
// "count", or "standard_deviation" for extended_stats, or "50" for quantile.
type MultipleMetricColumnsInterface interface {
ColumnIdx(name string) int
}
11 changes: 11 additions & 0 deletions quesma/model/metrics_aggregations/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,14 @@ func (query Quantile) createPercentileNameToReturn(percentileName string) string
}
return percentileName
}

func (query Quantile) ColumnIdx(name string) int {
for i, percentileName := range query.percentileNames {
if percentileName == name {
return i
}
}

logger.ErrorWithCtx(query.ctx).Msgf("quantile column %s not found", name)
return -1
}
11 changes: 11 additions & 0 deletions quesma/model/metrics_aggregations/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,14 @@ func (query Stats) TranslateSqlResponseToJson(rows []model.QueryResultRow) model
func (query Stats) String() string {
return "stats"
}

func (query Stats) ColumnIdx(name string) int {
for i, column := range statsColumnsInOrder {
if column == name {
return i
}
}

logger.ErrorWithCtx(query.ctx).Msgf("stats column %s not found", name)
return -1
}
2 changes: 1 addition & 1 deletion quesma/queryparser/pancake_aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (cw *ClickhouseQueryTranslator) PancakeParseAggregationJson(body types.JSON
// Phase 3: Generate SQL queries from pancake model
aggregationQueries := make([]*model.Query, 0)
for _, pancakeQuery := range pancakeQueries {
generator := &pancakeSqlQueryGenerator{}
generator := newPancakeSqlQueryGeneratorr(cw.Ctx)
dbQuery, err := generator.generateQuery(pancakeQuery)
if err != nil {
return nil, err
Expand Down
157 changes: 42 additions & 115 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
"quesma/util"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -320,78 +319,6 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
return
}

func (cw *ClickhouseQueryTranslator) pancakeFindMetricAggregation(queryMap QueryMap, aggregationName string) model.Expr {
notFoundValue := model.NewLiteral("")

aggsRaw, exists := queryMap["aggs"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msgf("no aggs in queryMap, queryMap: %+v", queryMap)
return notFoundValue
}
aggs, ok := aggsRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("aggs is not a map, but %T, value: %v. Skipping", aggsRaw, aggsRaw)
return notFoundValue
}

var percentileNameWeLookFor string
weTrySplitByDot := false

// We try 2 things here:
// First (always): maybe there exists an aggregation with exactly this name
// Second (if aggregation_name == X.Y): maybe it's aggregationName.some_value, e.g. "2.75", when "2" aggregation is a percentile, and 75 is its value
aggregationNamesToTry := []string{aggregationName}
splitByDot := strings.Split(aggregationName, ".")
if len(splitByDot) == 2 {
weTrySplitByDot = true
percentileNameWeLookFor = splitByDot[1]
aggregationNamesToTry = append(aggregationNamesToTry, splitByDot[0])
}

for _, aggNameToTry := range aggregationNamesToTry {
currentAggMapRaw, exists := aggs[aggNameToTry]
if !exists {
continue
}

currentAggMap, ok := currentAggMapRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("aggregation %s is not a map, but %T, value: %v. Skipping",
aggregationName, currentAggMapRaw, currentAggMapRaw)
continue
}

agg, success := cw.tryMetricsAggregation(currentAggMap)
if !success {
logger.WarnWithCtx(cw.Ctx).Msgf("failed to parse metric aggregation: %v", agg)
continue
}

// we build a temporary query only to extract the name of the metric
columns, err := generateMetricSelectedColumns(cw.Ctx, agg)
if err != nil {
continue
}

if aggNameToTry == aggregationName {
if len(columns) != 1 {
continue
}
return columns[0]
} else if weTrySplitByDot {
userPercents := util.MapKeysSortedByValue(agg.Percentiles)
for i, percentileName := range userPercents {
if percentileName == percentileNameWeLookFor {
return columns[i]
}
}
}
}

logger.ErrorWithCtx(cw.Ctx).Msgf("no given metric aggregation found (name: %v, queryMap: %+v)", aggregationName, queryMap)
return notFoundValue
}

// samplerRaw - in a proper request should be of QueryMap type.
func (cw *ClickhouseQueryTranslator) parseSampler(samplerRaw any) bucket_aggregations.Sampler {
const defaultSize = 100
Expand Down Expand Up @@ -420,60 +347,60 @@ func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bu
}

func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr {
defaultMainOrderBy := model.NewCountFunc()
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)

fieldOrderBys := make([]model.OrderByExpr, 0, len(fieldExpressions))
for _, fieldExpression := range fieldExpressions {
fieldOrderBys = append(fieldOrderBys, model.OrderByExpr{Expr: fieldExpression})
}

var mainOrderBy model.Expr = defaultMainOrderBy
fullOrderBy := []model.OrderByExpr{ // default
{Expr: mainOrderBy, Direction: defaultDirection},
}
fullOrderBy = append(fullOrderBy, fieldOrderBys...)
direction := defaultDirection

orderRaw, exists := terms["order"]
ordersRaw, exists := terms["order"]
if !exists {
return fullOrderBy
return []model.OrderByExpr{defaultOrderBy}
}

order, ok := orderRaw.(QueryMap) // TODO it can be array too, don't handle it yet
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("order is not a map, but %T, value: %v. Using default order", orderRaw, orderRaw)
return fullOrderBy
}
if len(order) != 1 {
logger.WarnWithCtx(cw.Ctx).Msgf("order should have 1 key, but has %d. Order: %+v. Using default", len(order), order)
return fullOrderBy
// order can be either a single order {}, or a list of such single orders [{}(,{}...)]
orders := make([]QueryMap, 0)
switch ordersTyped := ordersRaw.(type) {
case QueryMap:
orders = append(orders, ordersTyped)
case []any:
for _, order := range ordersTyped {
if orderTyped, ok := order.(QueryMap); ok {
orders = append(orders, orderTyped)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("invalid order: %v", order)
}
}
default:
logger.WarnWithCtx(cw.Ctx).Msgf("order is not a map/list of maps, but %T, value: %v. Using default order", ordersRaw, ordersRaw)
return []model.OrderByExpr{defaultOrderBy}
}

for key, valueRaw := range order { // value == "asc" or "desc"
value, ok := valueRaw.(string)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("order value is not a string, but %T, value: %v. Using default (desc)", valueRaw, valueRaw)
value = "desc"
}
if strings.ToLower(value) == "asc" {
direction = model.AscOrder
fullOrderBy := make([]model.OrderByExpr, 0)

for _, order := range orders {
if len(order) != 1 {
logger.WarnWithCtx(cw.Ctx).Msgf("invalid order length, should be 1: %v", order)
}
for key, valueRaw := range order { // value == "asc" or "desc"
value, ok := valueRaw.(string)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("order value is not a string, but %T, value: %v. Using default (desc)", valueRaw, valueRaw)
value = "desc"
}

if key == "_key" {
fullOrderBy = fieldOrderBys
for i := range fullOrderBy {
fullOrderBy[i].Direction = direction
direction := defaultDirection
if strings.ToLower(value) == "asc" {
direction = model.AscOrder
}
break // mainOrderBy remains default
} else if key != "_count" {
mainOrderBy = cw.pancakeFindMetricAggregation(queryMap, key)
}

fullOrderBy = []model.OrderByExpr{
{Expr: mainOrderBy, Direction: direction},
if key == "_key" {
for _, fieldExpression := range fieldExpressions {
fullOrderBy = append(fullOrderBy, model.OrderByExpr{Expr: fieldExpression, Direction: direction})
}
} else if key == "_count" {
fullOrderBy = append(fullOrderBy, model.NewOrderByExpr(model.NewCountFunc(), direction))
} else {
fullOrderBy = append(fullOrderBy, model.OrderByExpr{Expr: model.NewLiteral(key), Direction: direction})
}
}
fullOrderBy = append(fullOrderBy, fieldOrderBys...)
}

return fullOrderBy
Expand Down
105 changes: 105 additions & 0 deletions quesma/queryparser/pancake_order_by_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package queryparser

import (
"context"
"quesma/logger"
"quesma/model"
"quesma/model/metrics_aggregations"
"strings"
)

type pancakeOrderByTransformer struct {
ctx context.Context
}

func newPancakeOrderByTransformer(ctx context.Context) *pancakeOrderByTransformer {
return &pancakeOrderByTransformer{ctx: ctx}
}

// transformSingleOrderBy transforms a single order by expression, of query `query` and bucket aggregation `bucketAggrInternalName`.
// What it does, it finds metric aggregation that corresponds to the order by expression, and returns a new aliased expression
//
// TODO: maybe the same logic needs to be applied to pipeline aggregations, needs checking.
func (t *pancakeOrderByTransformer) transformSingleOrderBy(orderBy model.Expr, bucketAggregation *pancakeModelBucketAggregation, query *pancakeModel) *model.AliasedExpr {
fullPathToOrderByExprRaw, isPath := orderBy.(model.LiteralExpr)
if !isPath {
return nil
}

fullPathToOrderByExpr, ok := fullPathToOrderByExprRaw.Value.(string)
if !ok {
logger.ErrorWithCtx(t.ctx).Msgf("path to metric is not a string, but %T (val: %v)",
fullPathToOrderByExprRaw.Value, fullPathToOrderByExprRaw.Value)
return nil
}

// fullPathToOrderByExpr is in the form of "[aggr1][>aggr2...]>metric_aggr[.submetric]" ([] means optional)
// submetric: e.g. "percentiles.50", or "stats.sum", "extended_stats.std_deviation"
// Most metric aggregations don't have submetrics
var fullPathWithoutSubmetric, submetricName string
splitByDot := strings.Split(fullPathToOrderByExpr, ".")
switch len(splitByDot) {
case 1:
fullPathWithoutSubmetric = splitByDot[0]
case 2:
fullPathWithoutSubmetric, submetricName = splitByDot[0], splitByDot[1]
default:
logger.ErrorWithCtx(t.ctx).Msgf("path to metric is not valid: %s", fullPathToOrderByExpr)
return nil
}

foundLayerIdx := -1
for layerIdx, layer := range query.layers {
if layer.nextBucketAggregation == bucketAggregation {
foundLayerIdx = layerIdx
break
}
}
if foundLayerIdx == -1 {
logger.ErrorWithCtx(t.ctx).Msgf("bucket aggregation not found in query")
return nil
}
foundLayerIdx += 1
fullPath := strings.Split(fullPathWithoutSubmetric, ">")
path := fullPath

for len(path) > 1 {
if foundLayerIdx >= len(query.layers) {
logger.ErrorWithCtx(t.ctx).Msgf("out of layers in path: %s", fullPathToOrderByExpr)
return nil
}
if query.layers[foundLayerIdx].nextBucketAggregation == nil {
logger.ErrorWithCtx(t.ctx).Msgf("no bucket aggregation in path: %s", fullPathToOrderByExpr)
return nil
}
if query.layers[foundLayerIdx].nextBucketAggregation.name != path[0] {
logger.ErrorWithCtx(t.ctx).Msgf("bucket aggregation mismatch in path: %s, expected: %s, was: %s",
fullPathToOrderByExpr, path[0], query.layers[foundLayerIdx].nextBucketAggregation.name)
return nil
}
foundLayerIdx += 1
path = path[1:]
}

if foundLayerIdx >= len(query.layers) {
logger.ErrorWithCtx(t.ctx).Msgf("out of layers in path: %s", fullPathToOrderByExpr)
return nil
}

for _, metric := range query.layers[foundLayerIdx].currentMetricAggregations {
columnIdx := 0 // when no multiple columns, it must be 0
if multipleColumnsMetric, ok := metric.queryType.(metrics_aggregations.MultipleMetricColumnsInterface); ok {
columnIdx = multipleColumnsMetric.ColumnIdx(submetricName)
}

if metric.name == path[0] {
result := model.NewAliasedExpr(orderBy, metric.InternalNameForCol(columnIdx))
return &result
}
}

logger.ErrorWithCtx(t.ctx).Msgf("no metric found for path: %s", fullPathToOrderByExpr)
return nil
}
Loading

0 comments on commit e3ca75a

Please sign in to comment.