Skip to content

Commit

Permalink
Serial diff aggregation (#164)
Browse files Browse the repository at this point in the history
* Add support for `Serial Diff` It's simply `derivative`, but we
calculate `row[i] - row[i - lag]` instead of `row[i] - row[i - 1]`
* Unify code for all pipeline aggregations a bit, especially for
`serial_diff` and `derivative` as they are almost the same

Some screens:
<img width="1728" alt="Screenshot 2024-05-20 at 16 53 08"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/f7bec1fe-4b3e-4f15-a106-196cf524f92c">
`Serial diff of cumulative sum of Avg bytes` should be exactly the same
as simply `Avg bytes`, and it is:
<img width="1725" alt="Screenshot 2024-05-20 at 16 53 37"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/692563e8-e537-4687-9d33-7319ecb31f5d">
<img width="1728" alt="Screenshot 2024-05-20 at 16 54 57"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/d81df69c-65d2-44ea-af26-ee532340a39c">
  • Loading branch information
trzysiek authored May 24, 2024
1 parent 526e176 commit dd5d126
Show file tree
Hide file tree
Showing 8 changed files with 1,068 additions and 133 deletions.
10 changes: 1 addition & 9 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,7 @@ func (query AverageBucket) IsBucketAggregation() bool {
}

func (query AverageBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for average bucket aggregation")
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
return translateSqlResponseToJsonCommon(query.ctx, rows, query.String())
}

func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
Expand Down
97 changes: 97 additions & 0 deletions quesma/model/pipeline_aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/util"
"strings"
)

Expand All @@ -27,3 +28,99 @@ func getKey(ctx context.Context, row model.QueryResultRow, query *model.Query) a
}
return row.Cols[len(row.Cols)-2].Value
}

// translateSqlResponseToJsonCommon translates rows from DB (maybe postprocessed later), into JSON's format in which
// we want to return them. It is common for a lot of pipeline aggregations
func translateSqlResponseToJsonCommon(ctx context.Context, rows []model.QueryResultRow, aggregationName string) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(ctx).Msgf("no rows returned for %s aggregation", aggregationName)
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
}

// calculateResultWhenMissingCommonForDiffAggregations is common for derivative/serial diff aggregations
func calculateResultWhenMissingCommonForDiffAggregations(ctx context.Context, parentRows []model.QueryResultRow, lag int) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
if len(parentRows) == 0 {
return resultRows
}

// first "lag" rows have nil value
rowsWithNilValueCnt := min(lag, len(parentRows))
for _, parentRow := range parentRows[:rowsWithNilValueCnt] {
resultRow := parentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = nil
resultRows = append(resultRows, resultRow)
}

// until we find non-null row, still append nils
firstNonNilIndex := -1
for i, row := range parentRows[rowsWithNilValueCnt:] {
if row.LastColValue() != nil {
firstNonNilIndex = i + rowsWithNilValueCnt
break
} else {
resultRow := row.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = nil
resultRows = append(resultRows, resultRow)
}
}
if firstNonNilIndex == -1 {
return resultRows
}

// normal calculation at last
if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[firstNonNilIndex].LastColValue()); firstRowValueIsFloat {
for i, currentRow := range parentRows[firstNonNilIndex:] {
previousRow := parentRows[i+firstNonNilIndex-rowsWithNilValueCnt]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractFloat64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractFloat64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
resultValue = nil
}
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
} else if _, firstRowValueIsInt := util.ExtractInt64Maybe(parentRows[firstNonNilIndex].LastColValue()); firstRowValueIsInt {
for i, currentRow := range parentRows[firstNonNilIndex:] {
previousRow := parentRows[i+firstNonNilIndex-rowsWithNilValueCnt]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractInt64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractInt64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
resultValue = nil
}
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
} else {
logger.WarnWithCtx(ctx).Msgf("could not convert value to float or int: %v, type: %T. Returning nil values.",
parentRows[firstNonNilIndex].LastColValue(), parentRows[firstNonNilIndex].LastColValue())
for _, row := range parentRows[firstNonNilIndex:] {
resultRow := row.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = nil
resultRows = append(resultRows, resultRow)
}
}
return resultRows
}
10 changes: 1 addition & 9 deletions quesma/model/pipeline_aggregations/cumulative_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,7 @@ func (query CumulativeSum) IsBucketAggregation() bool {
}

func (query CumulativeSum) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for cumulative sum aggregation")
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
return translateSqlResponseToJsonCommon(query.ctx, rows, query.String())
}

func (query CumulativeSum) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
Expand Down
69 changes: 6 additions & 63 deletions quesma/model/pipeline_aggregations/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package pipeline_aggregations
import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/util"
)

// Derivative is just Serial Diff, with lag = 1

const derivativeLag = 1

type Derivative struct {
ctx context.Context
Parent string
Expand All @@ -24,70 +26,11 @@ func (query Derivative) IsBucketAggregation() bool {
}

func (query Derivative) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for derivative aggregation")
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
return translateSqlResponseToJsonCommon(query.ctx, rows, query.String())
}

func (query Derivative) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
if len(parentRows) == 0 {
return resultRows
}

firstRow := parentRows[0].Copy()
firstRow.Cols[len(firstRow.Cols)-1].Value = nil
resultRows = append(resultRows, firstRow)
if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
for i, currentRow := range parentRows[1:] {
previousRow := parentRows[i]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractFloat64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractFloat64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
} else { // cumulative sum must be on numeric, so if it's not float64, it should always be int
for i, currentRow := range parentRows[1:] {
previousRow := parentRows[i]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractInt64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractInt64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
}
return resultRows
return calculateResultWhenMissingCommonForDiffAggregations(query.ctx, parentRows, derivativeLag)
}

func (query Derivative) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
Expand Down
44 changes: 44 additions & 0 deletions quesma/model/pipeline_aggregations/serial_diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pipeline_aggregations

import (
"context"
"fmt"
"mitmproxy/quesma/model"
)

type SerialDiff struct {
ctx context.Context
Parent string
IsCount bool
lag int
}

func NewSerialDiff(ctx context.Context, bucketsPath string, lag int) SerialDiff {
isCount := bucketsPath == BucketsPathCount
return SerialDiff{
ctx: ctx,
Parent: bucketsPath,
IsCount: isCount,
lag: lag,
}
}

func (query SerialDiff) IsBucketAggregation() bool {
return false
}

func (query SerialDiff) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return translateSqlResponseToJsonCommon(query.ctx, rows, query.String())
}

func (query SerialDiff) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
return calculateResultWhenMissingCommonForDiffAggregations(query.ctx, parentRows, query.lag)
}

func (query SerialDiff) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}

func (query SerialDiff) String() string {
return fmt.Sprintf("serial_diff(parent: %s, lag: %d)", query.Parent, query.lag)
}
48 changes: 47 additions & 1 deletion quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap
delete(queryMap, "derivative")
return
}
if aggregationType, success = cw.parseSerialDiff(queryMap); success {
delete(queryMap, "derivative")
return
}
if aggregationType, success = cw.parseAverageBucket(queryMap); success {
delete(queryMap, "avg_bucket")
return
Expand Down Expand Up @@ -112,6 +116,37 @@ func (cw *ClickhouseQueryTranslator) parseSumBucket(queryMap QueryMap) (aggregat
return pipeline_aggregations.NewSumBucket(cw.Ctx, bucketsPath), true
}

func (cw *ClickhouseQueryTranslator) parseSerialDiff(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
serialDiffRaw, exists := queryMap["serial_diff"]
if !exists {
return
}

// buckets_path
bucketsPath, ok := cw.parseBucketsPath(serialDiffRaw, "serial_diff")
if !ok {
return
}

// lag
const defaultLag = 1
serialDiff, ok := serialDiffRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("serial_diff is not a map, but %T, value: %v", serialDiffRaw, serialDiffRaw)
return
}
lagRaw, exists := serialDiff["lag"]
if !exists {
return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, defaultLag), true
}
if lag, ok := lagRaw.(float64); ok {
return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, int(lag)), true
}

logger.WarnWithCtx(cw.Ctx).Msgf("lag is not a float64, but %T, value: %v", lagRaw, lagRaw)
return
}

func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
bucketScriptRaw, exists := queryMap["bucket_script"]
if !exists {
Expand Down Expand Up @@ -207,7 +242,18 @@ func (b *aggrQueryBuilder) finishBuildingAggregationPipeline(aggregationType mod
if aggrType.IsCount {
query.NonSchemaFields = append(query.NonSchemaFields, "count()")
if len(query.Aggregators) < 2 {
logger.WarnWithCtx(b.ctx).Msg("cumulative_sum with count as parent, but no parent aggregation found")
logger.WarnWithCtx(b.ctx).Msg("derivative with count as parent, but no parent aggregation found")
}
query.Parent = query.Aggregators[len(query.Aggregators)-2].Name
} else {
query.Parent = aggrType.Parent
}
case pipeline_aggregations.SerialDiff:
query.NoDBQuery = true
if aggrType.IsCount {
query.NonSchemaFields = append(query.NonSchemaFields, "count()")
if len(query.Aggregators) < 2 {
logger.WarnWithCtx(b.ctx).Msg("serial diff with count as parent, but no parent aggregation found")
}
query.Parent = query.Aggregators[len(query.Aggregators)-2].Name
} else {
Expand Down
Loading

0 comments on commit dd5d126

Please sign in to comment.