Skip to content

Commit

Permalink
Return empty results if needed in aggregations (#71)
Browse files Browse the repository at this point in the history
Such aggregation:
```
"date_histogram": {
    "field": "@timestamp",
    "fixed_interval": "1h",
    "min_doc_count": 0,
}
```
Should return:
```
"buckets": 
[
    {
	    "key_as_string": "2024-04-15T00:00:00.000",
	    "key": 1713139200000,
	    "doc_count": 10
    },
    {
	    "key_as_string": "2024-04-15T01:00:00.000",
	    "key": 1713142800000,
	    "doc_count": 0
    },
    {
	    "key_as_string": "2024-04-15T02:00:00.000",
	    "key": 1713146400000,
	    "doc_count": 0
    
    },
    {
	    "key_as_string": "2024-04-15T03:00:00.000",
	    "key": 1713150000000,
	    "doc_count": 9
    }
]
```
Notice elements with `doc_count: 0`. It's because of this parameter
`min_doc_count: 0`.
Now we only return results which we get from Clickhouse, so we'd return
2 results instead of 4. I'll fix it here.

Previously it wasn't very important (or at all), but pipeline
aggregations simply don't work properly without it.

Well, this works, new 2 tests for both `histogram` and `date_histogram`
pass, but I already introduce here 4 other harder tests (with
subaggregations), which don't fully work yet. It's not that completely
trivial to make that work, so I left it for another 2 PRs. Most work
done in both, but I need to wait for some pipeline merges to finish.
I also commented out 2 tests from our dashboard, which gave incorrect
results (fortunately dashboard worked fine anyway). They'll also be
fixed by next PRs.
  • Loading branch information
trzysiek authored May 16, 2024
1 parent c68599e commit 0275cac
Show file tree
Hide file tree
Showing 29 changed files with 1,279 additions and 53 deletions.
4 changes: 4 additions & 0 deletions quesma/model/bucket_aggregations/dateRange.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,7 @@ func (query DateRange) parseTimestamp(timestamp any) int64 {
}
return timestamp.(int64)
}

func (query DateRange) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
52 changes: 48 additions & 4 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"time"
)

const DefaultMinDocCount = 1

type DateHistogram struct {
ctx context.Context
Interval string
ctx context.Context
minDocCount int
Interval string
}

func NewDateHistogram(ctx context.Context, interval string) DateHistogram {
return DateHistogram{ctx, interval}
func NewDateHistogram(ctx context.Context, minDocCount int, interval string) DateHistogram {
return DateHistogram{ctx, minDocCount, interval}
}

func (query DateHistogram) IsBucketAggregation() bool {
Expand Down Expand Up @@ -63,3 +66,44 @@ func (query DateHistogram) IntervalAsDuration() time.Duration {
duration, _ := time.ParseDuration(query.Interval)
return duration
}

// we're sure len(row.Cols) >= 2
func (query DateHistogram) getKey(row model.QueryResultRow) int64 {
return row.Cols[len(row.Cols)-2].Value.(int64)
}

// if minDocCount == 0, and we have buckets e.g. [key, value1], [key+10, value2], we need to insert [key+1, 0], [key+2, 0]...
// CAUTION: a different kind of postprocessing is needed for minDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
func (query DateHistogram) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
if query.minDocCount != 0 || len(rowsFromDB) < 2 {
// we only add empty rows, when
// a) minDocCount == 0
// b) we have > 1 rows, with < 2 rows we can't add anything in between
return rowsFromDB
}
if query.minDocCount < 0 {
logger.WarnWithCtx(query.ctx).Msgf("unexpected negative minDocCount: %d. Skipping postprocess", query.minDocCount)
return rowsFromDB
}
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
for i := 1; i < len(rowsFromDB); i++ {
if len(rowsFromDB[i-1].Cols) < 2 || len(rowsFromDB[i].Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in date_histogram aggregation response (< 2),"+
"rowsFromDB[%d]: %+v, rowsFromDB[%d]: %+v. Skipping those rows in postprocessing",
i-1, rowsFromDB[i-1], i, rowsFromDB[i],
)
}
lastKey := query.getKey(rowsFromDB[i-1])
currentKey := query.getKey(rowsFromDB[i])
for midKey := lastKey + 1; midKey < currentKey; midKey++ {
midRow := rowsFromDB[i-1].Copy()
midRow.Cols[len(midRow.Cols)-2].Value = midKey
midRow.Cols[len(midRow.Cols)-1].Value = 0
postprocessedRows = append(postprocessedRows, midRow)
}
postprocessedRows = append(postprocessedRows, rowsFromDB[i])
}
return postprocessedRows
}
4 changes: 4 additions & 0 deletions quesma/model/bucket_aggregations/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ func (query Filters) TranslateSqlResponseToJson(rows []model.QueryResultRow, lev
func (query Filters) String() string {
return "filters"
}

func (query Filters) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
47 changes: 44 additions & 3 deletions quesma/model/bucket_aggregations/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/util"
)

type Histogram struct {
ctx context.Context
ctx context.Context
interval float64
minDocCount int
}

func NewHistogram(ctx context.Context) Histogram {
return Histogram{ctx: ctx}
func NewHistogram(ctx context.Context, interval float64, minDocCount int) Histogram {
return Histogram{ctx: ctx, interval: interval, minDocCount: minDocCount}
}

func (query Histogram) IsBucketAggregation() bool {
Expand All @@ -38,3 +41,41 @@ func (query Histogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, l
func (query Histogram) String() string {
return "histogram"
}

// we're sure len(row.Cols) >= 2
func (query Histogram) getKey(row model.QueryResultRow) float64 {
return row.Cols[len(row.Cols)-2].Value.(float64)
}

// if minDocCount == 0, and we have buckets e.g. [key, value1], [key+2*interval, value2], we need to insert [key+1*interval, 0]
// CAUTION: a different kind of postprocessing is needed for minDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
func (query Histogram) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
if query.minDocCount != 0 || len(rowsFromDB) < 2 {
// we only add empty rows, when
// a) minDocCount == 0
// b) we have > 1 rows, with < 2 rows we can't add anything in between
return rowsFromDB
}
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
for i := 1; i < len(rowsFromDB); i++ {
if len(rowsFromDB[i-1].Cols) < 2 || len(rowsFromDB[i].Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in histogram aggregation response (< 2),"+
"rowsFromDB[%d]: %+v, rowsFromDB[%d]: %+v. Skipping those rows in postprocessing",
i-1, rowsFromDB[i-1], i, rowsFromDB[i],
)
}
lastKey := query.getKey(rowsFromDB[i-1])
currentKey := query.getKey(rowsFromDB[i])
// we need to add rows in between
for midKey := lastKey + query.interval; util.IsSmaller(midKey, currentKey); midKey += query.interval {
midRow := rowsFromDB[i-1].Copy()
midRow.Cols[len(midRow.Cols)-2].Value = midKey
midRow.Cols[len(midRow.Cols)-1].Value = 0
postprocessedRows = append(postprocessedRows, midRow)
}
postprocessedRows = append(postprocessedRows, rowsFromDB[i])
}
return postprocessedRows
}
4 changes: 4 additions & 0 deletions quesma/model/bucket_aggregations/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ func (query Range) responseForInterval(interval Interval, value any) model.JsonM
}
return response
}

func (query Range) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (query Terms) String() string {
}
return "significant_terms"
}

func (query Terms) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ func (query Avg) TranslateSqlResponseToJson(rows []model.QueryResultRow, level i
func (query Avg) String() string {
return "avg"
}

func (query Avg) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/cardinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ func (query Cardinality) TranslateSqlResponseToJson(rows []model.QueryResultRow,
func (query Cardinality) String() string {
return "cardinality"
}

func (query Cardinality) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ func (query Count) TranslateSqlResponseToJson(rows []model.QueryResultRow, level
func (query Count) String() string {
return "count"
}

func (query Count) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ func (query Max) TranslateSqlResponseToJson(rows []model.QueryResultRow, level i
func (query Max) String() string {
return "max"
}

func (query Max) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ func (query Min) TranslateSqlResponseToJson(rows []model.QueryResultRow, level i
func (query Min) String() string {
return "min"
}

func (query Min) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/percentile_ranks.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,7 @@ func (query PercentileRanks) TranslateSqlResponseToJson(rows []model.QueryResult
func (query PercentileRanks) String() string {
return "percentile_ranks"
}

func (query PercentileRanks) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,7 @@ func (query Quantile) processResult(colName string, percentileReturnedByClickhou
var emptyPercentilesResult = []model.JsonMap{{
"values": 0,
}}

func (query Quantile) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (query Stats) TranslateSqlResponseToJson(rows []model.QueryResultRow, level
func (query Stats) String() string {
return "stats"
}

func (query Stats) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ func (query Sum) TranslateSqlResponseToJson(rows []model.QueryResultRow, level i
func (query Sum) String() string {
return "sum"
}

func (query Sum) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/top_hits.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ func (query TopHits) TranslateSqlResponseToJson(rows []model.QueryResultRow, lev
func (query TopHits) String() string {
return "top_hits"
}

func (query TopHits) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/top_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ func (query TopMetrics) TranslateSqlResponseToJson(rows []model.QueryResultRow,
func (query TopMetrics) String() string {
return "top_metrics"
}

func (query TopMetrics) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/metrics_aggregations/value_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func (query ValueCount) TranslateSqlResponseToJson(rows []model.QueryResultRow,
func (query ValueCount) String() string {
return "value_count"
}

func (query ValueCount) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ func (query BucketScript) CalculateResultWhenMissing(model.QueryResultRow, []mod
func (query BucketScript) String() string {
return "bucket script"
}

func (query BucketScript) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
4 changes: 4 additions & 0 deletions quesma/model/pipeline_aggregations/cumulative_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (query CumulativeSum) CalculateResultWhenMissing(parentRow model.QueryResul
return resultRow
}

func (query CumulativeSum) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}

func (query CumulativeSum) String() string {
return fmt.Sprintf("cumulative_sum(%s)", query.Parent)
}
6 changes: 6 additions & 0 deletions quesma/model/query_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ type QueryType interface {
// For 'bucket' aggregation result is a slice of buckets, for 'metrics' aggregation it's a single bucket (only look at [0])
TranslateSqlResponseToJson(rows []QueryResultRow, level int) []JsonMap

PostprocessResults(rowsFromDB []QueryResultRow) (ultimateRows []QueryResultRow)

// IsBucketAggregation if true, result from 'MakeResponse' will be a slice of buckets
// if false, it's a metrics aggregation and result from 'MakeResponse' will be a single bucket
IsBucketAggregation() bool
Expand Down Expand Up @@ -54,3 +56,7 @@ func (query UnknownAggregationType) TranslateSqlResponseToJson(rows []QueryResul
func (query UnknownAggregationType) String() string {
return "unknown aggregation type"
}

func (query UnknownAggregationType) PostprocessResults(rowsFromDB []QueryResultRow) []QueryResultRow {
return rowsFromDB
}
43 changes: 33 additions & 10 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,11 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
success bool, nonSchemaFieldsAddedCount, groupByFieldsAddedCount int) {

success = true // returned in most cases
if histogram, ok := queryMap["histogram"]; ok {
currentAggr.Type = bucket_aggregations.NewHistogram(cw.Ctx)
if histogramRaw, ok := queryMap["histogram"]; ok {
histogram, ok := histogramRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("date_histogram is not a map, but %T, value: %v", histogramRaw, histogramRaw)
}
fieldName, isFieldNameFromScript := cw.parseFieldFieldMaybeScript(histogram, "histogram")
var fieldNameProperlyQuoted string
if isFieldNameFromScript {
Expand All @@ -555,26 +558,29 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
fieldNameProperlyQuoted = strconv.Quote(fieldName)
}
var interval float64
intervalQueryMap, ok := histogram.(QueryMap)["interval"]
intervalRaw, ok := histogram["interval"]
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("interval not found in histogram: %v", histogram)
}
switch intervalRaw := intervalQueryMap.(type) {
switch intervalTyped := intervalRaw.(type) {
case string:
var err error
interval, err = strconv.ParseFloat(intervalRaw, 64)
interval, err = strconv.ParseFloat(intervalTyped, 64)
if err != nil {
logger.ErrorWithCtx(cw.Ctx).Err(err).Msgf("failed to parse interval: %v", intervalRaw)
}
case int:
interval = float64(intervalRaw)
interval = float64(intervalTyped)
case float64:
interval = intervalRaw
interval = intervalTyped
default:
logger.ErrorWithCtx(cw.Ctx).Msgf("unexpected type of interval: %T, value: %v", intervalRaw, intervalRaw)
interval = 1.0
logger.ErrorWithCtx(cw.Ctx).Msgf("unexpected type of interval: %T, value: %v", intervalTyped, intervalTyped)
}
minDocCount := cw.parseMinDocCount(histogram)
currentAggr.Type = bucket_aggregations.NewHistogram(cw.Ctx, interval, minDocCount)
groupByStr := fieldNameProperlyQuoted
if interval != 1 {
if interval != 1.0 {
groupByStr = fmt.Sprintf("floor(%s / %f) * %f", fieldNameProperlyQuoted, interval, interval)
}
currentAggr.GroupByFields = append(currentAggr.GroupByFields, groupByStr)
Expand All @@ -587,7 +593,8 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("date_histogram is not a map, but %T, value: %v", dateHistogramRaw, dateHistogramRaw)
}
currentAggr.Type = bucket_aggregations.NewDateHistogram(cw.Ctx, cw.extractInterval(dateHistogram))
minDocCount := cw.parseMinDocCount(dateHistogram)
currentAggr.Type = bucket_aggregations.NewDateHistogram(cw.Ctx, minDocCount, cw.extractInterval(dateHistogram))
histogramPartOfQuery := cw.createHistogramPartOfQuery(dateHistogram)
currentAggr.GroupByFields = append(currentAggr.GroupByFields, histogramPartOfQuery)
currentAggr.NonSchemaFields = append(currentAggr.NonSchemaFields, histogramPartOfQuery)
Expand Down Expand Up @@ -752,6 +759,22 @@ func (cw *ClickhouseQueryTranslator) parseFieldFromScriptField(queryMap QueryMap
return
}

func (cw *ClickhouseQueryTranslator) parseMinDocCount(queryMap QueryMap) int {
if minDocCountRaw, exists := queryMap["min_doc_count"]; exists {
if minDocCount, ok := minDocCountRaw.(float64); ok {
asInt := int(minDocCount)
if asInt != 0 && asInt != 1 {
logger.WarnWithCtx(cw.Ctx).Msgf("min_doc_count is not 0 or 1, but %d. Not really supported", asInt)
}
return asInt
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("min_doc_count is not a number, but %T, value: %v. Using default value: %d",
minDocCountRaw, minDocCountRaw, bucket_aggregations.DefaultMinDocCount)
}
}
return bucket_aggregations.DefaultMinDocCount
}

func (cw *ClickhouseQueryTranslator) parseFilters(filtersMap QueryMap) []filter {
var filters []filter
filtersMap = filtersMap["filters"].(QueryMap)
Expand Down
Loading

0 comments on commit 0275cac

Please sign in to comment.