Skip to content

Commit

Permalink
Average bucket aggregation (#102)
Browse files Browse the repository at this point in the history
Some example charts:
<img width="1725" alt="Screenshot 2024-05-16 at 16 42 15"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/4a2d4731-deb6-4370-84af-399463a04c3c">
<img width="1720" alt="Screenshot 2024-05-16 at 16 40 43"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/89d01eb3-9ba1-4e66-9e19-efea6384c87d">
  • Loading branch information
trzysiek authored May 17, 2024
1 parent 26d0ca9 commit abc9bff
Show file tree
Hide file tree
Showing 11 changed files with 1,051 additions and 159 deletions.
109 changes: 109 additions & 0 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package pipeline_aggregations

import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryprocessor"
"mitmproxy/quesma/util"
"strings"
)

type AverageBucket struct {
ctx context.Context
Parent string
}

func NewAverageBucket(ctx context.Context, bucketsPath string) AverageBucket {
const delimiter = ">"
withoutUnnecessarySuffix, _ := strings.CutSuffix(bucketsPath, delimiter+BucketsPathCount)
lastDelimiterIdx := strings.LastIndex(withoutUnnecessarySuffix, delimiter)
var parent string
if lastDelimiterIdx+1 < len(withoutUnnecessarySuffix) {
parent = withoutUnnecessarySuffix[lastDelimiterIdx+1:]
} else {
logger.WarnWithCtx(ctx).Msgf("invalid bucketsPath: %s, withoutUnnecessarySuffix: %s. Using empty string as parent.", bucketsPath, withoutUnnecessarySuffix)
parent = ""
}
return AverageBucket{ctx: ctx, Parent: parent}
}

func (query AverageBucket) IsBucketAggregation() bool {
return false
}

func (query AverageBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for average bucket aggregation")
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
}

func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0)
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleAvgBucket(parentRowsOneBucket))
}
return resultRows
}

// we're sure len(parentRows) > 0
func (query AverageBucket) calculateSingleAvgBucket(parentRows []model.QueryResultRow) model.QueryResultRow {
if len(parentRows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no parent rows, should NEVER happen")
return model.QueryResultRow{}
}

var resultValue float64
rowsCnt := 0
if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
sum := 0.0
for _, parentRow := range parentRows {
value, ok := util.ExtractFloat64Maybe(parentRow.LastColValue())
if ok {
sum += value
rowsCnt++
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
}
resultValue = sum / float64(rowsCnt)
} else {
var sum int64
for _, parentRow := range parentRows {
value, ok := util.ExtractInt64Maybe(parentRow.LastColValue())
if ok {
sum += value
rowsCnt++
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
}
resultValue = float64(sum) / float64(rowsCnt)
}

resultRow := parentRows[0].Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
}

func (query AverageBucket) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}

func (query AverageBucket) String() string {
return fmt.Sprintf("avg_bucket(%s)", query.Parent)
}
7 changes: 3 additions & 4 deletions quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
)

type BucketScript struct {
ctx context.Context
index string // name of the index (table)
ctx context.Context
}

func NewBucketScript(ctx context.Context) BucketScript {
Expand All @@ -31,8 +30,8 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow
return response
}

func (query BucketScript) CalculateResultWhenMissing(model.QueryResultRow, []model.QueryResultRow) model.QueryResultRow {
return model.NewQueryResultRowEmpty(query.index)
func (query BucketScript) CalculateResultWhenMissing(*model.Query, []model.QueryResultRow) []model.QueryResultRow {
return []model.QueryResultRow{}
}

func (query BucketScript) String() string {
Expand Down
62 changes: 29 additions & 33 deletions quesma/model/pipeline_aggregations/cumulative_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ type CumulativeSum struct {
}

func NewCumulativeSum(ctx context.Context, bucketsPath string) CumulativeSum {
isCount := bucketsPath == bucketsPathCount
isCount := bucketsPath == BucketsPathCount
return CumulativeSum{ctx: ctx, Parent: bucketsPath, IsCount: isCount}
}

const bucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name
const BucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name

func (query CumulativeSum) IsBucketAggregation() bool {
return false
Expand All @@ -43,44 +43,40 @@ func (query CumulativeSum) TranslateSqlResponseToJson(rows []model.QueryResultRo
return response
}

func (query CumulativeSum) CalculateResultWhenMissing(rowIndex int, parentRows []model.QueryResultRow, previousResultsCurrentAggregation []model.QueryResultRow) model.QueryResultRow {
resultRow := parentRows[rowIndex].Copy() // result is the same as parent, with an exception of last element, which we'll change below
parentValue := parentRows[rowIndex].Cols[len(parentRows[rowIndex].Cols)-1].Value
var resultValue any
if rowIndex == 0 {
resultValue = parentValue
} else {
// I don't check types too much, they are expected to be numeric, so either floats or ints.
// I propose to keep it this way until at least one case arises as this method can be called a lot of times.
previousValue := previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols[len(previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols)-1].Value
parentValueAsFloat, ok := util.ExtractFloat64Maybe(parentValue)
if ok {
previousValueAsFloat, ok := util.ExtractFloat64Maybe(previousValue)
func (query CumulativeSum) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
if len(parentRows) == 0 {
return resultRows
}

if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
sum := 0.0
for _, parentRow := range parentRows {
value, ok := util.ExtractFloat64Maybe(parentRow.LastColValue())
if ok {
resultValue = parentValueAsFloat + previousValueAsFloat
sum += value
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to float: %v, parentValue: %v", previousValue, parentValue)
resultValue = previousValue
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
} else {
previousValueAsInt, okPrevious := util.ExtractInt64Maybe(previousValue)
parentValueAsInt, okParent := util.ExtractInt64Maybe(parentValue)
if okPrevious && okParent {
resultValue = parentValueAsInt + previousValueAsInt
} else if okPrevious {
logger.WarnWithCtx(query.ctx).Msgf("could not convert parent value to int: %v, previousValue: %v. Using previousValue as sum", parentValue, previousValue)
resultValue = previousValue
} else if okParent {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to int: %v, parentValue: %v. Starting sum from 0", previousValue, parentValue)
resultValue = parentValue
resultRow := parentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = sum
resultRows = append(resultRows, resultRow)
}
} else { // cumulative sum must be on numeric, so if it's not float64, it should always be int
var sum int64
for _, parentRow := range parentRows {
value, ok := util.ExtractInt64Maybe(parentRow.LastColValue())
if ok {
sum += value
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous and parent value to int, previousValue: %v, parentValue: %v. Using nil as result", previousValue, parentValue)
resultValue = nil
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue())
}
resultRow := parentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = sum
resultRows = append(resultRows, resultRow)
}
}
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
return resultRows
}

func (query CumulativeSum) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
Expand Down
69 changes: 46 additions & 23 deletions quesma/model/pipeline_aggregations/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Derivative struct {
}

func NewDerivative(ctx context.Context, bucketsPath string) Derivative {
isCount := bucketsPath == bucketsPathCount
isCount := bucketsPath == BucketsPathCount
return Derivative{ctx: ctx, Parent: bucketsPath, IsCount: isCount}
}

Expand All @@ -35,36 +35,59 @@ func (query Derivative) TranslateSqlResponseToJson(rows []model.QueryResultRow,
return response
}

func (query Derivative) CalculateResultWhenMissing(rowIndex int, parentRows []model.QueryResultRow, previousResultsCurrentAggregation []model.QueryResultRow) model.QueryResultRow {
resultRow := parentRows[rowIndex].Copy() // result is the same as parent, with an exception of last element, which we'll change below
var resultValue any
if rowIndex == 0 {
resultValue = nil
} else {
previousValue := parentRows[rowIndex-1].Cols[len(parentRows[rowIndex-1].Cols)-1].Value
currentValue := parentRows[rowIndex].Cols[len(parentRows[rowIndex].Cols)-1].Value
currentValueAsFloat, ok := util.ExtractFloat64Maybe(currentValue)
if ok {
previousValueAsFloat, ok := util.ExtractFloat64Maybe(previousValue)
if ok {
resultValue = currentValueAsFloat - previousValueAsFloat
func (query Derivative) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
if len(parentRows) == 0 {
return resultRows
}

firstRow := parentRows[0].Copy()
firstRow.Cols[len(firstRow.Cols)-1].Value = nil
resultRows = append(resultRows, firstRow)
if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
for i, currentRow := range parentRows[1:] {
previousRow := parentRows[i]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractFloat64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractFloat64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to float: %v, currentValue: %v", previousValue, currentValue)
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
} else {
previousValueAsInt, okPrevious := util.ExtractInt64Maybe(previousValue)
currentValueAsInt, okParent := util.ExtractInt64Maybe(currentValue)
if okPrevious && okParent {
resultValue = currentValueAsInt - previousValueAsInt
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
} else { // cumulative sum must be on numeric, so if it's not float64, it should always be int
for i, currentRow := range parentRows[1:] {
previousRow := parentRows[i]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractInt64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractInt64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous or current value to int, previousValue: %v, currentValue: %v. Using nil as result", previousValue, currentValue)
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
}
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
return resultRows
}

func (query Derivative) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
Expand Down
4 changes: 4 additions & 0 deletions quesma/model/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,7 @@ func (r *QueryResultRow) Copy() QueryResultRow {
copy(newCols, r.Cols)
return QueryResultRow{Index: r.Index, Cols: newCols}
}

func (r *QueryResultRow) LastColValue() any {
return r.Cols[len(r.Cols)-1].Value
}
3 changes: 1 addition & 2 deletions quesma/model/query_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ type PipelineQueryType interface {

// CalculateResultWhenMissing calculates the result of this aggregation when it's a NoDBQuery
// (we don't query the DB for the results, but calculate them from the parent aggregation)
// (it'll be changed in already done later PRs, so I don't comment on arguments, etc.)
CalculateResultWhenMissing(rowIndex int, parentRows []QueryResultRow, thisAggrPreviousResults []QueryResultRow) QueryResultRow
CalculateResultWhenMissing(query *Query, parentRows []QueryResultRow) []QueryResultRow

String() string
}
Expand Down
Loading

0 comments on commit abc9bff

Please sign in to comment.