Skip to content

Commit

Permalink
new aggregation IP Range (#1100)
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek authored Dec 27, 2024
1 parent 31a39e1 commit 820fdeb
Show file tree
Hide file tree
Showing 8 changed files with 545 additions and 47 deletions.
1 change: 1 addition & 0 deletions quesma/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/H0llyW00dzZ/cidr v1.2.1 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions quesma/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h
github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/H0llyW00dzZ/cidr v1.2.1 h1:DfRHX+RqVVKZijQGO1aJSaWvN9Saan8sycK/4wrfY5g=
github.com/H0llyW00dzZ/cidr v1.2.1/go.mod h1:S+EgYkMandSAN27mGNG/CB3jeoXDAyalsvvVFpWdnXc=
github.com/DataDog/go-sqllexer v0.0.18 h1:ErBvoO7/srJLdA2ebwd+HPqD4g1kN++BP64A8qvmh9U=
github.com/DataDog/go-sqllexer v0.0.18/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
Expand Down
185 changes: 185 additions & 0 deletions quesma/model/bucket_aggregations/ip_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"reflect"
)

// BiggestIpv4 is "255.255.255.255 + 1", so to say. Used in Elastic, because it always uses exclusive upper bounds.
// So instead of "<= 255.255.255.255", it uses "< ::1:0:0:0"
const BiggestIpv4 = "::1:0:0:0"

// Current limitation: we expect Clickhouse field to be IPv4 (and not IPv6)

// Clickhouse table to test SQLs:
// CREATE TABLE __quesma_table_name (clientip IPv4) ENGINE=Log
// INSERT INTO __quesma_table_name VALUES ('0.0.0.0'), ('5.5.5.5'), ('90.180.90.180'), ('128.200.0.8'), ('192.168.1.67'), ('222.168.22.67')

// TODO make part of QueryType interface and implement for all aggregations
// TODO add bad requests to tests
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
func CheckParamsIpRange(ctx context.Context, paramsRaw any) error {
requiredParams := map[string]string{
"field": "string",
"ranges": "map_todo_improve_this_check", // TODO should add same type check to this 'ranges' field, will be fixed
}
optionalParams := map[string]string{
"keyed": "bool",
}

params, ok := paramsRaw.(model.JsonMap)
if !ok {
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
}

// check if required are present
for paramName, paramType := range requiredParams {
paramVal, exists := params[paramName]
if !exists {
return fmt.Errorf("required parameter %s not found in params", paramName)
}
if paramType == "map_todo_improve_this_check" {
continue // uncontinue after TODO is fixed
}
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
}

// check if only required/optional are present
for paramName := range params {
if _, isRequired := requiredParams[paramName]; !isRequired {
wantedType, isOptional := optionalParams[paramName]
if !isOptional {
return fmt.Errorf("unexpected parameter %s found in IP Range params %v", paramName, params)
}
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
}
}
}

return nil
}

type (
IpRange struct {
ctx context.Context
field model.Expr
intervals []IpInterval
keyed bool
}
IpInterval struct {
begin string
end string
key *string // when nil, key is not present
}
)

func NewIpRange(ctx context.Context, intervals []IpInterval, field model.Expr, keyed bool) *IpRange {
return &IpRange{
ctx: ctx,
field: field,
intervals: intervals,
keyed: keyed,
}
}

func NewIpInterval(begin, end string, key *string) IpInterval {
return IpInterval{begin: begin, end: end, key: key}
}

func (interval IpInterval) ToWhereClause(field model.Expr) model.Expr {
isBegin := interval.begin != UnboundedInterval
isEnd := interval.end != UnboundedInterval && interval.end != BiggestIpv4

begin := model.NewInfixExpr(field, ">=", model.NewLiteralSingleQuoted(interval.begin))
end := model.NewInfixExpr(field, "<", model.NewLiteralSingleQuoted(interval.end))

if isBegin && isEnd {
return model.NewInfixExpr(begin, "AND", end)
} else if isBegin {
return begin
} else if isEnd {
return end
} else {
return model.TrueExpr
}
}

// String returns key part of the response, e.g. "1.0-2.0", or "*-6.55"
func (interval IpInterval) String() string {
if interval.key != nil {
return *interval.key
}
return fmt.Sprintf("%s-%s", interval.begin, interval.end)
}

func (query *IpRange) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *IpRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
return nil
}

func (query *IpRange) String() string {
return "ip_range"
}

func (query *IpRange) DoesNotHaveGroupBy() bool {
return true
}

func (query *IpRange) CombinatorGroups() (result []CombinatorGroup) {
for intervalIdx, interval := range query.intervals {
prefix := fmt.Sprintf("range_%d__", intervalIdx)
if len(query.intervals) == 1 {
prefix = ""
}
result = append(result, CombinatorGroup{
idx: intervalIdx,
Prefix: prefix,
Key: interval.String(),
WhereClause: interval.ToWhereClause(query.field),
})
}
return
}

// bad requests: both to/from and mask

func (query *IpRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorGroup, rows []model.QueryResultRow) model.JsonMap {
if len(rows) == 0 || len(rows[0].Cols) == 0 {
logger.ErrorWithCtx(query.ctx).Msgf("need at least one row and column in ip_range aggregation response, rows: %d, cols: %d", len(rows), len(rows[0].Cols))
return model.JsonMap{}
}
count := rows[0].Cols[len(rows[0].Cols)-1].Value
response := model.JsonMap{
"key": subGroup.Key,
"doc_count": count,
}

interval := query.intervals[subGroup.idx]
if interval.begin != UnboundedInterval {
response["from"] = interval.begin
}
if interval.end != UnboundedInterval {
response["to"] = interval.end
}

return response
}

func (query *IpRange) CombinatorSplit() []model.QueryType {
result := make([]model.QueryType, 0, len(query.intervals))
for _, interval := range query.intervals {
result = append(result, NewIpRange(query.ctx, []IpInterval{interval}, query.field, query.keyed))
}
return result
}
9 changes: 8 additions & 1 deletion quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Elastic-2.0
package model

import "strconv"
import (
"fmt"
"strconv"
)

// Expr is a generic representation of an expression which is a part of the SQL query.
type Expr interface {
Expand Down Expand Up @@ -126,6 +129,10 @@ func NewLiteral(value any) LiteralExpr {
return LiteralExpr{Value: value}
}

func NewLiteralSingleQuoted(value string) LiteralExpr {
return LiteralExpr{Value: fmt.Sprintf("'%s'", value)}
}

// DistinctExpr is a representation of DISTINCT keyword in SQL, e.g. `SELECT DISTINCT` ... or `SELECT COUNT(DISTINCT ...)`
type DistinctExpr struct {
Expr Expr
Expand Down
10 changes: 10 additions & 0 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldNa
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseStringFieldExistCheck(queryMap QueryMap, fieldName string) (value string, exists bool) {
if valueRaw, exists := queryMap[fieldName]; exists {
if asString, ok := valueRaw.(string); ok {
return asString, true
}
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a string, but %T, value: %v", fieldName, valueRaw, valueRaw)
}
return "", false
}

func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldName string) ([]any, error) {
if valueRaw, exists := queryMap[fieldName]; exists {
if asArray, ok := valueRaw.([]any); ok {
Expand Down
47 changes: 47 additions & 0 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ package queryparser

import (
"fmt"
"github.com/H0llyW00dzZ/cidr"
"github.com/pkg/errors"
"math"
"net"
"quesma/clickhouse"
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
"quesma/util"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -37,6 +41,7 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
}},
{"multi_terms", cw.parseMultiTerms},
{"composite", cw.parseComposite},
{"ip_range", cw.parseIpRange},
{"ip_prefix", cw.parseIpPrefix},
}

Expand Down Expand Up @@ -382,6 +387,48 @@ func (cw *ClickhouseQueryTranslator) parseComposite(aggregation *pancakeAggregat
return nil
}

func (cw *ClickhouseQueryTranslator) parseIpRange(aggregation *pancakeAggregationTreeNode, params QueryMap) error {
const defaultKeyed = false

if err := bucket_aggregations.CheckParamsIpRange(cw.Ctx, params); err != nil {
return err
}

rangesRaw := params["ranges"].([]any)
ranges := make([]bucket_aggregations.IpInterval, 0, len(rangesRaw))
for _, rangeRaw := range rangesRaw {
var key *string
if keyIfPresent, exists := cw.parseStringFieldExistCheck(rangeRaw.(QueryMap), "key"); exists {
key = &keyIfPresent
}
var begin, end string
if maskIfExists, exists := cw.parseStringFieldExistCheck(rangeRaw.(QueryMap), "mask"); exists {
_, ipNet, err := net.ParseCIDR(maskIfExists)
if err != nil {
return err
}
beginAsInt, endAsInt := cidr.IPv4ToRange(ipNet)
begin = util.IntToIpv4(beginAsInt)
// endAsInt is inclusive, we do +1, because we need it exclusive
if endAsInt != math.MaxUint32 {
end = util.IntToIpv4(endAsInt + 1)
} else {
end = bucket_aggregations.BiggestIpv4 // "255.255.255.255 + 1", so to say (value in compliance with Elastic)
}
if key == nil {
key = &maskIfExists
}
} else {
begin = cw.parseStringField(rangeRaw.(QueryMap), "from", bucket_aggregations.UnboundedInterval)
end = cw.parseStringField(rangeRaw.(QueryMap), "to", bucket_aggregations.UnboundedInterval)
}
ranges = append(ranges, bucket_aggregations.NewIpInterval(begin, end, key))
}
aggregation.isKeyed = cw.parseBoolField(params, "keyed", defaultKeyed)
aggregation.queryType = bucket_aggregations.NewIpRange(cw.Ctx, ranges, cw.parseFieldField(params, "ip_range"), aggregation.isKeyed)
return nil
}

func (cw *ClickhouseQueryTranslator) parseIpPrefix(aggregation *pancakeAggregationTreeNode, params QueryMap) error {
const (
defaultIsIpv6 = false
Expand Down
Loading

0 comments on commit 820fdeb

Please sign in to comment.