Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DateTime values in metrics aggregations #29

Merged
merged 8 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quesma/clickhouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,7 @@ func NewDefaultBoolAttribute() Attribute {
Type: NewBaseType("Bool"),
}
}

func (dt DateTimeType) String() string {
return []string{"DateTime64", "DateTime", "Invalid"}[dt]
}
10 changes: 6 additions & 4 deletions quesma/model/metrics_aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/model"
)

type Avg struct {
ctx context.Context
ctx context.Context
fieldType clickhouse.DateTimeType
}

func NewAvg(ctx context.Context) Avg {
return Avg{ctx: ctx}
func NewAvg(ctx context.Context, fieldType clickhouse.DateTimeType) Avg {
return Avg{ctx: ctx, fieldType: fieldType}
}

func (query Avg) IsBucketAggregation() bool {
return false
}

func (query Avg) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

func (query Avg) String() string {
Expand Down
52 changes: 44 additions & 8 deletions quesma/model/metrics_aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,58 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"time"
)

func metricsTranslateSqlResponseToJson(ctx context.Context, rows []model.QueryResultRow, level int) []model.JsonMap {
var value any = nil
if len(rows) > 0 {
if len(rows[0].Cols) > 0 {
value = rows[0].Cols[len(rows[0].Cols)-1].Value
} else {
logger.WarnWithCtx(ctx).Msg("no columns returned for metrics aggregation")
}
} else {
logger.WarnWithCtx(ctx).Msg("no rows returned for metrics aggregation")
if resultRowsAreFine(ctx, rows) {
value = rows[0].Cols[len(rows[0].Cols)-1].Value
}
return []model.JsonMap{{
"value": value,
}}
}

// metricsTranslateSqlResponseToJsonWithFieldTypeCheck is the same as metricsTranslateSqlResponseToJson for all types except DateTimes.
// With DateTimes, we need to return 2 values, instead of 1, that's the difference.
func metricsTranslateSqlResponseToJsonWithFieldTypeCheck(
ctx context.Context, rows []model.QueryResultRow, level int, fieldType clickhouse.DateTimeType) []model.JsonMap {
if fieldType == clickhouse.Invalid {
// if it's not a date, we do just a normal response
return metricsTranslateSqlResponseToJson(ctx, rows, level)
}

var value, valueAsString any = nil, nil
if resultRowsAreFine(ctx, rows) {
valueAsAny := rows[0].Cols[len(rows[0].Cols)-1].Value
if valueAsTime, ok := valueAsAny.(time.Time); ok {
value = valueAsTime.UnixMilli()
valueAsString = valueAsTime.Format(time.RFC3339Nano)
} else {
logger.WarnWithCtx(ctx).Msg("could not parse date")
}
}
response := model.JsonMap{
"value": value,
}
if value != nil {
response["value_as_string"] = valueAsString
}
return []model.JsonMap{response}
}

func resultRowsAreFine(ctx context.Context, rows []model.QueryResultRow) bool {
if len(rows) == 0 {
logger.WarnWithCtx(ctx).Msg("no rows returned for metrics aggregation")
return false
}
if len(rows[0].Cols) == 0 {
logger.WarnWithCtx(ctx).Msg("no columns returned for metrics aggregation")
return false
}
return true
}
10 changes: 6 additions & 4 deletions quesma/model/metrics_aggregations/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/model"
)

type Max struct {
ctx context.Context
ctx context.Context
fieldType clickhouse.DateTimeType
}

func NewMax(ctx context.Context) Max {
return Max{ctx: ctx}
func NewMax(ctx context.Context, fieldType clickhouse.DateTimeType) Max {
return Max{ctx: ctx, fieldType: fieldType}
}

func (query Max) IsBucketAggregation() bool {
return false
}

func (query Max) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

func (query Max) String() string {
Expand Down
10 changes: 6 additions & 4 deletions quesma/model/metrics_aggregations/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package metrics_aggregations

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/model"
)

type Min struct {
ctx context.Context
ctx context.Context
fieldType clickhouse.DateTimeType
}

func NewMin(ctx context.Context) Min {
return Min{ctx: ctx}
func NewMin(ctx context.Context, fieldType clickhouse.DateTimeType) Min {
return Min{ctx: ctx, fieldType: fieldType}
}

func (query Min) IsBucketAggregation() bool {
return false
}

func (query Min) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

func (query Min) String() string {
Expand Down
102 changes: 83 additions & 19 deletions quesma/model/metrics_aggregations/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package metrics_aggregations
import (
"context"
"math"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/util"
"strconv"
"strings"
"time"
)

type Quantile struct {
ctx context.Context
keyed bool // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html#_keyed_response_6
ctx context.Context
keyed bool // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html#_keyed_response_6
fieldType clickhouse.DateTimeType
}

func NewQuantile(ctx context.Context, keyed bool) Quantile {
return Quantile{ctx, keyed}
func NewQuantile(ctx context.Context, keyed bool, fieldType clickhouse.DateTimeType) Quantile {
return Quantile{ctx, keyed, fieldType}
}

func (query Quantile) IsBucketAggregation() bool {
Expand All @@ -24,6 +28,7 @@ func (query Quantile) IsBucketAggregation() bool {

func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
valueMap := make(model.JsonMap)
valueAsStringMap := make(model.JsonMap)

if len(rows) == 0 {
return emptyPercentilesResult
Expand All @@ -34,27 +39,22 @@ func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, le

for _, res := range rows[0].Cols {
if strings.HasPrefix(res.ColName, "quantile") {
percentile, ok := res.Value.([]float64)
if !ok {
logger.WarnWithCtx(query.ctx).Msgf(
"failed to convert percentile values to []float64, type: %T, value: %v. Skipping", res.Value, res.Value)
continue
}
// error handling is moved to processResult
percentile, percentileAsString, percentileIsNanOrInvalid := query.processResult(res.ColName, res.Value)
percentileName, _ := strings.CutPrefix(res.ColName, "quantile_")

// percentileName can't be an integer (doesn't work in Kibana that way), so we need to add .0 if it's missing
dotIndex := strings.Index(percentileName, ".")
if dotIndex == -1 {
percentileName += ".0"
}

if len(percentile) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("empty percentile values for %s", percentileName)
}
if len(percentile) == 0 || math.IsNaN(percentile[0]) {
if percentileIsNanOrInvalid {
valueMap[percentileName] = nil
} else {
valueMap[percentileName] = percentile[0]
valueMap[percentileName] = percentile
if percentileAsString != nil {
valueAsStringMap[percentileName] = *percentileAsString
}
}
}
}
Expand All @@ -65,12 +65,18 @@ func (query Quantile) TranslateSqlResponseToJson(rows []model.QueryResultRow, le
}}
} else {
var values []model.JsonMap
for key, value := range valueMap {
keysSorted := util.MapKeysSorted(valueMap)
for _, key := range keysSorted {
value := valueMap[key]
keyAsFloat, _ := strconv.ParseFloat(key, 64)
values = append(values, model.JsonMap{
responseValue := model.JsonMap{
"key": keyAsFloat,
"value": value,
})
}
if _, exists := valueAsStringMap[key]; exists {
responseValue["value_as_string"] = valueAsStringMap[key]
}
values = append(values, responseValue)
}
return []model.JsonMap{{
"values": values,
Expand All @@ -82,6 +88,64 @@ func (query Quantile) String() string {
return "quantile"
}

// processResult processes the result of a single quantile value from Clickhouse, and handles all errors encountered.
// Unfortunately valueFromClickhouse is an array, even though we're only interested in [0] index.
// It makes this function a bit messy.
// That can be changed by changing the Clickhouse query, from `quantiles` to `quantile`, but it's well tested already + more general,
// I'd keep as it is for now, unless we find some further problems with it.
//
// Returns:
// - percentile: float64 value of the percentile (or NaN if it's invalid)
// - percentileAsString: string representation of the percentile
// (or nil if we don't have it/don't need it - we'll just omit it in the response and that's fine)
// - percentileIsNanOrInvalid: true if the percentile is NaN or invalid. We know we'll need to return nil in the response
func (query Quantile) processResult(colName string, percentileReturnedByClickhouse any) (
percentile float64, percentileAsString *string, percentileIsNanOrInvalid bool) {
var percentileAsArrayLen int
// We never return from this switch preemptively to make code easier,
// assumption is following: we know something is wrong if after the switch either
// a) percentileAsArrayLen == 0, or b) percentileIsNanOrInvalid == true. Else => we're good.
switch percentileTyped := percentileReturnedByClickhouse.(type) {
case []float64:
percentileAsArrayLen = len(percentileTyped)
if len(percentileTyped) > 0 {
percentileIsNanOrInvalid = math.IsNaN(percentileTyped[0])
percentile = percentileTyped[0]
}
case []time.Time:
percentileAsArrayLen = len(percentileTyped)
if len(percentileTyped) > 0 {
percentile = float64(percentileTyped[0].UnixMilli())
asString := percentileTyped[0].Format(time.RFC3339Nano)
percentileAsString = &asString
}
case []any:
percentileAsArrayLen = len(percentileTyped)
if len(percentileTyped) > 0 {
switch percentileTyped[0].(type) {
case float64:
return query.processResult(colName, []float64{percentileTyped[0].(float64)})
case time.Time:
return query.processResult(colName, []time.Time{percentileTyped[0].(time.Time)})
default:
logger.WarnWithCtx(query.ctx).Msgf("unexpected type in percentile array: %T, array: %v", percentileTyped[0], percentileTyped)
percentileIsNanOrInvalid = true
}
}
default:
logger.WarnWithCtx(query.ctx).Msgf("unexpected type in percentile array: %T, value: %v", percentileReturnedByClickhouse, percentileReturnedByClickhouse)
percentileIsNanOrInvalid = true
}
if percentileAsArrayLen == 0 {
logger.WarnWithCtx(query.ctx).Msgf("empty percentile values for %s", colName)
return math.NaN(), nil, true
}
if percentileIsNanOrInvalid {
return math.NaN(), nil, true
}
return percentile, percentileAsString, percentileIsNanOrInvalid
}

var emptyPercentilesResult = []model.JsonMap{{
"values": 0,
}}
74 changes: 74 additions & 0 deletions quesma/model/metrics_aggregations/quantile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package metrics_aggregations

import (
"context"
"fmt"
"math"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/util"
"strconv"
"testing"
"time"
)

func equalFloats(a, b float64) bool {
if math.IsNaN(a) && math.IsNaN(b) {
return true
}
return math.Abs(a-b) < 1e-9
}

func equalStrings(a, b *string) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}

func Test_processResult(t *testing.T) {
a := time.Now()
fmt.Println(a.Format(time.RFC3339))
quantile := NewQuantile(context.Background(), false, clickhouse.DateTime)
colName := "not-important"
wantedStr := "2024-05-02T21:58:16.297Z"
tests := []struct {
percentileReturnedByClickhouse any
wantedPercentile float64
wantedPercentileAsString *string
}{
{nil, math.NaN(), nil},
{"", math.NaN(), nil},
{"0", math.NaN(), nil},
{0, math.NaN(), nil},
{0.0, math.NaN(), nil},
{[]string{"1.0"}, math.NaN(), nil},
{[]string{"1.0", "5"}, math.NaN(), nil},
{[]any{"1.0", "5"}, math.NaN(), nil},
{[]any{"1.0", "5"}, math.NaN(), nil},
{[]int{1}, math.NaN(), nil},
{[]int{}, math.NaN(), nil},
{[]float64{}, math.NaN(), nil},
{[]float64{1.0}, 1.0, nil},
{[]float64{1.0, 2.0}, 1.0, nil},
{[]any{float64(1.0), 5}, 1.0, nil},
{[]any{5, float64(1.0)}, math.NaN(), nil},
{[]time.Time{util.ParseTime("2024-05-02T21:58:16.297Z"), util.ParseTime("5")}, 1714687096297.0, &wantedStr},
{[]time.Time{util.ParseTime("2024-05-02T21:58:16.297Z")}, 1714687096297.0, &wantedStr},
{[]any{util.ParseTime("2024-05-02T21:58:16.297Z"), 5, 10, 5.2}, 1714687096297.0, &wantedStr},
{[]any{util.ParseTime("2024-05-02T21:58:16.297Z")}, 1714687096297.0, &wantedStr},
}
for i, tt := range tests {
t.Run("testing processResult"+strconv.Itoa(i), func(t *testing.T) {
percentile, percentileAsString, _ := quantile.processResult(colName, tt.percentileReturnedByClickhouse)
if !equalFloats(percentile, tt.wantedPercentile) {
t.Errorf("got %v, wanted %v", percentile, tt.wantedPercentile)
}
if !equalStrings(percentileAsString, tt.wantedPercentileAsString) {
t.Errorf("got %v, wanted %v", percentileAsString, tt.wantedPercentileAsString)
}
})
}
}
Loading
Loading