Skip to content

Commit

Permalink
Add support for DateTime values in metrics aggregations (#29)
Browse files Browse the repository at this point in the history
`max`, `min`, and `percentiles` aggregations need separate handling for
`DateTime` fields, the response is slightly different. Added that + 3
tests, one for each.
Before: `percentiles` didn't work at all, `min/max` - a bit.
<img width="1714" alt="Screenshot 2024-05-07 at 20 05 01"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/c6f67faf-2684-4234-92a5-150f8d68a4b3">
<img width="1728" alt="Screenshot 2024-05-07 at 20 04 23"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/a55d8449-4f9b-48a0-821d-b72de4507336">
After:
<img width="1728" alt="Screenshot 2024-05-07 at 20 35 11"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/f2183e74-926e-48d5-914f-a1ebfc24e107">

<img width="1714" alt="Screenshot 2024-05-07 at 20 01 34"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/73805381-310b-4481-8097-d672c290e9b6">
<img width="1720" alt="Screenshot 2024-05-07 at 20 01 45"
src="https://github.com/QuesmaOrg/quesma/assets/5407146/99f3669d-936c-4926-99f3-71bbba308a7b">
  • Loading branch information
trzysiek authored May 8, 2024
1 parent 9afc1c9 commit 7bc45ec
Show file tree
Hide file tree
Showing 15 changed files with 797 additions and 63 deletions.
4 changes: 4 additions & 0 deletions quesma/clickhouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,7 @@ func NewDefaultBoolAttribute() Attribute {
Type: NewBaseType("Bool"),
}
}

func (dt DateTimeType) String() string {
return []string{"DateTime64", "DateTime", "Invalid"}[dt]
}
10 changes: 6 additions & 4 deletions quesma/model/metrics_aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/model"
)

type Avg struct {
ctx context.Context
ctx context.Context
fieldType clickhouse.DateTimeType
}

func NewAvg(ctx context.Context) Avg {
return Avg{ctx: ctx}
func NewAvg(ctx context.Context, fieldType clickhouse.DateTimeType) Avg {
return Avg{ctx: ctx, fieldType: fieldType}
}

func (query Avg) IsBucketAggregation() bool {
return false
}

func (query Avg) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

func (query Avg) String() string {
Expand Down
52 changes: 44 additions & 8 deletions quesma/model/metrics_aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,58 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"time"
)

func metricsTranslateSqlResponseToJson(ctx context.Context, rows []model.QueryResultRow, level int) []model.JsonMap {
var value any = nil
if len(rows) > 0 {
if len(rows[0].Cols) > 0 {
value = rows[0].Cols[len(rows[0].Cols)-1].Value
} else {
logger.WarnWithCtx(ctx).Msg("no columns returned for metrics aggregation")
}
} else {
logger.WarnWithCtx(ctx).Msg("no rows returned for metrics aggregation")
if resultRowsAreFine(ctx, rows) {
value = rows[0].Cols[len(rows[0].Cols)-1].Value
}
return []model.JsonMap{{
"value": value,
}}
}

// metricsTranslateSqlResponseToJsonWithFieldTypeCheck is the same as metricsTranslateSqlResponseToJson for all types except DateTimes.
// With DateTimes, we need to return 2 values, instead of 1, that's the difference.
func metricsTranslateSqlResponseToJsonWithFieldTypeCheck(
ctx context.Context, rows []model.QueryResultRow, level int, fieldType clickhouse.DateTimeType) []model.JsonMap {
if fieldType == clickhouse.Invalid {
// if it's not a date, we do just a normal response
return metricsTranslateSqlResponseToJson(ctx, rows, level)
}

var value, valueAsString any = nil, nil
if resultRowsAreFine(ctx, rows) {
valueAsAny := rows[0].Cols[len(rows[0].Cols)-1].Value
if valueAsTime, ok := valueAsAny.(time.Time); ok {
value = valueAsTime.UnixMilli()
valueAsString = valueAsTime.Format(time.RFC3339Nano)
} else {
logger.WarnWithCtx(ctx).Msg("could not parse date")
}
}
response := model.JsonMap{
"value": value,
}
if value != nil {
response["value_as_string"] = valueAsString
}
return []model.JsonMap{response}
}

func resultRowsAreFine(ctx context.Context, rows []model.QueryResultRow) bool {
if len(rows) == 0 {
logger.WarnWithCtx(ctx).Msg("no rows returned for metrics aggregation")
return false
}
if len(rows[0].Cols) == 0 {
logger.WarnWithCtx(ctx).Msg("no columns returned for metrics aggregation")
return false
}
return true
}
10 changes: 6 additions & 4 deletions quesma/model/metrics_aggregations/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/model"
)

type Max struct {
ctx context.Context
ctx context.Context
fieldType clickhouse.DateTimeType
}

func NewMax(ctx context.Context) Max {
return Max{ctx: ctx}
func NewMax(ctx context.Context, fieldType clickhouse.DateTimeType) Max {
return Max{ctx: ctx, fieldType: fieldType}
}

func (query Max) IsBucketAggregation() bool {
return false
}

func (query Max) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

func (query Max) String() string {
Expand Down
10 changes: 6 additions & 4 deletions quesma/model/metrics_aggregations/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/model"
)

type Min struct {
ctx context.Context
ctx context.Context
fieldType clickhouse.DateTimeType
}

func NewMin(ctx context.Context) Min {
return Min{ctx: ctx}
func NewMin(ctx context.Context, fieldType clickhouse.DateTimeType) Min {
return Min{ctx: ctx, fieldType: fieldType}
}

func (query Min) IsBucketAggregation() bool {
return false
}

func (query Min) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

func (query Min) String() string {
Expand Down
102 changes: 83 additions & 19 deletions quesma/model/metrics_aggregations/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package metrics_aggregations
import (
"context"
"math"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/util"
"strconv"
"strings"
"time"
)

type Quantile struct {
ctx context.Context
keyed bool // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html#_keyed_response_6
ctx context.Context
keyed bool // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html#_keyed_response_6
fieldType clickhouse.DateTimeType
}

func NewQuantile(ctx context.Context, keyed bool) Quantile {
return Quantile{ctx, keyed}
func NewQuantile(ctx context.Context, keyed bool, fieldType clickhouse.DateTimeType) Quantile {
return Quantile{ctx, keyed, fieldType}
}

func (query Quantile) IsBucketAggregation() bool {
Expand All @@ -24,6 +28,7 @@ func (query Quantile) IsBucketAggregation() bool {

func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
valueMap := make(model.JsonMap)
valueAsStringMap := make(model.JsonMap)

if len(rows) == 0 {
return emptyPercentilesResult
Expand All @@ -34,27 +39,22 @@ func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, le

for _, res := range rows[0].Cols {
if strings.HasPrefix(res.ColName, "quantile") {
percentile, ok := res.Value.([]float64)
if !ok {
logger.WarnWithCtx(query.ctx).Msgf(
"failed to convert percentile values to []float64, type: %T, value: %v. Skipping", res.Value, res.Value)
continue
}
// error handling is moved to processResult
percentile, percentileAsString, percentileIsNanOrInvalid := query.processResult(res.ColName, res.Value)
percentileName, _ := strings.CutPrefix(res.ColName, "quantile_")

// percentileName can't be an integer (doesn't work in Kibana that way), so we need to add .0 if it's missing
dotIndex := strings.Index(percentileName, ".")
if dotIndex == -1 {
percentileName += ".0"
}

if len(percentile) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("empty percentile values for %s", percentileName)
}
if len(percentile) == 0 || math.IsNaN(percentile[0]) {
if percentileIsNanOrInvalid {
valueMap[percentileName] = nil
} else {
valueMap[percentileName] = percentile[0]
valueMap[percentileName] = percentile
if percentileAsString != nil {
valueAsStringMap[percentileName] = *percentileAsString
}
}
}
}
Expand All @@ -65,12 +65,18 @@ func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, le
}}
} else {
var values []model.JsonMap
for key, value := range valueMap {
keysSorted := util.MapKeysSorted(valueMap)
for _, key := range keysSorted {
value := valueMap[key]
keyAsFloat, _ := strconv.ParseFloat(key, 64)
values = append(values, model.JsonMap{
responseValue := model.JsonMap{
"key": keyAsFloat,
"value": value,
})
}
if _, exists := valueAsStringMap[key]; exists {
responseValue["value_as_string"] = valueAsStringMap[key]
}
values = append(values, responseValue)
}
return []model.JsonMap{{
"values": values,
Expand All @@ -82,6 +88,64 @@ func (query Quantile) String() string {
return "quantile"
}

// processResult processes the result of a single quantile value from Clickhouse, and handles all errors encountered.
// Unfortunately valueFromClickhouse is an array, even though we're only interested in [0] index.
// It makes this function a bit messy.
// That can be changed by changing the Clickhouse query, from `quantiles` to `quantile`, but it's well tested already + more general,
// I'd keep as it is for now, unless we find some further problems with it.
//
// Returns:
// - percentile: float64 value of the percentile (or NaN if it's invalid)
// - percentileAsString: string representation of the percentile
// (or nil if we don't have it/don't need it - we'll just omit it in the response and that's fine)
// - percentileIsNanOrInvalid: true if the percentile is NaN or invalid. We know we'll need to return nil in the response
func (query Quantile) processResult(colName string, percentileReturnedByClickhouse any) (
percentile float64, percentileAsString *string, percentileIsNanOrInvalid bool) {
var percentileAsArrayLen int
// We never return from this switch preemptively to make code easier,
// assumption is following: we know something is wrong if after the switch either
// a) percentileAsArrayLen == 0, or b) percentileIsNanOrInvalid == true. Else => we're good.
switch percentileTyped := percentileReturnedByClickhouse.(type) {
case []float64:
percentileAsArrayLen = len(percentileTyped)
if len(percentileTyped) > 0 {
percentileIsNanOrInvalid = math.IsNaN(percentileTyped[0])
percentile = percentileTyped[0]
}
case []time.Time:
percentileAsArrayLen = len(percentileTyped)
if len(percentileTyped) > 0 {
percentile = float64(percentileTyped[0].UnixMilli())
asString := percentileTyped[0].Format(time.RFC3339Nano)
percentileAsString = &asString
}
case []any:
percentileAsArrayLen = len(percentileTyped)
if len(percentileTyped) > 0 {
switch percentileTyped[0].(type) {
case float64:
return query.processResult(colName, []float64{percentileTyped[0].(float64)})
case time.Time:
return query.processResult(colName, []time.Time{percentileTyped[0].(time.Time)})
default:
logger.WarnWithCtx(query.ctx).Msgf("unexpected type in percentile array: %T, array: %v", percentileTyped[0], percentileTyped)
percentileIsNanOrInvalid = true
}
}
default:
logger.WarnWithCtx(query.ctx).Msgf("unexpected type in percentile array: %T, value: %v", percentileReturnedByClickhouse, percentileReturnedByClickhouse)
percentileIsNanOrInvalid = true
}
if percentileAsArrayLen == 0 {
logger.WarnWithCtx(query.ctx).Msgf("empty percentile values for %s", colName)
return math.NaN(), nil, true
}
if percentileIsNanOrInvalid {
return math.NaN(), nil, true
}
return percentile, percentileAsString, percentileIsNanOrInvalid
}

var emptyPercentilesResult = []model.JsonMap{{
"values": 0,
}}
74 changes: 74 additions & 0 deletions quesma/model/metrics_aggregations/quantile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package metrics_aggregations

import (
"context"
"fmt"
"math"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/util"
"strconv"
"testing"
"time"
)

func equalFloats(a, b float64) bool {
if math.IsNaN(a) && math.IsNaN(b) {
return true
}
return math.Abs(a-b) < 1e-9
}

func equalStrings(a, b *string) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}

func Test_processResult(t *testing.T) {
a := time.Now()
fmt.Println(a.Format(time.RFC3339))
quantile := NewQuantile(context.Background(), false, clickhouse.DateTime)
colName := "not-important"
wantedStr := "2024-05-02T21:58:16.297Z"
tests := []struct {
percentileReturnedByClickhouse any
wantedPercentile float64
wantedPercentileAsString *string
}{
{nil, math.NaN(), nil},
{"", math.NaN(), nil},
{"0", math.NaN(), nil},
{0, math.NaN(), nil},
{0.0, math.NaN(), nil},
{[]string{"1.0"}, math.NaN(), nil},
{[]string{"1.0", "5"}, math.NaN(), nil},
{[]any{"1.0", "5"}, math.NaN(), nil},
{[]any{"1.0", "5"}, math.NaN(), nil},
{[]int{1}, math.NaN(), nil},
{[]int{}, math.NaN(), nil},
{[]float64{}, math.NaN(), nil},
{[]float64{1.0}, 1.0, nil},
{[]float64{1.0, 2.0}, 1.0, nil},
{[]any{float64(1.0), 5}, 1.0, nil},
{[]any{5, float64(1.0)}, math.NaN(), nil},
{[]time.Time{util.ParseTime("2024-05-02T21:58:16.297Z"), util.ParseTime("5")}, 1714687096297.0, &wantedStr},
{[]time.Time{util.ParseTime("2024-05-02T21:58:16.297Z")}, 1714687096297.0, &wantedStr},
{[]any{util.ParseTime("2024-05-02T21:58:16.297Z"), 5, 10, 5.2}, 1714687096297.0, &wantedStr},
{[]any{util.ParseTime("2024-05-02T21:58:16.297Z")}, 1714687096297.0, &wantedStr},
}
for i, tt := range tests {
t.Run("testing processResult"+strconv.Itoa(i), func(t *testing.T) {
percentile, percentileAsString, _ := quantile.processResult(colName, tt.percentileReturnedByClickhouse)
if !equalFloats(percentile, tt.wantedPercentile) {
t.Errorf("got %v, wanted %v", percentile, tt.wantedPercentile)
}
if !equalStrings(percentileAsString, tt.wantedPercentileAsString) {
t.Errorf("got %v, wanted %v", percentileAsString, tt.wantedPercentileAsString)
}
})
}
}
Loading

0 comments on commit 7bc45ec

Please sign in to comment.