Skip to content

Commit

Permalink
Merge branch 'main' into search-after
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Dec 28, 2024
2 parents 7afebeb + 9173255 commit af7b422
Show file tree
Hide file tree
Showing 16 changed files with 1,045 additions and 103 deletions.
39 changes: 39 additions & 0 deletions NOTICE.MD
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,45 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

--------------------------------------------------------------------------------
#### Module : github.com/H0llyW00dzZ/cidr
Version : v1.2.1
Time : 2024-03-27T02:04:51Z
Licence : BSD-3-Clause

Contents of probable licence file $GOMODCACHE/github.com/!h0lly!w00dz!z/[email protected]/LICENSE:

BSD 3-Clause License

Copyright (c) 2024, H0llyW00dzZ
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
#### Module : github.com/antlr4-go/antlr/v4
Version : v4.13.1
Expand Down
8 changes: 5 additions & 3 deletions quesma/logger/log_with_throttling.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"
)

// throttleMap: (reason name -> last logged time)
// We log only once per throttleDuration for each reason name, so that we don't spam the logs.
var throttleMap = util.NewSyncMap[string, time.Time]()

const throttleDuration = 30 * time.Minute
Expand All @@ -28,11 +30,11 @@ func WarnWithCtxAndThrottling(ctx context.Context, aggrName, paramName, format s

// WarnWithThrottling - logs a warning message with throttling.
// We only log once per throttleDuration for each warnName, so that we don't spam the logs.
func WarnWithThrottling(warnName, format string, v ...any) {
timestamp, ok := throttleMap.Load(warnName)
func WarnWithThrottling(reasonName, format string, v ...any) {
timestamp, ok := throttleMap.Load(reasonName)
weThrottle := ok && time.Since(timestamp) < throttleDuration
if !weThrottle {
Warn().Msgf(format, v...)
throttleMap.Store(warnName, time.Now())
throttleMap.Store(reasonName, time.Now())
}
}
65 changes: 56 additions & 9 deletions quesma/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ func Test_Main(m *testing.T) {
_ = buildIngestOnlyQuesma()
}

func emitRequests(stop chan os.Signal) {
func emitRequests(stop chan os.Signal, t *testing.T, testData []struct {
url string
expectedResponse string
}) {
go func() {
time.Sleep(1 * time.Second)
requestBody := []byte(`{"query": {"match_all": {}}}`)
sendRequest("http://localhost:8888/_bulk", requestBody)
sendRequest("http://localhost:8888/_doc", requestBody)
sendRequest("http://localhost:8888/_search", requestBody)
sendRequest("http://localhost:8888/_search", requestBody)
var resp string
var err error
for _, test := range testData {
resp, err = sendRequest(test.url, requestBody)
assert.NoError(t, err)
assert.Contains(t, test.expectedResponse, resp)
}
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
close(stop)
}()
Expand Down Expand Up @@ -144,7 +150,16 @@ func Test_fallbackScenario(t *testing.T) {
q1, _ := qBuilder.Build()
q1.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "unknown\n"},
{"http://localhost:8888/_doc", "unknown\n"},
{"http://localhost:8888/_search", "unknown\n"},
{"http://localhost:8888/_search", "unknown\n"},
}
emitRequests(stop, t, testData)
<-stop
q1.Stop(context.Background())
atomic.LoadInt32(&fallbackCalled)
Expand All @@ -155,7 +170,20 @@ func Test_scenario1(t *testing.T) {
q1 := ab_testing_scenario()
q1.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", `bulk->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor
bulk->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor
`},
{"http://localhost:8888/_doc", `doc->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor
doc->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor
`},
{"http://localhost:8888/_search", "ABTestProcessor processor: Responses are equal\n"},
{"http://localhost:8888/_search", "ABTestProcessor processor: Responses are not equal\n"},
}
emitRequests(stop, t, testData)
<-stop
q1.Stop(context.Background())
}
Expand Down Expand Up @@ -215,7 +243,17 @@ func Test_middleware(t *testing.T) {
quesmaBuilder.Build()
quesmaBuilder.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "middleware\n"},
{"http://localhost:8888/_doc", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
}
emitRequests(stop, t, testData)

<-stop
quesmaBuilder.Stop(context.Background())
atomic.LoadInt32(&middlewareCallCount)
Expand All @@ -227,7 +265,16 @@ func Test_middleware(t *testing.T) {
quesmaBuilder.Build()
quesmaBuilder.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "middleware\n"},
{"http://localhost:8888/_doc", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
}
emitRequests(stop, t, testData)
<-stop
quesmaBuilder.Stop(context.Background())
atomic.LoadInt32(&middlewareCallCount)
Expand Down
4 changes: 2 additions & 2 deletions quesma/model/bucket_aggregations/ip_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (interval IpInterval) ToWhereClause(field model.Expr) model.Expr {
isBegin := interval.begin != UnboundedInterval
isEnd := interval.end != UnboundedInterval && interval.end != BiggestIpv4

begin := model.NewInfixExpr(field, ">=", model.NewLiteralSingleQuoted(interval.begin))
end := model.NewInfixExpr(field, "<", model.NewLiteralSingleQuoted(interval.end))
begin := model.NewInfixExpr(field, ">=", model.NewLiteralSingleQuoteString(interval.begin))
end := model.NewInfixExpr(field, "<", model.NewLiteralSingleQuoteString(interval.end))

if isBegin && isEnd {
return model.NewInfixExpr(begin, "AND", end)
Expand Down
120 changes: 117 additions & 3 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,32 @@ package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"quesma/util"
"quesma/util/regex"
"reflect"
)

// TODO when adding include/exclude, check escaping of ' and \ in those fields
type Terms struct {
ctx context.Context
significant bool // true <=> significant_terms, false <=> terms
OrderByExpr model.Expr
// include is either:
// - single value: then for strings, it can be a regex.
// - array: then field must match exactly one of the values (never a regex)
// Nil if missing in request.
include any
// exclude is either:
// - single value: then for strings, it can be a regex.
// - array: then field must match exactly one of the values (never a regex)
// Nil if missing in request.
exclude any
}

func NewTerms(ctx context.Context, significant bool, orderByExpr model.Expr) Terms {
return Terms{ctx: ctx, significant: significant, OrderByExpr: orderByExpr}
func NewTerms(ctx context.Context, significant bool, include, exclude any) Terms {
return Terms{ctx: ctx, significant: significant, include: include, exclude: exclude}
}

func (query Terms) AggregationType() model.AggregationType {
Expand Down Expand Up @@ -106,3 +119,104 @@ func (query Terms) key(row model.QueryResultRow) any {
func (query Terms) parentCount(row model.QueryResultRow) any {
return row.Cols[len(row.Cols)-3].Value
}

func (query Terms) UpdateFieldForIncludeAndExclude(field model.Expr) (updatedField model.Expr, didWeUpdateField bool) {
// We'll use here everywhere Clickhouse 'if' function: if(condition, then, else)
// In our case field becomes: if(condition that field is not excluded, field, NULL)
ifOrNull := func(condition model.Expr) model.FunctionExpr {
return model.NewFunction("if", condition, field, model.NullExpr)
}

hasExclude := query.exclude != nil
excludeArr, excludeIsArray := query.exclude.([]any)
switch {
case hasExclude && excludeIsArray:
if len(excludeArr) == 0 {
return field, false
}

// Select expr will be: if(field NOT IN (excludeArr[0], excludeArr[1], ...), field, NULL)
exprs := make([]model.Expr, 0, len(excludeArr))
for _, excludeVal := range excludeArr {
exprs = append(exprs, model.NewLiteralSingleQuoteString(excludeVal))
}
return ifOrNull(model.NewInfixExpr(field, "NOT IN", model.NewTupleExpr(exprs...))), true
case hasExclude:
switch exclude := query.exclude.(type) {
case string: // hard case, might be regex
funcName, patternExpr := regex.ToClickhouseExpr(exclude)
return ifOrNull(model.NewInfixExpr(field, "NOT "+funcName, patternExpr)), true
default: // easy case, never regex
return ifOrNull(model.NewInfixExpr(field, "!=", model.NewLiteral(query.exclude))), true
}

default:
return field, false // TODO implement similar support for 'include' in next PR
}
}

// TODO make part of QueryType interface and implement for all aggregations
// TODO add bad requests to tests
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
func CheckParamsTerms(ctx context.Context, paramsRaw any) error {
requiredParams := map[string]string{"field": "string"}
optionalParams := map[string]string{
"size": "float64|string", // TODO should be int|string, will be fixed
"shard_size": "float64", // TODO should be int, will be fixed
"order": "order", // TODO add order type
"min_doc_count": "float64", // TODO should be int, will be fixed
"shard_min_doc_count": "float64", // TODO should be int, will be fixed
"show_term_doc_count_error": "bool",
"exclude": "not-checking-type-now-complicated",
"include": "not-checking-type-now-complicated",
"collect_mode": "string",
"execution_hint": "string",
"missing": "string",
"value_type": "string",
}
logIfYouSeeThemParams := []string{
"shard_size", "min_doc_count", "shard_min_doc_count",
"show_term_doc_count_error", "collect_mode", "execution_hint", "value_type",
}

params, ok := paramsRaw.(model.JsonMap)
if !ok {
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
}

// check if required are present
for paramName, paramType := range requiredParams {
paramVal, exists := params[paramName]
if !exists {
return fmt.Errorf("required parameter %s not found in Terms params", paramName)
}
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
}

// check if only required/optional are present
for paramName := range params {
if _, isRequired := requiredParams[paramName]; !isRequired {
wantedType, isOptional := optionalParams[paramName]
if !isOptional {
return fmt.Errorf("unexpected parameter %s found in Terms params %v", paramName, params)
}
if wantedType == "not-checking-type-now-complicated" || wantedType == "order" || wantedType == "float64|string" {
continue // TODO: add that later
}
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
}
}
}

// log if you see them
for _, warnParam := range logIfYouSeeThemParams {
if _, exists := params[warnParam]; exists {
logger.WarnWithCtxAndThrottling(ctx, "terms", warnParam, "we didn't expect %s in Terms params %v", warnParam, params)
}
}

return nil
}
11 changes: 9 additions & 2 deletions quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
InvalidExpr = Expr(nil)
TrueExpr = NewLiteral(true)
FalseExpr = NewLiteral(false)
NullExpr = NewLiteral("NULL")
)

// ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go
Expand Down Expand Up @@ -129,8 +130,14 @@ func NewLiteral(value any) LiteralExpr {
return LiteralExpr{Value: value}
}

func NewLiteralSingleQuoted(value string) LiteralExpr {
return LiteralExpr{Value: fmt.Sprintf("'%s'", value)}
// NewLiteralSingleQuoteString simply does: string -> 'string', anything_else -> anything_else
func NewLiteralSingleQuoteString(value any) LiteralExpr {
switch v := value.(type) {
case string:
return LiteralExpr{Value: fmt.Sprintf("'%s'", v)}
default:
return LiteralExpr{Value: v}
}
}

// DistinctExpr is a representation of DISTINCT keyword in SQL, e.g. `SELECT DISTINCT` ... or `SELECT COUNT(DISTINCT ...)`
Expand Down
21 changes: 19 additions & 2 deletions quesma/model/expr_string_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"quesma/logger"
"quesma/quesma/types"
"quesma/util"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -66,7 +67,12 @@ func (v *renderer) VisitFunction(e FunctionExpr) interface{} {
}

func (v *renderer) VisitLiteral(l LiteralExpr) interface{} {
return fmt.Sprintf("%v", l.Value)
switch val := l.Value.(type) {
case string:
return escapeString(val)
default:
return fmt.Sprintf("%v", val)
}
}

func (v *renderer) VisitTuple(t TupleExpr) interface{} {
Expand Down Expand Up @@ -101,7 +107,7 @@ func (v *renderer) VisitInfix(e InfixExpr) interface{} {
// I think in the future every infix op should be in braces.
if strings.HasPrefix(e.Op, "_") || e.Op == "AND" || e.Op == "OR" {
return fmt.Sprintf("(%v %v %v)", lhs, e.Op, rhs)
} else if strings.Contains(e.Op, "LIKE") || e.Op == "IS" || e.Op == "IN" || e.Op == "REGEXP" || strings.Contains(e.Op, "UNION") {
} else if strings.Contains(e.Op, "LIKE") || e.Op == "IS" || e.Op == "IN" || e.Op == "NOT IN" || e.Op == "REGEXP" || strings.Contains(e.Op, "UNION") {
return fmt.Sprintf("%v %v %v", lhs, e.Op, rhs)
} else {
return fmt.Sprintf("%v%v%v", lhs, e.Op, rhs)
Expand Down Expand Up @@ -349,3 +355,14 @@ func (v *renderer) VisitJoinExpr(j JoinExpr) interface{} {
func (v *renderer) VisitCTE(c CTE) interface{} {
return fmt.Sprintf("%s AS (%s) ", c.Name, AsString(c.SelectCommand))
}

// escapeString escapes the given string so that it can be used in a SQL Clickhouse query.
// It escapes ' and \ characters: ' -> \', \ -> \\.
func escapeString(s string) string {
s = strings.ReplaceAll(s, `\`, `\\`) // \ should be escaped with no exceptions
if len(s) >= 2 && s[0] == '\'' && s[len(s)-1] == '\'' {
// don't escape the first and last '
return util.SingleQuote(strings.ReplaceAll(s[1:len(s)-1], `'`, `\'`))
}
return strings.ReplaceAll(s, `'`, `\'`)
}
Loading

0 comments on commit af7b422

Please sign in to comment.