diff --git a/quesma/clickhouse/schema.go b/quesma/clickhouse/schema.go index 0302e1eeb..b48f9e08a 100644 --- a/quesma/clickhouse/schema.go +++ b/quesma/clickhouse/schema.go @@ -333,3 +333,7 @@ func NewDefaultBoolAttribute() Attribute { Type: NewBaseType("Bool"), } } + +func (dt DateTimeType) String() string { + return []string{"DateTime64", "DateTime", "Invalid"}[dt] +} diff --git a/quesma/model/metrics_aggregations/avg.go b/quesma/model/metrics_aggregations/avg.go index d83baeaa2..88b57820c 100644 --- a/quesma/model/metrics_aggregations/avg.go +++ b/quesma/model/metrics_aggregations/avg.go @@ -2,15 +2,17 @@ package metrics_aggregations import ( "context" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/model" ) type Avg struct { - ctx context.Context + ctx context.Context + fieldType clickhouse.DateTimeType } -func NewAvg(ctx context.Context) Avg { - return Avg{ctx: ctx} +func NewAvg(ctx context.Context, fieldType clickhouse.DateTimeType) Avg { + return Avg{ctx: ctx, fieldType: fieldType} } func (query Avg) IsBucketAggregation() bool { @@ -18,7 +20,7 @@ func (query Avg) IsBucketAggregation() bool { } func (query Avg) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - return metricsTranslateSqlResponseToJson(query.ctx, rows, level) + return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType) } func (query Avg) String() string { diff --git a/quesma/model/metrics_aggregations/common.go b/quesma/model/metrics_aggregations/common.go index 1581e7f38..049cb32ad 100644 --- a/quesma/model/metrics_aggregations/common.go +++ b/quesma/model/metrics_aggregations/common.go @@ -2,22 +2,58 @@ package metrics_aggregations import ( "context" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" + "time" ) func metricsTranslateSqlResponseToJson(ctx context.Context, rows []model.QueryResultRow, level int) []model.JsonMap { var value any = nil - if len(rows) > 0 { - if len(rows[0].Cols) > 0 { - value = rows[0].Cols[len(rows[0].Cols)-1].Value - } else { - logger.WarnWithCtx(ctx).Msg("no columns returned for metrics aggregation") - } - } else { - logger.WarnWithCtx(ctx).Msg("no rows returned for metrics aggregation") + if resultRowsAreFine(ctx, rows) { + value = rows[0].Cols[len(rows[0].Cols)-1].Value } return []model.JsonMap{{ "value": value, }} } + +// metricsTranslateSqlResponseToJsonWithFieldTypeCheck is the same as metricsTranslateSqlResponseToJson for all types except DateTimes. +// With DateTimes, we need to return 2 values, instead of 1, that's the difference. +func metricsTranslateSqlResponseToJsonWithFieldTypeCheck( + ctx context.Context, rows []model.QueryResultRow, level int, fieldType clickhouse.DateTimeType) []model.JsonMap { + if fieldType == clickhouse.Invalid { + // if it's not a date, we do just a normal response + return metricsTranslateSqlResponseToJson(ctx, rows, level) + } + + var value, valueAsString any = nil, nil + if resultRowsAreFine(ctx, rows) { + valueAsAny := rows[0].Cols[len(rows[0].Cols)-1].Value + if valueAsTime, ok := valueAsAny.(time.Time); ok { + value = valueAsTime.UnixMilli() + valueAsString = valueAsTime.Format(time.RFC3339Nano) + } else { + logger.WarnWithCtx(ctx).Msg("could not parse date") + } + } + response := model.JsonMap{ + "value": value, + } + if value != nil { + response["value_as_string"] = valueAsString + } + return []model.JsonMap{response} +} + +func resultRowsAreFine(ctx context.Context, rows []model.QueryResultRow) bool { + if len(rows) == 0 { + logger.WarnWithCtx(ctx).Msg("no rows returned for metrics aggregation") + return false + } + if len(rows[0].Cols) == 0 { + logger.WarnWithCtx(ctx).Msg("no columns returned for metrics aggregation") + return false + } + return true +} diff --git a/quesma/model/metrics_aggregations/max.go b/quesma/model/metrics_aggregations/max.go index a0ea062ab..69311892b 100644 --- a/quesma/model/metrics_aggregations/max.go +++ b/quesma/model/metrics_aggregations/max.go @@ -2,15 +2,17 @@ package metrics_aggregations import ( "context" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/model" ) type Max struct { - ctx context.Context + ctx context.Context + fieldType clickhouse.DateTimeType } -func NewMax(ctx context.Context) Max { - return Max{ctx: ctx} +func NewMax(ctx context.Context, fieldType clickhouse.DateTimeType) Max { + return Max{ctx: ctx, fieldType: fieldType} } func (query Max) IsBucketAggregation() bool { @@ -18,7 +20,7 @@ func (query Max) IsBucketAggregation() bool { } func (query Max) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - return metricsTranslateSqlResponseToJson(query.ctx, rows, level) + return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType) } func (query Max) String() string { diff --git a/quesma/model/metrics_aggregations/min.go b/quesma/model/metrics_aggregations/min.go index 7163c59e0..d06d94c74 100644 --- a/quesma/model/metrics_aggregations/min.go +++ b/quesma/model/metrics_aggregations/min.go @@ -2,15 +2,17 @@ package metrics_aggregations import ( "context" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/model" ) type Min struct { - ctx context.Context + ctx context.Context + fieldType clickhouse.DateTimeType } -func NewMin(ctx context.Context) Min { - return Min{ctx: ctx} +func NewMin(ctx context.Context, fieldType clickhouse.DateTimeType) Min { + return Min{ctx: ctx, fieldType: fieldType} } func (query Min) IsBucketAggregation() bool { @@ -18,7 +20,7 @@ func (query Min) IsBucketAggregation() bool { } func (query Min) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - return metricsTranslateSqlResponseToJson(query.ctx, rows, level) + return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType) } func (query Min) String() string { diff --git a/quesma/model/metrics_aggregations/quantile.go b/quesma/model/metrics_aggregations/quantile.go index 8b1bdae27..444034b4f 100644 --- a/quesma/model/metrics_aggregations/quantile.go +++ b/quesma/model/metrics_aggregations/quantile.go @@ -3,19 +3,23 @@ package metrics_aggregations import ( "context" "math" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" + "mitmproxy/quesma/util" "strconv" "strings" + "time" ) type Quantile struct { - ctx context.Context - keyed bool // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html#_keyed_response_6 + ctx context.Context + keyed bool // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html#_keyed_response_6 + fieldType clickhouse.DateTimeType } -func NewQuantile(ctx context.Context, keyed bool) Quantile { - return Quantile{ctx, keyed} +func NewQuantile(ctx context.Context, keyed bool, fieldType clickhouse.DateTimeType) Quantile { + return Quantile{ctx, keyed, fieldType} } func (query Quantile) IsBucketAggregation() bool { @@ -24,6 +28,7 @@ func (query Quantile) IsBucketAggregation() bool { func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { valueMap := make(model.JsonMap) + valueAsStringMap := make(model.JsonMap) if len(rows) == 0 { return emptyPercentilesResult @@ -34,27 +39,22 @@ func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, le for _, res := range rows[0].Cols { if strings.HasPrefix(res.ColName, "quantile") { - percentile, ok := res.Value.([]float64) - if !ok { - logger.WarnWithCtx(query.ctx).Msgf( - "failed to convert percentile values to []float64, type: %T, value: %v. Skipping", res.Value, res.Value) - continue - } + // error handling is moved to processResult + percentile, percentileAsString, percentileIsNanOrInvalid := query.processResult(res.ColName, res.Value) percentileName, _ := strings.CutPrefix(res.ColName, "quantile_") - // percentileName can't be an integer (doesn't work in Kibana that way), so we need to add .0 if it's missing dotIndex := strings.Index(percentileName, ".") if dotIndex == -1 { percentileName += ".0" } - if len(percentile) == 0 { - logger.WarnWithCtx(query.ctx).Msgf("empty percentile values for %s", percentileName) - } - if len(percentile) == 0 || math.IsNaN(percentile[0]) { + if percentileIsNanOrInvalid { valueMap[percentileName] = nil } else { - valueMap[percentileName] = percentile[0] + valueMap[percentileName] = percentile + if percentileAsString != nil { + valueAsStringMap[percentileName] = *percentileAsString + } } } } @@ -65,12 +65,18 @@ func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, le }} } else { var values []model.JsonMap - for key, value := range valueMap { + keysSorted := util.MapKeysSorted(valueMap) + for _, key := range keysSorted { + value := valueMap[key] keyAsFloat, _ := strconv.ParseFloat(key, 64) - values = append(values, model.JsonMap{ + responseValue := model.JsonMap{ "key": keyAsFloat, "value": value, - }) + } + if _, exists := valueAsStringMap[key]; exists { + responseValue["value_as_string"] = valueAsStringMap[key] + } + values = append(values, responseValue) } return []model.JsonMap{{ "values": values, @@ -82,6 +88,64 @@ func (query Quantile) String() string { return "quantile" } +// processResult processes the result of a single quantile value from Clickhouse, and handles all errors encountered. +// Unfortunately valueFromClickhouse is an array, even though we're only interested in [0] index. +// It makes this function a bit messy. +// That can be changed by changing the Clickhouse query, from `quantiles` to `quantile`, but it's well tested already + more general, +// I'd keep as it is for now, unless we find some further problems with it. +// +// Returns: +// - percentile: float64 value of the percentile (or NaN if it's invalid) +// - percentileAsString: string representation of the percentile +// (or nil if we don't have it/don't need it - we'll just omit it in the response and that's fine) +// - percentileIsNanOrInvalid: true if the percentile is NaN or invalid. We know we'll need to return nil in the response +func (query Quantile) processResult(colName string, percentileReturnedByClickhouse any) ( + percentile float64, percentileAsString *string, percentileIsNanOrInvalid bool) { + var percentileAsArrayLen int + // We never return from this switch preemptively to make code easier, + // assumption is following: we know something is wrong if after the switch either + // a) percentileAsArrayLen == 0, or b) percentileIsNanOrInvalid == true. Else => we're good. + switch percentileTyped := percentileReturnedByClickhouse.(type) { + case []float64: + percentileAsArrayLen = len(percentileTyped) + if len(percentileTyped) > 0 { + percentileIsNanOrInvalid = math.IsNaN(percentileTyped[0]) + percentile = percentileTyped[0] + } + case []time.Time: + percentileAsArrayLen = len(percentileTyped) + if len(percentileTyped) > 0 { + percentile = float64(percentileTyped[0].UnixMilli()) + asString := percentileTyped[0].Format(time.RFC3339Nano) + percentileAsString = &asString + } + case []any: + percentileAsArrayLen = len(percentileTyped) + if len(percentileTyped) > 0 { + switch percentileTyped[0].(type) { + case float64: + return query.processResult(colName, []float64{percentileTyped[0].(float64)}) + case time.Time: + return query.processResult(colName, []time.Time{percentileTyped[0].(time.Time)}) + default: + logger.WarnWithCtx(query.ctx).Msgf("unexpected type in percentile array: %T, array: %v", percentileTyped[0], percentileTyped) + percentileIsNanOrInvalid = true + } + } + default: + logger.WarnWithCtx(query.ctx).Msgf("unexpected type in percentile array: %T, value: %v", percentileReturnedByClickhouse, percentileReturnedByClickhouse) + percentileIsNanOrInvalid = true + } + if percentileAsArrayLen == 0 { + logger.WarnWithCtx(query.ctx).Msgf("empty percentile values for %s", colName) + return math.NaN(), nil, true + } + if percentileIsNanOrInvalid { + return math.NaN(), nil, true + } + return percentile, percentileAsString, percentileIsNanOrInvalid +} + var emptyPercentilesResult = []model.JsonMap{{ "values": 0, }} diff --git a/quesma/model/metrics_aggregations/quantile_test.go b/quesma/model/metrics_aggregations/quantile_test.go new file mode 100644 index 000000000..cd95ea676 --- /dev/null +++ b/quesma/model/metrics_aggregations/quantile_test.go @@ -0,0 +1,74 @@ +package metrics_aggregations + +import ( + "context" + "fmt" + "math" + "mitmproxy/quesma/clickhouse" + "mitmproxy/quesma/util" + "strconv" + "testing" + "time" +) + +func equalFloats(a, b float64) bool { + if math.IsNaN(a) && math.IsNaN(b) { + return true + } + return math.Abs(a-b) < 1e-9 +} + +func equalStrings(a, b *string) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b +} + +func Test_processResult(t *testing.T) { + a := time.Now() + fmt.Println(a.Format(time.RFC3339)) + quantile := NewQuantile(context.Background(), false, clickhouse.DateTime) + colName := "not-important" + wantedStr := "2024-05-02T21:58:16.297Z" + tests := []struct { + percentileReturnedByClickhouse any + wantedPercentile float64 + wantedPercentileAsString *string + }{ + {nil, math.NaN(), nil}, + {"", math.NaN(), nil}, + {"0", math.NaN(), nil}, + {0, math.NaN(), nil}, + {0.0, math.NaN(), nil}, + {[]string{"1.0"}, math.NaN(), nil}, + {[]string{"1.0", "5"}, math.NaN(), nil}, + {[]any{"1.0", "5"}, math.NaN(), nil}, + {[]any{"1.0", "5"}, math.NaN(), nil}, + {[]int{1}, math.NaN(), nil}, + {[]int{}, math.NaN(), nil}, + {[]float64{}, math.NaN(), nil}, + {[]float64{1.0}, 1.0, nil}, + {[]float64{1.0, 2.0}, 1.0, nil}, + {[]any{float64(1.0), 5}, 1.0, nil}, + {[]any{5, float64(1.0)}, math.NaN(), nil}, + {[]time.Time{util.ParseTime("2024-05-02T21:58:16.297Z"), util.ParseTime("5")}, 1714687096297.0, &wantedStr}, + {[]time.Time{util.ParseTime("2024-05-02T21:58:16.297Z")}, 1714687096297.0, &wantedStr}, + {[]any{util.ParseTime("2024-05-02T21:58:16.297Z"), 5, 10, 5.2}, 1714687096297.0, &wantedStr}, + {[]any{util.ParseTime("2024-05-02T21:58:16.297Z")}, 1714687096297.0, &wantedStr}, + } + for i, tt := range tests { + t.Run("testing processResult"+strconv.Itoa(i), func(t *testing.T) { + percentile, percentileAsString, _ := quantile.processResult(colName, tt.percentileReturnedByClickhouse) + if !equalFloats(percentile, tt.wantedPercentile) { + t.Errorf("got %v, wanted %v", percentile, tt.wantedPercentile) + } + if !equalStrings(percentileAsString, tt.wantedPercentileAsString) { + t.Errorf("got %v, wanted %v", percentileAsString, tt.wantedPercentileAsString) + } + }) + } +} diff --git a/quesma/model/metrics_aggregations/sum.go b/quesma/model/metrics_aggregations/sum.go index 68df1fa2f..0738ddea5 100644 --- a/quesma/model/metrics_aggregations/sum.go +++ b/quesma/model/metrics_aggregations/sum.go @@ -2,15 +2,17 @@ package metrics_aggregations import ( "context" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/model" ) type Sum struct { - ctx context.Context + ctx context.Context + fieldType clickhouse.DateTimeType } -func NewSum(ctx context.Context) Sum { - return Sum{ctx: ctx} +func NewSum(ctx context.Context, fieldType clickhouse.DateTimeType) Sum { + return Sum{ctx: ctx, fieldType: fieldType} } func (query Sum) IsBucketAggregation() bool { @@ -18,7 +20,7 @@ func (query Sum) IsBucketAggregation() bool { } func (query Sum) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap { - return metricsTranslateSqlResponseToJson(query.ctx, rows, level) + return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType) } func (query Sum) String() string { diff --git a/quesma/queryparser/aggregation_parser.go b/quesma/queryparser/aggregation_parser.go index bfdffe31f..4a736c80a 100644 --- a/quesma/queryparser/aggregation_parser.go +++ b/quesma/queryparser/aggregation_parser.go @@ -5,11 +5,13 @@ import ( "encoding/json" "fmt" "github.com/barkimedes/go-deepcopy" + "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" "mitmproxy/quesma/model/bucket_aggregations" "mitmproxy/quesma/model/metrics_aggregations" "mitmproxy/quesma/model/pipeline_aggregations" + "mitmproxy/quesma/util" "slices" "strconv" "strings" @@ -32,14 +34,17 @@ type aggrQueryBuilder struct { type metricsAggregation struct { AggrType string - FieldNames []string // on these fields we're doing aggregation. Array, because e.g. 'top_hits' can have multiple fields - Percentiles model.JsonMap // Only for percentiles aggregation - Keyed bool // Only for percentiles aggregation - SortBy string // Only for top_metrics - Size int // Only for top_metrics - Order string // Only for top_metrics + FieldNames []string // on these fields we're doing aggregation. Array, because e.g. 'top_hits' can have multiple fields + FieldType clickhouse.DateTimeType // field type of FieldNames[0]. If it's a date field, a slightly different response is needed + Percentiles map[string]float64 // Only for percentiles aggregation + Keyed bool // Only for percentiles aggregation + SortBy string // Only for top_metrics + Size int // Only for top_metrics + Order string // Only for top_metrics } +const metricsAggregationDefaultFieldType = clickhouse.Invalid + func (b *aggrQueryBuilder) buildAggregationCommon(metadata model.JsonMap) model.QueryWithAggregation { query := b.QueryWithAggregation query.WhereClause = b.whereBuilder.Sql.Stmt @@ -87,7 +92,11 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio case "sum", "min", "max", "avg": query.NonSchemaFields = append(query.NonSchemaFields, metricsAggr.AggrType+`OrNull("`+getFirstFieldName()+`")`) case "quantile": - for usersPercent, percentAsFloat := range metricsAggr.Percentiles { + // Sorting here useful mostly for determinism in tests. + // It wasn't there before, and everything worked fine. We could safely remove it, if needed. + usersPercents := util.MapKeysSortedByValue(metricsAggr.Percentiles) + for _, usersPercent := range usersPercents { + percentAsFloat := metricsAggr.Percentiles[usersPercent] query.NonSchemaFields = append(query.NonSchemaFields, fmt.Sprintf( "quantiles(%6f)(`%s`) AS `quantile_%s`", percentAsFloat, getFirstFieldName(), usersPercent)) } @@ -155,19 +164,19 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio } switch metricsAggr.AggrType { case "sum": - query.Type = metrics_aggregations.NewSum(b.ctx) + query.Type = metrics_aggregations.NewSum(b.ctx, metricsAggr.FieldType) case "min": - query.Type = metrics_aggregations.NewMin(b.ctx) + query.Type = metrics_aggregations.NewMin(b.ctx, metricsAggr.FieldType) case "max": - query.Type = metrics_aggregations.NewMax(b.ctx) + query.Type = metrics_aggregations.NewMax(b.ctx, metricsAggr.FieldType) case "avg": - query.Type = metrics_aggregations.NewAvg(b.ctx) + query.Type = metrics_aggregations.NewAvg(b.ctx, metricsAggr.FieldType) case "stats": query.Type = metrics_aggregations.NewStats(b.ctx) case "cardinality": query.Type = metrics_aggregations.NewCardinality(b.ctx) case "quantile": - query.Type = metrics_aggregations.NewQuantile(b.ctx, metricsAggr.Keyed) + query.Type = metrics_aggregations.NewQuantile(b.ctx, metricsAggr.Keyed, metricsAggr.FieldType) case "top_hits": query.Type = metrics_aggregations.NewTopHits(b.ctx) case "top_metrics": @@ -427,9 +436,11 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m metricsAggregations := []string{"sum", "avg", "min", "max", "cardinality", "value_count", "stats"} for k, v := range queryMap { if slices.Contains(metricsAggregations, k) { + fieldName := cw.parseFieldField(v, k) return metricsAggregation{ AggrType: k, - FieldNames: []string{cw.parseFieldField(v, k)}, + FieldNames: []string{fieldName}, + FieldType: cw.Table.GetDateTimeType(cw.Ctx, fieldName), }, true } } @@ -443,6 +454,7 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m return metricsAggregation{ AggrType: "quantile", FieldNames: []string{fieldName}, + FieldType: cw.Table.GetDateTimeType(cw.Ctx, fieldName), Percentiles: percentiles, Keyed: keyed, }, true @@ -473,6 +485,7 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m return metricsAggregation{ AggrType: "top_hits", FieldNames: fieldsAsStrings, + FieldType: metricsAggregationDefaultFieldType, // don't need to check, it's unimportant for this aggregation }, true } @@ -502,6 +515,7 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m return metricsAggregation{ AggrType: "percentile_ranks", FieldNames: fieldNames, + FieldType: metricsAggregationDefaultFieldType, // don't need to check, it's unimportant for this aggregation }, true } diff --git a/quesma/queryparser/aggregation_percentile_parser.go b/quesma/queryparser/aggregation_percentile_parser.go index 87d373d37..fe73cfc4b 100644 --- a/quesma/queryparser/aggregation_percentile_parser.go +++ b/quesma/queryparser/aggregation_percentile_parser.go @@ -3,12 +3,11 @@ package queryparser import ( "fmt" "mitmproxy/quesma/logger" - "mitmproxy/quesma/model" ) const maxPrecision = 0.999999 -var defaultPercentiles = model.JsonMap{ +var defaultPercentiles = map[string]float64{ "1.0": 0.01, "5.0": 0.05, "25.0": 0.25, @@ -20,7 +19,7 @@ var defaultPercentiles = model.JsonMap{ const keyedDefaultValue = true -func (cw *ClickhouseQueryTranslator) parsePercentilesAggregation(queryMap QueryMap) (fieldName string, keyed bool, percentiles model.JsonMap) { +func (cw *ClickhouseQueryTranslator) parsePercentilesAggregation(queryMap QueryMap) (fieldName string, keyed bool, percentiles map[string]float64) { fieldName = cw.parseFieldField(queryMap, "percentile") if keyedQueryMap, ok := queryMap["keyed"]; ok { if keyed, ok = keyedQueryMap.(bool); !ok { @@ -40,7 +39,7 @@ func (cw *ClickhouseQueryTranslator) parsePercentilesAggregation(queryMap QueryM logger.WarnWithCtx(cw.Ctx).Msgf("percents specified for percentiles aggregation is not an array. Querymap: %v", queryMap) return fieldName, keyed, defaultPercentiles } - userSpecifiedPercents := make(model.JsonMap, len(userInput)) + userSpecifiedPercents := make(map[string]float64, len(userInput)) for _, p := range userInput { asFloat, ok := p.(float64) if !ok { diff --git a/quesma/queryparser/aggregation_percentile_parser_test.go b/quesma/queryparser/aggregation_percentile_parser_test.go index a23b7ca24..a69d4f762 100644 --- a/quesma/queryparser/aggregation_percentile_parser_test.go +++ b/quesma/queryparser/aggregation_percentile_parser_test.go @@ -54,12 +54,12 @@ func Test_parsePercentilesAggregationWithUserSpecifiedPercents(t *testing.T) { assert.Equal(t, 0.00001, parsedMap["0.001"]) assert.Equal(t, 0.0001, parsedMap["0.01"]) assert.Equal(t, 0.0005, parsedMap["0.05"]) - assert.True(t, isBetween(parsedMap["11.123123123123124"].(float64), 0.111231231, 0.111231232)) + assert.True(t, isBetween(parsedMap["11.123123123123124"], 0.111231231, 0.111231232)) assert.Equal(t, 0.634, parsedMap["63.4"]) - assert.True(t, isBetween(parsedMap["66.999999999999"].(float64), 0.66999999, 0.67)) + assert.True(t, isBetween(parsedMap["66.999999999999"], 0.66999999, 0.67)) assert.Equal(t, 0.95, parsedMap["95"]) assert.Equal(t, 0.99, parsedMap["99"]) - assert.True(t, isBetween(parsedMap["99.9"].(float64), 0.999, 0.9991)) + assert.True(t, isBetween(parsedMap["99.9"], 0.999, 0.9991)) assert.Equal(t, 0.999999, parsedMap["99.9999"]) assert.Equal(t, maxPrecision, parsedMap["99.99999999"]) diff --git a/quesma/queryparser/top_metrics_aggregation_parser.go b/quesma/queryparser/top_metrics_aggregation_parser.go index 954036b7c..d21cb08e6 100644 --- a/quesma/queryparser/top_metrics_aggregation_parser.go +++ b/quesma/queryparser/top_metrics_aggregation_parser.go @@ -45,6 +45,7 @@ func (cw *ClickhouseQueryTranslator) ParseTopMetricsAggregation(queryMap QueryMa return metricsAggregation{ AggrType: "top_metrics", FieldNames: fieldNames, + FieldType: metricsAggregationDefaultFieldType, // don't need to check, it's unimportant for this aggregation SortBy: sortBy, Size: size, Order: order, diff --git a/quesma/testdata/opensearch-visualize/aggregation_requests.go b/quesma/testdata/opensearch-visualize/aggregation_requests.go index a94c0d903..55251a79c 100644 --- a/quesma/testdata/opensearch-visualize/aggregation_requests.go +++ b/quesma/testdata/opensearch-visualize/aggregation_requests.go @@ -4,6 +4,8 @@ import ( "math" "mitmproxy/quesma/model" "mitmproxy/quesma/testdata" + "mitmproxy/quesma/util" + "time" ) var AggregationTests = []testdata.AggregationTestCase{ @@ -620,4 +622,506 @@ var AggregationTests = []testdata.AggregationTestCase{ `WHERE "epoch_time">='2024-04-18T04:40:12.252Z' AND "epoch_time"<='2024-05-03T04:40:12.252Z' `, }, }, + { // [4] + TestName: "Max on DateTime field. Reproduce: Visualize -> Line: Metrics -> Max @timestamp, Buckets: Add X-Asis, Aggregation: Significant Terms", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "max": { + "field": "timestamp" + } + } + }, + "significant_terms": { + "field": "response.keyword", + "size": 3 + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "format": "strict_date_optional_time", + "gte": "2024-04-18T00:49:59.517Z", + "lte": "2024-05-03T00:49:59.517Z" + } + } + } + ], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: // erased "2": "bg_count": 14074, "doc_count": 2786 from the real response. It should be there in 'significant_terms' (not in 'terms'), but it seems to work without it. + `{ + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "buckets": [ + { + "1": { + "value": 1714687096297.0, + "value_as_string": "2024-05-02T21:58:16.297Z" + }, + "bg_count": 2570, + "doc_count": 2570, + "key": "200", + "score": 2570 + }, + { + "1": { + "value": 1714665552949.0, + "value_as_string": "2024-05-02T15:59:12.949Z" + }, + "bg_count": 94, + "doc_count": 94, + "key": "503", + "score": 94 + } + ] + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 2786 + } + }, + "timed_out": false, + "took": 91 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("value", uint64(2786))}}}, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "200"), + model.NewQueryResultCol(`maxOrNull("timestamp")`, util.ParseTime("2024-05-02T21:58:16.297Z")), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "503"), + model.NewQueryResultCol(`maxOrNull("timestamp")`, util.ParseTime("2024-05-02T15:59:12.949Z")), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "200"), + model.NewQueryResultCol(`doc_count`, 2570), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "503"), + model.NewQueryResultCol(`doc_count`, 94), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:49:59.517Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:49:59.517Z') `, + `SELECT "response", maxOrNull("timestamp") ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:49:59.517Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:49:59.517Z') ` + + `GROUP BY ("response") ` + + `ORDER BY ("response")`, + `SELECT "response", count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:49:59.517Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:49:59.517Z') ` + + `GROUP BY ("response") ` + + `ORDER BY ("response")`, + }, + }, + { // [5] + TestName: "Min on DateTime field. Reproduce: Visualize -> Line: Metrics -> Min @timestamp, Buckets: Add X-Asis, Aggregation: Significant Terms", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "min": { + "field": "timestamp" + } + } + }, + "significant_terms": { + "field": "response.keyword", + "size": 3 + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "format": "strict_date_optional_time", + "gte": "2024-04-18T00:51:00.471Z", + "lte": "2024-05-03T00:51:00.471Z" + } + } + } + ], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "bg_count": 14074, + "buckets": [ + { + "1": { + "value": 1713659942912.0, + "value_as_string": "2024-04-21T00:39:02.912Z" + }, + "bg_count": 2570, + "doc_count": 2570, + "key": "200", + "score": 2570 + }, + { + "1": { + "value": 1713670225131.0, + "value_as_string": "2024-04-21T03:30:25.131Z" + }, + "bg_count": 94, + "doc_count": 94, + "key": "503", + "score": 94 + } + ], + "doc_count": 2786 + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 2786 + } + }, + "timed_out": false, + "took": 15 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("value", uint64(2786))}}}, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "200"), + model.NewQueryResultCol(`minOrNull("timestamp")`, util.ParseTime("2024-04-21T00:39:02.912Z")), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "503"), + model.NewQueryResultCol(`minOrNull("timestamp")`, util.ParseTime("2024-04-21T03:30:25.131Z")), + }}, + }, + { + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "200"), + model.NewQueryResultCol(`doc_count`, 2570), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "503"), + model.NewQueryResultCol(`doc_count`, 94), + }}, + }, + }, + ExpectedSQLs: []string{ + `SELECT count() FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:51:00.471Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:51:00.471Z') `, + `SELECT "response", minOrNull("timestamp") ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:51:00.471Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:51:00.471Z') ` + + `GROUP BY ("response") ` + + `ORDER BY ("response")`, + `SELECT "response", count() ` + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:51:00.471Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:51:00.471Z') ` + + `GROUP BY ("response") ` + + `ORDER BY ("response")`, + }, + }, + { // [6] + TestName: "Percentiles on DateTime field. Reproduce: Visualize -> Line: Metrics -> Percentiles (or Median, it's the same aggregation) @timestamp, Buckets: Add X-Asis, Aggregation: Significant Terms", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "percentiles": { + "field": "timestamp", + "keyed": false, + "percents": [1, 2, 25, 50, 75, 95, 99] + } + } + }, + "significant_terms": { + "field": "response.keyword", + "size": 3 + } + } + }, + "docvalue_fields": [ + { + "field": "@timestamp", + "format": "date_time" + }, + { + "field": "timestamp", + "format": "date_time" + }, + { + "field": "utc_time", + "format": "date_time" + } + ], + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "format": "strict_date_optional_time", + "gte": "2024-04-18T00:51:15.845Z", + "lte": "2024-05-03T00:51:15.845Z" + } + } + } + ], + "must": [ + { + "match_all": {} + } + ], + "must_not": [], + "should": [] + } + }, + "script_fields": { + "hour_of_day": { + "script": { + "lang": "painless", + "source": "doc['timestamp'].value.getHour()" + } + } + }, + "size": 0, + "stored_fields": [ + "*" + ] + }`, + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "2": { + "bg_count": 2786, + "buckets": [ + { + "1": { + "values": [ + { + "key": 1.0, + "value": 1713679873619.0, + "value_as_string": "2024-04-21T06:11:13.619Z" + }, + { + "key": 2, + "value": 1713702073414.0, + "value_as_string": "2024-04-21T12:21:13.414Z" + }, + { + "key": 25.0, + "value": 1713898065613.0, + "value_as_string": "2024-04-23T18:47:45.613Z" + }, + { + "key": 50.0, + "value": 1714163505522.0, + "value_as_string": "2024-04-26T20:31:45.522Z" + }, + { + "key": 75.0, + "value": 1714419555029.0, + "value_as_string": "2024-04-29T19:39:15.029Z" + }, + { + "key": 95.0, + "value": 1714649082507.0, + "value_as_string": "2024-05-02T11:24:42.507Z" + }, + { + "key": 99.0, + "value": 1714666168003.0, + "value_as_string": "2024-05-02T16:09:28.003Z" + } + ] + }, + "bg_count": 2570, + "doc_count": 2570, + "key": "200", + "score": 2570 + } + ], + "doc_count": 2786 + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 2786 + } + }, + "timed_out": false, + "took": 9 + }`, + ExpectedResults: [][]model.QueryResultRow{ + {{Cols: []model.QueryResultCol{model.NewQueryResultCol("value", uint64(2786))}}}, + {{Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "200"), + model.NewQueryResultCol(`quantile_1`, []time.Time{util.ParseTime("2024-04-21T06:11:13.619Z")}), + model.NewQueryResultCol(`quantile_2`, []time.Time{util.ParseTime("2024-04-21T12:21:13.414Z")}), + model.NewQueryResultCol(`quantile_25`, []time.Time{util.ParseTime("2024-04-23T18:47:45.613Z")}), + model.NewQueryResultCol(`quantile_50`, []time.Time{util.ParseTime("2024-04-26T20:31:45.522Z")}), + model.NewQueryResultCol(`quantile_75`, []time.Time{util.ParseTime("2024-04-29T19:39:15.029Z")}), + model.NewQueryResultCol(`quantile_95`, []time.Time{util.ParseTime("2024-05-02T11:24:42.507Z")}), + model.NewQueryResultCol(`quantile_99`, []time.Time{util.ParseTime("2024-05-02T16:09:28.003Z")}), + }}}, + {{Cols: []model.QueryResultCol{ + model.NewQueryResultCol("response", "200"), + model.NewQueryResultCol(`doc_count`, 2570), + }}}, + }, + ExpectedSQLs: []string{ + `SELECT count() FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:51:15.845Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:51:15.845Z') `, + `SELECT "response", ` + + "quantiles(0.010000)(`timestamp`) AS `quantile_1`, " + + "quantiles(0.020000)(`timestamp`) AS `quantile_2`, " + + "quantiles(0.250000)(`timestamp`) AS `quantile_25`, " + + "quantiles(0.500000)(`timestamp`) AS `quantile_50`, " + + "quantiles(0.750000)(`timestamp`) AS `quantile_75`, " + + "quantiles(0.950000)(`timestamp`) AS `quantile_95`, " + + "quantiles(0.990000)(`timestamp`) AS `quantile_99` " + + `FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:51:15.845Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:51:15.845Z') ` + + `GROUP BY ("response") ` + + `ORDER BY ("response")`, + `SELECT "response", count() FROM ` + testdata.QuotedTableName + ` ` + + `WHERE "timestamp">=parseDateTime64BestEffort('2024-04-18T00:51:15.845Z') ` + + `AND "timestamp"<=parseDateTime64BestEffort('2024-05-03T00:51:15.845Z') ` + + `GROUP BY ("response") ` + + `ORDER BY ("response")`, + }, + }, } diff --git a/quesma/util/dates.go b/quesma/util/dates.go new file mode 100644 index 000000000..f0e21e474 --- /dev/null +++ b/quesma/util/dates.go @@ -0,0 +1,9 @@ +package util + +import "time" + +// ParseTime parses time from string in RFC3339Nano format, and discards error. Returns just time.Time value. +func ParseTime(asString string) time.Time { + t, _ := time.Parse(time.RFC3339Nano, asString) + return t +} diff --git a/quesma/util/map_utils.go b/quesma/util/map_utils.go index 7a3a554ea..77c533d56 100644 --- a/quesma/util/map_utils.go +++ b/quesma/util/map_utils.go @@ -1,5 +1,10 @@ package util +import ( + "cmp" + "sort" +) + func MapKeys[K comparable, V any](m map[K]V) []K { keys := make([]K, 0, len(m)) for k := range m { @@ -15,3 +20,19 @@ func MapValues[K comparable, V any](m map[K]V) []V { } return keys } + +func MapKeysSorted[K cmp.Ordered, V any](m map[K]V) []K { + keys := MapKeys(m) + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + return keys +} + +func MapKeysSortedByValue[K comparable, V cmp.Ordered](m map[K]V) []K { + keys := MapKeys(m) + sort.Slice(keys, func(i, j int) bool { + return m[keys[i]] < m[keys[j]] + }) + return keys +}