diff --git a/NOTICE.MD b/NOTICE.MD index d683c9264..08d966178 100644 --- a/NOTICE.MD +++ b/NOTICE.MD @@ -287,6 +287,45 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +#### Module : github.com/H0llyW00dzZ/cidr +Version : v1.2.1 +Time : 2024-03-27T02:04:51Z +Licence : BSD-3-Clause + +Contents of probable licence file $GOMODCACHE/github.com/!h0lly!w00dz!z/cidr@v1.2.1/LICENSE: + +BSD 3-Clause License + +Copyright (c) 2024, H0llyW00dzZ +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- #### Module : github.com/antlr4-go/antlr/v4 Version : v4.13.1 @@ -1564,11 +1603,11 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -------------------------------------------------------------------------------- #### Module : github.com/jackc/pgx/v5 -Version : v5.7.1 -Time : 2024-09-10T12:25:07Z +Version : v5.7.2 +Time : 2024-12-21T15:25:36Z Licence : MIT -Contents of probable licence file $GOMODCACHE/github.com/jackc/pgx/v5@v5.7.1/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/jackc/pgx/v5@v5.7.2/LICENSE: Copyright (c) 2013-2021 Jack Christensen diff --git a/quesma/quesma/elastic_http_frontend_connector.go b/quesma/frontend_connectors/elastic_http_frontend_connector.go similarity index 56% rename from quesma/quesma/elastic_http_frontend_connector.go rename to quesma/frontend_connectors/elastic_http_frontend_connector.go index 62f24ef81..ba41ffc1c 100644 --- a/quesma/quesma/elastic_http_frontend_connector.go +++ b/quesma/frontend_connectors/elastic_http_frontend_connector.go @@ -1,25 +1,19 @@ // Copyright Quesma, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -package quesma +package frontend_connectors import ( "context" "net/http" "quesma/clickhouse" - "quesma/frontend_connectors" "quesma/quesma/config" "quesma/schema" quesma_api "quesma_v2/core" - "quesma_v2/core/diag" ) type ElasticHttpIngestFrontendConnector struct { - *frontend_connectors.BasicHTTPFrontendConnector - - Config *config.QuesmaConfiguration - - phoneHomeClient diag.PhoneHomeClient + *BasicHTTPFrontendConnector } func NewElasticHttpIngestFrontendConnector(endpoint string, @@ -28,7 +22,7 @@ func NewElasticHttpIngestFrontendConnector(endpoint string, config *config.QuesmaConfiguration, router quesma_api.Router) *ElasticHttpIngestFrontendConnector { fc := &ElasticHttpIngestFrontendConnector{ - BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint, config), + BasicHTTPFrontendConnector: NewBasicHTTPFrontendConnector(endpoint, config), } fallback := func(ctx context.Context, req *quesma_api.Request, writer http.ResponseWriter) (*quesma_api.Result, error) { fc.BasicHTTPFrontendConnector.GetRouterInstance().ElasticFallback(req.Decision, ctx, writer, req.OriginalRequest, []byte(req.Body), logManager, registry) @@ -41,23 +35,8 @@ func NewElasticHttpIngestFrontendConnector(endpoint string, return fc } -func (h *ElasticHttpIngestFrontendConnector) GetChildComponents() []interface{} { - components := make([]interface{}, 0) - if h.BasicHTTPFrontendConnector != nil { - components = append(components, h.BasicHTTPFrontendConnector) - } - - return components -} - -func (h *ElasticHttpIngestFrontendConnector) SetDependencies(deps quesma_api.Dependencies) { - h.phoneHomeClient = deps.PhoneHomeAgent() -} - type ElasticHttpQueryFrontendConnector struct { - *frontend_connectors.BasicHTTPFrontendConnector - - phoneHomeClient diag.PhoneHomeClient + *BasicHTTPFrontendConnector } func NewElasticHttpQueryFrontendConnector(endpoint string, @@ -66,7 +45,7 @@ func NewElasticHttpQueryFrontendConnector(endpoint string, config *config.QuesmaConfiguration, router quesma_api.Router) *ElasticHttpIngestFrontendConnector { fc := &ElasticHttpIngestFrontendConnector{ - BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint, config), + BasicHTTPFrontendConnector: NewBasicHTTPFrontendConnector(endpoint, config), } fallback := func(ctx context.Context, req *quesma_api.Request, writer http.ResponseWriter) (*quesma_api.Result, error) { fc.BasicHTTPFrontendConnector.GetRouterInstance().ElasticFallback(req.Decision, ctx, writer, req.OriginalRequest, []byte(req.Body), logManager, registry) @@ -76,15 +55,3 @@ func NewElasticHttpQueryFrontendConnector(endpoint string, fc.AddRouter(router) return fc } - -func (h *ElasticHttpQueryFrontendConnector) GetChildComponents() []interface{} { - components := make([]interface{}, 0) - if h.BasicHTTPFrontendConnector != nil { - components = append(components, h.BasicHTTPFrontendConnector) - } - return components -} - -func (h *ElasticHttpQueryFrontendConnector) SetDependencies(deps quesma_api.Dependencies) { - h.phoneHomeClient = deps.PhoneHomeAgent() -} diff --git a/quesma/go.mod b/quesma/go.mod index 2cb0924aa..055da1e49 100644 --- a/quesma/go.mod +++ b/quesma/go.mod @@ -19,7 +19,7 @@ require ( github.com/gorilla/sessions v1.4.0 github.com/hashicorp/go-multierror v1.1.1 github.com/jackc/pgx/v4 v4.18.3 - github.com/jackc/pgx/v5 v5.7.1 + github.com/jackc/pgx/v5 v5.7.2 github.com/k0kubun/pp v3.0.1+incompatible github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 @@ -40,6 +40,7 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/H0llyW00dzZ/cidr v1.2.1 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect diff --git a/quesma/go.sum b/quesma/go.sum index 5e696e0a3..39f2d9293 100644 --- a/quesma/go.sum +++ b/quesma/go.sum @@ -7,6 +7,8 @@ github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/H0llyW00dzZ/cidr v1.2.1 h1:DfRHX+RqVVKZijQGO1aJSaWvN9Saan8sycK/4wrfY5g= +github.com/H0llyW00dzZ/cidr v1.2.1/go.mod h1:S+EgYkMandSAN27mGNG/CB3jeoXDAyalsvvVFpWdnXc= github.com/DataDog/go-sqllexer v0.0.18 h1:ErBvoO7/srJLdA2ebwd+HPqD4g1kN++BP64A8qvmh9U= github.com/DataDog/go-sqllexer v0.0.18/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= @@ -116,8 +118,8 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= -github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= -github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= +github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= diff --git a/quesma/logger/log_with_throttling.go b/quesma/logger/log_with_throttling.go index 5b24de79a..4d920ca9f 100644 --- a/quesma/logger/log_with_throttling.go +++ b/quesma/logger/log_with_throttling.go @@ -10,7 +10,7 @@ import ( // throttleMap: (reason name -> last logged time) // We log only once per throttleDuration for each reason name, so that we don't spam the logs. -var throttleMap = util.SyncMap[string, time.Time]{} +var throttleMap = util.NewSyncMap[string, time.Time]() const throttleDuration = 30 * time.Minute diff --git a/quesma/main_test.go b/quesma/main_test.go index 03c6e507b..fc8fdd0d0 100644 --- a/quesma/main_test.go +++ b/quesma/main_test.go @@ -25,14 +25,20 @@ func Test_Main(m *testing.T) { _ = buildIngestOnlyQuesma() } -func emitRequests(stop chan os.Signal) { +func emitRequests(stop chan os.Signal, t *testing.T, testData []struct { + url string + expectedResponse string +}) { go func() { time.Sleep(1 * time.Second) requestBody := []byte(`{"query": {"match_all": {}}}`) - sendRequest("http://localhost:8888/_bulk", requestBody) - sendRequest("http://localhost:8888/_doc", requestBody) - sendRequest("http://localhost:8888/_search", requestBody) - sendRequest("http://localhost:8888/_search", requestBody) + var resp string + var err error + for _, test := range testData { + resp, err = sendRequest(test.url, requestBody) + assert.NoError(t, err) + assert.Contains(t, test.expectedResponse, resp) + } signal.Notify(stop, os.Interrupt, syscall.SIGTERM) close(stop) }() @@ -144,7 +150,16 @@ func Test_fallbackScenario(t *testing.T) { q1, _ := qBuilder.Build() q1.Start() stop := make(chan os.Signal, 1) - emitRequests(stop) + testData := []struct { + url string + expectedResponse string + }{ + {"http://localhost:8888/_bulk", "unknown\n"}, + {"http://localhost:8888/_doc", "unknown\n"}, + {"http://localhost:8888/_search", "unknown\n"}, + {"http://localhost:8888/_search", "unknown\n"}, + } + emitRequests(stop, t, testData) <-stop q1.Stop(context.Background()) atomic.LoadInt32(&fallbackCalled) @@ -155,7 +170,20 @@ func Test_scenario1(t *testing.T) { q1 := ab_testing_scenario() q1.Start() stop := make(chan os.Signal, 1) - emitRequests(stop) + testData := []struct { + url string + expectedResponse string + }{ + {"http://localhost:8888/_bulk", `bulk->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor +bulk->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor +`}, + {"http://localhost:8888/_doc", `doc->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor +doc->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor +`}, + {"http://localhost:8888/_search", "ABTestProcessor processor: Responses are equal\n"}, + {"http://localhost:8888/_search", "ABTestProcessor processor: Responses are not equal\n"}, + } + emitRequests(stop, t, testData) <-stop q1.Stop(context.Background()) } @@ -215,7 +243,17 @@ func Test_middleware(t *testing.T) { quesmaBuilder.Build() quesmaBuilder.Start() stop := make(chan os.Signal, 1) - emitRequests(stop) + testData := []struct { + url string + expectedResponse string + }{ + {"http://localhost:8888/_bulk", "middleware\n"}, + {"http://localhost:8888/_doc", "middleware\n"}, + {"http://localhost:8888/_search", "middleware\n"}, + {"http://localhost:8888/_search", "middleware\n"}, + } + emitRequests(stop, t, testData) + <-stop quesmaBuilder.Stop(context.Background()) atomic.LoadInt32(&middlewareCallCount) @@ -227,7 +265,16 @@ func Test_middleware(t *testing.T) { quesmaBuilder.Build() quesmaBuilder.Start() stop := make(chan os.Signal, 1) - emitRequests(stop) + testData := []struct { + url string + expectedResponse string + }{ + {"http://localhost:8888/_bulk", "middleware\n"}, + {"http://localhost:8888/_doc", "middleware\n"}, + {"http://localhost:8888/_search", "middleware\n"}, + {"http://localhost:8888/_search", "middleware\n"}, + } + emitRequests(stop, t, testData) <-stop quesmaBuilder.Stop(context.Background()) atomic.LoadInt32(&middlewareCallCount) diff --git a/quesma/model/bucket_aggregations/ip_range.go b/quesma/model/bucket_aggregations/ip_range.go new file mode 100644 index 000000000..e34bdbd1f --- /dev/null +++ b/quesma/model/bucket_aggregations/ip_range.go @@ -0,0 +1,185 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package bucket_aggregations + +import ( + "context" + "fmt" + "quesma/logger" + "quesma/model" + "reflect" +) + +// BiggestIpv4 is "255.255.255.255 + 1", so to say. Used in Elastic, because it always uses exclusive upper bounds. +// So instead of "<= 255.255.255.255", it uses "< ::1:0:0:0" +const BiggestIpv4 = "::1:0:0:0" + +// Current limitation: we expect Clickhouse field to be IPv4 (and not IPv6) + +// Clickhouse table to test SQLs: +// CREATE TABLE __quesma_table_name (clientip IPv4) ENGINE=Log +// INSERT INTO __quesma_table_name VALUES ('0.0.0.0'), ('5.5.5.5'), ('90.180.90.180'), ('128.200.0.8'), ('192.168.1.67'), ('222.168.22.67') + +// TODO make part of QueryType interface and implement for all aggregations +// TODO add bad requests to tests +// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%) +func CheckParamsIpRange(ctx context.Context, paramsRaw any) error { + requiredParams := map[string]string{ + "field": "string", + "ranges": "map_todo_improve_this_check", // TODO should add same type check to this 'ranges' field, will be fixed + } + optionalParams := map[string]string{ + "keyed": "bool", + } + + params, ok := paramsRaw.(model.JsonMap) + if !ok { + return fmt.Errorf("params is not a map, but %+v", paramsRaw) + } + + // check if required are present + for paramName, paramType := range requiredParams { + paramVal, exists := params[paramName] + if !exists { + return fmt.Errorf("required parameter %s not found in params", paramName) + } + if paramType == "map_todo_improve_this_check" { + continue // uncontinue after TODO is fixed + } + if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here + return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal) + } + } + + // check if only required/optional are present + for paramName := range params { + if _, isRequired := requiredParams[paramName]; !isRequired { + wantedType, isOptional := optionalParams[paramName] + if !isOptional { + return fmt.Errorf("unexpected parameter %s found in IP Range params %v", paramName, params) + } + if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here + return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName]) + } + } + } + + return nil +} + +type ( + IpRange struct { + ctx context.Context + field model.Expr + intervals []IpInterval + keyed bool + } + IpInterval struct { + begin string + end string + key *string // when nil, key is not present + } +) + +func NewIpRange(ctx context.Context, intervals []IpInterval, field model.Expr, keyed bool) *IpRange { + return &IpRange{ + ctx: ctx, + field: field, + intervals: intervals, + keyed: keyed, + } +} + +func NewIpInterval(begin, end string, key *string) IpInterval { + return IpInterval{begin: begin, end: end, key: key} +} + +func (interval IpInterval) ToWhereClause(field model.Expr) model.Expr { + isBegin := interval.begin != UnboundedInterval + isEnd := interval.end != UnboundedInterval && interval.end != BiggestIpv4 + + begin := model.NewInfixExpr(field, ">=", model.NewLiteralSingleQuoteString(interval.begin)) + end := model.NewInfixExpr(field, "<", model.NewLiteralSingleQuoteString(interval.end)) + + if isBegin && isEnd { + return model.NewInfixExpr(begin, "AND", end) + } else if isBegin { + return begin + } else if isEnd { + return end + } else { + return model.TrueExpr + } +} + +// String returns key part of the response, e.g. "1.0-2.0", or "*-6.55" +func (interval IpInterval) String() string { + if interval.key != nil { + return *interval.key + } + return fmt.Sprintf("%s-%s", interval.begin, interval.end) +} + +func (query *IpRange) AggregationType() model.AggregationType { + return model.BucketAggregation +} + +func (query *IpRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap { + return nil +} + +func (query *IpRange) String() string { + return "ip_range" +} + +func (query *IpRange) DoesNotHaveGroupBy() bool { + return true +} + +func (query *IpRange) CombinatorGroups() (result []CombinatorGroup) { + for intervalIdx, interval := range query.intervals { + prefix := fmt.Sprintf("range_%d__", intervalIdx) + if len(query.intervals) == 1 { + prefix = "" + } + result = append(result, CombinatorGroup{ + idx: intervalIdx, + Prefix: prefix, + Key: interval.String(), + WhereClause: interval.ToWhereClause(query.field), + }) + } + return +} + +// bad requests: both to/from and mask + +func (query *IpRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorGroup, rows []model.QueryResultRow) model.JsonMap { + if len(rows) == 0 || len(rows[0].Cols) == 0 { + logger.ErrorWithCtx(query.ctx).Msgf("need at least one row and column in ip_range aggregation response, rows: %d, cols: %d", len(rows), len(rows[0].Cols)) + return model.JsonMap{} + } + count := rows[0].Cols[len(rows[0].Cols)-1].Value + response := model.JsonMap{ + "key": subGroup.Key, + "doc_count": count, + } + + interval := query.intervals[subGroup.idx] + if interval.begin != UnboundedInterval { + response["from"] = interval.begin + } + if interval.end != UnboundedInterval { + response["to"] = interval.end + } + + return response +} + +func (query *IpRange) CombinatorSplit() []model.QueryType { + result := make([]model.QueryType, 0, len(query.intervals)) + for _, interval := range query.intervals { + result = append(result, NewIpRange(query.ctx, []IpInterval{interval}, query.field, query.keyed)) + } + return result +} diff --git a/quesma/model/bucket_aggregations/terms.go b/quesma/model/bucket_aggregations/terms.go index 14a50a308..2c62ec6b2 100644 --- a/quesma/model/bucket_aggregations/terms.go +++ b/quesma/model/bucket_aggregations/terms.go @@ -4,20 +4,32 @@ package bucket_aggregations import ( "context" + "fmt" "quesma/logger" "quesma/model" "quesma/util" + "quesma/util/regex" + "reflect" ) // TODO when adding include/exclude, check escaping of ' and \ in those fields type Terms struct { ctx context.Context significant bool // true <=> significant_terms, false <=> terms - OrderByExpr model.Expr + // include is either: + // - single value: then for strings, it can be a regex. + // - array: then field must match exactly one of the values (never a regex) + // Nil if missing in request. + include any + // exclude is either: + // - single value: then for strings, it can be a regex. + // - array: then field must match exactly one of the values (never a regex) + // Nil if missing in request. + exclude any } -func NewTerms(ctx context.Context, significant bool, orderByExpr model.Expr) Terms { - return Terms{ctx: ctx, significant: significant, OrderByExpr: orderByExpr} +func NewTerms(ctx context.Context, significant bool, include, exclude any) Terms { + return Terms{ctx: ctx, significant: significant, include: include, exclude: exclude} } func (query Terms) AggregationType() model.AggregationType { @@ -107,3 +119,104 @@ func (query Terms) key(row model.QueryResultRow) any { func (query Terms) parentCount(row model.QueryResultRow) any { return row.Cols[len(row.Cols)-3].Value } + +func (query Terms) UpdateFieldForIncludeAndExclude(field model.Expr) (updatedField model.Expr, didWeUpdateField bool) { + // We'll use here everywhere Clickhouse 'if' function: if(condition, then, else) + // In our case field becomes: if(condition that field is not excluded, field, NULL) + ifOrNull := func(condition model.Expr) model.FunctionExpr { + return model.NewFunction("if", condition, field, model.NullExpr) + } + + hasExclude := query.exclude != nil + excludeArr, excludeIsArray := query.exclude.([]any) + switch { + case hasExclude && excludeIsArray: + if len(excludeArr) == 0 { + return field, false + } + + // Select expr will be: if(field NOT IN (excludeArr[0], excludeArr[1], ...), field, NULL) + exprs := make([]model.Expr, 0, len(excludeArr)) + for _, excludeVal := range excludeArr { + exprs = append(exprs, model.NewLiteralSingleQuoteString(excludeVal)) + } + return ifOrNull(model.NewInfixExpr(field, "NOT IN", model.NewTupleExpr(exprs...))), true + case hasExclude: + switch exclude := query.exclude.(type) { + case string: // hard case, might be regex + funcName, patternExpr := regex.ToClickhouseExpr(exclude) + return ifOrNull(model.NewInfixExpr(field, "NOT "+funcName, patternExpr)), true + default: // easy case, never regex + return ifOrNull(model.NewInfixExpr(field, "!=", model.NewLiteral(query.exclude))), true + } + + default: + return field, false // TODO implement similar support for 'include' in next PR + } +} + +// TODO make part of QueryType interface and implement for all aggregations +// TODO add bad requests to tests +// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%) +func CheckParamsTerms(ctx context.Context, paramsRaw any) error { + requiredParams := map[string]string{"field": "string"} + optionalParams := map[string]string{ + "size": "float64|string", // TODO should be int|string, will be fixed + "shard_size": "float64", // TODO should be int, will be fixed + "order": "order", // TODO add order type + "min_doc_count": "float64", // TODO should be int, will be fixed + "shard_min_doc_count": "float64", // TODO should be int, will be fixed + "show_term_doc_count_error": "bool", + "exclude": "not-checking-type-now-complicated", + "include": "not-checking-type-now-complicated", + "collect_mode": "string", + "execution_hint": "string", + "missing": "string", + "value_type": "string", + } + logIfYouSeeThemParams := []string{ + "shard_size", "min_doc_count", "shard_min_doc_count", + "show_term_doc_count_error", "collect_mode", "execution_hint", "value_type", + } + + params, ok := paramsRaw.(model.JsonMap) + if !ok { + return fmt.Errorf("params is not a map, but %+v", paramsRaw) + } + + // check if required are present + for paramName, paramType := range requiredParams { + paramVal, exists := params[paramName] + if !exists { + return fmt.Errorf("required parameter %s not found in Terms params", paramName) + } + if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here + return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal) + } + } + + // check if only required/optional are present + for paramName := range params { + if _, isRequired := requiredParams[paramName]; !isRequired { + wantedType, isOptional := optionalParams[paramName] + if !isOptional { + return fmt.Errorf("unexpected parameter %s found in Terms params %v", paramName, params) + } + if wantedType == "not-checking-type-now-complicated" || wantedType == "order" || wantedType == "float64|string" { + continue // TODO: add that later + } + if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here + return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName]) + } + } + } + + // log if you see them + for _, warnParam := range logIfYouSeeThemParams { + if _, exists := params[warnParam]; exists { + logger.WarnWithCtxAndThrottling(ctx, "terms", warnParam, "we didn't expect %s in Terms params %v", warnParam, params) + } + } + + return nil +} diff --git a/quesma/model/expr.go b/quesma/model/expr.go index ff7b1242a..3ad5b330e 100644 --- a/quesma/model/expr.go +++ b/quesma/model/expr.go @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Elastic-2.0 package model -import "strconv" +import ( + "fmt" + "strconv" +) // Expr is a generic representation of an expression which is a part of the SQL query. type Expr interface { @@ -12,6 +15,8 @@ type Expr interface { var ( InvalidExpr = Expr(nil) TrueExpr = NewLiteral(true) + FalseExpr = NewLiteral(false) + NullExpr = NewLiteral("NULL") ) // ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go @@ -125,6 +130,16 @@ func NewLiteral(value any) LiteralExpr { return LiteralExpr{Value: value} } +// NewLiteralSingleQuoteString simply does: string -> 'string', anything_else -> anything_else +func NewLiteralSingleQuoteString(value any) LiteralExpr { + switch v := value.(type) { + case string: + return LiteralExpr{Value: fmt.Sprintf("'%s'", v)} + default: + return LiteralExpr{Value: v} + } +} + // DistinctExpr is a representation of DISTINCT keyword in SQL, e.g. `SELECT DISTINCT` ... or `SELECT COUNT(DISTINCT ...)` type DistinctExpr struct { Expr Expr @@ -290,7 +305,7 @@ func (e CTE) Accept(v ExprVisitor) interface{} { type ExprVisitor interface { VisitFunction(e FunctionExpr) interface{} VisitLiteral(l LiteralExpr) interface{} - VisitTuple(e TupleExpr) interface{} + VisitTuple(t TupleExpr) interface{} VisitInfix(e InfixExpr) interface{} VisitColumnRef(e ColumnRef) interface{} VisitPrefixExpr(e PrefixExpr) interface{} diff --git a/quesma/model/expr_string_renderer.go b/quesma/model/expr_string_renderer.go index 304959469..e7b578270 100644 --- a/quesma/model/expr_string_renderer.go +++ b/quesma/model/expr_string_renderer.go @@ -91,6 +91,22 @@ func (v *renderer) VisitTuple(t TupleExpr) interface{} { } } +func (v *renderer) VisitTuple(t TupleExpr) interface{} { + switch len(t.Exprs) { + case 0: + logger.WarnWithThrottling("VisitTuple", "TupleExpr with no expressions") + return "()" + case 1: + return t.Exprs[0].Accept(v) + default: + args := make([]string, len(t.Exprs)) + for i, arg := range t.Exprs { + args[i] = arg.Accept(v).(string) + } + return fmt.Sprintf("tuple(%s)", strings.Join(args, ", ")) // can omit "tuple", but I think SQL's more readable with it + } +} + func (v *renderer) VisitInfix(e InfixExpr) interface{} { var lhs, rhs interface{} // TODO FOR NOW LITTLE PARANOID BUT HELPS ME NOT SEE MANY PANICS WHEN TESTING if e.Left != nil { @@ -107,7 +123,7 @@ func (v *renderer) VisitInfix(e InfixExpr) interface{} { // I think in the future every infix op should be in braces. if strings.HasPrefix(e.Op, "_") || e.Op == "AND" || e.Op == "OR" { return fmt.Sprintf("(%v %v %v)", lhs, e.Op, rhs) - } else if strings.Contains(e.Op, "LIKE") || e.Op == "IS" || e.Op == "IN" || e.Op == "REGEXP" || strings.Contains(e.Op, "UNION") { + } else if strings.Contains(e.Op, "LIKE") || e.Op == "IS" || e.Op == "IN" || e.Op == "NOT IN" || e.Op == "REGEXP" || strings.Contains(e.Op, "UNION") { return fmt.Sprintf("%v %v %v", lhs, e.Op, rhs) } else { return fmt.Sprintf("%v%v%v", lhs, e.Op, rhs) diff --git a/quesma/model/search_after_strategy.go b/quesma/model/search_after_strategy.go new file mode 100644 index 000000000..f94d607a8 --- /dev/null +++ b/quesma/model/search_after_strategy.go @@ -0,0 +1,26 @@ +package model + +import ( + "quesma/schema" +) + +type ( + SearchAfterStrategy interface { + // ValidateAndParse validates the 'searchAfter', which is what came from the request's search_after field. + ValidateAndParse(query *Query, indexSchema schema.Schema) (searchAfterParamParsed []Expr, err error) + TransformQuery(query *Query, searchAfterParameterParsed []Expr) (*Query, error) + TransformHit(hit SearchHit) (SearchHit, error) + } + SearchAfterStrategyType int +) + +const ( + BasicAndFast SearchAfterStrategyType = iota + Bulletproof + JustDiscardTheParameter + DefaultSearchAfterStrategy = Bulletproof +) + +func (s SearchAfterStrategyType) String() string { + return []string{"BasicAndFast", "Bulletproof", "JustDiscardTheParameter"}[s] +} diff --git a/quesma/model/simple_query.go b/quesma/model/simple_query.go index 829759941..0265fa0f9 100644 --- a/quesma/model/simple_query.go +++ b/quesma/model/simple_query.go @@ -28,6 +28,10 @@ func NewSimpleQuery(whereClause Expr, canParse bool) SimpleQuery { return SimpleQuery{WhereClause: whereClause, CanParse: canParse} } +func NewSimpleQueryInvalid() SimpleQuery { + return SimpleQuery{CanParse: false} +} + // LimitForCount returns (limit, true) if we need count(*) with limit, // (not-important, false) if we don't need count/limit func (s *SimpleQuery) LimitForCount() (limit int, doWeNeedLimit bool) { diff --git a/quesma/processors/ab_test_processor.go b/quesma/processors/ab_test_processor.go index 260d242f9..c2ac82ab5 100644 --- a/quesma/processors/ab_test_processor.go +++ b/quesma/processors/ab_test_processor.go @@ -50,10 +50,8 @@ func (p *ABTestProcessor) compare(json1 string, json2 string) (bool, string) { diff := cmp.Diff(obj1, obj2) if diff == "" { - fmt.Println("JSON objects are equal") return true, "" } - fmt.Println("JSON objects are not equal:", diff) return false, diff } @@ -77,8 +75,6 @@ func (p *ABTestProcessor) Handle(metadata map[string]interface{}, message ...any data = append(data, strconv.Itoa(level)...) data = append(data, []byte(p.GetId())...) - data = append(data, []byte(",correlationId:")...) - data = append(data, []byte(correlationId)...) data = append(data, []byte("\n")...) } @@ -88,16 +84,12 @@ func (p *ABTestProcessor) Handle(metadata map[string]interface{}, message ...any resp := make([]byte, 0) for _, messages := range p.messageStorage { if len(messages) == 2 { - equal, diff := p.compare(string(messages[0]), string(messages[1])) + equal, _ := p.compare(string(messages[0]), string(messages[1])) if equal { - resp = append(resp, []byte("ABTestProcessor processor: Responses are equal\n\n")...) - resp = append(resp, []byte("\n")...) - resp = append(resp, []byte(diff)...) + resp = append(resp, []byte("ABTestProcessor processor: Responses are equal\n")...) } else { - resp = append(resp, []byte("ABTestProcessor processor: Responses are not equal\n\n")...) - resp = append(resp, []byte("\n")...) - resp = append(resp, []byte(diff)...) + resp = append(resp, []byte("ABTestProcessor processor: Responses are not equal\n")...) } // clean storage p.messageStorage = make(map[string][][]byte) diff --git a/quesma/queryparser/aggregation_parser.go b/quesma/queryparser/aggregation_parser.go index c50bf5b9e..75698c2c3 100644 --- a/quesma/queryparser/aggregation_parser.go +++ b/quesma/queryparser/aggregation_parser.go @@ -291,6 +291,16 @@ func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldNa return defaultValue } +func (cw *ClickhouseQueryTranslator) parseStringFieldExistCheck(queryMap QueryMap, fieldName string) (value string, exists bool) { + if valueRaw, exists := queryMap[fieldName]; exists { + if asString, ok := valueRaw.(string); ok { + return asString, true + } + logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a string, but %T, value: %v", fieldName, valueRaw, valueRaw) + } + return "", false +} + func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldName string) ([]any, error) { if valueRaw, exists := queryMap[fieldName]; exists { if asArray, ok := valueRaw.([]any); ok { diff --git a/quesma/queryparser/pancake_aggregation_parser_buckets.go b/quesma/queryparser/pancake_aggregation_parser_buckets.go index 33257166b..4b65d0319 100644 --- a/quesma/queryparser/pancake_aggregation_parser_buckets.go +++ b/quesma/queryparser/pancake_aggregation_parser_buckets.go @@ -5,11 +5,15 @@ package queryparser import ( "fmt" + "github.com/H0llyW00dzZ/cidr" "github.com/pkg/errors" + "math" + "net" "quesma/clickhouse" "quesma/logger" "quesma/model" "quesma/model/bucket_aggregations" + "quesma/util" "sort" "strconv" "strings" @@ -37,6 +41,7 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa }}, {"multi_terms", cw.parseMultiTerms}, {"composite", cw.parseComposite}, + {"ip_range", cw.parseIpRange}, {"ip_prefix", cw.parseIpPrefix}, } @@ -147,20 +152,33 @@ func (cw *ClickhouseQueryTranslator) parseDateHistogram(aggregation *pancakeAggr // aggrName - "terms" or "significant_terms" func (cw *ClickhouseQueryTranslator) parseTermsAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap, aggrName string) error { + if err := bucket_aggregations.CheckParamsTerms(cw.Ctx, params); err != nil { + return err + } + + terms := bucket_aggregations.NewTerms( + cw.Ctx, aggrName == "significant_terms", params["include"], params["exclude"], + ) + + var didWeAddMissing, didWeUpdateFieldHere bool field := cw.parseFieldField(params, aggrName) - field, didWeAddMissing := cw.addMissingParameterIfPresent(field, params) - if !didWeAddMissing { + field, didWeAddMissing = cw.addMissingParameterIfPresent(field, params) + field, didWeUpdateFieldHere = terms.UpdateFieldForIncludeAndExclude(field) + + // If we updated above, we change our select to if(condition, field, NULL), so we also need to filter out those NULLs later + if !didWeAddMissing || didWeUpdateFieldHere { aggregation.filterOutEmptyKeyBucket = true } const defaultSize = 10 size := cw.parseSize(params, defaultSize) + orderBy, err := cw.parseOrder(params, []model.Expr{field}) if err != nil { return err } - aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, aggrName == "significant_terms", orderBy[0]) // TODO probably full, not [0] + aggregation.queryType = terms aggregation.selectedColumns = append(aggregation.selectedColumns, field) aggregation.limit = size aggregation.orderBy = orderBy @@ -382,6 +400,48 @@ func (cw *ClickhouseQueryTranslator) parseComposite(aggregation *pancakeAggregat return nil } +func (cw *ClickhouseQueryTranslator) parseIpRange(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + const defaultKeyed = false + + if err := bucket_aggregations.CheckParamsIpRange(cw.Ctx, params); err != nil { + return err + } + + rangesRaw := params["ranges"].([]any) + ranges := make([]bucket_aggregations.IpInterval, 0, len(rangesRaw)) + for _, rangeRaw := range rangesRaw { + var key *string + if keyIfPresent, exists := cw.parseStringFieldExistCheck(rangeRaw.(QueryMap), "key"); exists { + key = &keyIfPresent + } + var begin, end string + if maskIfExists, exists := cw.parseStringFieldExistCheck(rangeRaw.(QueryMap), "mask"); exists { + _, ipNet, err := net.ParseCIDR(maskIfExists) + if err != nil { + return err + } + beginAsInt, endAsInt := cidr.IPv4ToRange(ipNet) + begin = util.IntToIpv4(beginAsInt) + // endAsInt is inclusive, we do +1, because we need it exclusive + if endAsInt != math.MaxUint32 { + end = util.IntToIpv4(endAsInt + 1) + } else { + end = bucket_aggregations.BiggestIpv4 // "255.255.255.255 + 1", so to say (value in compliance with Elastic) + } + if key == nil { + key = &maskIfExists + } + } else { + begin = cw.parseStringField(rangeRaw.(QueryMap), "from", bucket_aggregations.UnboundedInterval) + end = cw.parseStringField(rangeRaw.(QueryMap), "to", bucket_aggregations.UnboundedInterval) + } + ranges = append(ranges, bucket_aggregations.NewIpInterval(begin, end, key)) + } + aggregation.isKeyed = cw.parseBoolField(params, "keyed", defaultKeyed) + aggregation.queryType = bucket_aggregations.NewIpRange(cw.Ctx, ranges, cw.parseFieldField(params, "ip_range"), aggregation.isKeyed) + return nil +} + func (cw *ClickhouseQueryTranslator) parseIpPrefix(aggregation *pancakeAggregationTreeNode, params QueryMap) error { const ( defaultIsIpv6 = false diff --git a/quesma/queryparser/query_parser.go b/quesma/queryparser/query_parser.go index 2e947a0c5..7a659de2c 100644 --- a/quesma/queryparser/query_parser.go +++ b/quesma/queryparser/query_parser.go @@ -18,6 +18,7 @@ import ( "quesma/quesma/types" "quesma/schema" "quesma/util" + "quesma/util/regex" "strconv" "strings" "unicode" @@ -302,7 +303,7 @@ func (cw *ClickhouseQueryTranslator) parseQueryMap(queryMap QueryMap) model.Simp unparsedQuery = string(prettyMarshal) } logger.Error().Msgf("can't parse query: %s", unparsedQuery) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } // `constant_score` query is just a wrapper for filter query which returns constant relevance score, which we ignore anyway @@ -311,30 +312,27 @@ func (cw *ClickhouseQueryTranslator) parseConstantScore(queryMap QueryMap) model return cw.parseBool(queryMap) } else { logger.Error().Msgf("parsing error: `constant_score` needs to wrap `filter` query") - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQuery { - var ids, finalIds []string - if val, ok := queryMap["values"]; ok { - if values, ok := val.([]interface{}); ok { - for _, id := range values { - ids = append(ids, id.(string)) - } + idsRaw, err := cw.parseArrayField(queryMap, "values") + if err != nil { + logger.ErrorWithCtx(cw.Ctx).Msgf("parsing error: %v", err) + return model.NewSimpleQueryInvalid() + } + ids := make([]string, 0, len(idsRaw)) + for _, id := range idsRaw { + if idAsString, ok := id.(string); ok { + ids = append(ids, idAsString) + } else { + logger.ErrorWithCtx(cw.Ctx).Msgf("invalid id format, id value: %v type: %T", id, id) + return model.NewSimpleQueryInvalid() } - } else { - logger.Error().Msgf("parsing error: missing mandatory `values` field") - return model.NewSimpleQuery(nil, false) } - logger.Warn().Msgf("unsupported id query executed, requested ids of [%s]", strings.Join(ids, "','")) - - timestampColumnName := model.TimestampFieldName - if len(ids) == 0 { - logger.Warn().Msgf("parsing error: empty _id array") - return model.NewSimpleQuery(nil, false) - } + logger.Warn().Msgf("unsupported id query executed, requested ids of [%s]", strings.Join(ids, "','")) // when our generated ID appears in query looks like this: `1d0b8q1` // therefore we need to strip the hex part (before `q`) and convert it to decimal @@ -343,39 +341,48 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue idInHex := strings.Split(id, "q")[0] if idAsStr, err := hex.DecodeString(idInHex); err != nil { logger.Error().Msgf("error parsing document id %s: %v", id, err) - return model.NewSimpleQuery(nil, true) + return model.NewSimpleQueryInvalid() } else { tsWithoutTZ := strings.TrimSuffix(string(idAsStr), " +0000 UTC") ids[i] = fmt.Sprintf("'%s'", tsWithoutTZ) } } - var whereStmt model.Expr // TODO replace with cw.Schema - if v, ok := cw.Table.Cols[timestampColumnName]; ok { - switch v.Type.String() { + var idToSql func(string) model.Expr + timestampColumnName := model.TimestampFieldName + if column, ok := cw.Table.Cols[timestampColumnName]; ok { + switch column.Type.String() { case clickhouse.DateTime64.String(): - for _, id := range ids { - finalIds = append(finalIds, fmt.Sprintf("toDateTime64(%s,3)", id)) - } - if len(finalIds) == 1 { - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", model.NewFunction("toDateTime64", model.NewLiteral(ids[0]), model.NewLiteral("3"))) - } else { - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " IN ", model.NewFunction("toDateTime64", model.NewLiteral(strings.Join(ids, ",")), model.NewLiteral("3"))) + idToSql = func(id string) model.Expr { + return model.NewFunction("toDateTime64", model.NewLiteral(id), model.NewLiteral(3)) } case clickhouse.DateTime.String(): - for _, id := range ids { - finalIds = append(finalIds, fmt.Sprintf("toDateTime(%s)", id)) - } - if len(finalIds) == 1 { - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", model.NewFunction("toDateTime", model.NewLiteral(finalIds[0]))) - } else { - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " IN ", model.NewFunction("toDateTime", model.NewLiteral(strings.Join(ids, ",")))) + idToSql = func(id string) model.Expr { + return model.NewFunction("toDateTime", model.NewLiteral(id)) } default: - logger.Warn().Msgf("timestamp field of unsupported type %s", v.Type.String()) - return model.NewSimpleQuery(nil, true) + logger.ErrorWithCtx(cw.Ctx).Msgf("timestamp field of unsupported type %s", column.Type.String()) + return model.NewSimpleQueryInvalid() + } + } else { + logger.ErrorWithCtx(cw.Ctx).Msgf("timestamp field %s not found in schema", timestampColumnName) + return model.NewSimpleQueryInvalid() + } + + var whereStmt model.Expr + switch len(ids) { + case 0: + whereStmt = model.FalseExpr // timestamp IN [] <=> false + case 1: + whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", idToSql(ids[0])) + default: + idsAsExprs := make([]model.Expr, len(ids)) + for i, id := range ids { + idsAsExprs[i] = idToSql(id) } + idsTuple := model.NewTupleExpr(idsAsExprs...) + whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " IN ", idsTuple) } return model.NewSimpleQuery(whereStmt, true) } @@ -481,14 +488,14 @@ func (cw *ClickhouseQueryTranslator) parseTerm(queryMap QueryMap) model.SimpleQu } } logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 term, got: %d. value: %v", len(queryMap), queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } // TODO remove optional parameters like boost func (cw *ClickhouseQueryTranslator) parseTerms(queryMap QueryMap) model.SimpleQuery { if len(queryMap) != 1 { logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 term, got: %d. value: %v", len(queryMap), queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } for k, v := range queryMap { @@ -500,7 +507,7 @@ func (cw *ClickhouseQueryTranslator) parseTerms(queryMap QueryMap) model.SimpleQ vAsArray, ok := v.([]interface{}) if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("invalid terms type: %T, value: %v", v, v) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } if len(vAsArray) == 1 { simpleStatement := model.NewInfixExpr(model.NewColumnRef(k), "=", model.NewLiteral(sprint(vAsArray[0]))) @@ -517,7 +524,7 @@ func (cw *ClickhouseQueryTranslator) parseTerms(queryMap QueryMap) model.SimpleQ // unreachable unless something really weird happens logger.ErrorWithCtx(cw.Ctx).Msg("theoretically unreachable code") - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } func (cw *ClickhouseQueryTranslator) parseMatchAll(_ QueryMap) model.SimpleQuery { @@ -537,7 +544,7 @@ func (cw *ClickhouseQueryTranslator) parseMatchAll(_ QueryMap) model.SimpleQuery func (cw *ClickhouseQueryTranslator) parseMatch(queryMap QueryMap, matchPhrase bool) model.SimpleQuery { if len(queryMap) != 1 { logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 match, got: %d. value: %v", len(queryMap), queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } for fieldName, v := range queryMap { @@ -575,7 +582,7 @@ func (cw *ClickhouseQueryTranslator) parseMatch(queryMap QueryMap, matchPhrase b // unreachable unless something really weird happens logger.ErrorWithCtx(cw.Ctx).Msg("theoretically unreachable code") - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } func (cw *ClickhouseQueryTranslator) parseMultiMatch(queryMap QueryMap) model.SimpleQuery { @@ -586,25 +593,25 @@ func (cw *ClickhouseQueryTranslator) parseMultiMatch(queryMap QueryMap) model.Si fields = cw.extractFields(fieldsAsArray) } else { logger.ErrorWithCtx(cw.Ctx).Msgf("invalid fields type: %T, value: %v", fieldsAsInterface, fieldsAsInterface) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } else { fields = []string{model.FullTextFieldNamePlaceHolder} } - alwaysFalseStmt := model.NewLiteral("false") + if len(fields) == 0 { - return model.NewSimpleQuery(alwaysFalseStmt, true) + return model.NewSimpleQuery(model.FalseExpr, true) } query, ok := queryMap["query"] if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("no query in multi_match query: %v", queryMap) - return model.NewSimpleQuery(alwaysFalseStmt, false) + return model.NewSimpleQueryInvalid() } queryAsString, ok := query.(string) if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("invalid query type: %T, value: %v", query, query) - return model.NewSimpleQuery(alwaysFalseStmt, false) + return model.NewSimpleQueryInvalid() } var subQueries []string wereDone := false @@ -637,7 +644,7 @@ func (cw *ClickhouseQueryTranslator) parseMultiMatch(queryMap QueryMap) model.Si func (cw *ClickhouseQueryTranslator) parsePrefix(queryMap QueryMap) model.SimpleQuery { if len(queryMap) != 1 { logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 prefix, got: %d. value: %v", len(queryMap), queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } for fieldName, v := range queryMap { @@ -652,13 +659,13 @@ func (cw *ClickhouseQueryTranslator) parsePrefix(queryMap QueryMap) model.Simple return model.NewSimpleQuery(simpleStat, true) default: logger.WarnWithCtx(cw.Ctx).Msgf("unsupported prefix type: %T, value: %v", v, v) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } // unreachable unless something really weird happens logger.ErrorWithCtx(cw.Ctx).Msg("theoretically unreachable code") - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } // Not supporting 'case_insensitive' (optional) @@ -667,7 +674,7 @@ func (cw *ClickhouseQueryTranslator) parsePrefix(queryMap QueryMap) model.Simple func (cw *ClickhouseQueryTranslator) parseWildcard(queryMap QueryMap) model.SimpleQuery { if len(queryMap) != 1 { logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 wildcard, got: %d. value: %v", len(queryMap), queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } for fieldName, v := range queryMap { @@ -679,21 +686,21 @@ func (cw *ClickhouseQueryTranslator) parseWildcard(queryMap QueryMap) model.Simp return model.NewSimpleQuery(whereStatement, true) } else { logger.WarnWithCtx(cw.Ctx).Msgf("invalid value type: %T, value: %v", value, value) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } else { logger.WarnWithCtx(cw.Ctx).Msgf("no value in wildcard query: %v", queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } else { logger.WarnWithCtx(cw.Ctx).Msgf("invalid wildcard type: %T, value: %v", v, v) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } // unreachable unless something really weird happens logger.ErrorWithCtx(cw.Ctx).Msg("theoretically unreachable code") - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } // This one is really complicated (https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html) @@ -719,12 +726,12 @@ func (cw *ClickhouseQueryTranslator) parseNested(queryMap QueryMap) model.Simple return cw.parseQueryMap(queryAsMap) } else { logger.WarnWithCtx(cw.Ctx).Msgf("invalid nested query type: %T, value: %v", query, query) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } } logger.WarnWithCtx(cw.Ctx).Msgf("no query in nested query: %v", queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } func (cw *ClickhouseQueryTranslator) parseDateMathExpression(expr string) (string, error) { @@ -755,7 +762,7 @@ func (cw *ClickhouseQueryTranslator) parseDateMathExpression(expr string) (strin func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQuery { if len(queryMap) != 1 { logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 range, got: %d. value: %v", len(queryMap), queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } // Maybe change to false if numeric fields exist. @@ -848,7 +855,7 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ // unreachable unless something really weird happens logger.ErrorWithCtx(cw.Ctx).Msg("theoretically unreachable code") - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } // TODO: not supported: @@ -862,7 +869,7 @@ func (cw *ClickhouseQueryTranslator) parseExists(queryMap QueryMap) model.Simple fieldName, ok := v.(string) if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("invalid exists type: %T, value: %v", v, v) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } sql = model.NewInfixExpr(model.NewColumnRef(fieldName), "IS", model.NewLiteral("NOT NULL")) @@ -877,62 +884,37 @@ func (cw *ClickhouseQueryTranslator) parseExists(queryMap QueryMap) model.Simple func (cw *ClickhouseQueryTranslator) parseRegexp(queryMap QueryMap) (result model.SimpleQuery) { if len(queryMap) != 1 { logger.WarnWithCtx(cw.Ctx).Msgf("we expect only 1 regexp, got: %d. value: %v", len(queryMap), queryMap) - return + return model.NewSimpleQueryInvalid() } - // really simple == (out of all special characters, only . and .* may be present) - isPatternReallySimple := func(pattern string) bool { - // any special characters excluding . and * not allowed. Also (not the most important check) * can't be first character. - if strings.ContainsAny(pattern, `?+|{}[]()"\`) || (len(pattern) > 0 && pattern[0] == '*') { - return false - } - // .* allowed, but [any other char]* - not - for i, char := range pattern[1:] { - if char == '*' && pattern[i] != '.' { - return false - } - } - return true - } - - for fieldName, parametersRaw := range queryMap { - parameters, ok := parametersRaw.(QueryMap) + for fieldName, paramsRaw := range queryMap { + params, ok := paramsRaw.(QueryMap) if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("invalid regexp parameters type: %T, value: %v", parametersRaw, parametersRaw) - return + logger.WarnWithCtx(cw.Ctx).Msgf("invalid regexp parameters type: %T, value: %v", paramsRaw, paramsRaw) + return model.NewSimpleQueryInvalid() } - patternRaw, exists := parameters["value"] + patternRaw, exists := params["value"] if !exists { logger.WarnWithCtx(cw.Ctx).Msgf("no value in regexp query: %v", queryMap) - return + return model.NewSimpleQueryInvalid() } pattern, ok := patternRaw.(string) if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("invalid pattern type: %T, value: %v", patternRaw, patternRaw) - return + return model.NewSimpleQueryInvalid() } - if len(parameters) > 1 { - logger.WarnWithCtx(cw.Ctx).Msgf("unsupported regexp parameters: %v", parameters) + if len(params) > 1 { + logger.WarnWithCtx(cw.Ctx).Msgf("unsupported regexp parameters: %v", params) } - var funcName string - if isPatternReallySimple(pattern) { - // We'll escape this _ twice (first one here, second one in renderer, where we escape all \) - // But it's not a problem for Clickhouse! So it seems fine. - pattern = strings.ReplaceAll(pattern, "_", `\_`) - pattern = strings.ReplaceAll(pattern, ".*", "%") - pattern = strings.ReplaceAll(pattern, ".", "_") - funcName = "LIKE" - } else { // this Clickhouse function is much slower, so we use it only for complex regexps - funcName = "REGEXP" - } - return model.NewSimpleQuery( - model.NewInfixExpr(model.NewColumnRef(fieldName), funcName, model.NewLiteral(util.SingleQuote(pattern))), true) + clickhouseFuncName, patternExpr := regex.ToClickhouseExpr(pattern) + clickhouseExpr := model.NewInfixExpr(model.NewColumnRef(fieldName), clickhouseFuncName, patternExpr) + return model.NewSimpleQuery(clickhouseExpr, true) } logger.ErrorWithCtx(cw.Ctx).Msg("parseRegexp: theoretically unreachable code") - return + return model.NewSimpleQueryInvalid() } func (cw *ClickhouseQueryTranslator) extractFields(fields []interface{}) []string { @@ -1248,7 +1230,7 @@ func (cw *ClickhouseQueryTranslator) parseGeoBoundingBox(queryMap QueryMap) mode } } else { logger.WarnWithCtx(cw.Ctx).Msgf("no bottom_right in geo_bounding_box query: %v", queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } if topLeft, ok := v.(QueryMap)["top_left"]; ok { if topLeftCornerAsArray, ok := topLeft.([]interface{}); ok { @@ -1257,7 +1239,7 @@ func (cw *ClickhouseQueryTranslator) parseGeoBoundingBox(queryMap QueryMap) mode } } else { logger.WarnWithCtx(cw.Ctx).Msgf("no top_left in geo_bounding_box query: %v", queryMap) - return model.NewSimpleQuery(nil, false) + return model.NewSimpleQueryInvalid() } args := make([]model.Expr, 0) args = append(args, model.NewColumnRef(field)) diff --git a/quesma/queryparser/query_parser_test.go b/quesma/queryparser/query_parser_test.go index b857a8574..bd6322898 100644 --- a/quesma/queryparser/query_parser_test.go +++ b/quesma/queryparser/query_parser_test.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "quesma/clickhouse" + "quesma/logger" "quesma/model" "quesma/model/typical_queries" "quesma/persistence" @@ -27,6 +28,7 @@ import ( // what should be? According to docs, I think so... Maybe test in Kibana? // OK, Kibana disagrees, it is indeed wrong. func TestQueryParserStringAttrConfig(t *testing.T) { + logger.InitSimpleLoggerForTestsWarnLevel() tableName := "logs-generic-default" table, err := clickhouse.NewTable(`CREATE TABLE `+tableName+` ( "message" String, "@timestamp" DateTime64(3, 'UTC'), "attributes_values" Map(String,String)) diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index 32594025a..81a9e7cca 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -4,11 +4,11 @@ package quesma import ( "context" - "errors" "net/http" "quesma/ab_testing" "quesma/clickhouse" "quesma/elasticsearch" + "quesma/frontend_connectors" "quesma/ingest" "quesma/logger" "quesma/queryparser" @@ -50,7 +50,7 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http. } type dualWriteHttpProxyV2 struct { - routingHttpServer *http.Server + quesmaV2 quesma_api.QuesmaBuilder indexManagement elasticsearch.IndexManagement logManager *clickhouse.LogManager publicPort util.Port @@ -79,10 +79,10 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic ingestRouter := ConfigureIngestRouterV2(config, dependencies, ingestProcessor, resolver) searchRouter := ConfigureSearchRouterV2(config, dependencies, registry, logManager, queryProcessor, resolver) - elasticHttpIngestFrontendConnector := NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), + elasticHttpIngestFrontendConnector := frontend_connectors.NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), logManager, registry, config, ingestRouter) - elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), + elasticHttpQueryFrontendConnector := frontend_connectors.NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), logManager, registry, config, searchRouter) quesmaBuilder := quesma_api.NewQuesma(dependencies) @@ -91,33 +91,27 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic queryPipeline := quesma_api.NewPipeline() queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector) - quesmaBuilder.AddPipeline(ingestPipeline) quesmaBuilder.AddPipeline(queryPipeline) + quesmaBuilder.AddPipeline(ingestPipeline) - _, err := quesmaBuilder.Build() + quesmaV2, err := quesmaBuilder.Build() if err != nil { logger.Fatal().Msgf("Error building Quesma: %v", err) } - var limitedHandler http.Handler if config.DisableAuth { elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) - limitedHandler = elasticHttpIngestFrontendConnector } else { elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpQueryFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpIngestFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) - limitedHandler = elasticHttpIngestFrontendConnector } return &dualWriteHttpProxyV2{ - schemaRegistry: registry, - schemaLoader: schemaLoader, - routingHttpServer: &http.Server{ - Addr: ":" + strconv.Itoa(int(config.PublicTcpPort)), - Handler: limitedHandler, - }, + schemaRegistry: registry, + schemaLoader: schemaLoader, + quesmaV2: quesmaV2, indexManagement: indexManager, logManager: logManager, publicPort: config.PublicTcpPort, @@ -139,9 +133,7 @@ func (q *dualWriteHttpProxyV2) Close(ctx context.Context) { if q.asyncQueriesEvictor != nil { q.asyncQueriesEvictor.Close() } - if err := q.routingHttpServer.Shutdown(ctx); err != nil { - logger.Fatal().Msgf("Error during server shutdown: %v", err) - } + q.quesmaV2.Stop(ctx) } func (q *dualWriteHttpProxyV2) Ingest() { @@ -149,10 +141,5 @@ func (q *dualWriteHttpProxyV2) Ingest() { q.logManager.Start() q.indexManagement.Start() go q.asyncQueriesEvictor.AsyncQueriesGC() - go func() { - if err := q.routingHttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Fatal().Msgf("Error starting http server: %v", err) - } - logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort) - }() + q.quesmaV2.Start() } diff --git a/quesma/test_utils.go b/quesma/test_utils.go index 990ed1e80..20ebd0d09 100644 --- a/quesma/test_utils.go +++ b/quesma/test_utils.go @@ -10,23 +10,20 @@ import ( "net/http" ) -func sendRequest(url string, requestBody []byte) { +func sendRequest(url string, requestBody []byte) (string, error) { // Send POST request resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody)) if err != nil { fmt.Println("Error sending request:", err) - return + return "", err } defer resp.Body.Close() + respBody, err := io.ReadAll(resp.Body) + resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) if err != nil { fmt.Println(err) } else { - respBody, err := io.ReadAll(resp.Body) - resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) - if err != nil { - fmt.Println(err) - } else { - fmt.Println(string(respBody)) - } + fmt.Println(string(respBody)) } + return string(respBody), nil } diff --git a/quesma/testdata/aggregation_requests_2.go b/quesma/testdata/aggregation_requests_2.go index ec8faff67..dd4836151 100644 --- a/quesma/testdata/aggregation_requests_2.go +++ b/quesma/testdata/aggregation_requests_2.go @@ -4690,6 +4690,607 @@ var AggregationTests2 = []AggregationTestCase{ LIMIT 4`, }, { // [70] + TestName: "simplest terms with exclude (array of values)", + // TODO add ' somewhere in exclude after the merge! + QueryRequestJson: ` + { + "aggs": { + "1": { + "terms": { + "field": "chess_goat", + "size": 2, + "exclude": ["Carlsen", "Kasparov", "Fis._er*"] + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "aggregations": { + "1": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 7416, + "buckets": [ + { + "key": "My dad", + "doc_count": 3323 + }, + { + "key": "Barack Obama", + "doc_count": 3261 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(14000)), + model.NewQueryResultCol("aggr__1__key_0", "My dad"), + model.NewQueryResultCol("aggr__1__count", int64(3323)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(14000)), + model.NewQueryResultCol("aggr__1__key_0", "Barack Obama"), + model.NewQueryResultCol("aggr__1__count", int64(3261)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT sum(count(*)) OVER () AS "aggr__1__parent_count", + if("chess_goat" NOT IN tuple('Carlsen', 'Kasparov', 'Fis._er*'), "chess_goat", NULL) + AS "aggr__1__key_0", count(*) AS "aggr__1__count" + FROM __quesma_table_name + GROUP BY if("chess_goat" NOT IN tuple('Carlsen', 'Kasparov', 'Fis._er*'), "chess_goat", NULL) AS "aggr__1__key_0" + ORDER BY "aggr__1__count" DESC, "aggr__1__key_0" ASC + LIMIT 3`, + }, + { // [71] + TestName: "simplest terms with exclude (single value, no regex)", + QueryRequestJson: ` + { + "aggs": { + "1": { + "terms": { + "field": "agi_birth_year", + "size": 1, + "exclude": 2025 + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "aggregations": { + "1": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 10700, + "buckets": [ + { + "key": 2024, + "doc_count": 3300 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(14000)), + model.NewQueryResultCol("aggr__1__key_0", nil), + model.NewQueryResultCol("aggr__1__count", int64(10000)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(14000)), + model.NewQueryResultCol("aggr__1__key_0", 2024), + model.NewQueryResultCol("aggr__1__count", int64(3300)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT sum(count(*)) OVER () AS "aggr__1__parent_count", + if("agi_birth_year"!=2025, "agi_birth_year", NULL) AS "aggr__1__key_0", + count(*) AS "aggr__1__count" + FROM __quesma_table_name + GROUP BY if("agi_birth_year"!=2025, "agi_birth_year", NULL) AS "aggr__1__key_0" + ORDER BY "aggr__1__count" DESC, "aggr__1__key_0" ASC + LIMIT 2`, + }, + { // [72] + TestName: "simplest terms with exclude (empty array)", + QueryRequestJson: ` + { + "aggs": { + "1": { + "terms": { + "field": "agi_birth_year", + "size": 1, + "exclude": [] + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "aggregations": { + "1": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 700, + "buckets": [ + { + "key": 2024, + "doc_count": 300 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(1000)), + model.NewQueryResultCol("aggr__1__key_0", nil), + model.NewQueryResultCol("aggr__1__count", int64(600)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(1000)), + model.NewQueryResultCol("aggr__1__key_0", 2024), + model.NewQueryResultCol("aggr__1__count", int64(300)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT sum(count(*)) OVER () AS "aggr__1__parent_count", + "agi_birth_year" AS "aggr__1__key_0", count(*) AS "aggr__1__count" + FROM __quesma_table_name + GROUP BY "agi_birth_year" AS "aggr__1__key_0" + ORDER BY "aggr__1__count" DESC, "aggr__1__key_0" ASC + LIMIT 2`, + }, + { // [73] + TestName: "simplest terms with exclude (of strings), regression test", + QueryRequestJson: ` + { + "aggs": { + "1": { + "terms": { + "field": "chess_goat", + "size": 1, + "exclude": ["abc"] + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "aggregations": { + "1": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 700, + "buckets": [ + { + "key": 2024, + "doc_count": 300 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(1000)), + model.NewQueryResultCol("aggr__1__key_0", nil), + model.NewQueryResultCol("aggr__1__count", int64(600)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(1000)), + model.NewQueryResultCol("aggr__1__key_0", 2024), + model.NewQueryResultCol("aggr__1__count", int64(300)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT sum(count(*)) OVER () AS "aggr__1__parent_count", + if("chess_goat" NOT IN 'abc', "chess_goat", NULL) AS "aggr__1__key_0", + count(*) AS "aggr__1__count" + FROM __quesma_table_name + GROUP BY if("chess_goat" NOT IN 'abc', "chess_goat", NULL) AS "aggr__1__key_0" + ORDER BY "aggr__1__count" DESC, "aggr__1__key_0" ASC + LIMIT 2`, + }, + { // [74] + TestName: "terms with exclude (more complex, string field with exclude regex)", + // One simple test, for more regex tests see util/regex unit tests + QueryRequestJson: ` + { + "aggs": { + "1": { + "terms": { + "field": "chess_goat", + "size": 1, + "exclude": "K.*" + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "aggregations": { + "1": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 1, + "buckets": [ + { + "key": "Paul Morphy", + "doc_count": 13999 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(14000)), + model.NewQueryResultCol("aggr__1__key_0", "Paul Morphy"), + model.NewQueryResultCol("aggr__1__count", int64(13999)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(14000)), + model.NewQueryResultCol("aggr__1__key_0", nil), + model.NewQueryResultCol("aggr__1__count", int64(1)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT sum(count(*)) OVER () AS "aggr__1__parent_count", + if("chess_goat" NOT LIKE 'K%', "chess_goat", NULL) AS "aggr__1__key_0", + count(*) AS "aggr__1__count" + FROM __quesma_table_name + GROUP BY if("chess_goat" NOT LIKE 'K%', "chess_goat", NULL) AS "aggr__1__key_0" + ORDER BY "aggr__1__count" DESC, "aggr__1__key_0" ASC + LIMIT 2`, + }, + { // [75] + TestName: "complex terms with exclude: nested terms + 2 metrics", + QueryRequestJson: ` + { + "aggs": { + "terms1": { + "aggs": { + "metric1": { + "avg": { + "field": "DistanceMiles" + } + }, + "terms2": { + "aggs": { + "metric2": { + "sum": { + "field": "AvgTicketPrice" + } + } + }, + "terms": { + "field": "DestCityName", + "size": 1 + } + } + }, + "terms": { + "exclude": [ + "a", + "b" + ], + "field": "Carrier", + "size": 2 + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "aggregations": { + "terms1": { + "buckets": [ + { + "doc_count": 3323, + "key": "Logstash Airways", + "metric1": { + "value": 4451.946294580208 + }, + "terms2": { + "buckets": [ + { + "doc_count": 173, + "key": "Zurich", + "metric2": { + "value": 102370.42402648926 + } + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 3150 + } + }, + { + "doc_count": 3261, + "key": "JetBeats", + "metric1": { + "value": 4434.670874554115 + }, + "terms2": { + "buckets": [ + { + "doc_count": 167, + "key": "Zurich", + "metric2": { + "value": 92215.76377868652 + } + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 3094 + } + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 6430 + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "Logstash Airways"), + model.NewQueryResultCol("aggr__terms1__count", int64(3323)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 4451.946294580208), + model.NewQueryResultCol("aggr__terms1__terms2__parent_count", int64(3323)), + model.NewQueryResultCol("aggr__terms1__terms2__key_0", "Zurich"), + model.NewQueryResultCol("aggr__terms1__terms2__count", int64(173)), + model.NewQueryResultCol("metric__terms1__terms2__metric2_col_0", 102370.42402648926), + }}, + {Cols: []model.QueryResultCol{ // should be discarded by us because of terms2's size=1 + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "Logstash Airways"), + model.NewQueryResultCol("aggr__terms1__count", int64(3323)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 4451.946294580208), + model.NewQueryResultCol("aggr__terms1__terms2__parent_count", int64(3323)), + model.NewQueryResultCol("aggr__terms1__terms2__key_0", "Wąchock"), + model.NewQueryResultCol("aggr__terms1__terms2__count", int64(150)), + model.NewQueryResultCol("metric__terms1__terms2__metric2_col_0", nil), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "JetBeats"), + model.NewQueryResultCol("aggr__terms1__count", int64(3261)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 4434.670878262596), + model.NewQueryResultCol("aggr__terms1__terms2__parent_count", int64(3261)), + model.NewQueryResultCol("aggr__terms1__terms2__key_0", "Zurich"), + model.NewQueryResultCol("aggr__terms1__terms2__count", int64(167)), + model.NewQueryResultCol("metric__terms1__terms2__metric2_col_0", 92215.763779), + }}, + {Cols: []model.QueryResultCol{ // should be discarded by us because of terms2's size=1 + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "JetBeats"), + model.NewQueryResultCol("aggr__terms1__count", int64(3261)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 4434.670878262596), + model.NewQueryResultCol("aggr__terms1__terms2__parent_count", int64(3261)), + model.NewQueryResultCol("aggr__terms1__terms2__key_0", "Wąchock"), + model.NewQueryResultCol("aggr__terms1__terms2__count", int64(147)), + model.NewQueryResultCol("metric__terms1__terms2__metric2_col_0", 90242.31663285477), + }}, + {Cols: []model.QueryResultCol{ // should be discarded by us because of terms1's size=2 + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "Kibana Airlines"), + model.NewQueryResultCol("aggr__terms1__count", int64(3219)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 4335.019248495363), + model.NewQueryResultCol("aggr__terms1__terms2__parent_count", int64(3219)), + model.NewQueryResultCol("aggr__terms1__terms2__key_0", "Zurich"), + model.NewQueryResultCol("aggr__terms1__terms2__count", int64(173)), + model.NewQueryResultCol("metric__terms1__terms2__metric2_col_0", 99314.3501429406), + }}, + }, + ExpectedPancakeSQL: ` + SELECT "aggr__terms1__parent_count", "aggr__terms1__key_0", + "aggr__terms1__count", "metric__terms1__metric1_col_0", + "aggr__terms1__terms2__parent_count", "aggr__terms1__terms2__key_0", + "aggr__terms1__terms2__count", "metric__terms1__terms2__metric2_col_0" + FROM ( + SELECT "aggr__terms1__parent_count", "aggr__terms1__key_0", + "aggr__terms1__count", "metric__terms1__metric1_col_0", + "aggr__terms1__terms2__parent_count", "aggr__terms1__terms2__key_0", + "aggr__terms1__terms2__count", "metric__terms1__terms2__metric2_col_0", + dense_rank() OVER (ORDER BY "aggr__terms1__count" DESC, + "aggr__terms1__key_0" ASC) AS "aggr__terms1__order_1_rank", + dense_rank() OVER (PARTITION BY "aggr__terms1__key_0" ORDER BY + "aggr__terms1__terms2__count" DESC, "aggr__terms1__terms2__key_0" ASC) AS + "aggr__terms1__terms2__order_1_rank" + FROM ( + SELECT sum(count(*)) OVER () AS "aggr__terms1__parent_count", + if("Carrier" NOT IN tuple('a', 'b'), "Carrier", NULL) AS "aggr__terms1__key_0", + sum(count(*)) OVER (PARTITION BY "aggr__terms1__key_0") AS + "aggr__terms1__count", + avgOrNullMerge(avgOrNullState("DistanceMiles")) OVER (PARTITION BY + "aggr__terms1__key_0") AS "metric__terms1__metric1_col_0", + sum(count(*)) OVER (PARTITION BY "aggr__terms1__key_0") AS + "aggr__terms1__terms2__parent_count", + "DestCityName" AS "aggr__terms1__terms2__key_0", + count(*) AS "aggr__terms1__terms2__count", + sumOrNull("AvgTicketPrice") AS "metric__terms1__terms2__metric2_col_0" + FROM __quesma_table_name + GROUP BY if("Carrier" NOT IN tuple('a', 'b'), "Carrier", NULL) AS + "aggr__terms1__key_0", "DestCityName" AS "aggr__terms1__terms2__key_0")) + WHERE ("aggr__terms1__order_1_rank"<=3 AND "aggr__terms1__terms2__order_1_rank" + <=2) + ORDER BY "aggr__terms1__order_1_rank" ASC, + "aggr__terms1__terms2__order_1_rank" ASC`, + }, + { // [76] + TestName: "terms with exclude, but with branched off aggregation tree", + QueryRequestJson: ` + { + "aggs": { + "terms1": { + "aggs": { + "metric1": { + "avg": { + "field": "DistanceMiles" + } + } + }, + "terms": { + "exclude": [ + "a", + "b" + ], + "field": "Carrier", + "size": 1 + } + }, + "terms2": { + "aggs": { + "metric1": { + "avg": { + "field": "DistanceMiles" + } + } + }, + "terms": { + "exclude": [ + "Logstash Airways", + ".*" + ], + "field": "Carrier", + "size": 2 + } + } + }, + "size": 0, + "track_total_hits": true + }`, + // I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested) + ExpectedResponse: ` + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "terms1": { + "buckets": [ + { + "doc_count": 3323, + "key": "Logstash Airways", + "metric1": { + "value": 4451.946294580208 + } + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 9691 + }, + "terms2": { + "buckets": [ + { + "doc_count": 3261, + "key": "JetBeats", + "metric1": { + "value": 4434.670874554115 + } + }, + { + "doc_count": 3219, + "key": "Kibana Airlines", + "metric1": { + "value": 4335.019245198367 + } + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 6534 + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 13014 + } + }, + "timed_out": false, + "took": 18 + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "Logstash Airways"), + model.NewQueryResultCol("aggr__terms1__count", int64(3323)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 4451.946294580208), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms1__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms1__key_0", "Discard"), + model.NewQueryResultCol("aggr__terms1__count", int64(5)), + model.NewQueryResultCol("metric__terms1__metric1_col_0", 6.20), + }}, + }, + ExpectedPancakeSQL: ` + SELECT sum(count(*)) OVER () AS "aggr__terms1__parent_count", + if("Carrier" NOT IN tuple('a', 'b'), "Carrier", NULL) AS "aggr__terms1__key_0" + , count(*) AS "aggr__terms1__count", + avgOrNull("DistanceMiles") AS "metric__terms1__metric1_col_0" + FROM __quesma_table_name + GROUP BY if("Carrier" NOT IN tuple('a', 'b'), "Carrier", NULL) AS + "aggr__terms1__key_0" + ORDER BY "aggr__terms1__count" DESC, "aggr__terms1__key_0" ASC + LIMIT 2`, + ExpectedAdditionalPancakeResults: [][]model.QueryResultRow{{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms2__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms2__key_0", "JetBeats"), + model.NewQueryResultCol("aggr__terms2__count", int64(3261)), + model.NewQueryResultCol("metric__terms2__metric1_col_0", 4434.670874554115), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms2__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms2__key_0", "Kibana Airlines"), + model.NewQueryResultCol("aggr__terms2__count", int64(3219)), + model.NewQueryResultCol("metric__terms2__metric1_col_0", 4335.019245198367), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__terms2__parent_count", int64(13014)), + model.NewQueryResultCol("aggr__terms2__key_0", "Discard"), + model.NewQueryResultCol("aggr__terms2__count", int64(11)), + model.NewQueryResultCol("metric__terms2__metric1_col_0", 42), + }}, + }}, + ExpectedAdditionalPancakeSQLs: []string{` + SELECT sum(count(*)) OVER () AS "aggr__terms2__parent_count", + if("Carrier" NOT IN tuple('Logstash Airways', '.*'), "Carrier", NULL) AS + "aggr__terms2__key_0", count(*) AS "aggr__terms2__count", + avgOrNull("DistanceMiles") AS "metric__terms2__metric1_col_0" + FROM __quesma_table_name + GROUP BY if("Carrier" NOT IN tuple('Logstash Airways', '.*'), "Carrier", NULL) + AS "aggr__terms2__key_0" + ORDER BY "aggr__terms2__count" DESC, "aggr__terms2__key_0" ASC + LIMIT 3`}, + }, + { // [77] TestName: `Escaping of ', \, \n, and \t in some example aggregations. No tests for other escape characters, e.g. \r or 'b. Add if needed.`, QueryRequestJson: ` { diff --git a/quesma/testdata/kibana-visualize/aggregation_requests.go b/quesma/testdata/kibana-visualize/aggregation_requests.go index c2056a5fa..7a0f3fd2d 100644 --- a/quesma/testdata/kibana-visualize/aggregation_requests.go +++ b/quesma/testdata/kibana-visualize/aggregation_requests.go @@ -3274,4 +3274,269 @@ var AggregationTests = []testdata.AggregationTestCase{ GROUP BY intDiv("clientip", 2147483648) AS "aggr__2__key_0" ORDER BY "aggr__2__key_0" ASC`, }, + { // [24] + TestName: "Simplest IP range. In Kibana: Add panel > Aggregation Based > Area. Buckets: X-asis: IP Range", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "ip_range": { + "field": "clientip", + "ranges": [ + { + "from": "0.0.0.0", + "to": "127.255.255.255" + }, + { + "from": "128.0.0.0" + }, + { + "from": "128.129.130.131", + "key": "my-custom-key" + }, + { + "to": "10.0.0.5" + } + ] + } + } + }, + "size": 0, + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "took": 14, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 14074, + "relation": "eq" + }, + "max_score": null, + "hits": [] + }, + "aggregations": { + "2": { + "buckets": [ + { + "key": "0.0.0.0-127.255.255.255", + "from": "0.0.0.0", + "to": "127.255.255.255", + "doc_count": 7290 + }, + { + "key": "128.0.0.0-*", + "from": "128.0.0.0", + "doc_count": 6784 + }, + { + "key": "my-custom-key", + "from": "128.129.130.131", + "doc_count": 6752 + }, + { + "key": "*-10.0.0.5", + "to": "10.0.0.5", + "doc_count": 534 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("range_0__aggr__2__count", int64(7290)), + model.NewQueryResultCol("range_1__aggr__2__count", int64(6784)), + model.NewQueryResultCol("range_2__aggr__2__count", int64(6752)), + model.NewQueryResultCol("range_3__aggr__2__count", int64(534)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT countIf(("clientip">='0.0.0.0' AND "clientip"<'127.255.255.255')) AS + "range_0__aggr__2__count", + countIf("clientip">='128.0.0.0') AS "range_1__aggr__2__count", + countIf("clientip">='128.129.130.131') AS "range_2__aggr__2__count", + countIf("clientip"<'10.0.0.5') AS "range_3__aggr__2__count" + FROM __quesma_table_name`, + }, + { // [25] + TestName: "IP range, with ranges as CIDR masks. In Kibana: Add panel > Aggregation Based > Area. Buckets: X-asis: IP Range", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "ip_range": { + "field": "clientip", + "ranges": [ + { + "mask": "255.255.255.253/30" + }, + { + "from": "128.129.130.131", + "key": "my-custom-key" + }, + { + "mask": "10.0.7.127/27", + "key": "custom-mask-key" + } + ] + } + } + }, + "size": 0, + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "took": 14, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 14074, + "relation": "eq" + }, + "max_score": null, + "hits": [] + }, + "aggregations": { + "2": { + "buckets": [ + { + "key": "255.255.255.253/30", + "from": "255.255.255.252", + "to": "::1:0:0:0", + "doc_count": 2 + }, + { + "key": "my-custom-key", + "from": "128.129.130.131", + "doc_count": 6752 + }, + { + "key": "custom-mask-key", + "from": "10.0.7.96", + "to": "10.0.7.128", + "doc_count": 3 + } + ] + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("range_0__aggr__2__count", int64(2)), + model.NewQueryResultCol("range_1__aggr__2__count", int64(6752)), + model.NewQueryResultCol("range_2__aggr__2__count", int64(3)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT countIf("clientip">='255.255.255.252') AS "range_0__aggr__2__count", + countIf("clientip">='128.129.130.131') AS "range_1__aggr__2__count", + countIf(("clientip">='10.0.7.96' AND "clientip"<'10.0.7.128')) AS + "range_2__aggr__2__count" + FROM __quesma_table_name`, + }, + { // [26] + TestName: "IP range, with ranges as CIDR masks, keyed=true. In Kibana: Add panel > Aggregation Based > Area. Buckets: X-asis: IP Range", + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "ip_range": { + "field": "clientip", + "keyed": true, + "ranges": [ + { + "mask": "255.255.255.255/31" + }, + { + "from": "128.129.130.131", + "key": "my-custom-key" + }, + { + "mask": "10.0.7.127/27", + "key": "custom-mask-key" + } + ] + } + } + }, + "size": 0, + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "took": 14, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 14074, + "relation": "eq" + }, + "max_score": null, + "hits": [] + }, + "aggregations": { + "2": { + "buckets": { + "custom-mask-key": { + "from": "10.0.7.96", + "to": "10.0.7.128", + "doc_count": 3 + }, + "my-custom-key": { + "from": "128.129.130.131", + "doc_count": 6752 + }, + "255.255.255.255/31": { + "from": "255.255.255.254", + "to": "::1:0:0:0", + "doc_count": 2 + } + } + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("range_0__aggr__2__count", int64(2)), + model.NewQueryResultCol("range_1__aggr__2__count", int64(6752)), + model.NewQueryResultCol("range_2__aggr__2__count", int64(3)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT countIf("clientip">='255.255.255.254') AS "range_0__aggr__2__count", + countIf("clientip">='128.129.130.131') AS "range_1__aggr__2__count", + countIf(("clientip">='10.0.7.96' AND "clientip"<'10.0.7.128')) AS + "range_2__aggr__2__count" + FROM __quesma_table_name`, + }, } diff --git a/quesma/testdata/requests.go b/quesma/testdata/requests.go index 61b2464a1..b9c5500b1 100644 --- a/quesma/testdata/requests.go +++ b/quesma/testdata/requests.go @@ -2344,6 +2344,69 @@ Men\'s Clothing \\ ')`}, Men\\'s Clothing \\\\ %' LIMIT 10`}, []string{}, }, + { // [41] + "ids, 0 values", + `{ + "query": { + "ids": { + "values": [] + } + }, + "track_total_hits": false + }`, + []string{`false`}, + model.ListAllFields, + []string{ + `SELECT "message" ` + + `FROM ` + TableName + ` ` + + `WHERE false ` + + `LIMIT 10`, + }, + []string{}, + }, + { // [42] + "ids, 1 value", + `{ + "query": { + "ids": { + "values": ["323032342d31322d32312030373a32393a30332e333637202b3030303020555443q1"] + } + }, + "track_total_hits": false + }`, + []string{`"@timestamp" = toDateTime64('2024-12-21 07:29:03.367',3)`}, + model.ListAllFields, + []string{ + `SELECT "message" ` + + `FROM ` + TableName + ` ` + + `WHERE "@timestamp" = toDateTime64('2024-12-21 07:29:03.367',3) ` + + `LIMIT 10`, + }, + []string{}, + }, + { // [43] + "ids, 2+ values", + `{ + "query": { + "ids": { + "values": [ + "323032342d31322d32312030373a32393a30332e333637202b3030303020555443q1", + "323032342d31322d32312030373a32393a30322e393932202b3030303020555443q3" + ] + } + }, + "track_total_hits": false + }`, + []string{`"@timestamp" IN tuple(toDateTime64('2024-12-21 07:29:03.367',3), toDateTime64('2024-12-21 07:29:02.992',3))`}, + model.ListAllFields, + []string{ + `SELECT "message" ` + + `FROM ` + TableName + ` ` + + `WHERE "@timestamp" IN tuple(toDateTime64('2024-12-21 07:29:03.367',3), toDateTime64('2024-12-21 07:29:02.992',3)) ` + + `LIMIT 10`, + }, + []string{}, + }, } var TestSearchRuntimeMappings = []SearchTestCase{ diff --git a/quesma/testdata/unsupported_requests.go b/quesma/testdata/unsupported_requests.go index 971001474..b74f08289 100644 --- a/quesma/testdata/unsupported_requests.go +++ b/quesma/testdata/unsupported_requests.go @@ -25,7 +25,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [2] + { // [1] TestName: "bucket aggregation: categorize_text", QueryType: "categorize_text", QueryRequestJson: ` @@ -39,7 +39,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [3] + { // [2] TestName: "bucket aggregation: children", QueryType: "children", QueryRequestJson: ` @@ -69,7 +69,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [5] + { // [3] TestName: "bucket aggregation: diversified_sampler", QueryType: "diversified_sampler", QueryRequestJson: ` @@ -97,7 +97,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [6] + { // [4] TestName: "bucket aggregation: frequent_item_sets", QueryType: "frequent_item_sets", QueryRequestJson: ` @@ -122,7 +122,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [7] + { // [5] TestName: "bucket aggregation: geo_distance", QueryType: "geo_distance", QueryRequestJson: ` @@ -143,7 +143,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [8] + { // [6] TestName: "bucket aggregation: geohash_grid", QueryType: "geohash_grid", QueryRequestJson: ` @@ -162,7 +162,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [9] + { // [7] TestName: "bucket aggregation: geohex_grid", QueryType: "geohex_grid", QueryRequestJson: ` @@ -181,7 +181,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [11] + { // [8] TestName: "bucket aggregation: global", QueryType: "global", QueryRequestJson: ` @@ -200,26 +200,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [13] - TestName: "bucket aggregation: ip_range", - QueryType: "ip_range", - QueryRequestJson: ` - { - "size": 10, - "aggs": { - "ip_ranges": { - "ip_range": { - "field": "ip", - "ranges": [ - { "to": "10.0.0.5" }, - { "from": "10.0.0.5" } - ] - } - } - } - }`, - }, - { // [14] + { // [9] TestName: "bucket aggregation: missing", QueryType: "missing", QueryRequestJson: ` @@ -231,7 +212,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [16] + { // [10] TestName: "bucket aggregation: nested", QueryType: "nested", QueryRequestJson: ` @@ -257,7 +238,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [17] + { // [11] TestName: "bucket aggregation: parent", QueryType: "parent", QueryRequestJson: ` @@ -287,7 +268,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [18] + { // [12] TestName: "bucket aggregation: rare_terms", QueryType: "rare_terms", QueryRequestJson: ` @@ -301,7 +282,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [19] + { // [13] TestName: "bucket aggregation: reverse_nested", QueryType: "reverse_nested", QueryRequestJson: ` @@ -334,7 +315,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [20] + { // [14] TestName: "bucket aggregation: significant_text", QueryType: "significant_text", QueryRequestJson: ` @@ -356,7 +337,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [21] + { // [15] TestName: "bucket aggregation: time_series", QueryType: "time_series", QueryRequestJson: ` @@ -368,7 +349,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [22] + { // [16] TestName: "bucket aggregation: variable_width_histogram", QueryType: "variable_width_histogram", QueryRequestJson: ` @@ -384,7 +365,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ }`, }, // metrics: - { // [23] + { // [17] TestName: "metrics aggregation: boxplot", QueryType: "boxplot", QueryRequestJson: ` @@ -399,7 +380,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [25] + { // [18] TestName: "metrics aggregation: geo_bounds", QueryType: "geo_bounds", QueryRequestJson: ` @@ -413,7 +394,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [27] + { // [19] TestName: "metrics aggregation: geo_line", QueryType: "geo_line", QueryRequestJson: ` @@ -428,7 +409,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [28] + { // [20] TestName: "metrics aggregation: cartesian_bounds", QueryType: "cartesian_bounds", QueryRequestJson: ` @@ -445,7 +426,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [29] + { // [21] TestName: "metrics aggregation: cartesian_centroid", QueryType: "cartesian_centroid", QueryRequestJson: ` @@ -459,7 +440,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [30] + { // [22] TestName: "metrics aggregation: matrix_stats", QueryType: "matrix_stats", QueryRequestJson: ` @@ -473,7 +454,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [31] + { // [23] TestName: "metrics aggregation: median_absolute_deviation", QueryType: "median_absolute_deviation", QueryRequestJson: ` @@ -493,7 +474,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [32] + { // [24] TestName: "metrics aggregation: rate", QueryType: "rate", QueryRequestJson: ` @@ -516,7 +497,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [33] + { // [25] TestName: "metrics aggregation: scripted_metric", QueryType: "scripted_metric", QueryRequestJson: ` @@ -536,7 +517,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [34] + { // [26] TestName: "metrics aggregation: string_stats", QueryType: "string_stats", QueryRequestJson: ` @@ -563,7 +544,7 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{ } }`, }, - { // [36] + { // [27] TestName: "metrics aggregation: weighted_avg", QueryType: "weighted_avg", QueryRequestJson: ` diff --git a/quesma/util/regex/regex.go b/quesma/util/regex/regex.go new file mode 100644 index 000000000..e5a42aa89 --- /dev/null +++ b/quesma/util/regex/regex.go @@ -0,0 +1,40 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package regex + +import ( + "quesma/model" + "strings" +) + +// ToClickhouseExpr converts a regex pattern to a Clickhouse expression. +// It's our old heuristic, maybe it'll need to be improved. +func ToClickhouseExpr(pattern string) (clickhouseFuncName string, patternExpr model.Expr) { + // really simple == (out of all special characters, only . and .* may be present) + isPatternReallySimple := func(pattern string) bool { + // any special characters excluding . and * not allowed. Also (not the most important check) * can't be first character. + if strings.ContainsAny(pattern, `?+|{}[]()"\`) || (len(pattern) > 0 && pattern[0] == '*') { + return false + } + // .* allowed, but [any other char]* - not + for i, char := range pattern[1:] { + prevChar := pattern[i] + if char == '*' && prevChar != '.' { + return false + } + } + return true + } + + var funcName string + if isPatternReallySimple(pattern) { + pattern = strings.ReplaceAll(pattern, "_", `\_`) + pattern = strings.ReplaceAll(pattern, ".*", "%") + pattern = strings.ReplaceAll(pattern, ".", "_") + funcName = "LIKE" + } else { // this Clickhouse function is much slower, so we use it only for complex regexps + funcName = "REGEXP" + } + + return funcName, model.NewLiteral("'" + pattern + "'") +} diff --git a/quesma/v2/core/mux.go b/quesma/v2/core/mux.go index 6e929a087..02b49deec 100644 --- a/quesma/v2/core/mux.go +++ b/quesma/v2/core/mux.go @@ -6,6 +6,7 @@ import ( "github.com/ucarion/urlpath" "net/http" "net/url" + "sort" "strings" ) @@ -205,14 +206,20 @@ func (p *PathRouter) GetHandlers() map[string]HandlersPipe { func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) { newHandlers := make(map[string]HandlersPipe, 0) for path, handler := range handlers { - for index := range p.mappings { + var index int + var found bool + for index = range p.mappings { if p.mappings[index].pattern == path { - p.mappings[index].handler.Processors = handler.Processors - p.mappings[index].handler.Predicate = handler.Predicate - } else { - newHandlers[path] = handler + found = true + break } } + if found { + p.mappings[index].handler.Processors = handler.Processors + p.mappings[index].handler.Predicate = handler.Predicate + } else { + newHandlers[path] = handler + } } for path, handler := range newHandlers { p.mappings = append(p.mappings, mapping{pattern: path, @@ -222,4 +229,9 @@ func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) { Predicate: handler.Predicate, Processors: handler.Processors}}) } + // mappings needs to be sorted as literal paths should be matched first + // for instance /_search should be matched before /:index + sort.Slice(p.mappings, func(i, j int) bool { + return p.mappings[i].pattern > p.mappings[j].pattern + }) } diff --git a/quesma/v2_test_objects.go b/quesma/v2_test_objects.go index a4ccf328d..ce7bcca15 100644 --- a/quesma/v2_test_objects.go +++ b/quesma/v2_test_objects.go @@ -9,7 +9,6 @@ import ( "quesma/frontend_connectors" "quesma/processors" quesma_api "quesma_v2/core" - "strconv" "sync/atomic" ) @@ -86,7 +85,7 @@ func bulk(_ context.Context, request *quesma_api.Request, _ http.ResponseWriter) } metadata := quesma_api.MakeNewMetadata() metadata["level"] = 0 - resp := []byte("bulk\n") + resp := []byte("bulk->") atomic.AddInt64(&correlationId, 1) quesma_api.SetCorrelationId(metadata, correlationId) return &quesma_api.Result{Meta: metadata, GenericResult: resp, StatusCode: 200}, nil @@ -101,7 +100,7 @@ func doc(_ context.Context, request *quesma_api.Request, _ http.ResponseWriter) metadata["level"] = 0 atomic.AddInt64(&correlationId, 1) quesma_api.SetCorrelationId(metadata, correlationId) - resp := []byte("doc\n") + resp := []byte("doc->") return &quesma_api.Result{Meta: metadata, GenericResult: resp, StatusCode: 200}, nil } @@ -149,10 +148,8 @@ func (p *IngestProcessor) Handle(metadata map[string]interface{}, message ...any panic("IngestProcessor: invalid message type") } - level := metadata["level"].(int) - data = append(data, strconv.Itoa(level)...) data = append(data, []byte(p.GetId())...) - data = append(data, []byte("\n")...) + data = append(data, []byte("->")...) } return metadata, data, nil } @@ -262,10 +259,8 @@ func (p *InnerIngestProcessor2) Handle(metadata map[string]interface{}, message if err != nil { panic("InnerIngestProcessor2: invalid message type") } - level := metadata["level"].(int) - data = append(data, strconv.Itoa(level)...) data = append(data, []byte(p.GetId())...) - data = append(data, []byte("\n")...) + data = append(data, []byte("->")...) } return metadata, data, nil } @@ -296,10 +291,8 @@ func (p *InnerIngestProcessor1) Handle(metadata map[string]interface{}, message if err != nil { panic("InnerIngestProcessor1: invalid message type") } - level := metadata["level"].(int) - data = append(data, strconv.Itoa(level)...) data = append(data, []byte(p.GetId())...) - data = append(data, []byte("\n")...) + data = append(data, []byte("->")...) } return metadata, data, nil }