Skip to content

Commit

Permalink
Report errors in queries better #2 (#1012)
Browse files Browse the repository at this point in the history
Similar to #1006 , but for
pipeline aggregations.

I want to simplify and unify interfaces of our parsers, so all of them
simply return something like `(aggregation, error)`. Code should be
cleaner, and proper returning errors to Kibana much easier.
  • Loading branch information
trzysiek authored Nov 17, 2024
1 parent 00f0922 commit 15bcad3
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 158 deletions.
2 changes: 2 additions & 0 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type metricsAggregation struct {
sigma float64 // only for standard deviation
}

type aggregationParser = func(queryMap QueryMap) (model.QueryType, error)

const metricsAggregationDefaultFieldType = clickhouse.Invalid

// Tries to parse metrics aggregation from queryMap. If it's not a metrics aggregation, returns false.
Expand Down
7 changes: 5 additions & 2 deletions quesma/queryparser/pancake_aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (cw *ClickhouseQueryTranslator) pancakeParseAggregation(aggregationName str
return nil, nil
}

// check if metadata's present
// check if metadata is present
var metadata model.JsonMap
if metaRaw, exists := queryMap["meta"]; exists {
metadata = metaRaw.(model.JsonMap)
Expand Down Expand Up @@ -143,7 +143,10 @@ func (cw *ClickhouseQueryTranslator) pancakeParseAggregation(aggregationName str
}

// 2. Pipeline aggregation => always leaf (for now)
if pipelineAggr, isPipeline := cw.parsePipelineAggregations(queryMap); isPipeline {
if pipelineAggr, err := cw.parsePipelineAggregations(queryMap); err != nil || pipelineAggr != nil {
if err != nil {
return nil, err
}
aggregation.queryType = pipelineAggr
return aggregation, nil
}
Expand Down
241 changes: 85 additions & 156 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
package queryparser

import (
"quesma/logger"
"errors"
"fmt"
"quesma/model"
"quesma/model/pipeline_aggregations"
"quesma/util"
Expand All @@ -12,217 +13,147 @@ import (

// CAUTION: maybe "return" everywhere isn't corrent, as maybe there can be multiple pipeline aggregations at one level.
// But I've tested some complex queries and it seems to not be the case. So let's keep it this way for now.
func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
if aggregationType, success = cw.parseBucketScriptBasic(queryMap); success {
delete(queryMap, "bucket_script")
return
}
if aggregationType, success = cw.parseCumulativeSum(queryMap); success {
delete(queryMap, "cumulative_sum")
return
}
if aggregationType, success = cw.parseDerivative(queryMap); success {
delete(queryMap, "derivative")
return
}
if aggregationType, success = cw.parseSerialDiff(queryMap); success {
delete(queryMap, "derivative")
return
}
if aggregationType, success = cw.parseAverageBucket(queryMap); success {
delete(queryMap, "avg_bucket")
return
}
if aggregationType, success = cw.parseMinBucket(queryMap); success {
delete(queryMap, "min_bucket")
return
}
if aggregationType, success = cw.parseMaxBucket(queryMap); success {
delete(queryMap, "max_bucket")
return
}
if aggregationType, success = cw.parseSumBucket(queryMap); success {
delete(queryMap, "sum_bucket")
return
func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap) (aggregationType model.QueryType, err error) {
parsers := map[string]aggregationParser{
"bucket_script": cw.parseBucketScriptBasic,
"cumulative_sum": cw.parseCumulativeSum,
"derivative": cw.parseDerivative,
"serial_diff": cw.parseSerialDiff,
"avg_bucket": cw.parseAverageBucket,
"min_bucket": cw.parseMinBucket,
"max_bucket": cw.parseMaxBucket,
"sum_bucket": cw.parseSumBucket,
}

for aggrName, aggrParser := range parsers {
if paramsRaw, exists := queryMap[aggrName]; exists {
if params, ok := paramsRaw.(QueryMap); ok {
delete(queryMap, aggrName)
return aggrParser(params)
}
return nil, fmt.Errorf("%s is not a map, but %T, value: %v", aggrName, paramsRaw, paramsRaw)
}
}
return
}

func (cw *ClickhouseQueryTranslator) parseCumulativeSum(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
cumulativeSumRaw, exists := queryMap["cumulative_sum"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(cumulativeSumRaw, "cumulative_sum")
if !ok {
return
}
return pipeline_aggregations.NewCumulativeSum(cw.Ctx, bucketsPath), true
return nil, nil
}

func (cw *ClickhouseQueryTranslator) parseDerivative(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
derivativeRaw, exists := queryMap["derivative"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(derivativeRaw, "derivative")
if !ok {
return
func (cw *ClickhouseQueryTranslator) parseCumulativeSum(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "cumulative_sum")
if err != nil {
return nil, err
}
return pipeline_aggregations.NewDerivative(cw.Ctx, bucketsPath), true
return pipeline_aggregations.NewCumulativeSum(cw.Ctx, bucketsPath), nil
}

func (cw *ClickhouseQueryTranslator) parseAverageBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
avgBucketRaw, exists := queryMap["avg_bucket"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(avgBucketRaw, "avg_bucket")
if !ok {
return
func (cw *ClickhouseQueryTranslator) parseDerivative(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "derivative")
if err != nil {
return nil, err
}
return pipeline_aggregations.NewAverageBucket(cw.Ctx, bucketsPath), true
return pipeline_aggregations.NewDerivative(cw.Ctx, bucketsPath), nil
}

func (cw *ClickhouseQueryTranslator) parseMinBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
minBucketRaw, exists := queryMap["min_bucket"]
if !exists {
return
func (cw *ClickhouseQueryTranslator) parseAverageBucket(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "avg_bucket")
if err != nil {
return nil, err
}
bucketsPath, ok := cw.parseBucketsPath(minBucketRaw, "min_bucket")
if !ok {
return
}
return pipeline_aggregations.NewMinBucket(cw.Ctx, bucketsPath), true
return pipeline_aggregations.NewAverageBucket(cw.Ctx, bucketsPath), nil
}

func (cw *ClickhouseQueryTranslator) parseMaxBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
maxBucketRaw, exists := queryMap["max_bucket"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(maxBucketRaw, "max_bucket")
if !ok {
return
func (cw *ClickhouseQueryTranslator) parseMinBucket(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "min_bucket")
if err != nil {
return nil, err
}
return pipeline_aggregations.NewMaxBucket(cw.Ctx, bucketsPath), true
return pipeline_aggregations.NewMinBucket(cw.Ctx, bucketsPath), nil
}

func (cw *ClickhouseQueryTranslator) parseSumBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
sumBucketRaw, exists := queryMap["sum_bucket"]
if !exists {
return
func (cw *ClickhouseQueryTranslator) parseMaxBucket(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "max_bucket")
if err != nil {
return nil, err
}
bucketsPath, ok := cw.parseBucketsPath(sumBucketRaw, "sum_bucket")
if !ok {
return
}
return pipeline_aggregations.NewSumBucket(cw.Ctx, bucketsPath), true
return pipeline_aggregations.NewMaxBucket(cw.Ctx, bucketsPath), nil
}

func (cw *ClickhouseQueryTranslator) parseSerialDiff(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
serialDiffRaw, exists := queryMap["serial_diff"]
if !exists {
return
func (cw *ClickhouseQueryTranslator) parseSumBucket(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "sum_bucket")
if err != nil {
return nil, err
}
return pipeline_aggregations.NewSumBucket(cw.Ctx, bucketsPath), nil
}

func (cw *ClickhouseQueryTranslator) parseSerialDiff(params QueryMap) (model.QueryType, error) {
// buckets_path
bucketsPath, ok := cw.parseBucketsPath(serialDiffRaw, "serial_diff")
if !ok {
return
bucketsPath, err := cw.parseBucketsPath(params, "serial_diff")
if err != nil {
return nil, err
}

// lag
const defaultLag = 1
serialDiff, ok := serialDiffRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("serial_diff is not a map, but %T, value: %v", serialDiffRaw, serialDiffRaw)
return
}
lagRaw, exists := serialDiff["lag"]
lagRaw, exists := params["lag"]
if !exists {
return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, defaultLag), true
return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, defaultLag), nil
}
if lag, ok := lagRaw.(float64); ok {
return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, int(lag)), true
return pipeline_aggregations.NewSerialDiff(cw.Ctx, bucketsPath, int(lag)), nil
}

logger.WarnWithCtx(cw.Ctx).Msgf("lag is not a float64, but %T, value: %v", lagRaw, lagRaw)
return
return nil, fmt.Errorf("lag is not a float64, but %T, value: %v", lagRaw, lagRaw)
}

func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
bucketScriptRaw, exists := queryMap["bucket_script"]
if !exists {
return
}

// so far we only handle "count" here :D
delete(queryMap, "bucket_script")
bucketScript, ok := bucketScriptRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("bucket_script is not a map, but %T, value: %v. Skipping this aggregation", bucketScriptRaw, bucketScriptRaw)
return
}

// if ["buckets_path"] != "_count", skip the aggregation
bucketsPath, ok := cw.parseBucketsPath(bucketScript, "bucket_script")
if !ok {
return
func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(params QueryMap) (model.QueryType, error) {
bucketsPath, err := cw.parseBucketsPath(params, "bucket_script")
if err != nil {
return nil, err
}
if !strings.HasSuffix(bucketsPath, pipeline_aggregations.BucketsPathCount) {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath)
return
//lint:ignore ST1005 I want Quesma capitalized
return nil, fmt.Errorf("Quesma limitation, contact us if you need it fixed: buckets_path is not '_count', but %s", bucketsPath)
}

scriptRaw, exists := bucketScript["script"]
scriptRaw, exists := params["script"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msg("no script in bucket_script. Skipping this aggregation")
return
return nil, errors.New("no script in bucket_script")
}
if script, ok := scriptRaw.(string); ok {
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, script), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, script), nil
}

script, ok := scriptRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("script is not a map, but %T, value: %v. Skipping this aggregation", scriptRaw, scriptRaw)
return
return nil, fmt.Errorf("script is not a map, but %T, value: %v", scriptRaw, scriptRaw)
}
if sourceRaw, exists := script["source"]; exists {
if source, ok := sourceRaw.(string); ok {
if source != "_value" && source != "count * 1" {
logger.WarnWithCtx(cw.Ctx).Msgf("source is not '_value'/'count * 1', but %s. Skipping this aggregation", source)
return
//lint:ignore ST1005 I want Quesma capitalized
return nil, fmt.Errorf("Quesma limitation, contact us if you need it fixed: source is not '_value'/'count * 1', but %s", source)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("source is not a string, but %T, value: %v. Skipping this aggregation", sourceRaw, sourceRaw)
return
return nil, fmt.Errorf("source is not a string, but %T, value: %v", sourceRaw, sourceRaw)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msg("no source in script. Skipping this aggregation")
return
return nil, errors.New("no source in script")
}

// okay, we've checked everything, it's indeed a simple count
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, ""), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, ""), nil
}

func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPathStr string, success bool) {
queryMap, ok := shouldBeQueryMap.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a map, but %T, value: %v", aggregationName, shouldBeQueryMap, shouldBeQueryMap)
return
}
bucketsPathRaw, exists := queryMap["buckets_path"]
func (cw *ClickhouseQueryTranslator) parseBucketsPath(params QueryMap, aggregationName string) (bucketsPathStr string, err error) {
bucketsPathRaw, exists := params["buckets_path"]
if !exists {
logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in avg_bucket")
return
return "", fmt.Errorf("no buckets_path in %s", aggregationName)
}

switch bucketsPath := bucketsPathRaw.(type) {
case string:
return bucketsPath, true
return bucketsPath, nil
case QueryMap:
// TODO: handle arbitrary nr of keys (and arbitrary scripts, because we also handle only one special case)
if len(bucketsPath) == 1 || len(bucketsPath) == 2 {
Expand All @@ -231,17 +162,15 @@ func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggr
// After fixing the TODO above, it should also get fixed.
for _, key := range util.MapKeysSorted(bucketsPath) {
if path, ok := bucketsPath[key].(string); ok {
return path, true
return path, nil
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a map with string values, but %T. Skipping this aggregation", path)
return
return "", fmt.Errorf("buckets_path is not a map with string values, but %T %v", bucketsPath[key], bucketsPath[key])
}
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a map with one or two keys, but %d. Skipping this aggregation", len(bucketsPath))
return "", fmt.Errorf("buckets_path is not a map with one or two keys, but it is: %v", bucketsPath)
}
}

logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path in wrong format, type: %T, value: %v", bucketsPathRaw, bucketsPathRaw)
return
return "", fmt.Errorf("buckets_path in wrong format, type: %T, value: %v", bucketsPathRaw, bucketsPathRaw)
}

0 comments on commit 15bcad3

Please sign in to comment.