Skip to content

Commit

Permalink
First real support for pipeline aggregation: cumulative_sum (#63)
Browse files Browse the repository at this point in the history
It's our first pipeline aggregation, so it's a basis for all other PRs.
It introduces some general logic for them, so calculating results for
some aggregation not based on rows returned from a query to Clickhouse,
but from returned rows for some other aggregation.

Some example charts:
<img width="1728" alt="Screenshot 2024-05-13 at 19 47 19"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/b05ec180-8c98-4a42-afaa-8443e88cb37c">
<img width="1724" alt="Screenshot 2024-05-13 at 19 46 12"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/058814ec-9e36-4729-8246-16fb521107af">
  • Loading branch information
trzysiek authored May 14, 2024
1 parent 1261555 commit 64b700a
Show file tree
Hide file tree
Showing 18 changed files with 1,074 additions and 128 deletions.
3 changes: 3 additions & 0 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string {
// sql statement that were already parsed and not string from which
// we have to extract again different parts like where clause and columns to build a proper result
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query, columns []string) ([]model.QueryResultRow, error) {
if query.NoDBQuery {
return make([]model.QueryResultRow, 0), nil
}
colNames, err := table.extractColumns(query, false)
sort.Strings(colNames)
sort.Strings(columns)
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
Cardinality | :white_check_mark: | Date histogram | :white_check_mark: | Bucket sort | :x: |
Extended stats | :x: | Date range | :white_check_mark: | Change point | :x: |
Geo-bounds | :x: | Diversified sampler | :x: | Cumulative cardinality | :x: |
Geo-centroid | :x: | Filter | :white_check_mark: | Cumulative sum | :x: |
Geo-centroid | :x: | Filter | :white_check_mark: | Cumulative sum | :white_check_mark: |
Geo-line | :x: | Filters | :white_check_mark: | Derivative | :x: |
Cartesian-bounds | :x: | Frequent item sets | :x: | Extended stats bucket | :x: |
Cartesian-centroid | :x: | Geo-distance | :x: | Inference bucket | :x: |
Expand Down
7 changes: 6 additions & 1 deletion quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
)

type BucketScript struct {
ctx context.Context
ctx context.Context
index string // name of the index (table)
}

func NewBucketScript(ctx context.Context) BucketScript {
Expand All @@ -30,6 +31,10 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow
return response
}

func (query BucketScript) CalculateResultWhenMissing(model.QueryResultRow, []model.QueryResultRow) model.QueryResultRow {
return model.NewQueryResultRowEmpty(query.index)
}

func (query BucketScript) String() string {
return "bucket script"
}
88 changes: 88 additions & 0 deletions quesma/model/pipeline_aggregations/cumulative_sum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package pipeline_aggregations

import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/util"
)

// We fully support this aggregation.
// Description: A parent pipeline aggregation which calculates the cumulative sum of a specified metric
// in a parent histogram (or date_histogram) aggregation.
// The specified metric must be numeric and the enclosing histogram must have min_doc_count set to 0 (default for histogram aggregations).
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline-cumulative-sum-aggregation.html

type CumulativeSum struct {
ctx context.Context
Parent string
IsCount bool // count is a special case, `bucketsPath` is not a path to another aggregation, but path-to-aggregation>_count
}

func NewCumulativeSum(ctx context.Context, bucketsPath string) CumulativeSum {
isCount := bucketsPath == bucketsPathCount
return CumulativeSum{ctx: ctx, Parent: bucketsPath, IsCount: isCount}
}

const bucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name

func (query CumulativeSum) IsBucketAggregation() bool {
return false
}

func (query CumulativeSum) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for cumulative sum aggregation")
return []model.JsonMap{{}}
}
var response []model.JsonMap
for _, row := range rows {
response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value})
}
return response
}

func (query CumulativeSum) CalculateResultWhenMissing(parentRow model.QueryResultRow, previousResultsCurrentAggregation []model.QueryResultRow) model.QueryResultRow {
resultRow := parentRow.Copy() // result is the same as parent, with an exception of last element, which we'll change below
parentValue := parentRow.Cols[len(parentRow.Cols)-1].Value
var resultValue any
if len(previousResultsCurrentAggregation) == 0 {
resultValue = parentValue
} else {
// I don't check types too much, they are expected to be numeric, so either floats or ints.
// I propose to keep it this way until at least one case arises as this method can be called a lot of times.
previousValue := previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols[len(previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols)-1].Value
parentValueAsFloat, ok := util.ExtractFloat64Maybe(parentValue)
if ok {
previousValueAsFloat, ok := util.ExtractFloat64Maybe(previousValue)
if ok {
resultValue = parentValueAsFloat + previousValueAsFloat
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to float: %v, parentValue: %v", previousValue, parentValue)
resultValue = previousValue
}
} else {
previousValueAsInt, okPrevious := util.ExtractInt64Maybe(previousValue)
parentValueAsInt, okParent := util.ExtractInt64Maybe(parentValue)
if okPrevious && okParent {
resultValue = parentValueAsInt + previousValueAsInt
} else if okPrevious {
logger.WarnWithCtx(query.ctx).Msgf("could not convert parent value to int: %v, previousValue: %v. Using previousValue as sum", parentValue, previousValue)
resultValue = previousValue
} else if okParent {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to int: %v, parentValue: %v. Starting sum from 0", previousValue, parentValue)
resultValue = parentValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous and parent value to int, previousValue: %v, parentValue: %v. Using nil as result", previousValue, parentValue)
resultValue = nil
}
}
}
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
}

func (query CumulativeSum) String() string {
return fmt.Sprintf("cumulative_sum(%s)", query.Parent)
}
23 changes: 23 additions & 0 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Query struct {
CanParse bool // true <=> query is valid
QueryInfo SearchQueryInfo
Highlighter Highlighter
NoDBQuery bool // true <=> we don't need query to DB here, true in some pipeline aggregations
Parent string // parent aggregation name, used in some pipeline aggregations
}

var NoMetadataField JsonMap = nil
Expand Down Expand Up @@ -204,6 +206,27 @@ func (q *QueryWithAggregation) TrimKeywordFromFields(ctx context.Context) {
}
}

// Name returns the name of this aggregation (specifically, the last aggregator)
// So for nested aggregation {"a": {"b": {"c": this aggregation}}}, it returns "c".
// In some queries aggregations are referenced by full name, so "a>b>c", but so far this implementation seems sufficient.
func (q *QueryWithAggregation) Name() string {
if len(q.Aggregators) == 0 {
return ""
}
return q.Aggregators[len(q.Aggregators)-1].Name
}

// HasParentAggregation returns true <=> this aggregation has a parent aggregation, so there's no query to the DB,
// and results are calculated based on parent aggregation's results.
func (q *QueryWithAggregation) HasParentAggregation() bool {
return q.NoDBQuery && len(q.Parent) > 0 // first condition should be enough, second just in case
}

// IsChild returns true <=> this aggregation is a child of maybeParent (so maybeParent is its parent).
func (q *QueryWithAggregation) IsChild(maybeParent QueryWithAggregation) bool {
return q.HasParentAggregation() && q.Parent == maybeParent.Name()
}

type Aggregator struct {
Name string
Empty bool // is this aggregator empty, so no buckets
Expand Down
21 changes: 16 additions & 5 deletions quesma/model/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@ type QueryResultCol struct {
Value interface{}
}

func NewQueryResultCol(colName string, value interface{}) QueryResultCol {
return QueryResultCol{ColName: colName, Value: value}
}

type QueryResultRow struct {
Index string
Cols []QueryResultCol
}

func NewQueryResultRowEmpty(index string) QueryResultRow {
return QueryResultRow{Index: index}
}

const (
ResultColKeyIndex FieldAtIndex = iota // for facets/histogram Col[0] == Key
ResultColDocCountIndex // for facets/histogram Col[1] == DocCount
ResultColKeyAsStringIndex // for histogram Col[2] == KeyAsString
)

func NewQueryResultCol(colName string, value interface{}) QueryResultCol {
return QueryResultCol{ColName: colName, Value: value}
}

// String returns the string representation of the column in format `"<colName>": <value>`, properly quoted.
func (c QueryResultCol) String(ctx context.Context) string {
valueExtracted := c.ExtractValue(ctx)
Expand Down Expand Up @@ -109,7 +113,7 @@ func (c QueryResultCol) ExtractValue(ctx context.Context) any {
return c.Value
}

func (r QueryResultRow) String(ctx context.Context) string {
func (r *QueryResultRow) String(ctx context.Context) string {
str := strings.Builder{}
str.WriteString(util.Indent(1) + "{\n")
numCols := len(r.Cols)
Expand All @@ -125,3 +129,10 @@ func (r QueryResultRow) String(ctx context.Context) string {
str.WriteString("\n" + util.Indent(1) + "}")
return str.String()
}

// Copy returns a deep copy of the row.
func (r *QueryResultRow) Copy() QueryResultRow {
newCols := make([]QueryResultCol, len(r.Cols))
copy(newCols, r.Cols)
return QueryResultRow{Index: r.Index, Cols: newCols}
}
19 changes: 19 additions & 0 deletions quesma/model/query_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ type QueryType interface {
String() string
}

// PipelineQueryType is an interface for pipeline aggregations
// It's an extension to QueryType interface
// Adds a method to calculate result rows from its parent aggregation
type PipelineQueryType interface {
// TranslateSqlResponseToJson 'level' - we want to translate [level:] (metrics aggr) or [level-1:] (bucket aggr) columns to JSON
// Previous columns are used for bucketing.
// For 'bucket' aggregation result is a slice of buckets, for 'metrics' aggregation it's a single bucket (only look at [0])
TranslateSqlResponseToJson(rows []QueryResultRow, level int) []JsonMap

// IsBucketAggregation if true, result from 'MakeResponse' will be a slice of buckets
// if false, it's a metrics aggregation and result from 'MakeResponse' will be a single bucket
IsBucketAggregation() bool

// CalculateResultWhenMissing calculates the result of this aggregation when it's a NoDBQuery
// (we don't query the DB for the results, but calculate them from the parent aggregation)
CalculateResultWhenMissing(parentRow QueryResultRow, thisAggrPreviousResults []QueryResultRow) QueryResultRow
String() string
}

// UnknownAggregationType is a placeholder for an aggregation type that'll be determined in the future,
// after descending further into the aggregation tree
type UnknownAggregationType struct {
Expand Down
74 changes: 3 additions & 71 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"mitmproxy/quesma/model"
"mitmproxy/quesma/model/bucket_aggregations"
"mitmproxy/quesma/model/metrics_aggregations"
"mitmproxy/quesma/model/pipeline_aggregations"
"mitmproxy/quesma/util"
"slices"
"strconv"
Expand Down Expand Up @@ -69,13 +68,6 @@ func (b *aggrQueryBuilder) buildCountAggregation(metadata model.JsonMap) model.Q
return query
}

func (b *aggrQueryBuilder) buildPipelineAggregation(metadata model.JsonMap) model.QueryWithAggregation {
query := b.buildAggregationCommon(metadata)
query.Type = pipeline_aggregations.NewBucketScript(b.ctx)
query.NonSchemaFields = append(query.NonSchemaFields, "count()")
return query
}

func (b *aggrQueryBuilder) buildBucketAggregation(metadata model.JsonMap) model.QueryWithAggregation {
query := b.buildAggregationCommon(metadata)
query.NonSchemaFields = append(query.NonSchemaFields, "count()")
Expand Down Expand Up @@ -317,9 +309,9 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil
}

// 2. Pipeline aggregation => always leaf (for now)
if cw.isItSimplePipeline(queryMap) {
*resultAccumulator = append(*resultAccumulator, currentAggr.buildPipelineAggregation(metadata))
return
pipelineAggregationType, isPipelineAggregation := cw.parsePipelineAggregations(queryMap)
if isPipelineAggregation {
*resultAccumulator = append(*resultAccumulator, currentAggr.buildPipelineAggregation(pipelineAggregationType, metadata))
}

// 3. Now process filter(s) first, because they apply to everything else on the same level or below.
Expand Down Expand Up @@ -653,66 +645,6 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
return
}

func (cw *ClickhouseQueryTranslator) isItSimplePipeline(queryMap QueryMap) bool {
bucketScriptRaw, exists := queryMap["bucket_script"]
if !exists {
return false
}

// so far we only handle "count" here :D
delete(queryMap, "bucket_script")
bucketScript, ok := bucketScriptRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("bucket_script is not a map, but %T, value: %v. Skipping this aggregation", bucketScriptRaw, bucketScriptRaw)
return false
}

// if ["buckets_path"] != "_count", skip the aggregation
if bucketsPathRaw, exists := bucketScript["buckets_path"]; exists {
if bucketsPath, ok := bucketsPathRaw.(string); ok {
if bucketsPath != "_count" {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath)
return false
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v. Skipping this aggregation", bucketsPathRaw, bucketsPathRaw)
return false
}
} else {
logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in bucket_script. Skipping this aggregation")
return false
}

// if ["script"]["source"] != "_value", skip the aggregation
scriptRaw, exists := bucketScript["script"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msg("no script in bucket_script. Skipping this aggregation")
return false
}
script, ok := scriptRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("script is not a map, but %T, value: %v. Skipping this aggregation", scriptRaw, scriptRaw)
return false
}
if sourceRaw, exists := script["source"]; exists {
if source, ok := sourceRaw.(string); ok {
if source != "_value" {
logger.WarnWithCtx(cw.Ctx).Msgf("source is not '_value', but %s. Skipping this aggregation", source)
return false
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("source is not a string, but %T, value: %v. Skipping this aggregation", sourceRaw, sourceRaw)
return false
}
} else {
logger.WarnWithCtx(cw.Ctx).Msg("no source in script. Skipping this aggregation")
return false
}

// okay, we've checked everything, it's indeed a simple count
return true
}

// parseFieldField returns field 'field' from shouldBeMap, which should be a string. Logs some warnings in case of errors, and returns "" then
func (cw *ClickhouseQueryTranslator) parseFieldField(shouldBeMap any, aggregationType string) string {
Map, ok := shouldBeMap.(QueryMap)
Expand Down
17 changes: 9 additions & 8 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queryparser
import (
"cmp"
"context"
"github.com/barkimedes/go-deepcopy"
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/concurrent"
Expand Down Expand Up @@ -558,6 +559,7 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
allTests = append(allTests, opensearch_visualize.AggregationTests...)
allTests = append(allTests, dashboard_1.AggregationTests...)
allTests = append(allTests, testdata.PipelineAggregationTests...)
allTests = append(allTests, opensearch_visualize.PipelineAggregationTests...)
for i, test := range allTests {
t.Run(test.TestName+"("+strconv.Itoa(i)+")", func(t *testing.T) {
if i == 26 {
Expand All @@ -584,20 +586,19 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
// fmt.Println("--- SQL string ", aggregation.String())
// fmt.Println()
// fmt.Println("--- Group by: ", aggregation.GroupByFields)
if test.ExpectedSQLs[j] != "TODO" {
if test.ExpectedSQLs[j] != "NoDBQuery" {
util.AssertSqlEqual(t, test.ExpectedSQLs[j], aggregation.String())
}
}

if test.ExpectedResponse == "" {
// We haven't recorded expected response yet, so we can't compare it
return
}

// I copy `test.ExpectedResults`, as it's processed 2 times and each time it might be modified by
// pipeline aggregation processing.
expectedResultsCopy := deepcopy.MustAnything(test.ExpectedResults).([][]model.QueryResultRow)
// pp.Println("EXPECTED", expectedResultsCopy)
actualAggregationsPart := cw.MakeAggregationPartOfResponse(aggregations, test.ExpectedResults)
// pp.Println("ACTUAL", actualAggregationsPart)

fullResponse, err := cw.MakeResponseAggregationMarshalled(aggregations, test.ExpectedResults)
fullResponse, err := cw.MakeResponseAggregationMarshalled(aggregations, expectedResultsCopy)
assert.NoError(t, err)

expectedResponseMap, _ := util.JsonToMap(test.ExpectedResponse)
Expand All @@ -612,7 +613,7 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
// probability and seed are present in random_sampler aggregation. I'd assume they are not needed, thus let's not care about it for now.
acceptableDifference := []string{"doc_count_error_upper_bound", "sum_other_doc_count", "probability", "seed", "bg_count", "doc_count"}
// pp.Println("ACTUAL", actualMinusExpected)
// pp.Print("EXPECTED", expectedMinusActual)
// pp.Println("EXPECTED", expectedMinusActual)
assert.True(t, util.AlmostEmpty(actualMinusExpected, acceptableDifference))
assert.True(t, util.AlmostEmpty(expectedMinusActual, acceptableDifference))
assert.Contains(t, string(fullResponse), `"value":`+strconv.FormatUint(test.ExpectedResults[0][0].Cols[0].Value.(uint64), 10)) // checks if hits nr is OK
Expand Down
Loading

0 comments on commit 64b700a

Please sign in to comment.