Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new aggregation IP Range #1100

Merged
merged 8 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quesma/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/H0llyW00dzZ/cidr v1.2.1 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions quesma/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/go-sqllexer v0.0.17 h1:u47fJAVg/+5DA74ZW3w0Qu+3qXHd3GtnA8ZBYixdPrM=
github.com/DataDog/go-sqllexer v0.0.17/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc=
github.com/H0llyW00dzZ/cidr v1.2.1 h1:DfRHX+RqVVKZijQGO1aJSaWvN9Saan8sycK/4wrfY5g=
github.com/H0llyW00dzZ/cidr v1.2.1/go.mod h1:S+EgYkMandSAN27mGNG/CB3jeoXDAyalsvvVFpWdnXc=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
Expand Down
187 changes: 187 additions & 0 deletions quesma/model/bucket_aggregations/ip_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"reflect"
)

// BiggestIpv4 is "255.255.255.255 + 1", so to say. Used in Elastic, because it always uses exclusive upper bounds.
// So instead of "<= 255.255.255.255", it uses "< ::1:0:0:0"
const BiggestIpv4 = "::1:0:0:0"

// Current limitation: we expect Clickhouse field to be IPv4 (and not IPv6)

// Clickhouse table to test SQLs:
// CREATE TABLE __quesma_table_name (clientip IPv4) ENGINE=Log
// INSERT INTO __quesma_table_name VALUES ('0.0.0.0'), ('5.5.5.5'), ('90.180.90.180'), ('128.200.0.8'), ('192.168.1.67'), ('222.168.22.67')

// TODO make part of QueryType interface and implement for all aggregations
// TODO add bad requests to tests
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
func CheckParamsIpRange(ctx context.Context, paramsRaw any) error {
requiredParams := map[string]string{
"field": "string",
"ranges": "map_todo_improve_this_check", // TODO should add same type check to this 'ranges' field, will be fixed
}
optionalParams := map[string]string{
"keyed": "bool",
}

params, ok := paramsRaw.(model.JsonMap)
if !ok {
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
}

// check if required are present
for paramName, paramType := range requiredParams {
paramVal, exists := params[paramName]
if !exists {
return fmt.Errorf("required parameter %s not found in params", paramName)
}
if paramType == "map_todo_improve_this_check" {
continue // uncontinue after TODO is fixed
}
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
}

// check if only required/optional are present
for paramName := range params {
if _, isRequired := requiredParams[paramName]; !isRequired {
wantedType, isOptional := optionalParams[paramName]
if !isOptional {
return fmt.Errorf("unexpected parameter %s found in IP Range params %v", paramName, params)
}
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
}
}
}

return nil
}

type (
IpRange struct {
ctx context.Context
field model.Expr
intervals []IpInterval
keyed bool
}
IpInterval struct {
begin string
end string
key *string // when nil, key is not present
}
)

func NewIpRange(ctx context.Context, intervals []IpInterval, field model.Expr, keyed bool) *IpRange {
return &IpRange{
ctx: ctx,
field: field,
intervals: intervals,
keyed: keyed,
}
}

func NewIpInterval(begin, end string, key *string) IpInterval {
return IpInterval{begin: begin, end: end, key: key}
}

func (interval IpInterval) ToWhereClause(field model.Expr) model.Expr {
isBegin := interval.begin != UnboundedInterval
isEnd := interval.end != UnboundedInterval && interval.end != BiggestIpv4

begin := model.TrueExpr //model.NewInfixExpr(field, ">=", model.NewLiteralSingleQuoted(interval.begin))
end := model.TrueExpr //model.NewInfixExpr(field, "<", model.NewLiteralSingleQuoted(interval.end))

if isBegin && isEnd {
return model.NewInfixExpr(begin, "AND", end)
} else if isBegin {
return begin
} else if isEnd {
return end
} else {
return model.TrueExpr
}
}

// String returns key part of the response, e.g. "1.0-2.0", or "*-6.55"
func (interval IpInterval) String() string {
if interval.key != nil {
return *interval.key
}
return fmt.Sprintf("%s-%s", interval.begin, interval.end)
}

func (query *IpRange) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *IpRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
fmt.Println("DUPA", rows)
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (query *IpRange) String() string {
return "ip_range"
}

func (query *IpRange) DoesNotHaveGroupBy() bool {
return true
}

func (query *IpRange) CombinatorGroups() (result []CombinatorGroup) {
for intervalIdx, interval := range query.intervals {
prefix := fmt.Sprintf("range_%d__", intervalIdx)
if len(query.intervals) == 1 {
prefix = ""
}
result = append(result, CombinatorGroup{
idx: intervalIdx,
Prefix: prefix,
Key: interval.String(),
WhereClause: interval.ToWhereClause(query.field),
})
}
return
}

// bad requests: both to/from and mask

func (query *IpRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorGroup, rows []model.QueryResultRow) model.JsonMap {
fmt.Println("SB", subGroup, "ROWS", rows)
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
if len(rows) == 0 || len(rows[0].Cols) == 0 {
logger.ErrorWithCtx(query.ctx).Msgf("need at least one row and column in ip_range aggregation response, rows: %d, cols: %d", len(rows), len(rows[0].Cols))
return model.JsonMap{}
}
count := rows[0].Cols[len(rows[0].Cols)-1].Value
response := model.JsonMap{
"key": subGroup.Key,
"doc_count": count,
}

interval := query.intervals[subGroup.idx]
if interval.begin != UnboundedInterval {
response["from"] = interval.begin
}
if interval.end != UnboundedInterval {
response["to"] = interval.end
}

return response
}

func (query *IpRange) CombinatorSplit() []model.QueryType {
result := make([]model.QueryType, 0, len(query.intervals))
for _, interval := range query.intervals {
result = append(result, NewIpRange(query.ctx, []IpInterval{interval}, query.field, query.keyed))
}
return result
}
9 changes: 8 additions & 1 deletion quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Elastic-2.0
package model

import "strconv"
import (
"fmt"
"strconv"
)

// Expr is a generic representation of an expression which is a part of the SQL query.
type Expr interface {
Expand Down Expand Up @@ -113,6 +116,10 @@ func NewLiteral(value any) LiteralExpr {
return LiteralExpr{Value: value}
}

func NewLiteralSingleQuoted(value string) LiteralExpr {
return LiteralExpr{Value: fmt.Sprintf("'%s'", value)}
}

// DistinctExpr is a representation of DISTINCT keyword in SQL, e.g. `SELECT DISTINCT` ... or `SELECT COUNT(DISTINCT ...)`
type DistinctExpr struct {
Expr Expr
Expand Down
20 changes: 20 additions & 0 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ func (cw *ClickhouseQueryTranslator) parseIntField(queryMap QueryMap, fieldName
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseBoolField(queryMap QueryMap, fieldName string, defaultValue bool) bool {
if valueRaw, exists := queryMap[fieldName]; exists {
if asBool, ok := valueRaw.(bool); ok {
return asBool
}
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a bool, but %T, value: %v. Using default: %v", fieldName, valueRaw, valueRaw, defaultValue)
}
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseInt64Field(queryMap QueryMap, fieldName string, defaultValue int64) int64 {
if valueRaw, exists := queryMap[fieldName]; exists {
if asFloat, ok := valueRaw.(float64); ok {
Expand Down Expand Up @@ -292,6 +302,16 @@ func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldNa
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseStringFieldExistCheck(queryMap QueryMap, fieldName string) (value string, exists bool) {
if valueRaw, exists := queryMap[fieldName]; exists {
if asString, ok := valueRaw.(string); ok {
return asString, true
}
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a string, but %T, value: %v", fieldName, valueRaw, valueRaw)
}
return "", false
}

func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldName string) ([]any, error) {
if valueRaw, exists := queryMap[fieldName]; exists {
if asArray, ok := valueRaw.([]any); ok {
Expand Down
47 changes: 47 additions & 0 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ package queryparser

import (
"fmt"
"github.com/H0llyW00dzZ/cidr"
"github.com/pkg/errors"
"math"
"net"
"quesma/clickhouse"
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
"quesma/util"
"strconv"
"strings"
)
Expand All @@ -36,6 +40,7 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
}},
{"multi_terms", cw.parseMultiTerms},
{"composite", cw.parseComposite},
{"ip_range", cw.parseIpRange},
}

for _, aggr := range aggregationHandlers {
Expand Down Expand Up @@ -359,6 +364,48 @@ func (cw *ClickhouseQueryTranslator) parseComposite(aggregation *pancakeAggregat
return nil
}

func (cw *ClickhouseQueryTranslator) parseIpRange(aggregation *pancakeAggregationTreeNode, params QueryMap) error {
const defaultKeyed = false

if err := bucket_aggregations.CheckParamsIpRange(cw.Ctx, params); err != nil {
return err
}

rangesRaw := params["ranges"].([]any)
ranges := make([]bucket_aggregations.IpInterval, 0, len(rangesRaw))
for _, rangeRaw := range rangesRaw {
var key *string
if keyIfPresent, exists := cw.parseStringFieldExistCheck(rangeRaw.(QueryMap), "key"); exists {
key = &keyIfPresent
}
var begin, end string
if maskIfExists, exists := cw.parseStringFieldExistCheck(rangeRaw.(QueryMap), "mask"); exists {
_, ipNet, err := net.ParseCIDR(maskIfExists)
if err != nil {
return err
}
beginAsInt, endAsInt := cidr.IPv4ToRange(ipNet)
begin = util.IntToIpv4(beginAsInt)
// endAsInt is inclusive, we do +1, because we need it exclusive
if endAsInt != math.MaxUint32 {
end = util.IntToIpv4(endAsInt + 1)
} else {
end = bucket_aggregations.BiggestIpv4 // "255.255.255.255 + 1", so to say (value in compliance with Elastic)
}
if key == nil {
key = &maskIfExists
}
} else {
begin = cw.parseStringField(rangeRaw.(QueryMap), "from", bucket_aggregations.UnboundedInterval)
end = cw.parseStringField(rangeRaw.(QueryMap), "to", bucket_aggregations.UnboundedInterval)
}
ranges = append(ranges, bucket_aggregations.NewIpInterval(begin, end, key))
}
aggregation.isKeyed = cw.parseBoolField(params, "keyed", defaultKeyed)
aggregation.queryType = bucket_aggregations.NewIpRange(cw.Ctx, ranges, cw.parseFieldField(params, "ip_range"), aggregation.isKeyed)
return nil
}

func (cw *ClickhouseQueryTranslator) parseOrder(params QueryMap, fieldExpressions []model.Expr) ([]model.OrderByExpr, error) {
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)
Expand Down
2 changes: 2 additions & 0 deletions quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func (p *pancakeJSONRenderer) combinatorBucketToJSON(remainingLayers []*pancakeM
selectedRows := p.selectMetricRows(layer.nextBucketAggregation.InternalNameForCount(), selectedRowsWithoutPrefix)
aggJson := queryType.CombinatorTranslateSqlResponseToJson(subGroup, selectedRows)

fmt.Println("aggJson", aggJson, "subAggr", subAggr)
trzysiek marked this conversation as resolved.
Show resolved Hide resolved

mergeResult, mergeErr := util.MergeMaps(aggJson, subAggr)
if mergeErr != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging maps: %v", mergeErr)
Expand Down
Loading
Loading