Skip to content

Commit

Permalink
Source resolver should resolve sources by investigating configured in…
Browse files Browse the repository at this point in the history
…dexes and not actual tables found in Clickhouse (#183)

Solves the issue of getting `Panic recovered: elasticsearch-only indexes should not be routed here at all` which should never be the case. 

This could occur when:
- index existed in Elasticsearch
- index was configured in Quesma
- matching table was absent in Clickhouse

Additionally:
- moved `IndexMatches` method to `elasticsearch` package
- created tests for `ResolveSources()`
  • Loading branch information
pivovarit authored May 22, 2024
1 parent 83e2c0a commit 880e84f
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 31 deletions.
16 changes: 3 additions & 13 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"mitmproxy/quesma/quesma/types"
"mitmproxy/quesma/telemetry"
"mitmproxy/quesma/util"
"regexp"
"slices"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -115,22 +114,13 @@ func (lm *LogManager) Close() {
_ = lm.chDb.Close()
}

func (lm *LogManager) matchIndex(ctx context.Context, indexNamePattern, indexName string) bool {
r, err := regexp.Compile("^" + strings.Replace(indexNamePattern, "*", ".*", -1) + "$")
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("invalid index name pattern [%s]: %s", indexNamePattern, err)
return false
}
return r.MatchString(indexName)
}

// Deprecated: use ResolveIndexes instead, this method will be removed once we switch to the new one
// Indexes can be in a form of wildcard, e.g. "index-*"
// If we have such index, we need to resolve it to a real table name.
func (lm *LogManager) ResolveTableName(ctx context.Context, index string) (result string) {
func (lm *LogManager) ResolveTableName(index string) (result string) {
lm.schemaLoader.TableDefinitions().
Range(func(k string, v *Table) bool {
if lm.matchIndex(ctx, index, k) {
if elasticsearch.IndexMatches(index, k) {
result = k
return false
}
Expand Down Expand Up @@ -163,7 +153,7 @@ func (lm *LogManager) ResolveIndexes(ctx context.Context, patterns string) (resu
} else {
lm.schemaLoader.TableDefinitions().
Range(func(tableName string, v *Table) bool {
if lm.matchIndex(ctx, patterns, tableName) {
if elasticsearch.IndexMatches(patterns, tableName) {
results = append(results, tableName)
}
return true
Expand Down
15 changes: 14 additions & 1 deletion quesma/elasticsearch/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package elasticsearch

import "strings"
import (
"mitmproxy/quesma/logger"
"regexp"
"strings"
)

const (
AllIndexesAliasIndexName = "_all"
Expand All @@ -14,3 +18,12 @@ func IsIndexPattern(index string) bool {
func IsInternalIndex(index string) bool {
return strings.HasPrefix(index, internalIndexPrefix)
}

func IndexMatches(indexNamePattern, indexName string) bool {
r, err := regexp.Compile("^" + strings.Replace(indexNamePattern, "*", ".*", -1) + "$")
if err != nil {
logger.Error().Msgf("invalid index name pattern [%s]: %s", indexNamePattern, err)
return false
}
return r.MatchString(indexName)
}
21 changes: 10 additions & 11 deletions quesma/elasticsearch/index_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type (
startable
ReloadIndices()
GetSources() Sources
GetSourceNames() map[string]interface{}
GetSourceNamesMatching(indexPattern string) map[string]interface{}
GetSourceNames() map[string]bool
GetSourceNamesMatching(indexPattern string) map[string]bool
}
indexManagement struct {
ElasticsearchUrl string
Expand Down Expand Up @@ -51,17 +51,17 @@ func (im *indexManagement) GetSources() Sources {
return *im.sources.Load()
}

func (im *indexManagement) GetSourceNames() map[string]interface{} {
names := make(map[string]interface{})
func (im *indexManagement) GetSourceNames() map[string]bool {
names := make(map[string]bool)
sources := *im.sources.Load()
for _, stream := range sources.DataStreams {
names[stream.Name] = struct{}{}
names[stream.Name] = true
}
for _, index := range sources.Indices {
names[index.Name] = struct{}{}
names[index.Name] = true
}
for _, alias := range sources.Aliases {
names[alias.Name] = struct{}{}
names[alias.Name] = true
}
for key := range names {
if strings.TrimSpace(key) == "" {
Expand All @@ -71,16 +71,16 @@ func (im *indexManagement) GetSourceNames() map[string]interface{} {
return names
}

func (im *indexManagement) GetSourceNamesMatching(indexPattern string) map[string]interface{} {
func (im *indexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool {
all := im.GetSourceNames()
filtered := make(map[string]interface{})
filtered := make(map[string]bool)

if indexPattern == "*" || indexPattern == "_all" || indexPattern == "" {
return all
} else {
for key := range all {
if config.MatchName(indexPattern, key) {
filtered[key] = struct{}{}
filtered[key] = true
}
}
}
Expand All @@ -106,6 +106,5 @@ func (im *indexManagement) Start() {
}

func (im *indexManagement) Stop() {

im.cancel()
}
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type AsyncQuery struct {
}

func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body []byte, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.logManager)
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im)

switch sources {
case sourceBoth:
Expand Down
10 changes: 6 additions & 4 deletions quesma/quesma/source_resolver.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package quesma

import (
"context"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/elasticsearch"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/quesma/config"
Expand All @@ -17,7 +15,7 @@ const (
sourceNone = "none"
)

func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, lm *clickhouse.LogManager) (string, []string, []string) {
func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement) (string, []string, []string) {
if elasticsearch.IsIndexPattern(indexPattern) {
matchesElastic := []string{}
matchesClickhouse := []string{}
Expand All @@ -29,7 +27,11 @@ func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elas
}
}

matchesClickhouse = append(matchesClickhouse, lm.ResolveIndexes(context.Background(), pattern)...)
for indexName, indexConfig := range cfg.IndexConfig {
if elasticsearch.IndexMatches(pattern, indexName) && indexConfig.Enabled {
matchesClickhouse = append(matchesClickhouse, indexName)
}
}
}
slices.Sort(matchesElastic)
slices.Sort(matchesClickhouse)
Expand Down
128 changes: 128 additions & 0 deletions quesma/quesma/source_resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package quesma

import (
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/elasticsearch"
"mitmproxy/quesma/quesma/config"
"testing"
)

func TestResolveSources(t *testing.T) {
type args struct {
indexPattern string
cfg config.QuesmaConfiguration
im elasticsearch.IndexManagement
}
tests := []struct {
name string
args args
want string
}{
{
name: "Index only in Clickhouse,pattern:",
args: args{
indexPattern: "test",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"test": {Enabled: true}}},
im: NewFixedIndexManagement(),
},
want: sourceClickhouse,
},
{
name: "Index only in Clickhouse,pattern:",
args: args{
indexPattern: "*",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"test": {Enabled: true}}},
im: NewFixedIndexManagement(),
},
want: sourceClickhouse,
},
{
name: "Index only in Elasticsearch,pattern:",
args: args{
indexPattern: "test",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}},
im: NewFixedIndexManagement("test"),
},
want: sourceElasticsearch,
},
{
name: "Index only in Elasticsearch,pattern:",
args: args{
indexPattern: "*",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}},
im: NewFixedIndexManagement("test"),
},
want: sourceElasticsearch,
},
{
name: "Indexes both in Elasticsearch and Clickhouse",
args: args{
indexPattern: "*",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"kibana-sample-data-logs": {Enabled: true}}},
im: NewFixedIndexManagement("logs-generic-default"),
},
want: sourceBoth,
},
{
name: "Indexes both in Elasticsearch and Clickhouse, but explicitly disabled",
args: args{
indexPattern: "*",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"logs-generic-default": {Enabled: false}}},
im: NewFixedIndexManagement("logs-generic-default"),
},
want: sourceElasticsearch,
},
{
name: "Index neither in Clickhouse nor in Elasticsearch",
args: args{
indexPattern: "*",
cfg: config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}},
im: NewFixedIndexManagement(),
},
want: sourceNone,
},
}
for _, tt := range tests {
t.Run(tt.name+tt.args.indexPattern, func(t *testing.T) {
got, _, _ := ResolveSources(tt.args.indexPattern, tt.args.cfg, tt.args.im)
assert.Equalf(t, tt.want, got, "ResolveSources(%v, %v, %v)", tt.args.indexPattern, tt.args.cfg, tt.args.im)
})
}
}

func NewFixedIndexManagement(indexes ...string) elasticsearch.IndexManagement {
return stubIndexManagement{indexes: indexes}
}

type stubIndexManagement struct {
indexes []string
}

func (s stubIndexManagement) Start() {}
func (s stubIndexManagement) Stop() {}
func (s stubIndexManagement) ReloadIndices() {}
func (s stubIndexManagement) GetSources() elasticsearch.Sources {
var dataStreams = []elasticsearch.DataStream{}
for _, index := range s.indexes {
dataStreams = append(dataStreams, elasticsearch.DataStream{Name: index})
}
return elasticsearch.Sources{DataStreams: dataStreams}
}

func (s stubIndexManagement) GetSourceNames() map[string]bool {
var result = make(map[string]bool)
for _, index := range s.indexes {
result[index] = true
}
return result
}

func (s stubIndexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool {
var result = make(map[string]bool)
for _, index := range s.indexes {
if elasticsearch.IndexMatches(indexPattern, index) {
result[index] = true
}
}
return result
}
2 changes: 1 addition & 1 deletion quesma/quesma/termsenum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *clickhouse.LogManager,
qmc *ui.QuesmaManagementConsole) ([]byte, error) {
if resolvedTableName := lm.ResolveTableName(ctx, index); resolvedTableName == "" {
if resolvedTableName := lm.ResolveTableName(index); resolvedTableName == "" {
errorMsg := fmt.Sprintf("terms enum failed - could not resolve table name for index: %s", index)
logger.Error().Msg(errorMsg)
return nil, fmt.Errorf(errorMsg)
Expand Down

0 comments on commit 880e84f

Please sign in to comment.