Skip to content

Commit

Permalink
Respect size (nr of buckets) (#454)
Browse files Browse the repository at this point in the history
PR which finishes "respect `size` when splitting `terms` aggregation
into buckets" issue.

Some example of triple bucketing (terms {terms {terms {} } })
aggregation, like in the testcase I added with this PR, showing that our
SQLs seem to be correct (what Clickhouse returns for each query is good)
We have such a table
<img width="1559" alt="Screenshot 2024-07-07 at 19 54 14"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/82b4c21d-c60a-4616-b8b3-791ff69d1334">
Topmost SQL for bucketing by `surname` (top 200 `surname`s) generated by
our code is simple:
<img width="1720" alt="Screenshot 2024-07-07 at 19 56 26"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/67ea6c14-c534-476b-a89d-0188581062f0">
Double nested bucketing (top 20 `limbName`s for each `surname`) is
<img width="1718" alt="Screenshot 2024-07-07 at 19 57 36"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/937ba2da-a776-4625-a3a4-4598edbf690e">
Triple nested bucketing (top 1 `organName` for each `surname, limbName`
pair)
<img width="1716" alt="Screenshot 2024-07-07 at 20 00 47"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/e1b8caee-fb1c-48fc-bcd0-e85f0d48dce3">

---------

Co-authored-by: Przemek Delewski <[email protected]>
Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2024
1 parent 20d4a1e commit 4871087
Show file tree
Hide file tree
Showing 20 changed files with 2,212 additions and 192 deletions.
9 changes: 9 additions & 0 deletions quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ func NewOrderByExprWithoutOrder(exprs ...Expr) OrderByExpr {
return OrderByExpr{Exprs: exprs, Direction: DefaultOrder}
}

// IsCountDesc returns true <=> this OrderByExpr is count() DESC
func (o OrderByExpr) IsCountDesc() bool {
if len(o.Exprs) != 1 || o.Direction != DescOrder {
return false
}
function, ok := o.Exprs[0].(FunctionExpr)
return ok && function.Name == "count"
}

func NewInfixExpr(lhs Expr, operator string, rhs Expr) InfixExpr {
return InfixExpr{lhs, operator, rhs}
}
Expand Down
75 changes: 65 additions & 10 deletions quesma/model/expr_string_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package model

import (
"fmt"
"quesma/logger"
"strconv"
"strings"
)
Expand Down Expand Up @@ -134,6 +135,33 @@ func (v *renderer) VisitAliasedExpr(e AliasedExpr) interface{} {
func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} {
// THIS SHOULD PRODUCE QUERY IN BRACES
var sb strings.Builder

const cteNamePrefix = "cte"
cteName := func(cteIdx int) string {
return fmt.Sprintf("%s_%d", cteNamePrefix, cteIdx+1)
}
cteFieldAlias := func(cteIdx, fieldIdx int) string {
return fmt.Sprintf("%s_%d_%d", cteNamePrefix, cteIdx+1, fieldIdx+1)
}
cteCountAlias := func(ctxIdx int) string {
return fmt.Sprintf("%s_%d_cnt", cteNamePrefix, ctxIdx+1)
}
if len(c.CTEs) > 0 {
CTEsStrings := make([]string, 0, len(c.CTEs))
for i, cte := range c.CTEs {
for j, col := range cte.Columns {
if _, alreadyAliased := cte.Columns[j].(AliasedExpr); !alreadyAliased {
cte.Columns[j] = AliasedExpr{Expr: col, Alias: cteFieldAlias(i, j)}
} else {
logger.Warn().Msgf("Subquery column already aliased: %s, %+v", AsString(col), col)
}
}
str := fmt.Sprintf("%s AS (%s)", cteName(i), AsString(cte))
CTEsStrings = append(CTEsStrings, str)
}
sb.WriteString(fmt.Sprintf("WITH %s ", strings.Join(CTEsStrings, ", ")))
}

sb.WriteString("SELECT ")
if c.IsDistinct {
sb.WriteString("DISTINCT ")
Expand Down Expand Up @@ -182,16 +210,25 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} {
/* HACK ALERT END */
if c.FromClause != nil { // here we have to handle nested
if nestedCmd, isNested := c.FromClause.(SelectCommand); isNested {
sb.WriteString("(")
sb.WriteString(AsString(nestedCmd))
sb.WriteString(")")
} else if nestedCmd, ok := c.FromClause.(*SelectCommand); ok {
sb.WriteString("(")
sb.WriteString(AsString(nestedCmd))
sb.WriteString(")")
sb.WriteString(fmt.Sprintf("(%s)", AsString(nestedCmd)))
} else if nestedCmdPtr, isNested := c.FromClause.(*SelectCommand); isNested {
sb.WriteString(fmt.Sprintf("(%s)", AsString(nestedCmdPtr)))
} else {
sb.WriteString(AsString(c.FromClause))
}
if len(c.CTEs) > 0 {
for cteIdx, cte := range c.CTEs {
sb.WriteString(" INNER JOIN ")
sb.WriteString(strconv.Quote(cteName(cteIdx)))
sb.WriteString(" ON ")
for colIdx := range len(cte.Columns) - 1 { // at least so far, last one is always count() or some other metric aggr, on which we don't need to GROUP BY
sb.WriteString(fmt.Sprintf("%s = %s", AsString(c.Columns[colIdx]), strconv.Quote(cteFieldAlias(cteIdx, colIdx))))
if colIdx < len(cte.Columns)-2 {
sb.WriteString(" AND ")
}
}
}
}
}
if c.WhereClause != nil {
sb.WriteString(" WHERE ")
Expand All @@ -207,20 +244,38 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} {
}
if len(groupBy) > 0 {
sb.WriteString(" GROUP BY ")
sb.WriteString(strings.Join(groupBy, ", "))
fullGroupBy := groupBy
for i := range c.CTEs {
fullGroupBy = append(fullGroupBy, cteCountAlias(i))
}
sb.WriteString(strings.Join(fullGroupBy, ", "))
}

orderBy := make([]string, 0, len(c.OrderBy))
orderByReplaced, orderByToReplace := 0, len(c.CTEs)
for _, col := range c.OrderBy {
orderBy = append(orderBy, AsString(col))
if col.IsCountDesc() && orderByReplaced < orderByToReplace {
orderBy = append(orderBy, fmt.Sprintf("%s DESC", cteCountAlias(orderByReplaced)))
orderByReplaced++
} else {
orderBy = append(orderBy, AsString(col))
}
}
if len(orderBy) > 0 {
sb.WriteString(" ORDER BY ")
sb.WriteString(strings.Join(orderBy, ", "))
}

if c.Limit != noLimit {
sb.WriteString(fmt.Sprintf(" LIMIT %d", c.Limit))
if len(c.LimitBy) <= 1 {
sb.WriteString(fmt.Sprintf(" LIMIT %d", c.Limit))
} else {
limitBys := make([]string, 0, len(c.LimitBy)-1)
for _, col := range c.LimitBy[:len(c.LimitBy)-1] {
limitBys = append(limitBys, AsString(col))
}
sb.WriteString(fmt.Sprintf(" LIMIT %d BY %s", c.Limit, strings.Join(limitBys, ", ")))
}
}

return sb.String()
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/highlighter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (v *highlighter) VisitSelectCommand(c SelectCommand) interface{} {
if c.WhereClause != nil {
where = c.WhereClause.Accept(v).(Expr)
}
return *NewSelectCommand(columns, groupBy, orderBy, from, where, c.Limit, c.SampleLimit, c.IsDistinct)
return *NewSelectCommand(columns, groupBy, orderBy, from, where, []Expr{}, c.Limit, c.SampleLimit, c.IsDistinct, c.CTEs)
}

func (v *highlighter) VisitWindowFunction(f WindowFunction) interface{} {
Expand Down
11 changes: 10 additions & 1 deletion quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,21 @@ func (q *Query) CopyAggregationFields(qwa Query) {
q.SelectCommand.GroupBy = make([]Expr, len(qwa.SelectCommand.GroupBy))
copy(q.SelectCommand.GroupBy, qwa.SelectCommand.GroupBy)

q.SelectCommand.OrderBy = make([]OrderByExpr, len(qwa.SelectCommand.OrderBy))
copy(q.SelectCommand.OrderBy, qwa.SelectCommand.OrderBy)

q.SelectCommand.LimitBy = make([]Expr, len(qwa.SelectCommand.LimitBy))
copy(q.SelectCommand.LimitBy, qwa.SelectCommand.LimitBy)

q.SelectCommand.Columns = make([]Expr, len(qwa.SelectCommand.Columns))
copy(q.SelectCommand.Columns, qwa.SelectCommand.Columns)

q.SelectCommand.OrderBy = make([]OrderByExpr, len(qwa.SelectCommand.OrderBy))
copy(q.SelectCommand.OrderBy, qwa.SelectCommand.OrderBy)

q.SelectCommand.CTEs = make([]SelectCommand, len(qwa.SelectCommand.CTEs))
copy(q.SelectCommand.CTEs, qwa.SelectCommand.CTEs)

q.Aggregators = make([]Aggregator, len(qwa.Aggregators))
copy(q.Aggregators, qwa.Aggregators)
}
Expand Down Expand Up @@ -116,7 +125,7 @@ func (q *Query) NewSelectExprWithRowNumber(selectFields []Expr, groupByFields []
"ROW_NUMBER", nil, groupByFields, orderByExpr,
), RowNumberColumnName))

return *NewSelectCommand(selectFields, nil, nil, q.SelectCommand.FromClause, whereClause, 0, 0, false)
return *NewSelectCommand(selectFields, nil, nil, q.SelectCommand.FromClause, whereClause, []Expr{}, 0, 0, false, []SelectCommand{})
}

// Aggregator is always initialized as "empty", so with SplitOverHowManyFields == 0, Keyed == false, Filters == false.
Expand Down
14 changes: 10 additions & 4 deletions quesma/model/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ type SelectCommand struct {
GroupBy []Expr // if not empty, we do GROUP BY GroupBy...
OrderBy []OrderByExpr // if not empty, we do ORDER BY OrderBy...

Limit int // LIMIT clause, noLimit (0) means no limit
SampleLimit int // LIMIT, but before grouping, 0 means no limit
LimitBy []Expr // LIMIT BY clause (empty => maybe LIMIT, but no LIMIT BY)
Limit int // LIMIT clause, noLimit (0) means no limit
SampleLimit int // LIMIT, but before grouping, 0 means no limit

CTEs []SelectCommand // Common Table Expressions, so these parts of query: WITH cte_1 AS SELECT ..., cte_2 AS SELECT ...
}

func NewSelectCommand(columns, groupBy []Expr, orderBy []OrderByExpr, from, where Expr, limit, sampleLimit int, isDistinct bool) *SelectCommand {
func NewSelectCommand(columns, groupBy []Expr, orderBy []OrderByExpr, from, where Expr, limitBy []Expr,
limit, sampleLimit int, isDistinct bool, CTEs []SelectCommand) *SelectCommand {
return &SelectCommand{
IsDistinct: isDistinct,

Expand All @@ -24,8 +28,10 @@ func NewSelectCommand(columns, groupBy []Expr, orderBy []OrderByExpr, from, wher
OrderBy: orderBy,
FromClause: from,
WhereClause: where,
LimitBy: limitBy,
Limit: limit,
SampleLimit: sampleLimit,
CTEs: CTEs,
}
}

Expand All @@ -35,7 +41,7 @@ func (c SelectCommand) Accept(v ExprVisitor) interface{} {
return v.VisitSelectCommand(c)
}

func (c SelectCommand) String() string {
func (c *SelectCommand) String() string {
// TODO - we might need to verify queries nested N-times (N>=3), perhaps this should strip the outermost braces
return AsString(c)
}
Expand Down
10 changes: 9 additions & 1 deletion quesma/optimize/trunc_date.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,16 @@ func (v *truncateDateVisitor) VisitSelectCommand(e model.SelectCommand) interfac
whereClause = e.WhereClause.Accept(v).(model.Expr)
}

var ctes []model.SelectCommand
if e.CTEs != nil {
ctes = make([]model.SelectCommand, 0)
for _, cte := range e.CTEs {
ctes = append(ctes, cte.Accept(v).(model.SelectCommand))
}
}

return model.NewSelectCommand(columns, groupBy, e.OrderBy,
fromClause, whereClause, e.Limit, e.SampleLimit, e.IsDistinct)
fromClause, whereClause, e.LimitBy, e.Limit, e.SampleLimit, e.IsDistinct, ctes)

}

Expand Down
73 changes: 34 additions & 39 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(prevAggr *aggrQueryBuilder
}

currentAggr := *prevAggr
currentAggr.SelectCommand.Limit = 0

// check if metadata's present
var metadata model.JsonMap
Expand Down Expand Up @@ -495,6 +496,22 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(prevAggr *aggrQueryBuilder
cw.processRangeAggregation(&currentAggr, Range, queryMap, resultQueries, metadata)
}

_, isTerms := currentAggr.Type.(bucket_aggregations.Terms)
if isTerms {
*resultQueries = append(*resultQueries, currentAggr.buildBucketAggregation(metadata))
cte := currentAggr.Query
cte.CopyAggregationFields(currentAggr.Query)
cte.SelectCommand.WhereClause = currentAggr.whereBuilder.WhereClause
cte.SelectCommand.Columns = append(cte.SelectCommand.Columns,
model.NewAliasedExpr(model.NewCountFunc(), fmt.Sprintf("cte_%d_cnt", len(currentAggr.SelectCommand.CTEs)+1))) // FIXME unify this name creation with one in model/expr_as_string
cte.SelectCommand.CTEs = nil // CTEs don't have CTEs themselves (so far, maybe that'll need to change)
if len(cte.SelectCommand.OrderBy) > 2 {
// we can reduce nr of ORDER BYs in CTEs. Last 2 seem to be always enough. Proper ordering is done anyway in the outer SELECT.
cte.SelectCommand.OrderBy = cte.SelectCommand.OrderBy[len(cte.SelectCommand.OrderBy)-2:]
}
currentAggr.SelectCommand.CTEs = append(currentAggr.SelectCommand.CTEs, cte.SelectCommand)
}

// TODO what happens if there's all: filters, range, and subaggregations at current level?
// We probably need to do |ranges| * |filters| * |subaggregations| queries, but we don't do that yet.
// Or probably a bit less, if optimized correctly.
Expand All @@ -514,7 +531,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(prevAggr *aggrQueryBuilder
}
delete(queryMap, "aggs") // no-op if no "aggs"

if bucketAggrPresent && !aggsHandledSeparately {
if bucketAggrPresent && !aggsHandledSeparately && !isTerms {
// range aggregation has separate, optimized handling
*resultQueries = append(*resultQueries, currentAggr.buildBucketAggregation(metadata))
}
Expand Down Expand Up @@ -723,6 +740,7 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery

currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, col)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, col)
currentAggr.SelectCommand.LimitBy = append(currentAggr.SelectCommand.LimitBy, col)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewOrderByExprWithoutOrder(col))

delete(queryMap, "histogram")
Expand Down Expand Up @@ -757,8 +775,6 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
if terms, ok := queryMap[termsType]; ok {
currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms")

isEmptyGroupBy := len(currentAggr.SelectCommand.GroupBy) == 0

// Parse 'missing' parameter. It can be any type.
var missingPlaceholder any
if m, ok := terms.(QueryMap); ok {
Expand All @@ -785,49 +801,28 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
fieldExpression = model.NewFunction("COALESCE", fieldExpression, value)
}

currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, fieldExpression)
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, fieldExpression)

orderByAdded := false
size := 10
if _, ok := queryMap["aggs"]; isEmptyGroupBy && !ok { // we can do limit only it terms are not nested
if jsonMap, ok := terms.(QueryMap); ok {
if sizeRaw, ok := jsonMap["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
if jsonMap, ok := terms.(QueryMap); ok {
if sizeRaw, ok := jsonMap["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}

}
currentAggr.SelectCommand.Limit = size
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewSortByCountColumn(model.DescOrder))
orderByAdded = true
}

currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms")
currentAggr.SelectCommand.Limit = size
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, fieldExpression)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, fieldExpression)
currentAggr.SelectCommand.LimitBy = append(currentAggr.SelectCommand.LimitBy, fieldExpression)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewSortByCountColumn(model.DescOrder))
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.OrderByExpr{Exprs: []model.Expr{fieldExpression}})

delete(queryMap, termsType)
if !orderByAdded {
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewOrderByExprWithoutOrder(fieldExpression))
}
return success, 1, nil
/* will remove later
var size int
if sizeRaw, exists := terms.(QueryMap)["size"]; exists {
size = (int)(sizeRaw.(float64))
} else {
size = bucket_aggregations.DefaultSize
}
currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, size, termsType == "significant_terms")
fieldName := strconv.Quote(cw.parseFieldField(terms, termsType))
currentAggr.GroupByFields = append(currentAggr.GroupByFields, fieldName)
currentAggr.NonSchemaFields = append(currentAggr.NonSchemaFields, fieldName)
currentAggr.SuffixClauses = append(currentAggr.SuffixClauses, fmt.Sprintf("LIMIT %d", size))
currentAggr.SubSelect = currentAggr.Query.String()
fmt.Println("SUB:", currentAggr.SubSelect)
delete(queryMap, termsType)
return success, 1, 1, nil
*/
}
}
if multiTermsRaw, exists := queryMap["multi_terms"]; exists {
Expand Down
Loading

0 comments on commit 4871087

Please sign in to comment.