Skip to content

Commit

Permalink
Sum_bucket aggregation (#156)
Browse files Browse the repository at this point in the history
Tests to this PR showed 2 issues in all pipeline aggregations:
* we should stop log warnings when numeric values are null
* so far all pipeline aggregation's "parent" was at the same level of
nesting. But it doesn't have to be the case, I'll need to fix this, it
shouldn't be very hard. I comment out the test for this so far.
But especially as those are issues in all types of pipeline aggr, I'd
fix those in separate PR.

Some screens:
<img width="1723" alt="Screenshot 2024-05-20 at 12 07 50"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/caeaf9b0-7d70-4a44-9ef2-753482a917cb">
<img width="1728" alt="Screenshot 2024-05-20 at 12 08 29"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/f8af46d4-8a38-480c-a1bf-8edec3a89b27">
  • Loading branch information
trzysiek authored May 22, 2024
1 parent 9846584 commit 28e911b
Show file tree
Hide file tree
Showing 8 changed files with 1,140 additions and 36 deletions.
3 changes: 3 additions & 0 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRo
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleAvgBucket(parentRowsOneBucket))
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/pipeline_aggregations/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (query Derivative) CalculateResultWhenMissing(qwa *model.Query, parentRows
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
Expand Down
10 changes: 6 additions & 4 deletions quesma/model/pipeline_aggregations/min_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ func (query MinBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, l
}
if returnMap, ok := rows[0].LastColValue().(model.JsonMap); ok {
return []model.JsonMap{returnMap}
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}

func (query MinBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
Expand All @@ -48,6 +47,9 @@ func (query MinBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows [
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleMinBucket(qwa, parentRowsOneBucket))
}
Expand Down Expand Up @@ -84,7 +86,7 @@ func (query MinBucket) calculateSingleMinBucket(qwa *model.Query, parentRows []m
if ok {
minValue = min(minValue, value)
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", row.LastColValue(), row.LastColValue())
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", row.LastColValue(), row.LastColValue())
}
}
resultValue = minValue
Expand Down
117 changes: 117 additions & 0 deletions quesma/model/pipeline_aggregations/sum_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package pipeline_aggregations

import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryprocessor"
"mitmproxy/quesma/util"
)

type SumBucket struct {
ctx context.Context
Parent string
}

func NewSumBucket(ctx context.Context, bucketsPath string) SumBucket {
return SumBucket{ctx: ctx, Parent: parseBucketsPathIntoParentAggregationName(ctx, bucketsPath)}
}

func (query SumBucket) IsBucketAggregation() bool {
return false
}

func (query SumBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for average bucket aggregation")
return []model.JsonMap{nil}
}
if len(rows) > 1 {
logger.WarnWithCtx(query.ctx).Msg("more than one row returned for average bucket aggregation")
}
if returnMap, ok := rows[0].LastColValue().(model.JsonMap); ok {
return []model.JsonMap{returnMap}
}
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}

func (query SumBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0)
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleSumBucket(parentRowsOneBucket))
}
return resultRows
}

// we're sure len(parentRows) > 0
func (query SumBucket) calculateSingleSumBucket(parentRows []model.QueryResultRow) model.QueryResultRow {
var resultValue any

firstNonNilIndex := -1
for i, row := range parentRows {
if row.LastColValue() != nil {
firstNonNilIndex = i
break
}
}
if firstNonNilIndex == -1 {
resultRow := parentRows[0].Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = model.JsonMap{
"value": resultValue,
}
return resultRow
}

if firstRowValueFloat, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[firstNonNilIndex].LastColValue()); firstRowValueIsFloat {
sum := firstRowValueFloat
for _, row := range parentRows[firstNonNilIndex+1:] {
value, ok := util.ExtractFloat64Maybe(row.LastColValue())
if ok {
sum += value
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", row.LastColValue(), row.LastColValue())
}
}
resultValue = sum
} else if firstRowValueInt, firstRowValueIsInt := util.ExtractInt64Maybe(parentRows[firstNonNilIndex].LastColValue()); firstRowValueIsInt {
sum := firstRowValueInt
for _, row := range parentRows[firstNonNilIndex+1:] {
value, ok := util.ExtractInt64Maybe(row.LastColValue())
if ok {
sum += value
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", row.LastColValue(), row.LastColValue())
}
}
resultValue = sum
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float or int: %v, type: %T. Returning nil.",
parentRows[firstNonNilIndex].LastColValue(), parentRows[firstNonNilIndex].LastColValue())
}

resultRow := parentRows[0].Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = model.JsonMap{
"value": resultValue,
}
return resultRow
}

func (query SumBucket) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}

func (query SumBucket) String() string {
return fmt.Sprintf("sum_bucket(%s)", query.Parent)
}
5 changes: 4 additions & 1 deletion quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,12 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
allTests = append(allTests, opensearch_visualize.PipelineAggregationTests...)
for i, test := range allTests {
t.Run(test.TestName+"("+strconv.Itoa(i)+")", func(t *testing.T) {
if i == 57 {
if test.TestName == "Max/Sum bucket with some null buckets. Reproduce: Visualize -> Vertical Bar: Metrics: Max (Sum) Bucket (Aggregation: Date Histogram, Metric: Min)" {
t.Skip("Needs to be fixed by keeping last key for every aggregation. Now we sometimes don't know it. Hard to reproduce, leaving it for separate PR")
}
if test.TestName == "complex sum_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Sum Bucket (Bucket: Date Histogram, Metric: Average), Buckets: X-Asis: Histogram" {
t.Skip("Waiting for fix. Now we handle only the case where pipeline agg is at the same nesting level as its parent. Should be quick to fix.")
}
if i > 26 && i <= 30 {
t.Skip("New tests, harder, failing for now. Fixes for them in 2 next PRs")
}
Expand Down
20 changes: 20 additions & 0 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap
}
if aggregationType, success = cw.parseMaxBucket(queryMap); success {
delete(queryMap, "max_bucket")
return
}
if aggregationType, success = cw.parseSumBucket(queryMap); success {
delete(queryMap, "sum_bucket")
return
}
return
}
Expand Down Expand Up @@ -95,6 +100,18 @@ func (cw *ClickhouseQueryTranslator) parseMaxBucket(queryMap QueryMap) (aggregat
return pipeline_aggregations.NewMaxBucket(cw.Ctx, bucketsPath), true
}

func (cw *ClickhouseQueryTranslator) parseSumBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
sumBucketRaw, exists := queryMap["sum_bucket"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(sumBucketRaw, "sum_bucket")
if !ok {
return
}
return pipeline_aggregations.NewSumBucket(cw.Ctx, bucketsPath), true
}

func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
bucketScriptRaw, exists := queryMap["bucket_script"]
if !exists {
Expand Down Expand Up @@ -205,6 +222,9 @@ func (b *aggrQueryBuilder) finishBuildingAggregationPipeline(aggregationType mod
case pipeline_aggregations.MaxBucket:
query.NoDBQuery = true
query.Parent = aggrType.Parent
case pipeline_aggregations.SumBucket:
query.NoDBQuery = true
query.Parent = aggrType.Parent
}
return query
}
Loading

0 comments on commit 28e911b

Please sign in to comment.