Skip to content

Commit

Permalink
Add avg_bucket aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed May 17, 2024
1 parent 26d0ca9 commit 089ef0c
Show file tree
Hide file tree
Showing 11 changed files with 1,006 additions and 137 deletions.
109 changes: 109 additions & 0 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package pipeline_aggregations

import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryprocessor"
"mitmproxy/quesma/util"
"strings"
)

type AverageBucket struct {
ctx context.Context
Parent string
}

func NewAverageBucket(ctx context.Context, bucketsPath string) AverageBucket {
const delimiter = ">"
withoutUnnecessarySuffix, _ := strings.CutSuffix(bucketsPath, delimiter+BucketsPathCount)
lastDelimiterIdx := strings.LastIndex(withoutUnnecessarySuffix, delimiter)
var parent string
if lastDelimiterIdx+1 < len(withoutUnnecessarySuffix) {
parent = withoutUnnecessarySuffix[lastDelimiterIdx+1:]
} else {
logger.WarnWithCtx(ctx).Msgf("invalid bucketsPath: %s, withoutUnnecessarySuffix: %s. Using empty string as parent.", bucketsPath, withoutUnnecessarySuffix)
parent = ""
}
return AverageBucket{ctx: ctx, Parent: parent}
}

func (query AverageBucket) IsBucketAggregation() bool {
return false
}

func (query AverageBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for average bucket aggregation")
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
}

func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0)
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleAvgBucket(parentRowsOneBucket))
}
return resultRows
}

// we're sure len(parentRows) > 0
func (query AverageBucket) calculateSingleAvgBucket(parentRows []model.QueryResultRow) model.QueryResultRow {
if len(parentRows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no parent rows, should NEVER happen")
return model.QueryResultRow{}
}

var resultValue float64
rowsCnt := 0
if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
sum := 0.0
for _, parentRow := range parentRows {
value, ok := util.ExtractFloat64Maybe(parentRow.LastColValue())
if ok {
sum += value
rowsCnt++
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
}
resultValue = sum / float64(rowsCnt)
} else {
var sum int64
for _, parentRow := range parentRows {
value, ok := util.ExtractInt64Maybe(parentRow.LastColValue())
if ok {
sum += value
rowsCnt++
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
}
resultValue = float64(sum) / float64(rowsCnt)
}

resultRow := parentRows[0].Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
}

func (query AverageBucket) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}

func (query AverageBucket) String() string {
return fmt.Sprintf("avg_bucket(%s)", query.Parent)
}
7 changes: 3 additions & 4 deletions quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
)

type BucketScript struct {
ctx context.Context
index string // name of the index (table)
ctx context.Context
}

func NewBucketScript(ctx context.Context) BucketScript {
Expand All @@ -31,8 +30,8 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow
return response
}

func (query BucketScript) CalculateResultWhenMissing(model.QueryResultRow, []model.QueryResultRow) model.QueryResultRow {
return model.NewQueryResultRowEmpty(query.index)
func (query BucketScript) CalculateResultWhenMissing(*model.Query, []model.QueryResultRow) []model.QueryResultRow {
return []model.QueryResultRow{}
}

func (query BucketScript) String() string {
Expand Down
62 changes: 29 additions & 33 deletions quesma/model/pipeline_aggregations/cumulative_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ type CumulativeSum struct {
}

func NewCumulativeSum(ctx context.Context, bucketsPath string) CumulativeSum {
isCount := bucketsPath == bucketsPathCount
isCount := bucketsPath == BucketsPathCount
return CumulativeSum{ctx: ctx, Parent: bucketsPath, IsCount: isCount}
}

const bucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name
const BucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name

func (query CumulativeSum) IsBucketAggregation() bool {
return false
Expand All @@ -43,44 +43,40 @@ func (query CumulativeSum) TranslateSqlResponseToJson(rows []model.QueryResultRo
return response
}

func (query CumulativeSum) CalculateResultWhenMissing(rowIndex int, parentRows []model.QueryResultRow, previousResultsCurrentAggregation []model.QueryResultRow) model.QueryResultRow {
resultRow := parentRows[rowIndex].Copy() // result is the same as parent, with an exception of last element, which we'll change below
parentValue := parentRows[rowIndex].Cols[len(parentRows[rowIndex].Cols)-1].Value
var resultValue any
if rowIndex == 0 {
resultValue = parentValue
} else {
// I don't check types too much, they are expected to be numeric, so either floats or ints.
// I propose to keep it this way until at least one case arises as this method can be called a lot of times.
previousValue := previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols[len(previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols)-1].Value
parentValueAsFloat, ok := util.ExtractFloat64Maybe(parentValue)
if ok {
previousValueAsFloat, ok := util.ExtractFloat64Maybe(previousValue)
func (query CumulativeSum) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
if len(parentRows) == 0 {
return resultRows
}

if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
sum := 0.0
for _, parentRow := range parentRows {
value, ok := util.ExtractFloat64Maybe(parentRow.LastColValue())
if ok {
resultValue = parentValueAsFloat + previousValueAsFloat
sum += value
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to float: %v, parentValue: %v", previousValue, parentValue)
resultValue = previousValue
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
} else {
previousValueAsInt, okPrevious := util.ExtractInt64Maybe(previousValue)
parentValueAsInt, okParent := util.ExtractInt64Maybe(parentValue)
if okPrevious && okParent {
resultValue = parentValueAsInt + previousValueAsInt
} else if okPrevious {
logger.WarnWithCtx(query.ctx).Msgf("could not convert parent value to int: %v, previousValue: %v. Using previousValue as sum", parentValue, previousValue)
resultValue = previousValue
} else if okParent {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to int: %v, parentValue: %v. Starting sum from 0", previousValue, parentValue)
resultValue = parentValue
resultRow := parentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = sum
resultRows = append(resultRows, resultRow)
}
} else { // cumulative sum must be on numeric, so if it's not float64, it should always be int
var sum int64
for _, parentRow := range parentRows {
value, ok := util.ExtractInt64Maybe(parentRow.LastColValue())
if ok {
sum += value
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous and parent value to int, previousValue: %v, parentValue: %v. Using nil as result", previousValue, parentValue)
resultValue = nil
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
resultRow := parentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = sum
resultRows = append(resultRows, resultRow)
}
}
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
return resultRows
}

func (query CumulativeSum) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/pipeline_aggregations/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Derivative struct {
}

func NewDerivative(ctx context.Context, bucketsPath string) Derivative {
isCount := bucketsPath == bucketsPathCount
isCount := bucketsPath == BucketsPathCount
return Derivative{ctx: ctx, Parent: bucketsPath, IsCount: isCount}
}

Expand Down
4 changes: 4 additions & 0 deletions quesma/model/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,7 @@ func (r *QueryResultRow) Copy() QueryResultRow {
copy(newCols, r.Cols)
return QueryResultRow{Index: r.Index, Cols: newCols}
}

func (r *QueryResultRow) LastColValue() any {
return r.Cols[len(r.Cols)-1].Value
}
3 changes: 1 addition & 2 deletions quesma/model/query_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ type PipelineQueryType interface {

// CalculateResultWhenMissing calculates the result of this aggregation when it's a NoDBQuery
// (we don't query the DB for the results, but calculate them from the parent aggregation)
// (it'll be changed in already done later PRs, so I don't comment on arguments, etc.)
CalculateResultWhenMissing(rowIndex int, parentRows []QueryResultRow, thisAggrPreviousResults []QueryResultRow) QueryResultRow
CalculateResultWhenMissing(query *Query, parentRows []QueryResultRow) []QueryResultRow

String() string
}
Expand Down
72 changes: 45 additions & 27 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap
delete(queryMap, "derivative")
return
}
if aggregationType, success = cw.parseAverageBucket(queryMap); success {
delete(queryMap, "avg_bucket")
return
}
return
}

Expand All @@ -29,23 +33,10 @@ func (cw *ClickhouseQueryTranslator) parseCumulativeSum(queryMap QueryMap) (aggr
if !exists {
return
}

cumulativeSum, ok := cumulativeSumRaw.(QueryMap)
bucketsPath, ok := cw.parseBucketsPath(cumulativeSumRaw, "cumulative_sum")
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("cumulative_sum is not a map, but %T, value: %v", cumulativeSumRaw, cumulativeSumRaw)
return
}
bucketsPathRaw, exists := cumulativeSum["buckets_path"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in cumulative_sum")
return
}
bucketsPath, ok := bucketsPathRaw.(string)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v", bucketsPathRaw, bucketsPathRaw)
return
}

return pipeline_aggregations.NewCumulativeSum(cw.Ctx, bucketsPath), true
}

Expand Down Expand Up @@ -88,18 +79,12 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
}

// if ["buckets_path"] != "_count", skip the aggregation
if bucketsPathRaw, exists := bucketScript["buckets_path"]; exists {
if bucketsPath, ok := bucketsPathRaw.(string); ok {
if bucketsPath != "_count" {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath)
return
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v. Skipping this aggregation", bucketsPathRaw, bucketsPathRaw)
return
}
} else {
logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in bucket_script. Skipping this aggregation")
bucketsPath, ok := cw.parseBucketsPath(bucketScript, "bucket_script")
if !ok {
return
}
if bucketsPath != pipeline_aggregations.BucketsPathCount {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath)
return
}

Expand Down Expand Up @@ -133,6 +118,37 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
return pipeline_aggregations.NewBucketScript(cw.Ctx), true
}

func (cw *ClickhouseQueryTranslator) parseAverageBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
avgBucketRaw, exists := queryMap["avg_bucket"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(avgBucketRaw, "avg_bucket")
if !ok {
return
}
return pipeline_aggregations.NewAverageBucket(cw.Ctx, bucketsPath), true
}

func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPath string, success bool) {
queryMap, ok := shouldBeQueryMap.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a map, but %T, value: %v", aggregationName, shouldBeQueryMap, shouldBeQueryMap)
return
}
bucketsPathRaw, exists := queryMap["buckets_path"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in avg_bucket")
return
}
bucketsPath, ok = bucketsPathRaw.(string)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v", bucketsPathRaw, bucketsPathRaw)
return
}
return bucketsPath, true
}

func (b *aggrQueryBuilder) buildPipelineAggregation(aggregationType model.QueryType, metadata model.JsonMap) model.Query {
query := b.buildAggregationCommon(metadata)
query.Type = aggregationType
Expand All @@ -142,7 +158,6 @@ func (b *aggrQueryBuilder) buildPipelineAggregation(aggregationType model.QueryT
case pipeline_aggregations.CumulativeSum:
query.NoDBQuery = true
if aggrType.IsCount {
query.NonSchemaFields = append(query.NonSchemaFields, "count()")
if len(query.Aggregators) < 2 {
logger.WarnWithCtx(b.ctx).Msg("cumulative_sum with count as parent, but no parent aggregation found")
}
Expand All @@ -161,6 +176,9 @@ func (b *aggrQueryBuilder) buildPipelineAggregation(aggregationType model.QueryT
} else {
query.Parent = aggrType.Parent
}
case pipeline_aggregations.AverageBucket:
query.NoDBQuery = true
query.Parent = aggrType.Parent
}
return query
}
Loading

0 comments on commit 089ef0c

Please sign in to comment.