Skip to content

Commit

Permalink
Fix graying date_histogram (#286)
Browse files Browse the repository at this point in the history
I'll quickfix that:
![image
(3)](https://github.com/QuesmaOrg/quesma/assets/5407146/da9d8fec-c151-467e-8a80-2bc648fd3ff9)

Also while at it, `calendar_interval` and `fixed_interval` have
different sematic (slightly, but it can very easily be reproduced making
our responses incorrect). `Date histograms` are very very common, so
it'd be good to have it correct, it's not much work.
  • Loading branch information
trzysiek authored Jul 11, 2024
1 parent d69f3b2 commit 95521b5
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 56 deletions.
9 changes: 9 additions & 0 deletions quesma/clickhouse/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string) DateTimeT
if t.Config.hasTimestamp && fieldName == timestampFieldName {
return DateTime64
}
logger.WarnWithCtx(ctx).Msgf("datetime field '%s' not found in table '%s'", fieldName, t.Name)
return Invalid
}

func (t *Table) GetDateTimeTypeFromExpr(ctx context.Context, expr model.Expr) DateTimeType {
if ref, ok := expr.(model.ColumnRef); ok {
return t.GetDateTimeType(ctx, ref.ColumnName)
}
logger.WarnWithCtx(ctx).Msgf("datetime field '%v' not found in table '%s'", expr, t.Name)
return Invalid
}

Expand Down
144 changes: 118 additions & 26 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,56 @@ package bucket_aggregations

import (
"context"
"quesma/clickhouse"
"quesma/kibana"
"quesma/logger"
"quesma/model"
"strconv"
"strings"
"time"
)

const DefaultMinDocCount = 1
type DateHistogramIntervalType bool

const (
DefaultMinDocCount = 1
DateHistogramFixedInterval DateHistogramIntervalType = true
DateHistogramCalendarInterval DateHistogramIntervalType = false
defaultDateTimeType = clickhouse.DateTime64
)

type DateHistogram struct {
ctx context.Context
minDocCount int
Interval string
ctx context.Context
field model.Expr // name of the field, e.g. timestamp
interval string
minDocCount int
intervalType DateHistogramIntervalType
fieldDateTimeType clickhouse.DateTimeType
}

func NewDateHistogram(ctx context.Context, minDocCount int, interval string) DateHistogram {
return DateHistogram{ctx, minDocCount, interval}
func NewDateHistogram(ctx context.Context, field model.Expr, interval string,
minDocCount int, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram {
return &DateHistogram{ctx: ctx, field: field, interval: interval,
minDocCount: minDocCount, intervalType: intervalType, fieldDateTimeType: fieldDateTimeType}
}

func (typ DateHistogramIntervalType) String(ctx context.Context) string {
switch typ {
case DateHistogramFixedInterval:
return "fixed_interval"
case DateHistogramCalendarInterval:
return "calendar_interval"
default:
logger.ErrorWithCtx(ctx).Msgf("unexpected DateHistogramIntervalType: %v", typ) // error as it should be impossible
return "invalid"
}
}

func (query DateHistogram) IsBucketAggregation() bool {
func (query *DateHistogram) IsBucketAggregation() bool {
return true
}

func (query DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) > 0 && len(rows[0].Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in date_histogram aggregation response, len(rows[0].Cols): "+
Expand All @@ -36,13 +62,14 @@ func (query DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRo
}
var response []model.JsonMap
for _, row := range rows {
intervalInMilliseconds := query.IntervalAsDuration().Milliseconds()
var key int64
if keyValue, ok := row.Cols[len(row.Cols)-2].Value.(int64); ok { // used to be [level-1], but because some columns are duplicated, it doesn't work in 100% cases now
key = keyValue * intervalInMilliseconds
if query.intervalType == DateHistogramCalendarInterval {
key = query.getKey(row)
} else {
logger.WarnWithCtx(query.ctx).Msgf("unexpected type of key value: %T, %+v, Should be int64", row.Cols[len(row.Cols)-2].Value, row.Cols[len(row.Cols)-2].Value)
intervalInMilliseconds := query.intervalAsDuration().Milliseconds()
key = query.getKey(row) * intervalInMilliseconds
}

intervalStart := time.UnixMilli(key).UTC().Format("2006-01-02T15:04:05.000")
response = append(response, model.JsonMap{
"key": key,
Expand All @@ -53,35 +80,100 @@ func (query DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRo
return response
}

func (query DateHistogram) String() string {
return "date_histogram(interval: " + query.Interval + ")"
func (query *DateHistogram) String() string {
return "date_histogram(interval: " + query.interval + ")"
}

// TODO implement this also for intervals longer than days ("d")
func (query DateHistogram) IntervalAsDuration() time.Duration {
// time.ParseDuration doesn't accept > hours
if strings.HasSuffix(query.Interval, "d") {
daysNr, err := strconv.Atoi(strings.TrimSuffix(query.Interval, "d"))
// only intervals <= days are needed
func (query *DateHistogram) intervalAsDuration() time.Duration {
var intervalInHoursOrLess string
if strings.HasSuffix(query.interval, "d") {
// time.ParseDuration doesn't accept > hours, we need to convert days to hours
daysNr, err := strconv.Atoi(strings.TrimSuffix(query.interval, "d"))
if err != nil {
logger.ErrorWithCtx(query.ctx).Msgf("error parsing interval %s: [%v]. Returning 0", query.Interval, err)
logger.ErrorWithCtx(query.ctx).Msgf("error parsing interval %s: [%v]. Returning 0", query.interval, err)
return time.Duration(0)
}
intervalInHours := strconv.Itoa(daysNr*24) + "h"
duration, _ := time.ParseDuration(intervalInHours)
return duration
intervalInHoursOrLess = strconv.Itoa(daysNr*24) + "h"
} else {
intervalInHoursOrLess = query.interval
}
duration, _ := time.ParseDuration(query.Interval)
duration, _ := time.ParseDuration(intervalInHoursOrLess)
return duration
}

func (query *DateHistogram) GenerateSQL() model.Expr {
switch query.intervalType {
case DateHistogramFixedInterval:
return query.generateSQLForFixedInterval()
case DateHistogramCalendarInterval:
return query.generateSQLForCalendarInterval()
default:
logger.WarnWithCtx(query.ctx).Msgf("invalid interval type: %v (should be impossible). Returning InvalidExpr",
query.intervalType.String(query.ctx))
return model.InvalidExpr
}
}

func (query *DateHistogram) generateSQLForFixedInterval() model.Expr {
interval, err := kibana.ParseInterval(query.interval)
if err != nil {
logger.ErrorWithCtx(query.ctx).Msg(err.Error())
}
dateTimeType := query.fieldDateTimeType
if query.fieldDateTimeType == clickhouse.Invalid {
logger.ErrorWithCtx(query.ctx).Msgf("invalid date type for DateHistogram %+v. Using DateTime64 as default.", query)
dateTimeType = defaultDateTimeType
}
return clickhouse.TimestampGroupBy(query.field, dateTimeType, interval)
}

func (query *DateHistogram) generateSQLForCalendarInterval() model.Expr {
exprForBiggerIntervals := func(toIntervalStartFuncName string) model.Expr {
// returned expr as string:
// "1000 * toInt64(toUnixTimestamp(toStartOf[Week|Month|Quarter|Year](timestamp)))"
toStartOf := model.NewFunction(toIntervalStartFuncName, query.field)
toUnixTimestamp := model.NewFunction("toUnixTimestamp", toStartOf)
toInt64 := model.NewFunction("toInt64", toUnixTimestamp)
return model.NewInfixExpr(toInt64, "*", model.NewLiteral(1000))
}

// calendar_interval: minute/hour/day are the same as fixed_interval: 1m/1h/1d
switch query.interval {
case "minute", "1m":
query.interval = "1m"
query.intervalType = DateHistogramFixedInterval
return query.generateSQLForFixedInterval()
case "hour", "1h":
query.interval = "1h"
query.intervalType = DateHistogramFixedInterval
return query.generateSQLForFixedInterval()
case "day", "1d":
query.interval = "1d"
query.intervalType = DateHistogramFixedInterval
return query.generateSQLForFixedInterval()
case "week", "1w":
return exprForBiggerIntervals("toStartOfWeek")
case "month", "1M":
return exprForBiggerIntervals("toStartOfMonth")
case "quarter", "1q":
return exprForBiggerIntervals("toStartOfQuarter")
case "year", "1y":
return exprForBiggerIntervals("toStartOfYear")
}

logger.WarnWithCtx(query.ctx).Msgf("unexpected calendar interval: %s. Returning InvalidExpr", query.interval)
return model.InvalidExpr
}

// we're sure len(row.Cols) >= 2
func (query DateHistogram) getKey(row model.QueryResultRow) int64 {
func (query *DateHistogram) getKey(row model.QueryResultRow) int64 {
return row.Cols[len(row.Cols)-2].Value.(int64)
}

// if minDocCount == 0, and we have buckets e.g. [key, value1], [key+10, value2], we need to insert [key+1, 0], [key+2, 0]...
// CAUTION: a different kind of postprocessing is needed for minDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
func (query DateHistogram) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
func (query *DateHistogram) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
if query.minDocCount != 0 || len(rowsFromDB) < 2 {
// we only add empty rows, when
// a) minDocCount == 0
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/bucket_aggregations/date_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ func TestTranslateSqlResponseToJson(t *testing.T) {
{"key": int64(56962398) * 30_000, "doc_count": 8, "key_as_string": "2024-02-25T14:39:00.000"},
{"key": int64(56962370) * 30_000, "doc_count": 14, "key_as_string": "2024-02-25T14:25:00.000"},
}
response := DateHistogram{Interval: interval}.TranslateSqlResponseToJson(resultRows, 1)
response := (&DateHistogram{interval: interval, intervalType: DateHistogramFixedInterval}).TranslateSqlResponseToJson(resultRows, 1)
assert.Equal(t, expectedResponse, response)
}
2 changes: 2 additions & 0 deletions quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type Expr interface {
Accept(v ExprVisitor) interface{}
}

var InvalidExpr = Expr(nil)

// ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go
type ColumnRef struct {
ColumnName string
Expand Down
3 changes: 3 additions & 0 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (q *Query) CopyAggregationFields(qwa Query) {
q.SelectCommand.Columns = make([]Expr, len(qwa.SelectCommand.Columns))
copy(q.SelectCommand.Columns, qwa.SelectCommand.Columns)

q.SelectCommand.OrderBy = make([]OrderByExpr, len(qwa.SelectCommand.OrderBy))
copy(q.SelectCommand.OrderBy, qwa.SelectCommand.OrderBy)

q.Aggregators = make([]Aggregator, len(qwa.Aggregators))
copy(q.Aggregators, qwa.Aggregators)
}
Expand Down
19 changes: 14 additions & 5 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,13 +733,22 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("date_histogram is not a map, but %T, value: %v", dateHistogramRaw, dateHistogramRaw)
}
field := cw.parseFieldField(dateHistogram, "date_histogram")
minDocCount := cw.parseMinDocCount(dateHistogram)
currentAggr.Type = bucket_aggregations.NewDateHistogram(cw.Ctx, minDocCount, cw.extractInterval(dateHistogram))
histogramPartOfQuery := cw.createHistogramPartOfQuery(dateHistogram)
interval, intervalType := cw.extractInterval(dateHistogram)
dateTimeType := cw.Table.GetDateTimeTypeFromExpr(cw.Ctx, field)

currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, histogramPartOfQuery)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, histogramPartOfQuery)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewOrderByExprWithoutOrder(histogramPartOfQuery))
if dateTimeType == clickhouse.Invalid {
logger.WarnWithCtx(cw.Ctx).Msgf("invalid date time type for field %s", field)
}

dateHistogramAggr := bucket_aggregations.NewDateHistogram(cw.Ctx, field, interval, minDocCount, intervalType, dateTimeType)
currentAggr.Type = dateHistogramAggr

sqlQuery := dateHistogramAggr.GenerateSQL()
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, sqlQuery)
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, sqlQuery)
currentAggr.SelectCommand.OrderBy = append(currentAggr.SelectCommand.OrderBy, model.NewOrderByExprWithoutOrder(sqlQuery))

delete(queryMap, "date_histogram")
return success, 1, nil
Expand Down
1 change: 1 addition & 0 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), SchemaRegistry: s}
allTests := testdata.AggregationTests
allTests = append(allTests, testdata.AggregationTests2...)
allTests = append(allTests, opensearch_visualize.AggregationTests...)
allTests = append(allTests, dashboard_1.AggregationTests...)
allTests = append(allTests, testdata.PipelineAggregationTests...)
Expand Down
17 changes: 10 additions & 7 deletions quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"quesma/clickhouse"
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
"quesma/model/typical_queries"
"quesma/queryparser/lucene"
"quesma/quesma/types"
Expand Down Expand Up @@ -1202,27 +1203,29 @@ func (cw *ClickhouseQueryTranslator) isItListRequest(queryMap QueryMap) (model.S
}
}

func (cw *ClickhouseQueryTranslator) extractInterval(queryMap QueryMap) string {
func (cw *ClickhouseQueryTranslator) extractInterval(queryMap QueryMap) (interval string, intervalType bucket_aggregations.DateHistogramIntervalType) {
const defaultInterval = "30s"
const defaultIntervalType = bucket_aggregations.DateHistogramFixedInterval
if fixedInterval, exists := queryMap["fixed_interval"]; exists {
if asString, ok := fixedInterval.(string); ok {
return asString
return asString, bucket_aggregations.DateHistogramFixedInterval
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected type of interval: %T, value: %v. Returning default", fixedInterval, fixedInterval)
return defaultInterval
return defaultInterval, bucket_aggregations.DateHistogramFixedInterval
}
}
if calendarInterval, exists := queryMap["calendar_interval"]; exists {
if asString, ok := calendarInterval.(string); ok {
return asString
return asString, bucket_aggregations.DateHistogramCalendarInterval
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("unexpected type of interval: %T, value: %v. Returning default", calendarInterval, calendarInterval)
return defaultInterval
return defaultInterval, bucket_aggregations.DateHistogramCalendarInterval
}
}

logger.WarnWithCtx(cw.Ctx).Msgf("extractInterval: no interval found, returning default: %s", defaultInterval)
return defaultInterval
// this should NEVER happen (query should always have either fixed_interval, or calendar_interval_field), so defaultIntervalType is totally arbitrary
logger.WarnWithCtx(cw.Ctx).Msgf("extractInterval: no interval found, returning default: %s (%s)", defaultInterval, defaultIntervalType.String(cw.Ctx))
return defaultInterval, defaultIntervalType
}

// parseSortFields parses sort fields from the query
Expand Down
16 changes: 0 additions & 16 deletions quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package queryparser
import (
"context"
"quesma/clickhouse"
"quesma/kibana"
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
Expand Down Expand Up @@ -490,21 +489,6 @@ func (cw *ClickhouseQueryTranslator) BuildFacetsQuery(fieldName string, simpleQu
}
}

func (cw *ClickhouseQueryTranslator) createHistogramPartOfQuery(queryMap QueryMap) model.Expr {
const defaultDateTimeType = clickhouse.DateTime64
field := cw.parseFieldField(queryMap, "histogram")
interval, err := kibana.ParseInterval(cw.extractInterval(queryMap))
if err != nil {
logger.ErrorWithCtx(cw.Ctx).Msg(err.Error())
}
dateTimeType := cw.GetDateTimeTypeFromSelectClause(cw.Ctx, field)
if dateTimeType == clickhouse.Invalid {
logger.ErrorWithCtx(cw.Ctx).Msgf("invalid date type for field %+v. Using DateTime64 as default.", field)
dateTimeType = defaultDateTimeType
}
return clickhouse.TimestampGroupBy(field, dateTimeType, interval)
}

// sortInTopologicalOrder sorts all our queries to DB, which we send to calculate response for a single query request.
// It sorts them in a way that we can calculate them in the returned order, so any parent aggregation needs to be calculated before its child.
// It's only really needed for pipeline aggregations, as only they have parent-child relationships.
Expand Down
2 changes: 1 addition & 1 deletion quesma/testdata/aggregation_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4150,7 +4150,7 @@ var AggregationTests = []AggregationTestCase{
}
},
"date_histogram": {
"calendar_interval": "22h",
"fixed_interval": "22h",
"field": "@timestamp"
},
"meta": {
Expand Down
Loading

0 comments on commit 95521b5

Please sign in to comment.