diff --git a/quesma/model/pipeline_aggregations/average_bucket.go b/quesma/model/pipeline_aggregations/average_bucket.go new file mode 100644 index 000000000..fb8104dd3 --- /dev/null +++ b/quesma/model/pipeline_aggregations/average_bucket.go @@ -0,0 +1,109 @@ +package pipeline_aggregations + +import ( + "context" + "fmt" + "mitmproxy/quesma/logger" + "mitmproxy/quesma/model" + "mitmproxy/quesma/queryprocessor" + "mitmproxy/quesma/util" + "strings" +) + +type AverageBucket struct { + ctx context.Context + Parent string +} + +func NewAverageBucket(ctx context.Context, bucketsPath string) AverageBucket { + const delimiter = ">" + withoutUnnecessarySuffix, _ := strings.CutSuffix(bucketsPath, delimiter+BucketsPathCount) + lastDelimiterIdx := strings.LastIndex(withoutUnnecessarySuffix, delimiter) + var parent string + if lastDelimiterIdx+1 < len(withoutUnnecessarySuffix) { + parent = withoutUnnecessarySuffix[lastDelimiterIdx+1:] + } else { + logger.WarnWithCtx(ctx).Msgf("invalid bucketsPath: %s, withoutUnnecessarySuffix: %s. Using empty string as parent.", bucketsPath, withoutUnnecessarySuffix) + parent = "" + } + return AverageBucket{ctx: ctx, Parent: parent} +} + +func (query AverageBucket) IsBucketAggregation() bool { + return false +} + +func (query AverageBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { + if len(rows) == 0 { + logger.WarnWithCtx(query.ctx).Msg("no rows returned for average bucket aggregation") + return []model.JsonMap{{}} + } + var response []model.JsonMap + for _, row := range rows { + response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value}) + } + return response +} + +func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow { + resultRows := make([]model.QueryResultRow, 0) + if len(parentRows) == 0 { + return resultRows // maybe null? + } + qp := queryprocessor.NewQueryProcessor(query.ctx) + parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value] + // in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols + // so we need to split into buckets based on parent_cols + for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) { + resultRows = append(resultRows, query.calculateSingleAvgBucket(parentRowsOneBucket)) + } + return resultRows +} + +// we're sure len(parentRows) > 0 +func (query AverageBucket) calculateSingleAvgBucket(parentRows []model.QueryResultRow) model.QueryResultRow { + if len(parentRows) == 0 { + logger.WarnWithCtx(query.ctx).Msg("no parent rows, should NEVER happen") + return model.QueryResultRow{} + } + + var resultValue float64 + rowsCnt := 0 + if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat { + sum := 0.0 + for _, parentRow := range parentRows { + value, ok := util.ExtractFloat64Maybe(parentRow.LastColValue()) + if ok { + sum += value + rowsCnt++ + } else { + logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue()) + } + } + resultValue = sum / float64(rowsCnt) + } else { + var sum int64 + for _, parentRow := range parentRows { + value, ok := util.ExtractInt64Maybe(parentRow.LastColValue()) + if ok { + sum += value + rowsCnt++ + } else { + logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue()) + } + } + resultValue = float64(sum) / float64(rowsCnt) + } + + resultRow := parentRows[0].Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue + return resultRow +} + +func (query AverageBucket) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow { + return rowsFromDB +} + +func (query AverageBucket) String() string { + return fmt.Sprintf("avg_bucket(%s)", query.Parent) +} diff --git a/quesma/model/pipeline_aggregations/bucket_script.go b/quesma/model/pipeline_aggregations/bucket_script.go index fdd16e84f..bd79de4d2 100644 --- a/quesma/model/pipeline_aggregations/bucket_script.go +++ b/quesma/model/pipeline_aggregations/bucket_script.go @@ -7,8 +7,7 @@ import ( ) type BucketScript struct { - ctx context.Context - index string // name of the index (table) + ctx context.Context } func NewBucketScript(ctx context.Context) BucketScript { @@ -31,8 +30,8 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow return response } -func (query BucketScript) CalculateResultWhenMissing(model.QueryResultRow, []model.QueryResultRow) model.QueryResultRow { - return model.NewQueryResultRowEmpty(query.index) +func (query BucketScript) CalculateResultWhenMissing(*model.Query, []model.QueryResultRow) []model.QueryResultRow { + return []model.QueryResultRow{} } func (query BucketScript) String() string { diff --git a/quesma/model/pipeline_aggregations/cumulative_sum.go b/quesma/model/pipeline_aggregations/cumulative_sum.go index a643ad558..447de3318 100644 --- a/quesma/model/pipeline_aggregations/cumulative_sum.go +++ b/quesma/model/pipeline_aggregations/cumulative_sum.go @@ -21,11 +21,11 @@ type CumulativeSum struct { } func NewCumulativeSum(ctx context.Context, bucketsPath string) CumulativeSum { - isCount := bucketsPath == bucketsPathCount + isCount := bucketsPath == BucketsPathCount return CumulativeSum{ctx: ctx, Parent: bucketsPath, IsCount: isCount} } -const bucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name +const BucketsPathCount = "_count" // special name for `buckets_path` parameter, normally it's some other aggregation's name func (query CumulativeSum) IsBucketAggregation() bool { return false @@ -43,44 +43,40 @@ func (query CumulativeSum) TranslateSqlResponseToJson(rows []model.QueryResultRo return response } -func (query CumulativeSum) CalculateResultWhenMissing(rowIndex int, parentRows []model.QueryResultRow, previousResultsCurrentAggregation []model.QueryResultRow) model.QueryResultRow { - resultRow := parentRows[rowIndex].Copy() // result is the same as parent, with an exception of last element, which we'll change below - parentValue := parentRows[rowIndex].Cols[len(parentRows[rowIndex].Cols)-1].Value - var resultValue any - if rowIndex == 0 { - resultValue = parentValue - } else { - // I don't check types too much, they are expected to be numeric, so either floats or ints. - // I propose to keep it this way until at least one case arises as this method can be called a lot of times. - previousValue := previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols[len(previousResultsCurrentAggregation[len(previousResultsCurrentAggregation)-1].Cols)-1].Value - parentValueAsFloat, ok := util.ExtractFloat64Maybe(parentValue) - if ok { - previousValueAsFloat, ok := util.ExtractFloat64Maybe(previousValue) +func (query CumulativeSum) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow { + resultRows := make([]model.QueryResultRow, 0, len(parentRows)) + if len(parentRows) == 0 { + return resultRows + } + + if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat { + sum := 0.0 + for _, parentRow := range parentRows { + value, ok := util.ExtractFloat64Maybe(parentRow.LastColValue()) if ok { - resultValue = parentValueAsFloat + previousValueAsFloat + sum += value } else { - logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to float: %v, parentValue: %v", previousValue, parentValue) - resultValue = previousValue + logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue()) } - } else { - previousValueAsInt, okPrevious := util.ExtractInt64Maybe(previousValue) - parentValueAsInt, okParent := util.ExtractInt64Maybe(parentValue) - if okPrevious && okParent { - resultValue = parentValueAsInt + previousValueAsInt - } else if okPrevious { - logger.WarnWithCtx(query.ctx).Msgf("could not convert parent value to int: %v, previousValue: %v. Using previousValue as sum", parentValue, previousValue) - resultValue = previousValue - } else if okParent { - logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to int: %v, parentValue: %v. Starting sum from 0", previousValue, parentValue) - resultValue = parentValue + resultRow := parentRow.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = sum + resultRows = append(resultRows, resultRow) + } + } else { // cumulative sum must be on numeric, so if it's not float64, it should always be int + var sum int64 + for _, parentRow := range parentRows { + value, ok := util.ExtractInt64Maybe(parentRow.LastColValue()) + if ok { + sum += value } else { - logger.WarnWithCtx(query.ctx).Msgf("could not convert previous and parent value to int, previousValue: %v, parentValue: %v. Using nil as result", previousValue, parentValue) - resultValue = nil + logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: %v, type: %T. Skipping", parentRow.LastColValue(), parentRow.LastColValue()) } + resultRow := parentRow.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = sum + resultRows = append(resultRows, resultRow) } } - resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue - return resultRow + return resultRows } func (query CumulativeSum) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow { diff --git a/quesma/model/pipeline_aggregations/derivative.go b/quesma/model/pipeline_aggregations/derivative.go index 233b0b450..89852ce6f 100644 --- a/quesma/model/pipeline_aggregations/derivative.go +++ b/quesma/model/pipeline_aggregations/derivative.go @@ -15,7 +15,7 @@ type Derivative struct { } func NewDerivative(ctx context.Context, bucketsPath string) Derivative { - isCount := bucketsPath == bucketsPathCount + isCount := bucketsPath == BucketsPathCount return Derivative{ctx: ctx, Parent: bucketsPath, IsCount: isCount} } diff --git a/quesma/model/query_result.go b/quesma/model/query_result.go index 065173f6e..fbf3d650d 100644 --- a/quesma/model/query_result.go +++ b/quesma/model/query_result.go @@ -136,3 +136,7 @@ func (r *QueryResultRow) Copy() QueryResultRow { copy(newCols, r.Cols) return QueryResultRow{Index: r.Index, Cols: newCols} } + +func (r *QueryResultRow) LastColValue() any { + return r.Cols[len(r.Cols)-1].Value +} diff --git a/quesma/model/query_type.go b/quesma/model/query_type.go index 249ba1f09..3a3d7dde3 100644 --- a/quesma/model/query_type.go +++ b/quesma/model/query_type.go @@ -31,8 +31,7 @@ type PipelineQueryType interface { // CalculateResultWhenMissing calculates the result of this aggregation when it's a NoDBQuery // (we don't query the DB for the results, but calculate them from the parent aggregation) - // (it'll be changed in already done later PRs, so I don't comment on arguments, etc.) - CalculateResultWhenMissing(rowIndex int, parentRows []QueryResultRow, thisAggrPreviousResults []QueryResultRow) QueryResultRow + CalculateResultWhenMissing(query *Query, parentRows []QueryResultRow) []QueryResultRow String() string } diff --git a/quesma/queryparser/pipeline_aggregations.go b/quesma/queryparser/pipeline_aggregations.go index d97e60d91..5db975ac2 100644 --- a/quesma/queryparser/pipeline_aggregations.go +++ b/quesma/queryparser/pipeline_aggregations.go @@ -21,6 +21,10 @@ func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap delete(queryMap, "derivative") return } + if aggregationType, success = cw.parseAverageBucket(queryMap); success { + delete(queryMap, "avg_bucket") + return + } return } @@ -29,23 +33,10 @@ func (cw *ClickhouseQueryTranslator) parseCumulativeSum(queryMap QueryMap) (aggr if !exists { return } - - cumulativeSum, ok := cumulativeSumRaw.(QueryMap) + bucketsPath, ok := cw.parseBucketsPath(cumulativeSumRaw, "cumulative_sum") if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("cumulative_sum is not a map, but %T, value: %v", cumulativeSumRaw, cumulativeSumRaw) - return - } - bucketsPathRaw, exists := cumulativeSum["buckets_path"] - if !exists { - logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in cumulative_sum") - return - } - bucketsPath, ok := bucketsPathRaw.(string) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v", bucketsPathRaw, bucketsPathRaw) return } - return pipeline_aggregations.NewCumulativeSum(cw.Ctx, bucketsPath), true } @@ -88,18 +79,12 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) ( } // if ["buckets_path"] != "_count", skip the aggregation - if bucketsPathRaw, exists := bucketScript["buckets_path"]; exists { - if bucketsPath, ok := bucketsPathRaw.(string); ok { - if bucketsPath != "_count" { - logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath) - return - } - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v. Skipping this aggregation", bucketsPathRaw, bucketsPathRaw) - return - } - } else { - logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in bucket_script. Skipping this aggregation") + bucketsPath, ok := cw.parseBucketsPath(bucketScript, "bucket_script") + if !ok { + return + } + if bucketsPath != pipeline_aggregations.BucketsPathCount { + logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath) return } @@ -133,6 +118,37 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) ( return pipeline_aggregations.NewBucketScript(cw.Ctx), true } +func (cw *ClickhouseQueryTranslator) parseAverageBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) { + avgBucketRaw, exists := queryMap["avg_bucket"] + if !exists { + return + } + bucketsPath, ok := cw.parseBucketsPath(avgBucketRaw, "avg_bucket") + if !ok { + return + } + return pipeline_aggregations.NewAverageBucket(cw.Ctx, bucketsPath), true +} + +func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPath string, success bool) { + queryMap, ok := shouldBeQueryMap.(QueryMap) + if !ok { + logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a map, but %T, value: %v", aggregationName, shouldBeQueryMap, shouldBeQueryMap) + return + } + bucketsPathRaw, exists := queryMap["buckets_path"] + if !exists { + logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in avg_bucket") + return + } + bucketsPath, ok = bucketsPathRaw.(string) + if !ok { + logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v", bucketsPathRaw, bucketsPathRaw) + return + } + return bucketsPath, true +} + func (b *aggrQueryBuilder) buildPipelineAggregation(aggregationType model.QueryType, metadata model.JsonMap) model.Query { query := b.buildAggregationCommon(metadata) query.Type = aggregationType @@ -142,7 +158,6 @@ func (b *aggrQueryBuilder) buildPipelineAggregation(aggregationType model.QueryT case pipeline_aggregations.CumulativeSum: query.NoDBQuery = true if aggrType.IsCount { - query.NonSchemaFields = append(query.NonSchemaFields, "count()") if len(query.Aggregators) < 2 { logger.WarnWithCtx(b.ctx).Msg("cumulative_sum with count as parent, but no parent aggregation found") } @@ -161,6 +176,9 @@ func (b *aggrQueryBuilder) buildPipelineAggregation(aggregationType model.QueryT } else { query.Parent = aggrType.Parent } + case pipeline_aggregations.AverageBucket: + query.NoDBQuery = true + query.Parent = aggrType.Parent } return query } diff --git a/quesma/queryparser/query_translator.go b/quesma/queryparser/query_translator.go index d5b9f405c..a123d3f9a 100644 --- a/quesma/queryparser/query_translator.go +++ b/quesma/queryparser/query_translator.go @@ -8,6 +8,7 @@ import ( "mitmproxy/quesma/logger" "mitmproxy/quesma/model" "mitmproxy/quesma/model/bucket_aggregations" + "mitmproxy/quesma/queryprocessor" "mitmproxy/quesma/util" "strconv" "strings" @@ -351,39 +352,6 @@ func (cw *ClickhouseQueryTranslator) finishMakeResponse(query model.Query, Resul } } -// Returns if row1 and row2 have the same values for the first level + 1 fields -func (cw *ClickhouseQueryTranslator) sameGroupByFields(row1, row2 model.QueryResultRow, level int) bool { - for i := 0; i <= level; i++ { - if row1.Cols[i].ExtractValue(cw.Ctx) != row2.Cols[i].ExtractValue(cw.Ctx) { - return false - } - } - return true -} - -// Splits ResultSet into buckets, based on the first level + 1 fields -// E.g. if level == 0, we split into buckets based on the first field, -// e.g. [row(1, ...), row(1, ...), row(2, ...), row(2, ...), row(3, ...)] -> [[row(1, ...), row(1, ...)], [row(2, ...), row(2, ...)], [row(3, ...)]] -func (cw *ClickhouseQueryTranslator) splitResultSetIntoBuckets(ResultSet []model.QueryResultRow, level int) [][]model.QueryResultRow { - if len(ResultSet) == 0 { - return [][]model.QueryResultRow{{}} - } - - buckets := [][]model.QueryResultRow{{}} - curBucket := 0 - lastRow := ResultSet[0] - for _, row := range ResultSet { - if cw.sameGroupByFields(row, lastRow, level) { - buckets[curBucket] = append(buckets[curBucket], row) - } else { - curBucket++ - buckets = append(buckets, []model.QueryResultRow{row}) - } - lastRow = row - } - return buckets -} - // DFS algorithm // 'aggregatorsLevel' - index saying which (sub)aggregation we're handling // 'selectLevel' - which field from select we're grouping by at current level (or not grouping by, if query.Aggregators[aggregatorsLevel].Empty == true) @@ -405,11 +373,12 @@ func (cw *ClickhouseQueryTranslator) makeResponseAggregationRecursive(query mode // fmt.Println("level1 :/", level1, " level2 B):", level2) // or we need to go deeper + qp := queryprocessor.NewQueryProcessor(cw.Ctx) var bucketsReturnMap []model.JsonMap if query.Aggregators[aggregatorsLevel].Empty { bucketsReturnMap = append(bucketsReturnMap, cw.makeResponseAggregationRecursive(query, ResultSet, aggregatorsLevel+1, selectLevel)...) } else { - buckets := cw.splitResultSetIntoBuckets(ResultSet, selectLevel) + buckets := qp.SplitResultSetIntoBuckets(ResultSet, selectLevel+1) for _, bucket := range buckets { bucketsReturnMap = append(bucketsReturnMap, cw.makeResponseAggregationRecursive(query, bucket, aggregatorsLevel+1, selectLevel+1)...) } @@ -520,7 +489,6 @@ func (cw *ClickhouseQueryTranslator) postprocessPipelineAggregations(queries []m // fmt.Println("qwerty", queryIterationOrder) let's remove all prints in this function after all pipeline aggregations are merged for _, queryIndex := range queryIterationOrder { query := queries[queryIndex] - //fmt.Println(queryIndex, query, ResultSets[queryIndex]) let's remove it after all pipeline aggregations implemented pipelineQueryType, isPipeline := query.Type.(model.PipelineQueryType) if !isPipeline || !query.HasParentAggregation() { continue @@ -538,11 +506,7 @@ func (cw *ClickhouseQueryTranslator) postprocessPipelineAggregations(queries []m logger.WarnWithCtx(cw.Ctx).Msgf("parent index not found for query %v", query) continue } - // fmt.Println("ResultSets[i]", ResultSets[queryIndex], queryIndex, parentIndex) - for rowNr := range ResultSets[parentIndex] { - ResultSets[queryIndex] = append(ResultSets[queryIndex], pipelineQueryType.CalculateResultWhenMissing(rowNr, ResultSets[parentIndex], ResultSets[queryIndex])) - } - // fmt.Println("ResultSets[i] - post", ResultSets[queryIndex], "i:", queryIndex, "parent:", parentIndex) + ResultSets[queryIndex] = pipelineQueryType.CalculateResultWhenMissing(&query, ResultSets[parentIndex]) } } diff --git a/quesma/queryprocessor/query_processor.go b/quesma/queryprocessor/query_processor.go new file mode 100644 index 000000000..4a52193fc --- /dev/null +++ b/quesma/queryprocessor/query_processor.go @@ -0,0 +1,47 @@ +package queryprocessor + +import ( + "context" + "mitmproxy/quesma/model" +) + +type QueryProcessor struct { + ctx context.Context +} + +func NewQueryProcessor(ctx context.Context) QueryProcessor { + return QueryProcessor{ctx: ctx} +} + +// Returns if row1 and row2 have the same values for the first level fields +func (qp *QueryProcessor) sameGroupByFields(row1, row2 model.QueryResultRow, level int) bool { + for i := 0; i < level; i++ { + if row1.Cols[i].ExtractValue(qp.ctx) != row2.Cols[i].ExtractValue(qp.ctx) { + return false + } + } + return true +} + +// Splits ResultSet into buckets, based on the first level fields +// E.g. if level == 0, we split into buckets based on the first field, +// e.g. [row(1, ...), row(1, ...), row(2, ...), row(2, ...), row(3, ...)] -> [[row(1, ...), row(1, ...)], [row(2, ...), row(2, ...)], [row(3, ...)]] +func (qp *QueryProcessor) SplitResultSetIntoBuckets(ResultSet []model.QueryResultRow, level int) [][]model.QueryResultRow { + if len(ResultSet) == 0 { + return [][]model.QueryResultRow{{}} + } + + buckets := [][]model.QueryResultRow{{}} + curBucket := 0 + lastRow := ResultSet[0] + for _, row := range ResultSet { + if qp.sameGroupByFields(row, lastRow, level) { + buckets[curBucket] = append(buckets[curBucket], row) + } else { + curBucket++ + buckets = append(buckets, []model.QueryResultRow{row}) + } + lastRow = row + } + return buckets +} diff --git a/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go b/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go index 400bf8807..f15b09d9c 100644 --- a/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go +++ b/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go @@ -1172,4 +1172,767 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", }, }, + { // [7] + TestName: "Simplest avg_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Average Bucket (Bucket: Date Histogram, Metric: Count)", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "1": { + "avg_bucket": { + "buckets_path": "1-bucket>_count" + } + }, + "1-bucket": { + "date_histogram": { + "field": "timestamp", + "fixed_interval": "10m", + "min_doc_count": 1, + "time_zone": "Europe/Warsaw" + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [], + "must": [], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "1": { + "value": 1.3333333333333333 + }, + "1-bucket": { + "buckets": [ + { + "doc_count": 1, + "key": 1715403000000, + "key_as_string": "2024-05-11T04:50:00.000" + }, + { + "doc_count": 2, + "key": 1715403600000, + "key_as_string": "2024-05-11T05:00:00.000" + }, + { + "doc_count": 1, + "key": 1715404200000, + "key_as_string": "2024-05-11T05:10:00.000" + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 207 + } + }, + "timed_out": false, + "took": 81 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(1974))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715403000000/600000)), + model.NewQueryResultCol("doc_count", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715403600000/600000)), + model.NewQueryResultCol("doc_count", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715404200000/600000)), + model.NewQueryResultCol("doc_count", 1), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName + ` `, + `NoDBQuery`, + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), count() " + + `FROM ` + testdata.QuotedTableName + ` ` + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + }, + }, + { // [8] + TestName: "avg_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Average Bucket (Bucket: Date Histogram, Metric: Max)", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "1": { + "avg_bucket": { + "buckets_path": "1-bucket>1-metric" + } + }, + "1-bucket": { + "aggs": { + "1-metric": { + "max": { + "field": "bytes" + } + } + }, + "date_histogram": { + "field": "timestamp", + "fixed_interval": "10m", + "min_doc_count": 1, + "time_zone": "Europe/Warsaw" + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [], + "must": [], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "1": { + "value": 8835.6666666666667 + }, + "1-bucket": { + "buckets": [ + { + "1-metric": { + "value": 8047.0 + }, + "doc_count": 1, + "key": 1715403000000, + "key_as_string": "2024-05-11T04:50:00.000" + }, + { + "1-metric": { + "value": 9261.0 + }, + "doc_count": 4, + "key": 1715413800000, + "key_as_string": "2024-05-11T07:50:00.000" + }, + { + "1-metric": { + "value": 9199.0 + }, + "doc_count": 2, + "key": 1715414400000, + "key_as_string": "2024-05-11T08:00:00.000" + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 207 + } + }, + "timed_out": false, + "took": 121 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(207))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715403000000/600000)), + model.NewQueryResultCol("doc_count", 8047.0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715413800000/600000)), + model.NewQueryResultCol("doc_count", 9261.0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715414400000/600000)), + model.NewQueryResultCol("doc_count", 9199.0), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715403000000/600000)), + model.NewQueryResultCol("doc_count", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715413800000/600000)), + model.NewQueryResultCol("doc_count", 4), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715414400000/600000)), + model.NewQueryResultCol("doc_count", 2), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName + ` `, + `NoDBQuery`, + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), " + `maxOrNull("bytes") ` + + `FROM ` + testdata.QuotedTableName + ` ` + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), count() " + + `FROM ` + testdata.QuotedTableName + ` ` + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + }, + }, + /* TODO need fix for date_range and subaggregations. Same one, as already merged ~1-2 weeks ago for range. It's WIP. + { // [9] + TestName: "avg_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Average Bucket (Bucket: Date Range, Metric: Average), Buckets: X-Asis: Range", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "avg_bucket": { + "buckets_path": "1-bucket>1-metric" + } + }, + "1-bucket": { + "aggs": { + "1-metric": { + "avg": { + "field": "bytes" + } + } + }, + "date_range": { + "field": "timestamp", + "ranges": [ + { + "from": "now-1w/w", + "to": "now" + }, + { + "to": "now" + } + ], + "time_zone": "Europe/Warsaw" + } + } + }, + "range": { + "keyed": true, + "ranges": [ + { + "from": 3, + "to": 1000 + }, + { + "from": 2, + "to": 5 + } + ], + "field": "dayOfWeek" + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "format": "strict_date_optional_time", + "gte": "2024-05-11T01:55:02.236Z", + "lte": "2024-05-11T16:55:02.236Z" + } + } + } + ], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": { + "2.0-5.0": { + "1": { + "value": 8047.0 + }, + "1-bucket": { + "buckets": [ + { + "1-metric": { + "value": 8047.0 + }, + "doc_count": 1, + "key": "*-2024-05-11T18:55:02.344+02:00", + "to": 1715446502344.0, + "to_as_string": "2024-05-11T18:55:02.344+02:00" + }, + { + "1-metric": { + "value": 8047.0 + }, + "doc_count": 1, + "from": 1714341600000.0, + "from_as_string": "2024-04-29T00:00:00.000+02:00", + "key": "2024-04-29T00:00:00.000+02:00-2024-05-11T18:55:02.344+02:00", + "to": 1715446502344.0, + "to_as_string": "2024-05-11T18:55:02.344+02:00" + } + ] + }, + "doc_count": 1, + "from": 2.0, + "to": 5.0 + }, + "3.0-1000.0": { + "1": { + "value": 5273.850241545893 + }, + "1-bucket": { + "buckets": [ + { + "1-metric": { + "value": 5273.850241545893 + }, + "doc_count": 207, + "key": "*-2024-05-11T18:55:02.344+02:00", + "to": 1715446502344.0, + "to_as_string": "2024-05-11T18:55:02.344+02:00" + }, + { + "1-metric": { + "value": 5273.850241545893 + }, + "doc_count": 207, + "from": 1714341600000.0, + "from_as_string": "2024-04-29T00:00:00.000+02:00", + "key": "2024-04-29T00:00:00.000+02:00-2024-05-11T18:55:02.344+02:00", + "to": 1715446502344.0, + "to_as_string": "2024-05-11T18:55:02.344+02:00" + } + ] + }, + "doc_count": 207, + "from": 3.0, + "to": 1000.0 + } + } + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 207 + } + }, + "timed_out": false, + "took": 28 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(207))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("1", 1), + model.NewQueryResultCol("2", int64(1714341600000)), + model.NewQueryResultCol("3", int64(1715446502344)), + model.NewQueryResultCol("4", 1), + model.NewQueryResultCol("5", int64(1715446502344)), + model.NewQueryResultCol(`avgOrNull("bytes")`, 8047.0), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("1", 1), + model.NewQueryResultCol("2", int64(1714341600000)), + model.NewQueryResultCol("3", int64(1715446502344)), + model.NewQueryResultCol("4", 1), + model.NewQueryResultCol("5", int64(1715446502344)), + model.NewQueryResultCol(`count()`, 1), + }}, + }, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("1", 207), + model.NewQueryResultCol("2", int64(1714341600000)), + model.NewQueryResultCol("3", int64(1715446502344)), + model.NewQueryResultCol("4", 207), + model.NewQueryResultCol("5", int64(1715446502344)), + model.NewQueryResultCol(`avgOrNull("bytes")`, 5273.850241545893), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("1", 207), + model.NewQueryResultCol("2", int64(1714341600000)), + model.NewQueryResultCol("3", int64(1715446502344)), + model.NewQueryResultCol("4", 207), + model.NewQueryResultCol("5", int64(1715446502344)), + model.NewQueryResultCol(`count()`, 207), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("1", 207), + model.NewQueryResultCol("2", 1), + model.NewQueryResultCol("3", 207), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-05-11T01:55:02.236Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-11T16:55:02.236Z') `, + `NoDBQuery`, + `SELECT count(if("timestamp" >= toStartOfWeek(subDate(now(), INTERVAL 1 week)) AND "timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(toStartOfWeek(subDate(now(), INTERVAL 1 week)))), ` + + `toInt64(toUnixTimestamp(now())), ` + + `count(if("timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(now())), ` + + `avgOrNull("bytes") ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE ("timestamp">=parseDateTime64BestEffort('2024-05-11T01:55:02.236Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-11T16:55:02.236Z')) ` + + `AND "dayOfWeek">=2 AND "dayOfWeek"<5 `, + `SELECT count(if("timestamp" >= toStartOfWeek(subDate(now(), INTERVAL 1 week)) AND "timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(toStartOfWeek(subDate(now(), INTERVAL 1 week)))), ` + + `toInt64(toUnixTimestamp(now())), ` + + `count(if("timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(now())), ` + + `count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE ("timestamp">=parseDateTime64BestEffort('2024-05-11T01:55:02.236Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-11T16:55:02.236Z')) ` + + `AND "dayOfWeek">=2 AND "dayOfWeek"<5 `, + `NoDBQuery`, + `SELECT count(if("timestamp" >= toStartOfWeek(subDate(now(), INTERVAL 1 week)) AND "timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(toStartOfWeek(subDate(now(), INTERVAL 1 week)))), ` + + `toInt64(toUnixTimestamp(now())), ` + + `count(if("timestamp" < now(), 1, NULL)), toInt64(toUnixTimestamp(now())), ` + + `avgOrNull("bytes") ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE ("timestamp"<=parseDateTime64BestEffort('2024-05-11T16:55:02.236Z') ` + + `AND "timestamp">=parseDateTime64BestEffort('2024-05-11T01:55:02.236Z')) ` + + `AND "dayOfWeek">=3 AND "dayOfWeek"<1000 `, + `SELECT count(if("timestamp" >= toStartOfWeek(subDate(now(), INTERVAL 1 week)) AND "timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(toStartOfWeek(subDate(now(), INTERVAL 1 week)))), ` + + `toInt64(toUnixTimestamp(now())), count(if("timestamp" < now(), 1, NULL)), ` + + `toInt64(toUnixTimestamp(now())), ` + + `count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE ("timestamp">=parseDateTime64BestEffort('2024-05-11T01:55:02.236Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-11T16:55:02.236Z')) ` + + `AND "dayOfWeek">=3 AND "dayOfWeek"<1000 `, + `SELECT count(if("dayOfWeek">=3 AND "dayOfWeek"<1000, 1, NULL)), ` + + `count(if("dayOfWeek">=2 AND "dayOfWeek"<5, 1, NULL)), ` + + `count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-05-11T01:55:02.236Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-11T16:55:02.236Z') `, + }, + }, + */ + { // [10] + TestName: "avg_bucket. Reproduce: Visualize -> Horizontal Bar: Metrics: Average Bucket (Bucket: Histogram, Metric: Count), Buckets: X-Asis: Date Histogram", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "avg_bucket": { + "buckets_path": "1-bucket>_count" + } + }, + "1-bucket": { + "histogram": { + "field": "bytes", + "interval": 1, + "min_doc_count": 1 + } + } + }, + "date_histogram": { + "field": "timestamp", + "fixed_interval": "10m", + "min_doc_count": 1, + "time_zone": "Europe/Warsaw" + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": {}, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": [ + { + "1": { + "value": 1.0 + }, + "1-bucket": { + "buckets": [ + { + "doc_count": 1, + "key": 4202.0 + } + ] + }, + "doc_count": 1, + "key": 1715818800000, + "key_as_string": "2024-05-16T00:20:00.000" + }, + { + "1": { + "value": 2.0 + }, + "1-bucket": { + "buckets": [ + { + "doc_count": 1, + "key": 0.0 + }, + { + "doc_count": 2, + "key": 293.0 + }, + { + "doc_count": 3, + "key": 1997.0 + } + ] + }, + "doc_count": 9, + "key": 1715863800000, + "key_as_string": "2024-05-16T12:50:00.000" + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 141 + } + }, + "timed_out": false, + "took": 60 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(141))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715818800000/600000)), + model.NewQueryResultCol("bytes", 4202.0), + model.NewQueryResultCol("doc_count", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715863800000/600000)), + model.NewQueryResultCol("bytes", 0.0), + model.NewQueryResultCol("doc_count", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715863800000/600000)), + model.NewQueryResultCol("bytes", 293.0), + model.NewQueryResultCol("doc_count", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715863800000/600000)), + model.NewQueryResultCol("bytes", 1997.0), + model.NewQueryResultCol("doc_count", 3), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715818800000/600000)), + model.NewQueryResultCol("doc_count", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", int64(1715863800000/600000)), + model.NewQueryResultCol("doc_count", 9), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName + ` `, + `NoDBQuery`, + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), " + `"bytes", count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), " + `"bytes") ` + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), " + `"bytes")`, + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), count() " + + `FROM ` + testdata.QuotedTableName + ` ` + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + }, + }, } diff --git a/quesma/testdata/unsupported_requests.go b/quesma/testdata/unsupported_requests.go index 7791ecbbf..3412c520f 100644 --- a/quesma/testdata/unsupported_requests.go +++ b/quesma/testdata/unsupported_requests.go @@ -691,36 +691,6 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ }, // pipeline: - { // [37] - TestName: "pipeline aggregation: avg_bucket", - QueryType: "avg_bucket", - QueryRequestJson: ` - { - "size": 0, - "aggs": { - "sales_per_month": { - "date_histogram": { - "field": "date", - "calendar_interval": "month" - }, - "aggs": { - "sales": { - "sum": { - "field": "price" - } - } - } - }, - "avg_monthly_sales": { - "avg_bucket": { - "buckets_path": "sales_per_month>sales", - "gap_policy": "skip", - "format": "#,##0.00;(#,##0.00)" - } - } - } - }`, - }, { // [38] TestName: "pipeline aggregation: bucket_count_ks_test", QueryType: "bucket_count_ks_test",