diff --git a/quesma/model/pipeline_aggregations/average_bucket.go b/quesma/model/pipeline_aggregations/average_bucket.go index e49579f97..5a5d4d799 100644 --- a/quesma/model/pipeline_aggregations/average_bucket.go +++ b/quesma/model/pipeline_aggregations/average_bucket.go @@ -23,15 +23,7 @@ func (query AverageBucket) IsBucketAggregation() bool { } func (query AverageBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - if len(rows) == 0 { - logger.WarnWithCtx(query.ctx).Msg("no rows returned for average bucket aggregation") - return []model.JsonMap{{}} - } - var response []model.JsonMap - for _, row := range rows { - response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value}) - } - return response + return translateSqlResponseToJsonCommon(query.ctx, rows, query.String()) } func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow { diff --git a/quesma/model/pipeline_aggregations/common.go b/quesma/model/pipeline_aggregations/common.go index 65d1b659c..d34188198 100644 --- a/quesma/model/pipeline_aggregations/common.go +++ b/quesma/model/pipeline_aggregations/common.go @@ -4,6 +4,7 @@ import ( "context" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" + "mitmproxy/quesma/util" "strings" ) @@ -27,3 +28,99 @@ func getKey(ctx context.Context, row model.QueryResultRow, query *model.Query) a } return row.Cols[len(row.Cols)-2].Value } + +// translateSqlResponseToJsonCommon translates rows from DB (maybe postprocessed later), into JSON's format in which +// we want to return them. It is common for a lot of pipeline aggregations +func translateSqlResponseToJsonCommon(ctx context.Context, rows []model.QueryResultRow, aggregationName string) []model.JsonMap { + if len(rows) == 0 { + logger.WarnWithCtx(ctx).Msgf("no rows returned for %s aggregation", aggregationName) + return []model.JsonMap{{}} + } + var response []model.JsonMap + for _, row := range rows { + response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value}) + } + return response +} + +// calculateResultWhenMissingCommonForDiffAggregations is common for derivative/serial diff aggregations +func calculateResultWhenMissingCommonForDiffAggregations(ctx context.Context, parentRows []model.QueryResultRow, lag int) []model.QueryResultRow { + resultRows := make([]model.QueryResultRow, 0, len(parentRows)) + if len(parentRows) == 0 { + return resultRows + } + + // first "lag" rows have nil value + rowsWithNilValueCnt := min(lag, len(parentRows)) + for _, parentRow := range parentRows[:rowsWithNilValueCnt] { + resultRow := parentRow.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = nil + resultRows = append(resultRows, resultRow) + } + + // until we find non-null row, still append nils + firstNonNilIndex := -1 + for i, row := range parentRows[rowsWithNilValueCnt:] { + if row.LastColValue() != nil { + firstNonNilIndex = i + rowsWithNilValueCnt + break + } else { + resultRow := row.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = nil + resultRows = append(resultRows, resultRow) + } + } + if firstNonNilIndex == -1 { + return resultRows + } + + // normal calculation at last + if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[firstNonNilIndex].LastColValue()); firstRowValueIsFloat { + for i, currentRow := range parentRows[firstNonNilIndex:] { + previousRow := parentRows[i+firstNonNilIndex-rowsWithNilValueCnt] + previousValueRaw := previousRow.LastColValue() + previousValue, okPrevious := util.ExtractFloat64Maybe(previousValueRaw) + + currentValueRaw := currentRow.LastColValue() + currentValue, okCurrent := util.ExtractFloat64Maybe(currentValueRaw) + + var resultValue any + if okPrevious && okCurrent { + resultValue = currentValue - previousValue + } else { + resultValue = nil + } + resultRow := currentRow.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue + resultRows = append(resultRows, resultRow) + } + } else if _, firstRowValueIsInt := util.ExtractInt64Maybe(parentRows[firstNonNilIndex].LastColValue()); firstRowValueIsInt { + for i, currentRow := range parentRows[firstNonNilIndex:] { + previousRow := parentRows[i+firstNonNilIndex-rowsWithNilValueCnt] + previousValueRaw := previousRow.LastColValue() + previousValue, okPrevious := util.ExtractInt64Maybe(previousValueRaw) + + currentValueRaw := currentRow.LastColValue() + currentValue, okCurrent := util.ExtractInt64Maybe(currentValueRaw) + + var resultValue any + if okPrevious && okCurrent { + resultValue = currentValue - previousValue + } else { + resultValue = nil + } + resultRow := currentRow.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue + resultRows = append(resultRows, resultRow) + } + } else { + logger.WarnWithCtx(ctx).Msgf("could not convert value to float or int: %v, type: %T. Returning nil values.", + parentRows[firstNonNilIndex].LastColValue(), parentRows[firstNonNilIndex].LastColValue()) + for _, row := range parentRows[firstNonNilIndex:] { + resultRow := row.Copy() + resultRow.Cols[len(resultRow.Cols)-1].Value = nil + resultRows = append(resultRows, resultRow) + } + } + return resultRows +} diff --git a/quesma/model/pipeline_aggregations/cumulative_sum.go b/quesma/model/pipeline_aggregations/cumulative_sum.go index 447de3318..cb22d1d9e 100644 --- a/quesma/model/pipeline_aggregations/cumulative_sum.go +++ b/quesma/model/pipeline_aggregations/cumulative_sum.go @@ -32,15 +32,7 @@ func (query CumulativeSum) IsBucketAggregation() bool { } func (query CumulativeSum) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - if len(rows) == 0 { - logger.WarnWithCtx(query.ctx).Msg("no rows returned for cumulative sum aggregation") - return []model.JsonMap{{}} - } - var response []model.JsonMap - for _, row := range rows { - response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value}) - } - return response + return translateSqlResponseToJsonCommon(query.ctx, rows, query.String()) } func (query CumulativeSum) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow { diff --git a/quesma/model/pipeline_aggregations/derivative.go b/quesma/model/pipeline_aggregations/derivative.go index 695490032..27e3609d8 100644 --- a/quesma/model/pipeline_aggregations/derivative.go +++ b/quesma/model/pipeline_aggregations/derivative.go @@ -3,11 +3,13 @@ package pipeline_aggregations import ( "context" "fmt" - "mitmproxy/quesma/logger" "mitmproxy/quesma/model" - "mitmproxy/quesma/util" ) +// Derivative is just Serial Diff, with lag = 1 + +const derivativeLag = 1 + type Derivative struct { ctx context.Context Parent string @@ -24,70 +26,11 @@ func (query Derivative) IsBucketAggregation() bool { } func (query Derivative) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - if len(rows) == 0 { - logger.WarnWithCtx(query.ctx).Msg("no rows returned for derivative aggregation") - return []model.JsonMap{{}} - } - var response []model.JsonMap - for _, row := range rows { - response = append(response, model.JsonMap{"value": row.Cols[len(row.Cols)-1].Value}) - } - return response + return translateSqlResponseToJsonCommon(query.ctx, rows, query.String()) } func (query Derivative) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow { - resultRows := make([]model.QueryResultRow, 0, len(parentRows)) - if len(parentRows) == 0 { - return resultRows - } - - firstRow := parentRows[0].Copy() - firstRow.Cols[len(firstRow.Cols)-1].Value = nil - resultRows = append(resultRows, firstRow) - if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat { - for i, currentRow := range parentRows[1:] { - previousRow := parentRows[i] - previousValueRaw := previousRow.LastColValue() - previousValue, okPrevious := util.ExtractFloat64Maybe(previousValueRaw) - - currentValueRaw := currentRow.LastColValue() - currentValue, okCurrent := util.ExtractFloat64Maybe(currentValueRaw) - - var resultValue any - if okPrevious && okCurrent { - resultValue = currentValue - previousValue - } else { - logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping", - previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw) - resultValue = nil - } - resultRow := currentRow.Copy() - resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue - resultRows = append(resultRows, resultRow) - } - } else { // cumulative sum must be on numeric, so if it's not float64, it should always be int - for i, currentRow := range parentRows[1:] { - previousRow := parentRows[i] - previousValueRaw := previousRow.LastColValue() - previousValue, okPrevious := util.ExtractInt64Maybe(previousValueRaw) - - currentValueRaw := currentRow.LastColValue() - currentValue, okCurrent := util.ExtractInt64Maybe(currentValueRaw) - - var resultValue any - if okPrevious && okCurrent { - resultValue = currentValue - previousValue - } else { - logger.WarnWithCtx(query.ctx).Msgf("could not convert value to int: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping", - previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw) - resultValue = nil - } - resultRow := currentRow.Copy() - resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue - resultRows = append(resultRows, resultRow) - } - } - return resultRows + return calculateResultWhenMissingCommonForDiffAggregations(query.ctx, parentRows, derivativeLag) } func (query Derivative) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow { diff --git a/quesma/model/pipeline_aggregations/serial_diff.go b/quesma/model/pipeline_aggregations/serial_diff.go new file mode 100644 index 000000000..9f2fb995c --- /dev/null +++ b/quesma/model/pipeline_aggregations/serial_diff.go @@ -0,0 +1,44 @@ +package pipeline_aggregations + +import ( + "context" + "fmt" + "mitmproxy/quesma/model" +) + +type SerialDiff struct { + ctx context.Context + Parent string + IsCount bool + lag int +} + +func NewSerialDiff(ctx context.Context, bucketsPath string, lag int) SerialDiff { + isCount := bucketsPath == BucketsPathCount + return SerialDiff{ + ctx: ctx, + Parent: bucketsPath, + IsCount: isCount, + lag: lag, + } +} + +func (query SerialDiff) IsBucketAggregation() bool { + return false +} + +func (query SerialDiff) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { + return translateSqlResponseToJsonCommon(query.ctx, rows, query.String()) +} + +func (query SerialDiff) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow { + return calculateResultWhenMissingCommonForDiffAggregations(query.ctx, parentRows, query.lag) +} + +func (query SerialDiff) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow { + return rowsFromDB +} + +func (query SerialDiff) String() string { + return fmt.Sprintf("serial_diff(parent: %s, lag: %d)", query.Parent, query.lag) +} diff --git a/quesma/queryparser/pipeline_aggregations.go b/quesma/queryparser/pipeline_aggregations.go index de9fc9f25..f5d12a206 100644 --- a/quesma/queryparser/pipeline_aggregations.go +++ b/quesma/queryparser/pipeline_aggregations.go @@ -21,6 +21,10 @@ func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap delete(queryMap, "derivative") return } + if aggregationType, success = cw.parseSerialDiff(queryMap); success { + delete(queryMap, "derivative") + return + } if aggregationType, success = cw.parseAverageBucket(queryMap); success { delete(queryMap, "avg_bucket") return @@ -112,6 +116,37 @@ func (cw *ClickhouseQueryTranslator) parseSumBucket(queryMap QueryMap) (aggregat return pipeline_aggregations.NewSumBucket(cw.Ctx, bucketsPath), true } +func (cw *ClickhouseQueryTranslator) parseSerialDiff(queryMap QueryMap) (aggregationType model.QueryType, success bool) { + serialDiffRaw, exists := queryMap["serial_diff"] + if !exists { + return + } + + // buckets_path + bucketsPath, ok := cw.parseBucketsPath(serialDiffRaw, "serial_diff") + if !ok { + return + } + + // lag + const defaultLag = 1 + serialDiff, ok := serialDiffRaw.(QueryMap) + if !ok { + logger.WarnWithCtx(cw.Ctx).Msgf("serial_diff is not a map, but %T, value: %v", serialDiffRaw, serialDiffRaw) + return + } + lagRaw, exists := serialDiff["lag"] + if !exists { + return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, defaultLag), true + } + if lag, ok := lagRaw.(float64); ok { + return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, int(lag)), true + } + + logger.WarnWithCtx(cw.Ctx).Msgf("lag is not a float64, but %T, value: %v", lagRaw, lagRaw) + return +} + func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (aggregationType model.QueryType, success bool) { bucketScriptRaw, exists := queryMap["bucket_script"] if !exists { @@ -207,7 +242,18 @@ func (b *aggrQueryBuilder) finishBuildingAggregationPipeline(aggregationType mod if aggrType.IsCount { query.NonSchemaFields = append(query.NonSchemaFields, "count()") if len(query.Aggregators) < 2 { - logger.WarnWithCtx(b.ctx).Msg("cumulative_sum with count as parent, but no parent aggregation found") + logger.WarnWithCtx(b.ctx).Msg("derivative with count as parent, but no parent aggregation found") + } + query.Parent = query.Aggregators[len(query.Aggregators)-2].Name + } else { + query.Parent = aggrType.Parent + } + case pipeline_aggregations.SerialDiff: + query.NoDBQuery = true + if aggrType.IsCount { + query.NonSchemaFields = append(query.NonSchemaFields, "count()") + if len(query.Aggregators) < 2 { + logger.WarnWithCtx(b.ctx).Msg("serial diff with count as parent, but no parent aggregation found") } query.Parent = query.Aggregators[len(query.Aggregators)-2].Name } else { diff --git a/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go b/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go index f47d58136..686f6b13c 100644 --- a/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go +++ b/quesma/testdata/opensearch-visualize/pipeline_aggregation_requests.go @@ -1173,6 +1173,782 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ }, }, { // [7] + TestName: "Simplest Serial Diff (count), lag=default (1). Reproduce: Visualize -> Vertical Bar: Metrics: Serial Diff (Aggregation: Count), Buckets: Histogram", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "serial_diff": { + "buckets_path": "_count" + } + } + }, + "histogram": { + "field": "bytes", + "interval": 200, + "min_doc_count": 0 + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.hourOfDay" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": [ + { + "doc_count": 106, + "key": 0.0, + "1": { + "value": null + } + }, + { + "1": { + "value": -67.0 + }, + "doc_count": 39, + "key": 200.0 + }, + { + "1": { + "value": -39.0 + }, + "doc_count": 0, + "key": 400.0 + }, + { + "1": { + "value": 0.0 + }, + "doc_count": 0, + "key": 600.0 + }, + { + "1": { + "value": 21.0 + }, + "doc_count": 21, + "key": 800.0 + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 2553 + } + }, + "timed_out": false, + "took": 40 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(2553))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", 0.0), + model.NewQueryResultCol("doc_count", 106), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", 200.0), + model.NewQueryResultCol("doc_count", 39), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", 800.0), + model.NewQueryResultCol("doc_count", 21), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName, + `NoDBQuery`, + `SELECT floor("bytes" / 200.000000) * 200.000000, count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `GROUP BY (floor("bytes" / 200.000000) * 200.000000) ` + + `ORDER BY (floor("bytes" / 200.000000) * 200.000000)`, + }, + }, + { // [8] + TestName: "Simplest Serial Diff (count), lag=2. Don't know how to reproduce in OpenSearch, but you can click out:" + + "Reproduce: Visualize -> Vertical Bar: Metrics: Serial Diff (Aggregation: Count), Buckets: Histogram" + + "And then change the request manually", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "serial_diff": { + "buckets_path": "_count", + "lag": 2 + } + } + }, + "histogram": { + "field": "bytes", + "interval": 200, + "min_doc_count": 0 + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": [ + { + "doc_count": 106, + "key": 0.0, + "1": { + "value": null + } + }, + { + "1": { + "value": null + }, + "doc_count": 39, + "key": 200.0 + }, + { + "1": { + "value": -106.0 + }, + "doc_count": 0, + "key": 400.0 + }, + { + "1": { + "value": -39.0 + }, + "doc_count": 0, + "key": 600.0 + }, + { + "1": { + "value": 21.0 + }, + "doc_count": 21, + "key": 800.0 + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 2553 + } + }, + "timed_out": false, + "took": 40 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(2553))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", 0.0), + model.NewQueryResultCol("doc_count", 106), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", 200.0), + model.NewQueryResultCol("doc_count", 39), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("key", 800.0), + model.NewQueryResultCol("doc_count", 21), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName, + `NoDBQuery`, + `SELECT floor("bytes" / 200.000000) * 200.000000, count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `GROUP BY (floor("bytes" / 200.000000) * 200.000000) ` + + `ORDER BY (floor("bytes" / 200.000000) * 200.000000)`, + }, + }, + { // [9] + TestName: "Serial diff with other aggregation. Reproduce: Visualize -> Vertical Bar: Metrics: Serial Diff (Aggregation: Sum), Buckets: Date Histogram", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "serial_diff": { + "buckets_path": "1-metric" + } + }, + "1-metric": { + "sum": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + } + }, + "date_histogram": { + "field": "timestamp", + "fixed_interval": "10m", + "time_zone": "Europe/Warsaw" + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": [ + { + "1": { + "value": null + }, + "1-metric": { + "value": 19.0 + }, + "doc_count": 1, + "key": 1715196000000, + "key_as_string": "2024-05-08T19:20:00.000" + }, + { + "1": { + "value": 0.0 + }, + "1-metric": { + "value": 19.0 + }, + "doc_count": 1, + "key": 1715196600000, + "key_as_string": "2024-05-08T19:30:00.000" + }, + { + "1": { + "value": 1.0 + }, + "1-metric": { + "value": 20.0 + }, + "doc_count": 1, + "key": 1715198400000, + "key_as_string": "2024-05-08T20:00:00.000" + }, + { + "1": { + "value": 12.0 + }, + "1-metric": { + "value": 32.0 + }, + "doc_count": 4, + "key": 1715199000000, + "key_as_string": "2024-05-08T20:10:00.000" + }, + { + "1": { + "value": -5.0 + }, + "1-metric": { + "value": 27.0 + }, + "doc_count": 3, + "key": 1715199600000, + "key_as_string": "2024-05-08T20:20:00.000" + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 2553 + } + }, + "timed_out": false, + "took": 40 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(2553))}}}, + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715196000000/600000)), + model.NewQueryResultCol("count()", 19.0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715196600000/600000)), + model.NewQueryResultCol("count()", 19.0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715198400000/600000)), + model.NewQueryResultCol("count()", 20.0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715199000000/600000)), + model.NewQueryResultCol("count()", 32.0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715199600000/600000)), + model.NewQueryResultCol("count()", 27.0), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715196000000/600000)), + model.NewQueryResultCol("count()", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715196600000/600000)), + model.NewQueryResultCol("count()", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715198400000/600000)), + model.NewQueryResultCol("count()", 1), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715199000000/600000)), + model.NewQueryResultCol("count()", 4), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1715199600000/600000)), + model.NewQueryResultCol("count()", 3), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() ` + + `FROM ` + testdata.QuotedTableName, + `NoDBQuery`, + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), " + + "sumOrNull(toHour(`timestamp`)) " + + "FROM " + testdata.QuotedTableName + " " + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), " + + "count() " + + "FROM " + testdata.QuotedTableName + " " + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + }, + }, + { // [10] + TestName: "Serial Diff to cumulative sum. Reproduce: Visualize -> Vertical Bar: Metrics: Serial Diff (Aggregation: Cumulative Sum (Aggregation: Count)), Buckets: Date Histogram", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "serial_diff": { + "buckets_path": "1-metric" + } + }, + "1-metric": { + "cumulative_sum": { + "buckets_path": "_count" + } + } + }, + "date_histogram": { + "field": "timestamp", + "fixed_interval": "10m", + "time_zone": "Europe/Warsaw" + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: // I changed this a bit. Opensearch returns "1": {null} for 2nd, 3rd and 3 last buckets. I think it's not correct... I return 0, and it seems working too. + `{ + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": [ + { + "1": { + "value": null + }, + "1-metric": { + "value": 2.0 + }, + "doc_count": 2, + "key": 1714869000000, + "key_as_string": "2024-05-05T00:30:00.000" + }, + { + "1": { + "value": 0.0 + }, + "1-metric": { + "value": 2.0 + }, + "doc_count": 0, + "key": 1714869600000, + "key_as_string": "2024-05-05T00:40:00.000" + }, + { + "1": { + "value": 0.0 + }, + "1-metric": { + "value": 2.0 + }, + "doc_count": 0, + "key": 1714878600000, + "key_as_string": "2024-05-05T03:10:00.000" + }, + { + "1": { + "value": 2.0 + }, + "1-metric": { + "value": 4.0 + }, + "doc_count": 2, + "key": 1714879200000, + "key_as_string": "2024-05-05T03:20:00.000" + }, + { + "1": { + "value": 6.0 + }, + "1-metric": { + "value": 10.0 + }, + "doc_count": 6, + "key": 1714879800000, + "key_as_string": "2024-05-05T03:30:00.000" + }, + { + "1": { + "value": 2.0 + }, + "1-metric": { + "value": 12.0 + }, + "doc_count": 2, + "key": 1714880400000, + "key_as_string": "2024-05-05T03:40:00.000" + }, + { + "1": { + "value": 2.0 + }, + "1-metric": { + "value": 14.0 + }, + "doc_count": 2, + "key": 1714881000000, + "key_as_string": "2024-05-05T03:50:00.000" + }, + { + "1": { + "value": 0.0 + }, + "1-metric": { + "value": 14.0 + }, + "doc_count": 0, + "key": 1714881600000, + "key_as_string": "2024-05-05T04:00:00.000" + }, + { + "1": { + "value": 2.0 + }, + "1-metric": { + "value": 16.0 + }, + "doc_count": 2, + "key": 1714882200000, + "key_as_string": "2024-05-05T04:10:00.000" + }, + { + "1": { + "value": 0.0 + }, + "1-metric": { + "value": 16.0 + }, + "doc_count": 0, + "key": 1714882800000, + "key_as_string": "2024-05-05T04:20:00.000" + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 1974 + } + }, + "timed_out": false, + "took": 10 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("hits", uint64(1974))}}}, + {}, // NoDBQuery + {}, // NoDBQuery + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714869000000/600000)), + model.NewQueryResultCol("count()", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714869600000/600000)), + model.NewQueryResultCol("count()", 0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714878600000/600000)), + model.NewQueryResultCol("count()", 0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714879200000/600000)), + model.NewQueryResultCol("count()", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714879800000/600000)), + model.NewQueryResultCol("count()", 6), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714880400000/600000)), + model.NewQueryResultCol("count()", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714881000000/600000)), + model.NewQueryResultCol("count()", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714881600000/600000)), + model.NewQueryResultCol("count()", 0), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714882200000/600000)), + model.NewQueryResultCol("count()", 2), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)", int64(1714882800000/600000)), + model.NewQueryResultCol("count()", 0), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() FROM ` + testdata.QuotedTableName, + `NoDBQuery`, + `NoDBQuery`, + "SELECT toInt64(toUnixTimestamp64Milli(`timestamp`)/600000), count() " + + `FROM ` + testdata.QuotedTableName + ` ` + + "GROUP BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000)) " + + "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", + }, + }, + { // [11] TestName: "Simplest avg_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Average Bucket (Bucket: Date Histogram, Metric: Count)", QueryRequestJson: ` { @@ -1300,7 +2076,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", }, }, - { // [8] + { // [12] TestName: "avg_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Average Bucket (Bucket: Date Histogram, Metric: Max)", QueryRequestJson: ` { @@ -1463,7 +2239,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ }, }, /* TODO need fix for date_range and subaggregations. Same one, as already merged ~1-2 weeks ago for range. It's WIP. - { // [9] + { // [13] TestName: "avg_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Average Bucket (Bucket: Date Range, Metric: Average), Buckets: X-Asis: Range", QueryRequestJson: ` { @@ -1761,7 +2537,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ }, }, */ - { // [10] + { // [14] TestName: "avg_bucket. Reproduce: Visualize -> Horizontal Bar: Metrics: Average Bucket (Bucket: Histogram, Metric: Count), Buckets: X-Asis: Date Histogram", QueryRequestJson: ` { @@ -1935,7 +2711,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", }, }, - { // [11] + { // [15] TestName: "Simplest min_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Min Bucket (Bucket: Terms, Metric: Count)", QueryRequestJson: ` { @@ -2104,7 +2880,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `LIMIT 5`, }, }, - { // [12] + { // [16] TestName: "min_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Min Bucket (Bucket: Terms, Metric: Unique Count)", QueryRequestJson: ` { @@ -2303,7 +3079,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `ORDER BY ("clientip")`, }, }, - { // [13] + { // [17] TestName: "complex min_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Min Bucket (Bucket: Terms, Metric: Sum), Buckets: Split Series: Histogram", QueryRequestJson: ` { @@ -2542,7 +3318,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `ORDER BY (floor("bytes" / 200.000000) * 200.000000)`, }, }, - { // [14] + { // [18] TestName: "Simplest max_bucket. Reproduce: Visualize -> Line: Metrics: Max Bucket (Bucket: Terms, Metric: Count)", QueryRequestJson: ` { @@ -2676,7 +3452,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `LIMIT 5`, }, }, - { // [15] + { // [19] TestName: "Max/Sum bucket with some null buckets. Reproduce: Visualize -> Vertical Bar: Metrics: Max (Sum) Bucket (Aggregation: Date Histogram, Metric: Min)", QueryRequestJson: ` { @@ -2855,8 +3631,8 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "ORDER BY (toInt64(toUnixTimestamp64Milli(`timestamp`)/600000))", }, }, - { // [16] - TestName: "Max/Sum bucket with some null buckets. Reproduce: Visualize -> Vertical Bar: Metrics: Max/Sum Bucket (Aggregation: Histogram, Metric: Max)", + { // [20] + TestName: "Different pipeline aggrs with some null buckets. Reproduce: Visualize -> Vertical Bar: Metrics: Max/Sum Bucket/etc. (Aggregation: Histogram, Metric: Max)", QueryRequestJson: ` { "_source": { @@ -2868,7 +3644,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "buckets_path": "1-bucket>1-metric" } }, - "2":{ + "2": { "sum_bucket": { "buckets_path": "1-bucket>1-metric" } @@ -2879,7 +3655,17 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "max": { "field": "memory" } - } + }, + "3": { + "derivative": { + "buckets_path": "1-metric" + } + }, + "4": { + "serial_diff": { + "buckets_path": "1-metric" + } + }, }, "histogram": { "field": "bytes", @@ -2941,11 +3727,11 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "keys": [ 5296 ], - "value": 211840 + "value": 211840.2 }, "2": { - "value": 212292 + "value": 212292.2 }, "1-bucket": { "buckets": [ @@ -2953,12 +3739,50 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "1-metric": { "value": null }, + "3": { + "value": null + }, + "4": { + "value": null + }, "doc_count": 5, "key": 0.0 }, { "1-metric": { - "value": 211840 + "value": null + }, + "3": { + "value": null + }, + "4": { + "value": null + }, + "doc_count": 6, + "key": 200.0 + }, + { + "1-metric": { + "value": null + }, + "3": { + "value": null + }, + "4": { + "value": null + }, + "doc_count": 7, + "key": 400.0 + }, + { + "1-metric": { + "value": 211840.2 + }, + "3": { + "value": null + }, + "4": { + "value": null }, "doc_count": 1, "key": 5296.0 @@ -2967,6 +3791,12 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ "1-metric": { "value": 452 }, + "3": { + "value": -211388.2 + }, + "4": { + "value": -211388.2 + }, "doc_count": 1, "key": 16837.0 } @@ -2992,20 +3822,38 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ model.NewQueryResultCol("bytes", 0.0), model.NewQueryResultCol(`maxOrNull("memory")`, nil), }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("bytes", 200.0), + model.NewQueryResultCol(`maxOrNull("memory")`, nil), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("bytes", 400.0), + model.NewQueryResultCol(`maxOrNull("memory")`, nil), + }}, {Cols: []model.QueryResultCol{ model.NewQueryResultCol("bytes", 5296.0), - model.NewQueryResultCol(`maxOrNull("memory")`, 211840), + model.NewQueryResultCol(`maxOrNull("memory")`, 211840.2), }}, {Cols: []model.QueryResultCol{ model.NewQueryResultCol("bytes", 16837.0), - model.NewQueryResultCol(`maxOrNull("memory")`, 452), + model.NewQueryResultCol(`maxOrNull("memory")`, float64(452)), }}, }, + {}, // NoDBQuery + {}, // NoDBQuery { {Cols: []model.QueryResultCol{ model.NewQueryResultCol("bytes", 0.0), model.NewQueryResultCol("count()", 5), }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("bytes", 200.0), + model.NewQueryResultCol("count()", 6), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("bytes", 400.0), + model.NewQueryResultCol("count()", 7), + }}, {Cols: []model.QueryResultCol{ model.NewQueryResultCol("bytes", 5296.0), model.NewQueryResultCol("count()", 1), @@ -3024,6 +3872,8 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `FROM ` + testdata.QuotedTableName + ` ` + `GROUP BY ("bytes") ` + `ORDER BY ("bytes")`, + `NoDBQuery`, + `NoDBQuery`, `SELECT "bytes", count() ` + `FROM ` + testdata.QuotedTableName + ` ` + `GROUP BY ("bytes") ` + @@ -3032,7 +3882,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ }, }, /* waits for probably a simple filters fix - { // [17] + { // [21] TestName: "max_bucket. Reproduce: Visualize -> Line: Metrics: Max Bucket (Bucket: Filters, Metric: Sum)", QueryRequestJson: ` { @@ -3205,7 +4055,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ }, */ /* waits for probably a simple filters fix - { // [18] TODO check this test with other pipeline aggregations + { // [22] TODO check this test with other pipeline aggregations TestName: "complex max_bucket. Reproduce: Visualize -> Line: Metrics: Max Bucket (Bucket: Filters, Metric: Sum), Buckets: Split chart: Rows -> Range", QueryRequestJson: ` { @@ -3445,7 +4295,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `FROM ` + testdata.QuotedTableName + ` `, }, }, */ - { // [19] + { // [23] TestName: "Simplest sum_bucket. Reproduce: Visualize -> Horizontal Bar: Metrics: Sum Bucket (B ucket: Terms, Metric: Count)", QueryRequestJson: ` { @@ -3608,7 +4458,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `LIMIT 5`, }, }, - { // [20] + { // [24] TestName: "sum_bucket. Reproduce: Visualize -> Horizontal Bar: Metrics: Sum Bucket (Bucket: Significant Terms, Metric: Average)", QueryRequestJson: ` { @@ -3775,7 +4625,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{ `ORDER BY ("extension")`, }, }, - { // [21] + { // [25] TestName: "complex sum_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Sum Bucket (Bucket: Date Histogram, Metric: Average), Buckets: X-Asis: Histogram", QueryRequestJson: ` { diff --git a/quesma/testdata/unsupported_requests.go b/quesma/testdata/unsupported_requests.go index 9e8bda599..2b23e5c31 100644 --- a/quesma/testdata/unsupported_requests.go +++ b/quesma/testdata/unsupported_requests.go @@ -1101,35 +1101,6 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [54] - TestName: "pipeline aggregation: serial_diff", - QueryType: "serial_diff", - QueryRequestJson: ` - { - "size": 0, - "aggs": { - "my_date_histo": { - "date_histogram": { - "field": "timestamp", - "calendar_interval": "day" - }, - "aggs": { - "the_sum": { - "sum": { - "field": "lemmings" - } - }, - "thirtieth_difference": { - "serial_diff": { - "buckets_path": "the_sum", - "lag" : 30 - } - } - } - } - } - }`, - }, { // [55] TestName: "pipeline aggregation: stats_bucket", QueryType: "stats_bucket",