Skip to content

Commit

Permalink
Fix problems with dates part 1 (#832)
Browse files Browse the repository at this point in the history
Query from the test caused Quesma to crash without any error message, as
it'd run of out memory in 0,1s or so. You can see it below:
![Screenshot 2024-10-02 at 15 33
54](https://github.com/user-attachments/assets/c1a4cda2-a302-4127-b6d4-570784060e4e)
![Screenshot 2024-10-02 at 15 34
09](https://github.com/user-attachments/assets/4e5565b8-829d-43fe-9ad7-3d1a4cf30faa)
It was a small bug when adding empty rows. Usually keys in
`date_histogram` are `x, x+1, x+2,...`, when we're using `timestamp /
duration` as a key, so number of added empty rows was kind of limited.
But in `calendar_interval` for big intervals, like 1 week, keys are
timestamps in millisecond, so we were adding a new row for each
millisecond, exhausting all the memory very quickly.
Fixed here by considering the second option.
  • Loading branch information
trzysiek authored Oct 3, 2024
1 parent 8441da6 commit b91f7b3
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 13 deletions.
15 changes: 15 additions & 0 deletions quesma/kibana/intervals.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ import (
func ParseInterval(fixedInterval string) (time.Duration, error) {
var unit time.Duration

switch fixedInterval {
case "minute":
return time.Minute, nil
case "hour":
return time.Hour, nil
case "day":
return time.Hour * 24, nil
case "week":
return time.Hour * 24 * 7, nil
case "month":
return time.Hour * 24 * 30, nil
case "year":
return time.Hour * 24 * 365, nil
}

switch {
case strings.HasSuffix(fixedInterval, "d"):
unit = 24 * time.Hour
Expand Down
43 changes: 30 additions & 13 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
// It's needed when date_histogram has subaggregations, because when we process them, we're merging subaggregation's
// map (it has the original key, doesn't know about the processed one)
// with date_histogram's map (it already has a "valid", processed key, after TranslateSqlResponseToJson)
OriginalKeyName = "__quesma_originalKey"
OriginalKeyName = "__quesma_originalKey"
maxEmptyBucketsAdded = 1000
)

type DateHistogram struct {
Expand Down Expand Up @@ -121,7 +122,8 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
}

func (query *DateHistogram) String() string {
return "date_histogram(interval: " + query.interval + ")"
return fmt.Sprintf("date_histogram(field: %v, interval: %v, min_doc_count: %v, timezone: %v",
query.field, query.interval, query.minDocCount, query.timezone)
}

// only intervals <= days are needed
Expand Down Expand Up @@ -219,29 +221,43 @@ func (query *DateHistogram) getKey(row model.QueryResultRow) int64 {
}

func (query *DateHistogram) NewRowsTransformer() model.QueryRowsTransformer {
return &DateHistogramRowsTransformer{minDocCount: query.minDocCount}
differenceBetweenTwoNextKeys := int64(1)
if query.intervalType == DateHistogramCalendarInterval {
duration, err := kibana.ParseInterval(query.interval)
if err == nil {
differenceBetweenTwoNextKeys = duration.Milliseconds()
} else {
logger.ErrorWithCtx(query.ctx).Err(err)
differenceBetweenTwoNextKeys = 0
}
}
return &DateHistogramRowsTransformer{minDocCount: query.minDocCount, differenceBetweenTwoNextKeys: differenceBetweenTwoNextKeys}
}

// we're sure len(row.Cols) >= 2

type DateHistogramRowsTransformer struct {
minDocCount int
minDocCount int
differenceBetweenTwoNextKeys int64 // if 0, we don't add keys
}

// if minDocCount == 0, and we have buckets e.g. [key, value1], [key+10, value2], we need to insert [key+1, 0], [key+2, 0]...
// CAUTION: a different kind of postprocessing is needed for minDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
func (query *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromDB []model.QueryResultRow) []model.QueryResultRow {

if query.minDocCount != 0 || len(rowsFromDB) < 2 {
if qt.minDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 || len(rowsFromDB) < 2 {
// we only add empty rows, when
// a) minDocCount == 0
// b) we have > 1 rows, with < 2 rows we can't add anything in between
// b) we have valid differenceBetweenTwoNextKeys (>0)
// c) we have > 1 rows, with < 2 rows we can't add anything in between
return rowsFromDB
}
if query.minDocCount < 0 {
logger.WarnWithCtx(ctx).Msgf("unexpected negative minDocCount: %d. Skipping postprocess", query.minDocCount)
if qt.minDocCount < 0 {
logger.WarnWithCtx(ctx).Msgf("unexpected negative minDocCount: %d. Skipping postprocess", qt.minDocCount)
return rowsFromDB
}

emptyRowsAdded := 0
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
for i := 1; i < len(rowsFromDB); i++ {
Expand All @@ -252,19 +268,20 @@ func (query *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFr
i-1, rowsFromDB[i-1], i, rowsFromDB[i],
)
}
lastKey := query.getKey(rowsFromDB[i-1])
currentKey := query.getKey(rowsFromDB[i])
for midKey := lastKey + 1; midKey < currentKey; midKey++ {
lastKey := qt.getKey(rowsFromDB[i-1])
currentKey := qt.getKey(rowsFromDB[i])
for midKey := lastKey + qt.differenceBetweenTwoNextKeys; midKey < currentKey && emptyRowsAdded < maxEmptyBucketsAdded; midKey += qt.differenceBetweenTwoNextKeys {
midRow := rowsFromDB[i-1].Copy()
midRow.Cols[len(midRow.Cols)-2].Value = midKey
midRow.Cols[len(midRow.Cols)-1].Value = 0
postprocessedRows = append(postprocessedRows, midRow)
emptyRowsAdded++
}
postprocessedRows = append(postprocessedRows, rowsFromDB[i])
}
return postprocessedRows
}

func (query *DateHistogramRowsTransformer) getKey(row model.QueryResultRow) int64 {
func (qt *DateHistogramRowsTransformer) getKey(row model.QueryResultRow) int64 {
return row.Cols[len(row.Cols)-2].Value.(int64)
}
1 change: 1 addition & 0 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ func allAggregationTests() []testdata.AggregationTestCase {

add(testdata.AggregationTests, "agg_req")
add(testdata.AggregationTests2, "agg_req_2")
add(testdata.AggregationTestsWithDates, "dates")
add(opensearch_visualize.AggregationTests, "opensearch-visualize/agg_req")
add(dashboard_1.AggregationTests, "dashboard-1/agg_req")
add(testdata.PipelineAggregationTests, "pipeline_agg_req")
Expand Down
121 changes: 121 additions & 0 deletions quesma/testdata/dates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package testdata

import "quesma/model"

var AggregationTestsWithDates = []AggregationTestCase{
{ // [0]
TestName: "simple max/min aggregation as 2 siblings",
QueryRequestJson: `
{
"aggs": {
"sampler": {
"aggs": {
"eventRate": {
"date_histogram": {
"extended_bounds": {
"max": 1727859403270,
"min": 1727858503270
},
"field": "order_date",
"calendar_interval": "1w",
"min_doc_count": 0
}
}
},
"random_sampler": {
"probability": 0.000001,
"seed": "1292529172"
}
}
},
"size": 0,
"track_total_hits": false
}`,
ExpectedResponse: `
{
"completion_time_in_millis": 1707486436398,
"expiration_time_in_millis": 1707486496397,
"is_partial": false,
"is_running": false,
"response": {
"_shards": {
"failed": 0,
"skipped": 0,
"successful": 1,
"total": 1
},
"aggregations": {
"sampler": {
"doc_count": 4675,
"eventRate": {
"buckets": [
{
"doc_count": 442,
"key": 1726358400000,
"key_as_string": "2024-09-15T00:00:00.000"
},
{
"doc_count": 0,
"key": 1726963200000,
"key_as_string": "2024-09-22T00:00:00.000"
},
{
"doc_count": 0,
"key": 1727568000000,
"key_as_string": "2024-09-29T00:00:00.000"
},
{
"doc_count": 0,
"key": 1728172800000,
"key_as_string": "2024-10-06T00:00:00.000"
},
{
"doc_count": 1,
"key": 1728777600000,
"key_as_string": "2024-10-13T00:00:00.000"
}
]
}
}
},
"hits": {
"hits": [],
"max_score": null,
"total": {
"relation": "eq",
"value": 2200
}
},
"timed_out": false,
"took": 1
},
"start_time_in_millis": 1707486436397
}`,
ExpectedPancakeResults: []model.QueryResultRow{
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__sampler__count", int64(4675)),
model.NewQueryResultCol("aggr__sampler__eventRate__key_0", int64(1726358400000)),
model.NewQueryResultCol("aggr__sampler__eventRate__count", int64(442)),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__sampler__count", int64(4675)),
model.NewQueryResultCol("aggr__sampler__eventRate__key_0", int64(1728777600000)),
model.NewQueryResultCol("aggr__sampler__eventRate__count", int64(1)),
}},
},
ExpectedPancakeSQL: `
SELECT sum(count(*)) OVER () AS "aggr__sampler__count",
toInt64(toUnixTimestamp(toStartOfWeek(toTimezone("order_date", 'UTC'))))*1000
AS "aggr__sampler__eventRate__key_0",
count(*) AS "aggr__sampler__eventRate__count"
FROM (
SELECT "order_date"
FROM __quesma_table_name
LIMIT 20000)
GROUP BY toInt64(toUnixTimestamp(toStartOfWeek(toTimezone("order_date", 'UTC')))
)*1000 AS "aggr__sampler__eventRate__key_0"
ORDER BY "aggr__sampler__eventRate__key_0" ASC`,
},
}

0 comments on commit b91f7b3

Please sign in to comment.