Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Dec 27, 2024
1 parent 808aaa0 commit b688e83
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 56 deletions.
132 changes: 94 additions & 38 deletions quesma/model/metrics_aggregations/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"quesma/logger"
"quesma/model"
"quesma/util"
"reflect"
"strings"
)

Expand All @@ -18,24 +19,28 @@ type (
multiplier float64
}
RateUnit int
RateMode int
)

const (
Second RateUnit = iota
Minute
Hour
Day
Week
Month
Quarter
Year
Invalid
second RateUnit = iota
minute
hour
day
week
month
quarter
year
)
const (
sum RateMode = iota
valueCount
)

// NewRate creates a new Rate aggregation, during parsing.
// 'multiplier' is set later, during pancake transformation.
func NewRate(ctx context.Context, unit string) *Rate {
return &Rate{ctx: ctx, unit: NewRateUnit(unit)}
return &Rate{ctx: ctx, unit: newRateUnit(ctx, unit)}
}

func (query *Rate) AggregationType() model.AggregationType {
Expand Down Expand Up @@ -64,13 +69,13 @@ func (query *Rate) CalcAndSetMultiplier(parentIntervalInMs int64) {

rateInMs := query.unit.ToMilliseconds(query.ctx)
// unit month/quarter/year is special, only compatible with month/quarter/year calendar intervals
if query.unit == Month || query.unit == Quarter || query.unit == Year {
if query.unit == month || query.unit == quarter || query.unit == year {
oneMonthInMs := int64(30 * 24 * 60 * 60 * 1000)
if parentIntervalInMs < oneMonthInMs {
logger.WarnWithCtx(query.ctx).Msgf("parent interval (%d ms) is not compatible with rate unit %s", parentIntervalInMs, query.unit)
return
}
if query.unit == Year {
if query.unit == year {
rateInMs = 360 * 24 * 60 * 60 * 1000 // round to 360 days, so year/month = 12, year/quarter = 3, as should be
}
}
Expand All @@ -86,72 +91,123 @@ func (query *Rate) String() string {
return fmt.Sprintf("rate(unit: %s)", query.unit)
}

func NewRateUnit(unit string) RateUnit {
func newRateUnit(ctx context.Context, unit string) RateUnit {
switch strings.ToLower(unit) {
case "second":
return Second
return second
case "minute":
return Minute
return minute
case "hour":
return Hour
return hour
case "day":
return Day
return day
case "week":
return Week
return week
case "month":
return Month
return month
case "quarter":
return Quarter
return quarter
case "year":
return Year
return year
default:
return Invalid
// theoretically unreachable, as this is checked during parsing
logger.ErrorWithCtx(ctx).Msgf("invalid rate unit: %s", unit)
return second
}
}

func (u RateUnit) String() string {
switch u {
case Second:
case second:
return "second"
case Minute:
case minute:
return "minute"
case Hour:
case hour:
return "hour"
case Day:
case day:
return "day"
case Week:
case week:
return "week"
case Month:
case month:
return "month"
case Quarter:
case quarter:
return "quarter"
case Year:
case year:
return "year"
default:
// theoretically unreachable
return "invalid"
}
}

func (u RateUnit) ToMilliseconds(ctx context.Context) int64 {
switch u {
case Second:
case second:
return 1000
case Minute:
case minute:
return 60 * 1000
case Hour:
case hour:
return 60 * 60 * 1000
case Day:
case day:
return 24 * 60 * 60 * 1000
case Week:
case week:
return 7 * 24 * 60 * 60 * 1000
case Month:
case month:
return 30 * 24 * 60 * 60 * 1000
case Quarter:
case quarter:
return 3 * 30 * 24 * 60 * 60 * 1000
case Year:
case year:
return 365 * 24 * 60 * 60 * 1000
default:
logger.ErrorWithCtx(ctx).Msgf("invalid rate unit: %s", u)
return 0
}
}

// mode: sum or value_count (sum default)

// TODO make part of QueryType interface and implement for all aggregations
// TODO add bad requests to tests
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
func CheckParamsRate(ctx context.Context, paramsRaw any) error {
requiredParams := map[string]string{
"unit": "string",
}
optionalParams := map[string]string{
"field": "string",
"mode": "string",
}

params, ok := paramsRaw.(model.JsonMap)
if !ok {
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
}

// check if required are present
for paramName, paramType := range requiredParams {
paramVal, exists := params[paramName]
if !exists {
return fmt.Errorf("required parameter %s not found in params", paramName)
}
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
}
// TODO additional check for unit

// check if only required/optional are present
for paramName := range params {
if _, isRequired := requiredParams[paramName]; !isRequired {
wantedType, isOptional := optionalParams[paramName]
if !isOptional {
return fmt.Errorf("unexpected parameter %s found in IP Range params %v", paramName, params)
}
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
}
}
}
// TODO additional check for field (resolve) + mode (one of 2 values)

return nil
}
27 changes: 9 additions & 18 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,17 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
}, true
}

if rateRaw, exists := queryMap["rate"]; exists {
rate, ok := rateRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("rate is not a map, but %T, value: %v. Skipping.", rate, rate)
return metricsAggregation{}, false
}

unit := cw.parseStringField(rate, "unit", "")
if metrics_aggregations.NewRateUnit(unit) == metrics_aggregations.Invalid {
logger.WarnWithCtx(cw.Ctx).Msgf("unit in rate aggregation is not a valid unit: %s. Skipping.", unit)
return metricsAggregation{}, false
}

var fields []model.Expr
if field := cw.parseFieldField(rate, "rate"); field != nil {
fields = append(fields, field)
return metricsAggregation{AggrType: "rate", Fields: fields, unit: unit}, true
} else {
if rate, exists := queryMap["rate"]; exists {
if err := metrics_aggregations.CheckParamsRate(cw.Ctx, rate); err != nil {
logger.WarnWithCtx(cw.Ctx).Msgf("rate aggregation has invalid parameters: %v. Skipping.", rate)
return metricsAggregation{}, false
}
return metricsAggregation{
AggrType: "rate",
Fields: []model.Expr{cw.parseFieldField(rate, "rate")},
// default unit doesn't matter, it's checked in CheckParamsRate, it will never be empty here
unit: cw.parseStringField(rate.(JsonMap), "unit", ""),
}, true
}

return metricsAggregation{}, false
Expand Down

0 comments on commit b688e83

Please sign in to comment.