Skip to content

Commit

Permalink
Support for IPv6 in IP Prefix aggr (#1112)
Browse files Browse the repository at this point in the history
We had support for IPv4, but Jacek wanted for a) IPv6 and b) IPs stored
in Clickhouse as ints.
This PR adds support for IPv6, and I think having IPs as ints in
Clickhouse will also work, because whenever we query for IPs, we use
`intDiv(ip_field, some_number)`. So we already make use of the fact that
Clickhouse treats all IPs as ints (`IPv4` type: `uint32`, `IPv6` type:
`uint128`)
Didn't test it yet, though.
  • Loading branch information
trzysiek authored Dec 14, 2024
1 parent fe52aa6 commit cea1f7a
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 29 deletions.
17 changes: 17 additions & 0 deletions quesma/clickhouse/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,20 @@ func TestParseTypeFromShowColumnsTuple_2(t *testing.T) {
assert.Equal(t, NewBaseType("String"), mvt.Cols[1].Type)
assert.Equal(t, "c", mvt.Cols[1].Name)
}

// TestWhatDriverWillReturn is a helper test for manual testing of the Clickhouse driver
// E.g. I wasn't sure what type will be returned for intDiv(ipv6, 1) in Clickhouse, so this test gave me the answer
func TestWhatDriverWillReturn(t *testing.T) {
/*
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
db := clickhouse.OpenDB(&options)
defer db.Close()
rows, _ := db.Query("SELECT intDiv(ipv6, 1) from i LIMIT 10")
var q big.Int // replacing big.Int with any might be useful
for rows.Next() {
rows.Scan(&q)
fmt.Printf("%v %T\n", q, q)
}
*/
}
78 changes: 58 additions & 20 deletions quesma/model/bucket_aggregations/ip_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ package bucket_aggregations
import (
"context"
"fmt"
"math/big"
"quesma/logger"
"quesma/model"
"quesma/util"
"reflect"
)

// Current limitation: we expect Clickhouse field to be IPv4 (and not IPv6)

// Clickhouse table to test SQLs:
// CREATE TABLE __quesma_table_name (clientip IPv4) ENGINE=Log
// INSERT INTO __quesma_table_name VALUES ('0.0.0.0'), ('5.5.5.5'), ('90.180.90.180'), ('128.200.0.8'), ('192.168.1.67'), ('222.168.22.67')
// Testing helpers:
// * (ipv4) Clickhouse table to test SQLs:
// CREATE TABLE __quesma_table_name (clientip IPv4) ENGINE=Log
// INSERT INTO __quesma_table_name VALUES ('0.0.0.0'), ('5.5.5.5'), ('90.180.90.180'), ('128.200.0.8'), ('192.168.1.67'), ('222.168.22.67')
// * (ipv6) If ip field in Clickhouse is string, not IPv6, just change "ip_fieldname" to "ip_fieldname"::IPv6, to test SQLs from tests
// (careful with that, most of the time it works, but sometimes some differences arise, I guess from big/little endian differences)

// TODO make part of QueryType interface and implement for all aggregations
// TODO add bad requests to tests
Expand All @@ -30,7 +33,7 @@ func CheckParamsIpPrefix(ctx context.Context, paramsRaw any) error {
"keyed": "bool",
"min_doc_count": "int",
}
logIfYouSeeThemParams := []string{"min_doc_count"} // we don't use min_doc_count yet. We'll log if "is_ipv6" == true, also.
logIfYouSeeThemParams := []string{"min_doc_count"} // we don't use min_doc_count yet.

params, ok := paramsRaw.(model.JsonMap)
if !ok {
Expand All @@ -47,6 +50,15 @@ func CheckParamsIpPrefix(ctx context.Context, paramsRaw any) error {
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
}
// prefixLength must be [0, 32] for ipv4, [0, 128] for ipv6
prefixLength := params["prefix_length"].(float64) // will never panic because of checks above
upperBound := 32.0
if ipv6, exists := params["is_ipv6"]; exists && ipv6.(bool) {
upperBound = 128.0
}
if util.IsSmaller(prefixLength, 0) || util.IsSmaller(upperBound, prefixLength) {
return fmt.Errorf("prefix_length must be in range [0, %d], but got %f", int(upperBound), prefixLength)
}

// check if only required/optional are present
for paramName := range params {
Expand All @@ -67,9 +79,6 @@ func CheckParamsIpPrefix(ctx context.Context, paramsRaw any) error {
logger.WarnWithCtxAndThrottling(ctx, "ip_prefix", warnParam, "we didn't expect %s in IP Range params %v", warnParam, params)
}
}
if isIpv6, exists := params["is_ipv6"]; exists && isIpv6.(bool) {
logger.WarnWithCtxAndThrottling(ctx, "ip_prefix", "is_ipv6", "is_ipv6 is true in IP Range params %v, we don't support IPv6 yet", params)
}

return nil
}
Expand Down Expand Up @@ -101,7 +110,12 @@ func (query *IpPrefix) AggregationType() model.AggregationType {
}

func (query *IpPrefix) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
var netmask, keySuffix string
var (
ok bool
key, netmask, keySuffix string
originalKeyIpv4 uint32 // if is_ipv6 is false, Clickhouse will always return uint32 as the key
originalKeyIpv6 big.Int // if is_ipv6 is true, Clickhouse will always return big.Int as the key
)
if !query.isIpv6 {
netmask = query.calcNetMask()
}
Expand All @@ -111,7 +125,6 @@ func (query *IpPrefix) TranslateSqlResponseToJson(rows []model.QueryResultRow) m
buckets := make([]model.JsonMap, 0, len(rows))
for _, row := range rows {
var docCount any
var originalKey uint32
if query.prefixLength == 0 {
if len(row.Cols) != 1 {
logger.ErrorWithCtx(query.ctx).Msgf(
Expand All @@ -125,17 +138,26 @@ func (query *IpPrefix) TranslateSqlResponseToJson(rows []model.QueryResultRow) m
"unexpected number of columns in ip_prefix aggregation response, len: %d, row: %v", len(row.Cols), row)
continue
}
var ok bool
originalKey, ok = row.Cols[0].Value.(uint32)

docCount = row.Cols[1].Value
if query.isIpv6 {
originalKeyIpv6, ok = row.Cols[0].Value.(big.Int)
} else {
originalKeyIpv4, ok = row.Cols[0].Value.(uint32)
}
if !ok {
logger.ErrorWithCtx(query.ctx).Msgf("unexpected type of key in ip_prefix aggregation response, got %T", row.Cols[0])
continue
}
docCount = row.Cols[1].Value
}

if query.isIpv6 {
key = query.calcKeyIPv6(originalKeyIpv6) + keySuffix
} else {
key = query.calcKeyIPv4(originalKeyIpv4) + keySuffix
}
bucket := model.JsonMap{
"key": query.calcKey(originalKey) + keySuffix,
"key": key,
"doc_count": docCount,
"prefix_length": query.prefixLength,
"is_ipv6": query.isIpv6,
Expand Down Expand Up @@ -175,18 +197,27 @@ func (query *IpPrefix) SqlSelectQuery() model.Expr {
if query.prefixLength == 0 {
return nil
}
return model.NewFunction("intDiv", query.field, model.NewLiteral(query.divideByToGroupBy()))
if query.isIpv6 {
return model.NewFunction("intDiv", query.field, model.NewLiteral(query.divideByToGroupByIpv6().String()))
} else {
return model.NewFunction("intDiv", query.field, model.NewLiteral(query.divideByToGroupByIpv4()))
}
}

func (query *IpPrefix) divideByToGroupBy() uint32 {
func (query *IpPrefix) divideByToGroupByIpv4() uint32 {
return 1 << (32 - query.prefixLength)
}

func (query *IpPrefix) calcKey(originalKey uint32) string {
// divideByToGroupByIpv6 returns 2^(128-prefixLength)
func (query *IpPrefix) divideByToGroupByIpv6() *big.Int {
return big.NewInt(1).Lsh(big.NewInt(1), uint(128-query.prefixLength))
}

func (query *IpPrefix) calcKeyIPv4(originalKey uint32) string {
if query.prefixLength == 0 {
return "0.0.0.0"
}
ipAsInt := originalKey * query.divideByToGroupBy()
ipAsInt := originalKey * query.divideByToGroupByIpv4()
part4 := ipAsInt % 256
ipAsInt /= 256
part3 := ipAsInt % 256
Expand All @@ -197,10 +228,17 @@ func (query *IpPrefix) calcKey(originalKey uint32) string {
return fmt.Sprintf("%d.%d.%d.%d", part1, part2, part3, part4)
}

func (query *IpPrefix) calcKeyIPv6(originalKey big.Int) string {
// ipAsInt = originalKey * 2^(128-prefixLength)
ipAsInt := originalKey.Mul(&originalKey, big.NewInt(1).Lsh(big.NewInt(1), uint(128-query.prefixLength)))
return util.BigIntToIpv6(*ipAsInt)
}

// calcNetMask is only called for ipv4, so 1<<(query.prefixLength-1) will never overflow
func (query *IpPrefix) calcNetMask() string {
if query.prefixLength == 0 {
return "0.0.0.0"
}
biggestPossibleKey := uint32(1<<query.prefixLength - 1)
return query.calcKey(biggestPossibleKey) // netmask is the same as ip of biggest possible key
return query.calcKeyIPv4(biggestPossibleKey) // netmask is the same as ip of biggest possible key
}
11 changes: 10 additions & 1 deletion quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queryparser
import (
"context"
"fmt"
"math/big"
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
Expand Down Expand Up @@ -105,7 +106,15 @@ func (p *pancakeJSONRenderer) splitBucketRows(bucket *pancakeModelBucketAggregat
if strings.HasPrefix(cols.ColName, bucketKeyName) {
for _, previousCols := range previousBucket.Cols {
if cols.ColName == previousCols.ColName {
if cols.Value != previousCols.Value {
var isEqual bool
switch val := cols.Value.(type) {
case big.Int:
prevVal := previousCols.Value.(big.Int)
isEqual = val.Cmp(&prevVal) == 0
default:
isEqual = val == previousCols.Value
}
if !isEqual {
isNewBucket = true
}
break
Expand Down
Loading

0 comments on commit cea1f7a

Please sign in to comment.