Skip to content

Commit

Permalink
Respect order (#504)
Browse files Browse the repository at this point in the history
Checked the dashboards at the end, and they seem still fine.
  • Loading branch information
trzysiek authored Jul 17, 2024
1 parent f858dc7 commit c7f78e7
Show file tree
Hide file tree
Showing 17 changed files with 2,568 additions and 211 deletions.
2 changes: 1 addition & 1 deletion quesma/model/base_visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (v *BaseExprVisitor) VisitOrderByExpr(e OrderByExpr) interface{} {
if v.OverrideVisitOrderByExpr != nil {
return v.OverrideVisitOrderByExpr(v, e)
}
return OrderByExpr{Exprs: v.VisitChildren(e.Exprs), Direction: e.Direction}
return OrderByExpr{Exprs: v.VisitChildren(e.Exprs), Direction: e.Direction, ExchangeToAliasInCTE: e.ExchangeToAliasInCTE}
}

func (v *BaseExprVisitor) VisitDistinctExpr(e DistinctExpr) interface{} {
Expand Down
5 changes: 3 additions & 2 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
type Terms struct {
ctx context.Context
significant bool // true <=> significant_terms, false <=> terms
OrderByExpr model.Expr
}

func NewTerms(ctx context.Context, significant bool) Terms {
return Terms{ctx: ctx, significant: significant}
func NewTerms(ctx context.Context, significant bool, orderByExpr model.Expr) Terms {
return Terms{ctx: ctx, significant: significant, OrderByExpr: orderByExpr}
}

func (query Terms) IsBucketAggregation() bool {
Expand Down
5 changes: 3 additions & 2 deletions quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ const (
)

type OrderByExpr struct {
Exprs []Expr
Direction OrderByDirection
Exprs []Expr
Direction OrderByDirection
ExchangeToAliasInCTE bool
}

func (o OrderByExpr) Accept(v ExprVisitor) interface{} {
Expand Down
3 changes: 2 additions & 1 deletion quesma/model/expr_string_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} {
} else {
sb.WriteString(AsString(c.FromClause))
}

if len(c.CTEs) > 0 {
for cteIdx, cte := range c.CTEs {
sb.WriteString(" INNER JOIN ")
Expand Down Expand Up @@ -254,7 +255,7 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} {
orderBy := make([]string, 0, len(c.OrderBy))
orderByReplaced, orderByToReplace := 0, len(c.CTEs)
for _, col := range c.OrderBy {
if col.IsCountDesc() && orderByReplaced < orderByToReplace {
if col.ExchangeToAliasInCTE && orderByReplaced < orderByToReplace {
orderBy = append(orderBy, fmt.Sprintf("%s DESC", cteCountAlias(orderByReplaced)))
orderByReplaced++
} else {
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ type SelectCommand struct {
func NewSelectCommand(columns, groupBy []Expr, orderBy []OrderByExpr, from, where Expr, limitBy []Expr,
limit, sampleLimit int, isDistinct bool, CTEs []*SelectCommand) *SelectCommand {
return &SelectCommand{
IsDistinct: isDistinct,

IsDistinct: isDistinct,
Columns: columns,
GroupBy: groupBy,
OrderBy: orderBy,
Expand Down
182 changes: 127 additions & 55 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ func updateInnerQueryColumns(query model.SelectCommand, whereClause model.Expr)
return query
}

/* code from my previous approach to this issue. Let's keep for now, 95% it'll be not needed, I'll remove it then.
func (b *aggrQueryBuilder) applyTermsSubSelect(terms bucket_aggregations.Terms) {
termsField := b.Query.GroupByFields[len(b.Query.GroupByFields)-1]
pp.Println(b, terms, termsField, b.Query.String())
whereLimitStmt := fmt.Sprintf("%s IN (%s)", termsField, b.String())
fmt.Println("WHERE LIMIT STMT:", whereLimitStmt)
fmt.Println("where before:", b.whereBuilder.Sql.Stmt)
b.whereBuilder = combineWheres(b.whereBuilder, newSimpleQuery(NewSimpleStatement(whereLimitStmt), true))
fmt.Println("where after:", b.whereBuilder.Sql.Stmt)
}
*/

func (b *aggrQueryBuilder) buildAggregationCommon(metadata model.JsonMap) *model.Query {
query := b.Query
query.SelectCommand.WhereClause = b.whereBuilder.WhereClause
Expand Down Expand Up @@ -461,14 +449,14 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(prevAggr *aggrQueryBuilder
cw.processRangeAggregation(&currentAggr, Range, queryMap, resultQueries, metadata)
}

_, isTerms := currentAggr.Type.(bucket_aggregations.Terms)
terms, isTerms := currentAggr.Type.(bucket_aggregations.Terms)
if isTerms {
*resultQueries = append(*resultQueries, currentAggr.buildBucketAggregation(metadata))
cte := currentAggr.Query
cte.CopyAggregationFields(currentAggr.Query)
cte.SelectCommand.WhereClause = currentAggr.whereBuilder.WhereClause
cte.SelectCommand.Columns = append(cte.SelectCommand.Columns,
model.NewAliasedExpr(model.NewCountFunc(), fmt.Sprintf("cte_%d_cnt", len(currentAggr.SelectCommand.CTEs)+1))) // FIXME unify this name creation with one in model/expr_as_string
model.NewAliasedExpr(terms.OrderByExpr, fmt.Sprintf("cte_%d_cnt", len(currentAggr.SelectCommand.CTEs)+1))) // FIXME unify this name creation with one in model/expr_as_string
cte.SelectCommand.CTEs = nil // CTEs don't have CTEs themselves (so far, maybe that'll need to change)
if len(cte.SelectCommand.OrderBy) > 2 {
// we can reduce nr of ORDER BYs in CTEs. Last 2 seem to be always enough. Proper ordering is done anyway in the outer SELECT.
Expand Down Expand Up @@ -731,64 +719,111 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
sqlQuery := dateHistogramAggr.GenerateSQL()
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, sqlQuery)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, sqlQuery)
currentAggr.SelectCommand.LimitBy = append(currentAggr.SelectCommand.LimitBy, sqlQuery)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewOrderByExprWithoutOrder(sqlQuery))

delete(queryMap, "date_histogram")
return success, 1, nil
}
for _, termsType := range []string{"terms", "significant_terms"} {
if terms, ok := queryMap[termsType]; ok {
currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms")

// Parse 'missing' parameter. It can be any type.
var missingPlaceholder any
if m, ok := terms.(QueryMap); ok {
if m["missing"] != nil {
missingPlaceholder = m["missing"]
}
}
termsRaw, ok := queryMap[termsType]
if !ok {
continue
}
terms, ok := termsRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a map, but %T, value: %v", termsType, termsRaw, termsRaw)
continue
}

fieldExpression := cw.parseFieldField(terms, termsType)
// Parse 'missing' parameter. It can be any type.
var missingPlaceholder any
if terms["missing"] != nil {
missingPlaceholder = terms["missing"]
}

// apply missing placeholder if it is set
if missingPlaceholder != nil {
var value model.LiteralExpr
fieldExpression := cw.parseFieldField(terms, termsType)

// Maybe we should check the input type against the schema?
// Right now we quote if it's a string.
switch val := missingPlaceholder.(type) {
case string:
value = model.NewLiteral("'" + val + "'")
default:
value = model.NewLiteral(missingPlaceholder)
}
// apply missing placeholder if it is set
if missingPlaceholder != nil {
var value model.LiteralExpr

fieldExpression = model.NewFunction("COALESCE", fieldExpression, value)
// Maybe we should check the input type against the schema?
// Right now we quote if it's a string.
switch val := missingPlaceholder.(type) {
case string:
value = model.NewLiteral("'" + val + "'")
default:
value = model.NewLiteral(missingPlaceholder)
}

size := 10
if jsonMap, ok := terms.(QueryMap); ok {
if sizeRaw, ok := jsonMap["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
fieldExpression = model.NewFunction("COALESCE", fieldExpression, value)
}

}
size := 10
if sizeRaw, ok := terms["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
}

currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms")
currentAggr.SelectCommand.Limit = size
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, fieldExpression)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, fieldExpression)
currentAggr.SelectCommand.LimitBy = append(currentAggr.SelectCommand.LimitBy, fieldExpression)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewSortByCountColumn(model.DescOrder))
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.OrderByExpr{Exprs: []model.Expr{fieldExpression}})
defaultMainOrderBy := model.NewCountFunc()
defaultDirection := model.DescOrder

delete(queryMap, termsType)
return success, 1, nil
var mainOrderBy model.Expr = defaultMainOrderBy
fullOrderBy := []model.OrderByExpr{ // default
{Exprs: []model.Expr{mainOrderBy}, Direction: defaultDirection, ExchangeToAliasInCTE: true},
{Exprs: []model.Expr{fieldExpression}},
}
direction := defaultDirection
if orderRaw, exists := terms["order"]; exists {
if order, ok := orderRaw.(QueryMap); ok { // TODO it can be array too, don't handle it yet
if len(order) == 1 {
for key, valueRaw := range order { // value == "asc" or "desc"
value, ok := valueRaw.(string)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("order value is not a string, but %T, value: %v. Using default (desc)", valueRaw, valueRaw)
value = "desc"
}
if strings.ToLower(value) == "asc" {
direction = model.AscOrder
}

if key == "_key" {
fullOrderBy = []model.OrderByExpr{{Exprs: []model.Expr{fieldExpression}, Direction: direction}}
break // mainOrderBy remains default
} else if key != "_count" {
mainOrderBy = cw.findMetricAggregation(queryMap, key, currentAggr)
}

fullOrderBy = []model.OrderByExpr{
{Exprs: []model.Expr{mainOrderBy}, Direction: direction, ExchangeToAliasInCTE: true},
{Exprs: []model.Expr{fieldExpression}},
}
}
} else {
logger.ErrorWithCtx(cw.Ctx).Msgf("order has more than 1 key, but %d. Order: %+v. Using default", len(order), order)
}
} else {
logger.ErrorWithCtx(cw.Ctx).Msgf("order is not a map, but %T, value: %v. Using default order", orderRaw, orderRaw)
}
}

currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", mainOrderBy)
currentAggr.SelectCommand.Limit = size
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, fieldExpression)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, fieldExpression)
currentAggr.SelectCommand.LimitBy = append(currentAggr.SelectCommand.LimitBy, fieldExpression)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, fullOrderBy...)
if missingPlaceholder == nil { // TODO replace with schema
currentAggr.whereBuilder = model.CombineWheres(cw.Ctx, currentAggr.whereBuilder,
model.NewSimpleQuery(model.NewInfixExpr(fieldExpression, "IS", model.NewLiteral("NOT NULL")), true))
}

delete(queryMap, termsType)
return success, 1, nil
}
if multiTermsRaw, exists := queryMap["multi_terms"]; exists {
multiTerms, ok := multiTermsRaw.(QueryMap)
Expand Down Expand Up @@ -1080,6 +1115,43 @@ func (cw *ClickhouseQueryTranslator) parseMinDocCount(queryMap QueryMap) int {
return bucket_aggregations.DefaultMinDocCount
}

func (cw *ClickhouseQueryTranslator) findMetricAggregation(queryMap QueryMap, aggregationName string, currentAggr *aggrQueryBuilder) model.Expr {
notFoundValue := model.NewLiteral("")
aggsRaw, exists := queryMap["aggs"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msgf("no aggs in queryMap, queryMap: %+v", queryMap)
return notFoundValue
}
aggs, ok := aggsRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("aggs is not a map, but %T, value: %v. Skipping", aggsRaw, aggsRaw)
return notFoundValue
}
if aggMapRaw, exists := aggs[aggregationName]; exists {
aggMap, ok := aggMapRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("aggregation %s is not a map, but %T, value: %v. Skipping", aggregationName, aggMapRaw, aggMapRaw)
return notFoundValue
}

agg, success := cw.tryMetricsAggregation(aggMap)
if !success {
logger.WarnWithCtx(cw.Ctx).Msgf("failed to parse metric aggregation: %v", agg)
return notFoundValue
}

// we build a temporary query only to extract the name of the metric
tmpQuery := currentAggr.buildMetricsAggregation(agg, model.NoMetadataField)
if len(tmpQuery.SelectCommand.Columns) != len(currentAggr.SelectCommand.Columns)+1 {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected number of columns in metric aggregation: %d, expected %d",
len(tmpQuery.SelectCommand.Columns), len(currentAggr.SelectCommand.Columns)+1)
return notFoundValue
}
return tmpQuery.SelectCommand.Columns[len(tmpQuery.SelectCommand.Columns)-1]
}
return notFoundValue
}

// quoteArray returns a new array with the same elements, but quoted
func quoteArray(array []string) []string {
quotedArray := make([]string, 0, len(array))
Expand Down
Loading

0 comments on commit c7f78e7

Please sign in to comment.