diff --git a/quesma/table_resolver/rules.go b/quesma/table_resolver/rules.go index 3fd0a1ccd..f41e7c7ce 100644 --- a/quesma/table_resolver/rules.go +++ b/quesma/table_resolver/rules.go @@ -9,34 +9,83 @@ import ( "quesma/end_user_errors" "quesma/quesma/config" "quesma/util" - "slices" + "reflect" + "strings" ) // TODO these rules may be incorrect and incomplete // They will be fixed int the next iteration. -func patternIsNotAllowed(input parsedPattern) *Decision { - if !input.isPattern { - return nil - } - return &Decision{ - Reason: "Pattern is not allowed.", - Err: fmt.Errorf("pattern is not allowed"), +func (r *tableRegistryImpl) wildcardPatternSplitter(pattern string) (parsedPattern, *Decision) { + patterns := strings.Split(pattern, ",") + + // Given a (potentially wildcard) pattern, find all non-wildcard index names that match the pattern + var matchingSingleNames []string + for _, pattern := range patterns { + // If pattern is not an actual pattern (so it's a single index), just add it to the list + // and skip further processing. + // If pattern is an internal Kibana index, add it to the list without any processing - resolveInternalElasticName + // will take care of it. + if !elasticsearch.IsIndexPattern(pattern) || elasticsearch.IsInternalIndex(pattern) { + matchingSingleNames = append(matchingSingleNames, pattern) + continue + } + + for indexName := range r.conf.IndexConfig { + if util.IndexPatternMatches(pattern, indexName) { + matchingSingleNames = append(matchingSingleNames, indexName) + } + } + + // but maybe we should also check against the actual indexes ?? + for indexName := range r.elasticIndexes { + if util.IndexPatternMatches(pattern, indexName) { + matchingSingleNames = append(matchingSingleNames, indexName) + } + } + if r.conf.AutodiscoveryEnabled { + for tableName := range r.clickhouseIndexes { + if util.IndexPatternMatches(pattern, tableName) { + matchingSingleNames = append(matchingSingleNames, tableName) + } + } + } } + + matchingSingleNames = util.Distinct(matchingSingleNames) + + return parsedPattern{ + source: pattern, + isPattern: len(patterns) > 1 || strings.Contains(pattern, "*"), + parts: matchingSingleNames, + }, nil } -func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision { +func singleIndexSplitter(pattern string) (parsedPattern, *Decision) { + patterns := strings.Split(pattern, ",") + if len(patterns) > 1 || strings.Contains(pattern, "*") { + return parsedPattern{}, &Decision{ + Reason: "Pattern is not allowed.", + Err: fmt.Errorf("pattern is not allowed"), + } + } - return func(input parsedPattern) *Decision { + return parsedPattern{ + source: pattern, + isPattern: false, + parts: patterns, + }, nil +} - if !input.isPattern { - idx, ok := cfg[input.source] - if ok { - if len(getTargets(idx, pipeline)) == 0 { - return &Decision{ - IsClosed: true, - Reason: "Index is disabled in config.", - } +func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline string) func(part string) *Decision { + + return func(part string) *Decision { + idx, ok := cfg[part] + if ok { + if len(getTargets(idx, pipeline)) == 0 { + return &Decision{ + IsClosed: true, + Reason: "Index is disabled in config.", } } } @@ -45,9 +94,9 @@ func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline s } } -func resolveInternalElasticName(pattern parsedPattern) *Decision { +func resolveInternalElasticName(part string) *Decision { - if elasticsearch.IsInternalIndex(pattern.source) { + if elasticsearch.IsInternalIndex(part) { return &Decision{ UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{ManagementCall: true}}, Reason: "It's kibana internals", @@ -57,8 +106,8 @@ func resolveInternalElasticName(pattern parsedPattern) *Decision { return nil } -func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) func(input parsedPattern) *Decision { - return func(input parsedPattern) *Decision { +func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) func(part string) *Decision { + return func(part string) *Decision { var targets []string var useConnectors []ConnectorDecision @@ -78,9 +127,9 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) switch target { case config.ClickhouseTarget: useConnectors = append(useConnectors, &ConnectorDecisionClickhouse{ - ClickhouseTableName: input.source, + ClickhouseTableName: part, IsCommonTable: quesmaConf.UseCommonTableForWildcard, - ClickhouseTables: []string{input.source}, + ClickhouseTables: []string{part}, }) case config.ElasticsearchTarget: useConnectors = append(useConnectors, &ConnectorDecisionElastic{}) @@ -100,15 +149,10 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) } } -func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision { +func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfiguration, pipeline string) func(part string) *Decision { - return func(input parsedPattern) *Decision { - - if input.isPattern { - return nil - } - - if cfg, ok := indexConfig[input.source]; ok { + return func(part string) *Decision { + if cfg, ok := indexConfig[part]; ok { if !cfg.UseCommonTable { targets := getTargets(cfg, pipeline) @@ -131,8 +175,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi targetDecision = &ConnectorDecisionElastic{} case config.ClickhouseTarget: targetDecision = &ConnectorDecisionClickhouse{ - ClickhouseTableName: input.source, - ClickhouseTables: []string{input.source}, + ClickhouseTableName: part, + ClickhouseTables: []string{part}, } default: return &Decision{ @@ -152,8 +196,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi Reason: "Enabled in the config. Dual write is enabled.", UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - ClickhouseTableName: input.source, - ClickhouseTables: []string{input.source}}, + ClickhouseTableName: part, + ClickhouseTables: []string{part}}, &ConnectorDecisionElastic{}}, } @@ -165,8 +209,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi Reason: "Enabled in the config. A/B testing.", EnableABTesting: true, UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - ClickhouseTableName: input.source, - ClickhouseTables: []string{input.source}}, + ClickhouseTableName: part, + ClickhouseTables: []string{part}}, &ConnectorDecisionElastic{}}, } } else if targets[0] == config.ElasticsearchTarget && targets[1] == config.ClickhouseTarget { @@ -177,8 +221,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi UseConnectors: []ConnectorDecision{ &ConnectorDecisionElastic{}, &ConnectorDecisionClickhouse{ - ClickhouseTableName: input.source, - ClickhouseTables: []string{input.source}}, + ClickhouseTableName: part, + ClickhouseTables: []string{part}}, }, } @@ -209,214 +253,168 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi } } -func (r *tableRegistryImpl) makeCheckIfPatternMatchesAllConnectors(pipeline string) func(input parsedPattern) *Decision { - - return func(input parsedPattern) *Decision { - if input.isPattern { - - var matchedElastic []string - var matchedClickhouse []string - - for _, pattern := range input.parts { - - // here we check against the config - - for indexName, index := range r.conf.IndexConfig { - targets := getTargets(index, pipeline) - - if util.IndexPatternMatches(pattern, indexName) { - - for _, target := range targets { - switch target { - case config.ElasticsearchTarget: - matchedElastic = append(matchedElastic, indexName) - case config.ClickhouseTarget: - matchedClickhouse = append(matchedClickhouse, indexName) - default: - return &Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("unsupported target: %s", target)), - Reason: "Unsupported target.", - } - } - } - } - } - - // but maybe we should also check against the actual indexes ?? - for indexName := range r.elasticIndexes { - if util.IndexPatternMatches(pattern, indexName) { - matchedElastic = append(matchedElastic, indexName) - } - } - if r.conf.AutodiscoveryEnabled { - for tableName := range r.clickhouseIndexes { - if util.IndexPatternMatches(pattern, tableName) { - matchedClickhouse = append(matchedClickhouse, tableName) - } - } - } +func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexConfiguration, pipeline string) func(part string) *Decision { + return func(part string) *Decision { + if part == common_table.TableName { + return &Decision{ + Err: fmt.Errorf("common table is not allowed to be queried directly"), + Reason: "It's internal table. Not allowed to be queried directly.", } + } - matchedElastic = util.Distinct(matchedElastic) - matchedClickhouse = util.Distinct(matchedClickhouse) - - nElastic := len(matchedElastic) - nClickhouse := len(matchedClickhouse) - - switch { - - case nElastic > 0 && nClickhouse > 0: - return &Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index pattern [%s] resolved to both elasticsearch indices: [%s] and clickhouse tables: [%s]", input.parts, matchedElastic, matchedClickhouse)), - Reason: "Both Elastic and Clickhouse matched.", - } - - case nElastic > 0 && nClickhouse == 0: + var virtualTableExists bool - return &Decision{ - UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, - Reason: "Only Elastic matched.", + if r.conf.AutodiscoveryEnabled { + for indexName, index := range r.clickhouseIndexes { + if index.isVirtual && indexName == part { + virtualTableExists = true + break } + } + } - case nElastic == 0 && nClickhouse > 0: - // it will be resolved by sth else later - return nil - - case nElastic == 0 && nClickhouse == 0: - return &Decision{ - IsEmpty: true, - Reason: "No indexes matched. Checked both connectors.", - } + if idxConfig, ok := cfg[part]; (ok && idxConfig.UseCommonTable) || (virtualTableExists) { + return &Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: common_table.TableName, + ClickhouseTables: []string{part}, + IsCommonTable: true, + }}, + Reason: "Common table will be used.", } } return nil } - } -func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision { - - return func(input parsedPattern) *Decision { - - if input.isPattern { - - // At this point we should do not have any elastic indexes. - // This is because we have already checked if the pattern matches any elastic indexes. - for _, pattern := range input.parts { - for indexName := range r.elasticIndexes { - if util.IndexPatternMatches(pattern, indexName) { - return &Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index parsedPattern [%s] resolved to elasticsearch indices", input.parts)), - Reason: "We're not supporting common tables for Elastic.", - } - } +func mergeUseConnectors(lhs []ConnectorDecision, rhs []ConnectorDecision, rhsIndexName string) ([]ConnectorDecision, *Decision) { + for _, connDecisionRhs := range rhs { + foundMatching := false + for _, connDecisionLhs := range lhs { + if _, ok := connDecisionRhs.(*ConnectorDecisionElastic); ok { + if _, ok := connDecisionLhs.(*ConnectorDecisionElastic); ok { + foundMatching = true } } - - var matchedVirtualTables []string - var matchedTables []string - for _, pattern := range input.parts { - - // here we check against the config - - for indexName, index := range r.conf.IndexConfig { - if util.IndexPatternMatches(pattern, indexName) { - - targets := getTargets(index, pipeline) - - if slices.Contains(targets, config.ClickhouseTarget) { - if index.UseCommonTable { - matchedVirtualTables = append(matchedVirtualTables, indexName) - } else { - matchedTables = append(matchedTables, indexName) - } + if rhsClickhouse, ok := connDecisionRhs.(*ConnectorDecisionClickhouse); ok { + if lhsClickhouse, ok := connDecisionLhs.(*ConnectorDecisionClickhouse); ok { + if lhsClickhouse.ClickhouseTableName != rhsClickhouse.ClickhouseTableName { + return nil, &Decision{ + Reason: "Incompatible decisions for two indexes - they use a different ClickHouse table", + Err: fmt.Errorf("incompatible decisions for two indexes (different ClickHouse table) - %s and %s", connDecisionRhs, connDecisionLhs), } } - } - - // but maybe we should also check against the actual indexes ?? - if r.conf.AutodiscoveryEnabled { - for indexName, index := range r.clickhouseIndexes { - if util.IndexPatternMatches(pattern, indexName) { - if index.isVirtual { - matchedVirtualTables = append(matchedVirtualTables, indexName) - } else { - matchedTables = append(matchedTables, indexName) + if lhsClickhouse.IsCommonTable { + if !rhsClickhouse.IsCommonTable { + return nil, &Decision{ + Reason: "Incompatible decisions for two indexes - one uses the common table, the other does not", + Err: fmt.Errorf("incompatible decisions for two indexes (common table usage) - %s and %s", connDecisionRhs, connDecisionLhs), + } + } + lhsClickhouse.ClickhouseTables = append(lhsClickhouse.ClickhouseTables, rhsClickhouse.ClickhouseTables...) + lhsClickhouse.ClickhouseTables = util.Distinct(lhsClickhouse.ClickhouseTables) + } else { + if !reflect.DeepEqual(lhsClickhouse, rhsClickhouse) { + return nil, &Decision{ + Reason: "Incompatible decisions for two indexes - they use ClickHouse tables differently", + Err: fmt.Errorf("incompatible decisions for two indexes (different usage of ClickHouse) - %s and %s", connDecisionRhs, connDecisionLhs), } } } + foundMatching = true } } + } + if !foundMatching { + return nil, &Decision{ + Reason: "Incompatible decisions for two indexes - they use different connectors", + Err: fmt.Errorf("incompatible decisions for two indexes - they use different connectors: could not find connector %s used for index %s in decisions: %s", connDecisionRhs, rhsIndexName, lhs), + } + } + } - matchedTables = util.Distinct(matchedTables) - matchedVirtualTables = util.Distinct(matchedVirtualTables) - - switch { + return lhs, nil +} - case len(matchedTables) == 0 && len(matchedVirtualTables) == 0: - return &Decision{ - IsEmpty: true, - Reason: "No indexes found.", - } +func basicDecisionMerger(decisions []*Decision) *Decision { + if len(decisions) == 0 { + return &Decision{ + IsEmpty: true, + Reason: "No indexes matched, no decisions made.", + } + } + if len(decisions) == 1 { + return decisions[0] + } - case len(matchedTables) == 1 && len(matchedVirtualTables) == 0: - return &Decision{ - UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - ClickhouseTableName: matchedTables[0], - ClickhouseTables: []string{matchedTables[0]}, - }}, - Reason: "Pattern matches single standalone table.", - } + for _, decision := range decisions { + if decision == nil { + return &Decision{ + Reason: "Got a nil decision. This is a bug.", + Err: fmt.Errorf("could not resolve index"), + } + } - case len(matchedTables) == 0 && len(matchedVirtualTables) > 0: - return &Decision{ - UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - IsCommonTable: true, - ClickhouseTableName: common_table.TableName, - ClickhouseTables: matchedVirtualTables, - }}, - Reason: "Common table will be used. Querying multiple indexes.", - } + if decision.Err != nil { + return decision + } - default: - return &Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index pattern [%s] resolved to both standalone table indices: [%s] and common table indices: [%s]", input.source, matchedTables, matchedVirtualTables)), - Reason: "Both standalone table and common table indexes matches the pattern", - } + if decision.IsEmpty { + return &Decision{ + Reason: "Got an empty decision. This is a bug.", + Err: fmt.Errorf("could not resolve index, empty index: %s", decision.IndexPattern), } } - if input.source == common_table.TableName { + if decision.EnableABTesting != decisions[0].EnableABTesting { return &Decision{ - Err: fmt.Errorf("common table is not allowed to be queried directly"), - Reason: "It's internal table. Not allowed to be queried directly.", + Reason: "One of the indexes matching the pattern does A/B testing, while another index does not - inconsistency.", + Err: fmt.Errorf("inconsistent A/B testing configuration - index %s (A/B testing: %v) and index %s (A/B testing: %v)", decision.IndexPattern, decision.EnableABTesting, decisions[0].IndexPattern, decisions[0].EnableABTesting), } } + } - var virtualTableExists bool - - if r.conf.AutodiscoveryEnabled { - for indexName, index := range r.clickhouseIndexes { - if index.isVirtual && indexName == input.source { - virtualTableExists = true - break - } - } + var nonClosedDecisions []*Decision + for _, decision := range decisions { + if !decision.IsClosed { + nonClosedDecisions = append(nonClosedDecisions, decision) } + } + if len(nonClosedDecisions) == 0 { + // All indexes are closed + return &Decision{ + IsClosed: true, + Reason: "All indexes matching the pattern are closed.", + } + } + // Discard all closed indexes + decisions = nonClosedDecisions - if idxConfig, ok := cfg[input.source]; (ok && idxConfig.UseCommonTable) || (virtualTableExists) { + useConnectors := decisions[0].UseConnectors + + for i, decision := range decisions { + if i == 0 { + continue + } + if len(decision.UseConnectors) != len(decisions[0].UseConnectors) { return &Decision{ - UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - ClickhouseTableName: common_table.TableName, - ClickhouseTables: []string{input.source}, - IsCommonTable: true, - }}, - Reason: "Common table will be used.", + Reason: "Inconsistent number of connectors", + Err: fmt.Errorf("inconsistent number of connectors - index %s (%d connectors) and index %s (%d connectors)", decision.IndexPattern, len(decision.UseConnectors), decisions[0].IndexPattern, len(decisions[0].UseConnectors)), } } - return nil + newUseConnectors, mergeDecision := mergeUseConnectors(useConnectors, decision.UseConnectors, decision.IndexPattern) + if mergeDecision != nil { + return mergeDecision + } + useConnectors = newUseConnectors + } + + return &Decision{ + UseConnectors: useConnectors, + EnableABTesting: decisions[0].EnableABTesting, + Reason: "Merged decisions", } } diff --git a/quesma/table_resolver/table_resolver.go b/quesma/table_resolver/table_resolver.go index 50a8e4450..0e9ea49e8 100644 --- a/quesma/table_resolver/table_resolver.go +++ b/quesma/table_resolver/table_resolver.go @@ -11,7 +11,6 @@ import ( "quesma/quesma/config" "quesma/quesma/recovery" "sort" - "strings" "sync" "time" ) @@ -30,36 +29,52 @@ type parsedPattern struct { parts []string } +type patternSplitter struct { + name string + resolver func(pattern string) (parsedPattern, *Decision) +} + type basicResolver struct { name string - resolver func(pattern parsedPattern) *Decision + resolver func(part string) *Decision } +type decisionMerger struct { + name string + merger func(decisions []*Decision) *Decision +} + +// Compound resolver works in the following way: +// 1. patternSplitter splits a pattern, for example: logs* into concrete single indexes (e.g. logs1, logs2) +// 2. decisionLadder rules are evaluated on each index separately, resulting in a decision for each index +// 3. decisionMerger merges those decisions, making sure that the decisions are compatible. It yields a single decision. type compoundResolver struct { - decisionLadder []basicResolver + patternSplitter patternSplitter + decisionLadder []basicResolver + decisionMerger decisionMerger } func (ir *compoundResolver) resolve(indexName string) *Decision { - patterns := strings.Split(indexName, ",") - - input := parsedPattern{ - source: indexName, - isPattern: len(patterns) > 1 || strings.Contains(indexName, "*"), - parts: patterns, + input, decision := ir.patternSplitter.resolver(indexName) + if decision != nil { + decision.ResolverName = ir.patternSplitter.name + return decision } - for _, resolver := range ir.decisionLadder { - decision := resolver.resolver(input) + var decisions []*Decision + for _, part := range input.parts { + for _, resolver := range ir.decisionLadder { + decision := resolver.resolver(part) - if decision != nil { - decision.ResolverName = resolver.name - return decision + if decision != nil { + decision.ResolverName = resolver.name + decisions = append(decisions, decision) + break + } } } - return &Decision{ - Reason: "Could not resolve pattern. This is a bug.", - Err: fmt.Errorf("could not resolve index"), // TODO better error - } + + return ir.decisionMerger.merger(decisions) } // HACK: we should have separate config for each pipeline @@ -281,8 +296,11 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous pipelineName: IngestPipeline, resolver: &compoundResolver{ + patternSplitter: patternSplitter{ + name: "singleIndexSplitter", + resolver: singleIndexSplitter, + }, decisionLadder: []basicResolver{ - {"patternIsNotAllowed", patternIsNotAllowed}, {"kibanaInternal", resolveInternalElasticName}, {"disabled", makeIsDisabledInConfig(indexConf, IngestPipeline)}, @@ -291,6 +309,10 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous {"defaultWildcard", makeDefaultWildcard(quesmaConf, IngestPipeline)}, }, + decisionMerger: decisionMerger{ + name: "basicDecisionMerger", + merger: basicDecisionMerger, + }, }, recentDecisions: make(map[string]*Decision), } @@ -301,10 +323,13 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous pipelineName: QueryPipeline, resolver: &compoundResolver{ + patternSplitter: patternSplitter{ + name: "wildcardPatternSplitter", + resolver: res.wildcardPatternSplitter, + }, decisionLadder: []basicResolver{ // checking if we can handle the parsedPattern {"kibanaInternal", resolveInternalElasticName}, - {"searchAcrossConnectors", res.makeCheckIfPatternMatchesAllConnectors(QueryPipeline)}, {"disabled", makeIsDisabledInConfig(indexConf, QueryPipeline)}, {"singleIndex", res.singleIndex(indexConf, QueryPipeline)}, @@ -313,6 +338,10 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous // default action {"defaultWildcard", makeDefaultWildcard(quesmaConf, QueryPipeline)}, }, + decisionMerger: decisionMerger{ + name: "basicDecisionMerger", + merger: basicDecisionMerger, + }, }, recentDecisions: make(map[string]*Decision), } diff --git a/quesma/table_resolver/table_resolver_test.go b/quesma/table_resolver/table_resolver_test.go index 8ff588ae4..ae4b092a0 100644 --- a/quesma/table_resolver/table_resolver_test.go +++ b/quesma/table_resolver/table_resolver_test.go @@ -9,7 +9,6 @@ import ( "quesma/clickhouse" "quesma/common_table" "quesma/elasticsearch" - "quesma/end_user_errors" "quesma/quesma/config" "reflect" "strings" @@ -81,7 +80,7 @@ func TestTableResolver(t *testing.T) { pipeline: QueryPipeline, pattern: "*", expected: Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("")), + Err: fmt.Errorf("inconsistent A/B testing configuration"), }, indexConf: indexConf, }, @@ -91,7 +90,7 @@ func TestTableResolver(t *testing.T) { pattern: "*", clickhouseIndexes: []string{"index1", "index2"}, expected: Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("")), + Err: fmt.Errorf(""), }, indexConf: indexConf, }, @@ -102,7 +101,7 @@ func TestTableResolver(t *testing.T) { clickhouseIndexes: []string{"index1", "index2"}, elasticIndexes: []string{"index3"}, expected: Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("")), + Err: fmt.Errorf(""), }, indexConf: indexConf, }, @@ -217,7 +216,7 @@ func TestTableResolver(t *testing.T) { pattern: "index1,index2", elasticIndexes: []string{"index3"}, expected: Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("")), + Err: fmt.Errorf(""), }, indexConf: indexConf, }, @@ -227,11 +226,7 @@ func TestTableResolver(t *testing.T) { pattern: "index1,index-not-existing", elasticIndexes: []string{"index1,index-not-existing"}, expected: Decision{ - UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - ClickhouseTableName: "index1", - ClickhouseTables: []string{"index1"}, - IsCommonTable: false, - }}, + Err: fmt.Errorf(""), // index1 in Clickhouse, index-not-existing in Elastic ('*') }, indexConf: indexConf, }, @@ -270,20 +265,6 @@ func TestTableResolver(t *testing.T) { }, indexConf: indexConf, }, - { - name: "query pattern", - pipeline: QueryPipeline, - pattern: "indexa,index2", - virtualTables: []string{"index2"}, - expected: Decision{ - UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ - ClickhouseTableName: common_table.TableName, - ClickhouseTables: []string{"index2"}, - IsCommonTable: true, - }}, - }, - indexConf: indexConf, - }, { name: "query kibana internals", pipeline: QueryPipeline, @@ -338,6 +319,20 @@ func TestTableResolver(t *testing.T) { }, indexConf: indexConf, }, + { + name: "A/B testing (pattern)", + pipeline: QueryPipeline, + pattern: "logs*", + expected: Decision{ + EnableABTesting: true, + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: "logs", + ClickhouseTables: []string{"logs"}, + }, + &ConnectorDecisionElastic{}}, + }, + indexConf: indexConf, + }, { name: "query both connectors", pipeline: QueryPipeline, @@ -346,7 +341,7 @@ func TestTableResolver(t *testing.T) { clickhouseIndexes: []string{"index1"}, elasticIndexes: []string{"logs"}, expected: Decision{ - Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("")), + Err: fmt.Errorf(""), }, }, { @@ -381,7 +376,6 @@ func TestTableResolver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tableDiscovery := clickhouse.NewEmptyTableDiscovery() for _, index := range tt.clickhouseIndexes {