From 880e84f738353b1346f8c4710333e3be1ee76b5b Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Wed, 22 May 2024 16:33:37 +0200 Subject: [PATCH] Source resolver should resolve sources by investigating configured indexes and not actual tables found in Clickhouse (#183) Solves the issue of getting `Panic recovered: elasticsearch-only indexes should not be routed here at all` which should never be the case. This could occur when: - index existed in Elasticsearch - index was configured in Quesma - matching table was absent in Clickhouse Additionally: - moved `IndexMatches` method to `elasticsearch` package - created tests for `ResolveSources()` --- quesma/clickhouse/clickhouse.go | 16 +-- quesma/elasticsearch/index.go | 15 ++- quesma/elasticsearch/index_management.go | 21 ++-- quesma/quesma/search.go | 2 +- quesma/quesma/source_resolver.go | 10 +- quesma/quesma/source_resolver_test.go | 128 +++++++++++++++++++++++ quesma/quesma/termsenum/terms_enum.go | 2 +- 7 files changed, 163 insertions(+), 31 deletions(-) create mode 100644 quesma/quesma/source_resolver_test.go diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 5682e9567..c5342fbbc 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -16,7 +16,6 @@ import ( "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/telemetry" "mitmproxy/quesma/util" - "regexp" "slices" "strings" "sync/atomic" @@ -115,22 +114,13 @@ func (lm *LogManager) Close() { _ = lm.chDb.Close() } -func (lm *LogManager) matchIndex(ctx context.Context, indexNamePattern, indexName string) bool { - r, err := regexp.Compile("^" + strings.Replace(indexNamePattern, "*", ".*", -1) + "$") - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("invalid index name pattern [%s]: %s", indexNamePattern, err) - return false - } - return r.MatchString(indexName) -} - // Deprecated: use ResolveIndexes instead, this method will be removed once we switch to the new one // Indexes can be in a form of wildcard, e.g. "index-*" // If we have such index, we need to resolve it to a real table name. -func (lm *LogManager) ResolveTableName(ctx context.Context, index string) (result string) { +func (lm *LogManager) ResolveTableName(index string) (result string) { lm.schemaLoader.TableDefinitions(). Range(func(k string, v *Table) bool { - if lm.matchIndex(ctx, index, k) { + if elasticsearch.IndexMatches(index, k) { result = k return false } @@ -163,7 +153,7 @@ func (lm *LogManager) ResolveIndexes(ctx context.Context, patterns string) (resu } else { lm.schemaLoader.TableDefinitions(). Range(func(tableName string, v *Table) bool { - if lm.matchIndex(ctx, patterns, tableName) { + if elasticsearch.IndexMatches(patterns, tableName) { results = append(results, tableName) } return true diff --git a/quesma/elasticsearch/index.go b/quesma/elasticsearch/index.go index 81cf451e3..0bf511129 100644 --- a/quesma/elasticsearch/index.go +++ b/quesma/elasticsearch/index.go @@ -1,6 +1,10 @@ package elasticsearch -import "strings" +import ( + "mitmproxy/quesma/logger" + "regexp" + "strings" +) const ( AllIndexesAliasIndexName = "_all" @@ -14,3 +18,12 @@ func IsIndexPattern(index string) bool { func IsInternalIndex(index string) bool { return strings.HasPrefix(index, internalIndexPrefix) } + +func IndexMatches(indexNamePattern, indexName string) bool { + r, err := regexp.Compile("^" + strings.Replace(indexNamePattern, "*", ".*", -1) + "$") + if err != nil { + logger.Error().Msgf("invalid index name pattern [%s]: %s", indexNamePattern, err) + return false + } + return r.MatchString(indexName) +} diff --git a/quesma/elasticsearch/index_management.go b/quesma/elasticsearch/index_management.go index 864aa590f..da421a2b0 100644 --- a/quesma/elasticsearch/index_management.go +++ b/quesma/elasticsearch/index_management.go @@ -15,8 +15,8 @@ type ( startable ReloadIndices() GetSources() Sources - GetSourceNames() map[string]interface{} - GetSourceNamesMatching(indexPattern string) map[string]interface{} + GetSourceNames() map[string]bool + GetSourceNamesMatching(indexPattern string) map[string]bool } indexManagement struct { ElasticsearchUrl string @@ -51,17 +51,17 @@ func (im *indexManagement) GetSources() Sources { return *im.sources.Load() } -func (im *indexManagement) GetSourceNames() map[string]interface{} { - names := make(map[string]interface{}) +func (im *indexManagement) GetSourceNames() map[string]bool { + names := make(map[string]bool) sources := *im.sources.Load() for _, stream := range sources.DataStreams { - names[stream.Name] = struct{}{} + names[stream.Name] = true } for _, index := range sources.Indices { - names[index.Name] = struct{}{} + names[index.Name] = true } for _, alias := range sources.Aliases { - names[alias.Name] = struct{}{} + names[alias.Name] = true } for key := range names { if strings.TrimSpace(key) == "" { @@ -71,16 +71,16 @@ func (im *indexManagement) GetSourceNames() map[string]interface{} { return names } -func (im *indexManagement) GetSourceNamesMatching(indexPattern string) map[string]interface{} { +func (im *indexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool { all := im.GetSourceNames() - filtered := make(map[string]interface{}) + filtered := make(map[string]bool) if indexPattern == "*" || indexPattern == "_all" || indexPattern == "" { return all } else { for key := range all { if config.MatchName(indexPattern, key) { - filtered[key] = struct{}{} + filtered[key] = true } } } @@ -106,6 +106,5 @@ func (im *indexManagement) Start() { } func (im *indexManagement) Stop() { - im.cancel() } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 00edb2ee3..258c6054e 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -124,7 +124,7 @@ type AsyncQuery struct { } func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body []byte, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) { - sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.logManager) + sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im) switch sources { case sourceBoth: diff --git a/quesma/quesma/source_resolver.go b/quesma/quesma/source_resolver.go index fb751b1db..89362d311 100644 --- a/quesma/quesma/source_resolver.go +++ b/quesma/quesma/source_resolver.go @@ -1,8 +1,6 @@ package quesma import ( - "context" - "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/elasticsearch" "mitmproxy/quesma/logger" "mitmproxy/quesma/quesma/config" @@ -17,7 +15,7 @@ const ( sourceNone = "none" ) -func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, lm *clickhouse.LogManager) (string, []string, []string) { +func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement) (string, []string, []string) { if elasticsearch.IsIndexPattern(indexPattern) { matchesElastic := []string{} matchesClickhouse := []string{} @@ -29,7 +27,11 @@ func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elas } } - matchesClickhouse = append(matchesClickhouse, lm.ResolveIndexes(context.Background(), pattern)...) + for indexName, indexConfig := range cfg.IndexConfig { + if elasticsearch.IndexMatches(pattern, indexName) && indexConfig.Enabled { + matchesClickhouse = append(matchesClickhouse, indexName) + } + } } slices.Sort(matchesElastic) slices.Sort(matchesClickhouse) diff --git a/quesma/quesma/source_resolver_test.go b/quesma/quesma/source_resolver_test.go new file mode 100644 index 000000000..05b259c3f --- /dev/null +++ b/quesma/quesma/source_resolver_test.go @@ -0,0 +1,128 @@ +package quesma + +import ( + "github.com/stretchr/testify/assert" + "mitmproxy/quesma/elasticsearch" + "mitmproxy/quesma/quesma/config" + "testing" +) + +func TestResolveSources(t *testing.T) { + type args struct { + indexPattern string + cfg config.QuesmaConfiguration + im elasticsearch.IndexManagement + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Index only in Clickhouse,pattern:", + args: args{ + indexPattern: "test", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"test": {Enabled: true}}}, + im: NewFixedIndexManagement(), + }, + want: sourceClickhouse, + }, + { + name: "Index only in Clickhouse,pattern:", + args: args{ + indexPattern: "*", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"test": {Enabled: true}}}, + im: NewFixedIndexManagement(), + }, + want: sourceClickhouse, + }, + { + name: "Index only in Elasticsearch,pattern:", + args: args{ + indexPattern: "test", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}}, + im: NewFixedIndexManagement("test"), + }, + want: sourceElasticsearch, + }, + { + name: "Index only in Elasticsearch,pattern:", + args: args{ + indexPattern: "*", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}}, + im: NewFixedIndexManagement("test"), + }, + want: sourceElasticsearch, + }, + { + name: "Indexes both in Elasticsearch and Clickhouse", + args: args{ + indexPattern: "*", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"kibana-sample-data-logs": {Enabled: true}}}, + im: NewFixedIndexManagement("logs-generic-default"), + }, + want: sourceBoth, + }, + { + name: "Indexes both in Elasticsearch and Clickhouse, but explicitly disabled", + args: args{ + indexPattern: "*", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"logs-generic-default": {Enabled: false}}}, + im: NewFixedIndexManagement("logs-generic-default"), + }, + want: sourceElasticsearch, + }, + { + name: "Index neither in Clickhouse nor in Elasticsearch", + args: args{ + indexPattern: "*", + cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}}, + im: NewFixedIndexManagement(), + }, + want: sourceNone, + }, + } + for _, tt := range tests { + t.Run(tt.name+tt.args.indexPattern, func(t *testing.T) { + got, _, _ := ResolveSources(tt.args.indexPattern, tt.args.cfg, tt.args.im) + assert.Equalf(t, tt.want, got, "ResolveSources(%v, %v, %v)", tt.args.indexPattern, tt.args.cfg, tt.args.im) + }) + } +} + +func NewFixedIndexManagement(indexes ...string) elasticsearch.IndexManagement { + return stubIndexManagement{indexes: indexes} +} + +type stubIndexManagement struct { + indexes []string +} + +func (s stubIndexManagement) Start() {} +func (s stubIndexManagement) Stop() {} +func (s stubIndexManagement) ReloadIndices() {} +func (s stubIndexManagement) GetSources() elasticsearch.Sources { + var dataStreams = []elasticsearch.DataStream{} + for _, index := range s.indexes { + dataStreams = append(dataStreams, elasticsearch.DataStream{Name: index}) + } + return elasticsearch.Sources{DataStreams: dataStreams} +} + +func (s stubIndexManagement) GetSourceNames() map[string]bool { + var result = make(map[string]bool) + for _, index := range s.indexes { + result[index] = true + } + return result +} + +func (s stubIndexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool { + var result = make(map[string]bool) + for _, index := range s.indexes { + if elasticsearch.IndexMatches(indexPattern, index) { + result[index] = true + } + } + return result +} diff --git a/quesma/quesma/termsenum/terms_enum.go b/quesma/quesma/termsenum/terms_enum.go index d1c70677d..ceb2315c8 100644 --- a/quesma/quesma/termsenum/terms_enum.go +++ b/quesma/quesma/termsenum/terms_enum.go @@ -16,7 +16,7 @@ import ( func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *clickhouse.LogManager, qmc *ui.QuesmaManagementConsole) ([]byte, error) { - if resolvedTableName := lm.ResolveTableName(ctx, index); resolvedTableName == "" { + if resolvedTableName := lm.ResolveTableName(index); resolvedTableName == "" { errorMsg := fmt.Sprintf("terms enum failed - could not resolve table name for index: %s", index) logger.Error().Msg(errorMsg) return nil, fmt.Errorf(errorMsg)