Skip to content

Commit

Permalink
Report errors in queries better #2 (in parsing bucket aggregations) (#…
Browse files Browse the repository at this point in the history
…1006)

**old comment (motivation):**
Our error messages for errors encountered while parsing query request
isn't as good as Elastic's, I'll try to improve it here. (It's not that
unimportant, with invalid query request where Elastic always fails with
an error response, we often don't and e.g. return empty results, which
might trick the user that queries went fine)

I think I'll split it into few PRs. This might be a good starting point
for another one. (update: actually similar small improvement was already
merged, but for pipeline aggregations)

**new comment (what's been done):**
There's really not much going on here.
Clue of this PR is to change our very ugly 400-line `func (cw
*ClickhouseQueryTranslator) pancakeTryBucketAggregation` function into
something a bit nicer. And also report errors in more places, to be
consistent with Elastic's behaviour.

Quite a few style improvements that I automatically caught in the
process, but only some trivial/very local ones, so extracting copy/paste
used 5 times into 1 function, consistent naming across parsers of
different aggregations, etc. (not much more).
  • Loading branch information
trzysiek authored Dec 12, 2024
1 parent 8ac3b1f commit 5ed84b9
Show file tree
Hide file tree
Showing 11 changed files with 803 additions and 540 deletions.
72 changes: 36 additions & 36 deletions quesma/model/bucket_aggregations/dateRange.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@ const UnboundedInterval = "*"
// 1) in Clickhouse's proper format, e.g. toStartOfDay(subDate(now(), INTERVAL 3 week))
// 2) * (UnboundedInterval), which means no bound
type DateTimeInterval struct {
Begin string
End string
begin string
end string
}

func NewDateTimeInterval(begin, end string) DateTimeInterval {
return DateTimeInterval{
Begin: begin,
End: end,
begin: begin,
end: end,
}
}

// BeginTimestampToSQL returns SQL select for the begin timestamp, and a boolean indicating if the select is needed
// We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d.
// It's only 1 more field to our SELECT query, so it shouldn't be a performance issue.
func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) {
if interval.Begin != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.Begin))), true
if interval.begin != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.begin))), true
}
return nil, false
}
Expand All @@ -41,21 +41,21 @@ func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, se
// We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d.
// It's only 1 more field to our SELECT query, so it isn't a performance issue.
func (interval DateTimeInterval) EndTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) {
if interval.End != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.End))), true
if interval.end != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.end))), true
}
return nil, false
}

func (interval DateTimeInterval) ToWhereClause(fieldName string) model.Expr {
func (interval DateTimeInterval) ToWhereClause(field model.Expr) model.Expr {
begin, isBegin := interval.BeginTimestampToSQL()
end, isEnd := interval.EndTimestampToSQL()

if isBegin {
begin = model.NewInfixExpr(model.NewColumnRef(fieldName), ">=", begin)
begin = model.NewInfixExpr(field, ">=", begin)
}
if isEnd {
end = model.NewInfixExpr(model.NewColumnRef(fieldName), "<", end)
end = model.NewInfixExpr(field, "<", end)
}

if isBegin && isEnd {
Expand All @@ -65,20 +65,20 @@ func (interval DateTimeInterval) ToWhereClause(fieldName string) model.Expr {
} else if isEnd {
return end
} else {
return model.NewLiteral("TRUE")
return model.TrueExpr
}
}

type DateRange struct {
ctx context.Context
FieldName string
Format string
Intervals []DateTimeInterval
SelectColumnsNr int // how many columns we add to the query because of date_range aggregation, e.g. SELECT x,y,z -> 3
field model.Expr
format string
intervals []DateTimeInterval
selectColumnsNr int // how many columns we add to the query because of date_range aggregation, e.g. SELECT x,y,z -> 3
}

func NewDateRange(ctx context.Context, fieldName string, format string, intervals []DateTimeInterval, selectColumnsNr int) DateRange {
return DateRange{ctx: ctx, FieldName: fieldName, Format: format, Intervals: intervals, SelectColumnsNr: selectColumnsNr}
func NewDateRange(ctx context.Context, field model.Expr, format string, intervals []DateTimeInterval, selectColumnsNr int) DateRange {
return DateRange{ctx: ctx, field: field, format: format, intervals: intervals, selectColumnsNr: selectColumnsNr}
}

func (query DateRange) AggregationType() model.AggregationType {
Expand All @@ -92,15 +92,15 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m
}

response := make([]model.JsonMap, 0)
startIteration := len(rows[0].Cols) - 1 - query.SelectColumnsNr
startIteration := len(rows[0].Cols) - 1 - query.selectColumnsNr
if startIteration < 0 || startIteration >= len(rows[0].Cols) {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected column nr in aggregation response, startIteration: %d, len(rows[0].Cols): %d",
startIteration, len(rows[0].Cols),
)
return nil
}
for intervalIdx, columnIdx := 0, startIteration; intervalIdx < len(query.Intervals); intervalIdx++ {
for intervalIdx, columnIdx := 0, startIteration; intervalIdx < len(query.intervals); intervalIdx++ {
responseForInterval, nextColumnIdx := query.responseForInterval(&rows[0], intervalIdx, columnIdx)
response = append(response, responseForInterval)
columnIdx = nextColumnIdx
Expand All @@ -111,7 +111,7 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m
}

func (query DateRange) String() string {
return "date_range, intervals: " + fmt.Sprintf("%v", query.Intervals)
return "date_range, intervals: " + fmt.Sprintf("%v", query.intervals)
}

func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalIdx, columnIdx int) (
Expand All @@ -123,7 +123,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId

var from, to int64
var fromString, toString string
if query.Intervals[intervalIdx].Begin == UnboundedInterval {
if query.intervals[intervalIdx].begin == UnboundedInterval {
fromString = UnboundedInterval
} else {
if columnIdx >= len(row.Cols) {
Expand All @@ -137,7 +137,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId
columnIdx++
}

if query.Intervals[intervalIdx].End == UnboundedInterval {
if query.intervals[intervalIdx].end == UnboundedInterval {
toString = UnboundedInterval
} else {
if columnIdx >= len(row.Cols) {
Expand Down Expand Up @@ -173,16 +173,16 @@ func (query DateRange) DoesNotHaveGroupBy() bool {
}

func (query DateRange) CombinatorGroups() (result []CombinatorGroup) {
for intervalIdx, interval := range query.Intervals {
for intervalIdx, interval := range query.intervals {
prefix := fmt.Sprintf("range_%d__", intervalIdx)
if len(query.Intervals) == 1 {
if len(query.intervals) == 1 {
prefix = ""
}
result = append(result, CombinatorGroup{
idx: intervalIdx,
Prefix: prefix,
Key: prefix, // TODO: we need translate date to real time
WhereClause: interval.ToWhereClause(query.FieldName),
WhereClause: interval.ToWhereClause(query.field),
})
}
return
Expand All @@ -199,23 +199,23 @@ func (query DateRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorG
}

// TODO: we need translate relative to real time
interval := query.Intervals[subGroup.idx]
if interval.Begin != UnboundedInterval {
response["from"] = interval.Begin
response["from_as_string"] = interval.Begin
interval := query.intervals[subGroup.idx]
if interval.begin != UnboundedInterval {
response["from"] = interval.begin
response["from_as_string"] = interval.begin
}
if interval.End != UnboundedInterval {
response["to"] = interval.End
response["to_as_string"] = interval.End
if interval.end != UnboundedInterval {
response["to"] = interval.end
response["to_as_string"] = interval.end
}

return response
}

func (query DateRange) CombinatorSplit() []model.QueryType {
result := make([]model.QueryType, 0, len(query.Intervals))
for _, interval := range query.Intervals {
result = append(result, NewDateRange(query.ctx, query.FieldName, query.Format, []DateTimeInterval{interval}, query.SelectColumnsNr))
result := make([]model.QueryType, 0, len(query.intervals))
for _, interval := range query.intervals {
result = append(result, NewDateRange(query.ctx, query.field, query.format, []DateTimeInterval{interval}, query.selectColumnsNr))
}
return result
}
6 changes: 0 additions & 6 deletions quesma/model/bucket_aggregations/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"strings"
)

const keyedDefaultValue = false

var IntervalInfiniteRange = math.NaN()

type Interval struct {
Expand Down Expand Up @@ -91,10 +89,6 @@ func NewRange(ctx context.Context, expr model.Expr, intervals []Interval, keyed
return Range{ctx, expr, intervals, keyed}
}

func NewRangeWithDefaultKeyed(ctx context.Context, expr model.Expr, intervals []Interval) Range {
return Range{ctx, expr, intervals, keyedDefaultValue}
}

func (query Range) AggregationType() model.AggregationType {
return model.BucketAggregation
}
Expand Down
93 changes: 33 additions & 60 deletions quesma/queryparser/aggregation_date_range_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,80 +3,53 @@
package queryparser

import (
"quesma/logger"
"fmt"
"quesma/model/bucket_aggregations"
"unicode"
)

func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(dateRange QueryMap) (bucket_aggregations.DateRange, error) {
var err error
var fieldName, format string

if field, exists := dateRange["field"]; exists {
if fieldNameRaw, ok := field.(string); ok {
fieldName = cw.ResolveField(cw.Ctx, fieldNameRaw)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("no field specified for date range aggregation. Using empty. Querymap: %v", dateRange)
}
var ranges []any
var ok bool
if formatRaw, exists := dateRange["format"]; exists {
if formatParsed, ok := formatRaw.(string); ok {
format = formatParsed
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("format specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange)
}
func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) {
field := cw.parseFieldField(params, "date_range")
if field == nil {
return fmt.Errorf("no field specified for date range aggregation, params: %v", params)
}
if rangesRaw, exists := dateRange["ranges"]; exists {
if ranges, ok = rangesRaw.([]any); !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("ranges specified for date range aggregation is not an array. Using empty. Querymap: %v", dateRange)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("no ranges specified for date range aggregation. Using empty. Querymap: %v", dateRange)
format := cw.parseStringField(params, "format", "")
ranges, err := cw.parseArrayField(params, "ranges")
if err != nil {
return err
}

intervals := make([]bucket_aggregations.DateTimeInterval, 0, len(ranges))
selectColumnsNr := len(ranges) // we query Clickhouse for every unbounded part of interval (begin and end)
for _, Range := range ranges {
rangeMap := Range.(QueryMap)
var intervalBegin, intervalEnd string
from, exists := rangeMap["from"]
if exists {
if fromRaw, ok := from.(string); ok {
intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(fromRaw)
if err != nil {
return bucket_aggregations.DateRange{}, err
}
selectColumnsNr++
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("from specified for date range aggregation is not a string. Querymap: %v "+
"Using default (unbounded).", dateRange)
intervalBegin = bucket_aggregations.UnboundedInterval
for _, rangeRaw := range ranges {
rangeMap, ok := rangeRaw.(QueryMap)
if !ok {
return fmt.Errorf("range is not a map, but %T, range: %v", rangeRaw, rangeRaw)
}

const defaultIntervalBound = bucket_aggregations.UnboundedInterval
intervalBegin := defaultIntervalBound
if from := cw.parseStringField(rangeMap, "from", defaultIntervalBound); from != defaultIntervalBound {
intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(from)
if err != nil {
return err
}
} else {
intervalBegin = bucket_aggregations.UnboundedInterval
selectColumnsNr++
}
to, exists := rangeMap["to"]
if exists {
if toRaw, ok := to.(string); ok {
intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(toRaw)
if err != nil {
return bucket_aggregations.DateRange{}, err
}
selectColumnsNr++
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("To specified for date range aggregation is not a string. Querymap: %v "+
"Using default (unbounded).", dateRange)
intervalEnd = bucket_aggregations.UnboundedInterval

intervalEnd := bucket_aggregations.UnboundedInterval
if to := cw.parseStringField(rangeMap, "to", defaultIntervalBound); to != defaultIntervalBound {
intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(to)
if err != nil {
return err
}
} else {
intervalEnd = bucket_aggregations.UnboundedInterval
selectColumnsNr++
}
intervals = append(intervals, bucket_aggregations.NewDateTimeInterval(intervalBegin, intervalEnd))
}
return bucket_aggregations.NewDateRange(cw.Ctx, fieldName, format, intervals, selectColumnsNr), nil

aggregation.queryType = bucket_aggregations.NewDateRange(cw.Ctx, field, format, intervals, selectColumnsNr)
return nil
}

// parseDateTimeInClickhouseMathLanguage parses dateTime from Clickhouse's format
Expand Down
17 changes: 16 additions & 1 deletion quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package queryparser

import (
"fmt"
"quesma/clickhouse"
"quesma/logger"
"quesma/model"
Expand Down Expand Up @@ -175,7 +176,11 @@ func (cw *ClickhouseQueryTranslator) parseTopHits(queryMap QueryMap) (parsedTopH
const defaultSize = 1
size := cw.parseSize(params, defaultSize)

orderBy := cw.parseOrder(params, queryMap, []model.Expr{})
orderBy, err := cw.parseOrder(params, []model.Expr{})
if err != nil {
logger.WarnWithCtx(cw.Ctx).Msgf("error parsing order in top_hits: %v", err)
return
}
if len(orderBy) == 1 && orderBy[0].IsCountDesc() { // we don't need count DESC
orderBy = []model.OrderByExpr{}
}
Expand Down Expand Up @@ -287,6 +292,16 @@ func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldNa
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldName string) ([]any, error) {
if valueRaw, exists := queryMap[fieldName]; exists {
if asArray, ok := valueRaw.([]any); ok {
return asArray, nil
}
return nil, fmt.Errorf("%s is not an array, but %T, value: %v", fieldName, valueRaw, valueRaw)
}
return nil, fmt.Errorf("array field '%s' not found in aggregation queryMap: %v", fieldName, queryMap)
}

// parseFieldFieldMaybeScript is basically almost a copy of parseFieldField above, but it also handles a basic script, if "field" is missing.
func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any, aggregationType string) (field model.Expr, isFromScript bool) {
Map, ok := shouldBeMap.(QueryMap)
Expand Down
Loading

0 comments on commit 5ed84b9

Please sign in to comment.