Skip to content

Commit

Permalink
Removing bizarre import cycles pt 1 (#751)
Browse files Browse the repository at this point in the history
This PR aims to remove some odd dependencies which still prevents us
from adding "physical" table information to schema registry.

* removal of `"quesma/elasticsearch"` import from the ClickHouse package
* Remove deprecated `ResolveTableName`
* `ResolveIndexes` renamed to more appropriate `ResolveIndexPattern`
* further cleanup, around the IngestProcessor 😉
  • Loading branch information
mieciu authored Sep 12, 2024
1 parent 0b9707c commit bdfac95
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 199 deletions.
56 changes: 13 additions & 43 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"math"
"quesma/concurrent"
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/index"
"quesma/logger"
Expand All @@ -24,11 +23,11 @@ import (
)

const (
timestampFieldName = "@timestamp" // it's always DateTime64 for now, don't want to waste time changing that, we don't seem to use that anyway
timestampFieldName = "@timestamp" // it's always DateTime64 for now, don't want to waste time changing that, we don't seem to use that anyway
allElasticsearchIndicesPattern = "_all"
)

type (
// LogManager should be renamed to Connector -> TODO !!!
LogManager struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -132,54 +131,39 @@ func (lm *LogManager) Close() {
_ = lm.chDb.Close()
}

// Deprecated: use ResolveIndexes instead, this method will be removed once we switch to the new one
// Indexes can be in a form of wildcard, e.g. "index-*"
// If we have such index, we need to resolve it to a real table name.
func (lm *LogManager) ResolveTableName(index string) (result string) {
lm.tableDiscovery.TableDefinitions().
Range(func(k string, v *Table) bool {
if elasticsearch.IndexMatches(index, k) {
result = k
return false
}
return true
})
return result
}

// Indexes can be in a form of wildcard, e.g. "index-*" or even contain multiple patterns like "index-*,logs-*",
// this method returns all matching indexes
// empty pattern means all indexes
// "_all" index name means all indexes
func (lm *LogManager) ResolveIndexes(ctx context.Context, patterns string) (results []string, err error) {
// ResolveIndexPattern - takes incoming index pattern (e.g. "index-*" or multiple patterns like "index-*,logs-*")
// and returns all matching indexes. Empty pattern means all indexes, "_all" index name means all indexes
//
// Note: Empty pattern means all indexes, "_all" index name means all indexes
func (lm *LogManager) ResolveIndexPattern(ctx context.Context, pattern string) (results []string, err error) {
if err = lm.tableDiscovery.TableDefinitionsFetchError(); err != nil {
return nil, err
}

results = make([]string, 0)
if strings.Contains(patterns, ",") {
for _, pattern := range strings.Split(patterns, ",") {
if pattern == elasticsearch.AllIndexesAliasIndexName || pattern == "" {
if strings.Contains(pattern, ",") {
for _, pattern := range strings.Split(pattern, ",") {
if pattern == allElasticsearchIndicesPattern || pattern == "" {
results = lm.tableDiscovery.TableDefinitions().Keys()
slices.Sort(results)
return results, nil
} else {
indexes, err := lm.ResolveIndexes(ctx, pattern)
indexes, err := lm.ResolveIndexPattern(ctx, pattern)
if err != nil {
return nil, err
}
results = append(results, indexes...)
}
}
} else {
if patterns == elasticsearch.AllIndexesAliasIndexName || len(patterns) == 0 {
if pattern == allElasticsearchIndicesPattern || len(pattern) == 0 {
results = lm.tableDiscovery.TableDefinitions().Keys()
slices.Sort(results)
return results, nil
} else {
lm.tableDiscovery.TableDefinitions().
Range(func(tableName string, v *Table) bool {
if elasticsearch.IndexMatches(patterns, tableName) {
if util.IndexPatternMatches(pattern, tableName) {
results = append(results, tableName)
}
return true
Expand Down Expand Up @@ -294,20 +278,6 @@ func (lm *LogManager) CheckIfConnectedPaidService(service PaidServiceName) (retu
return returnedErr
}

func Indexes(m SchemaMap) string {
var result strings.Builder
for col := range m {
index := GetIndexStatement(col)
if index != "" {
result.WriteString(",\n")
result.WriteString(util.Indent(1))
result.WriteString(index.Statement())
}
}
result.WriteString(",\n")
return result.String()
}

func (lm *LogManager) FindTable(tableName string) (result *Table) {
tableNamePattern := index.TableNamePatternRegexp(tableName)
lm.tableDiscovery.TableDefinitions().
Expand Down
4 changes: 2 additions & 2 deletions quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,9 +776,9 @@ func TestLogManager_ResolveIndexes(t *testing.T) {
var tableDefinitions = atomic.Pointer[TableMap]{}
tableDefinitions.Store(tt.tables)
lm := &LogManager{tableDiscovery: NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tt.tables)}
indexes, err := lm.ResolveIndexes(context.Background(), tt.patterns)
indexes, err := lm.ResolveIndexPattern(context.Background(), tt.patterns)
assert.NoError(t, err)
assert.Equalf(t, tt.resolved, indexes, tt.patterns, "ResolveIndexes(%v)", tt.patterns)
assert.Equalf(t, tt.resolved, indexes, tt.patterns, "ResolveIndexPattern(%v)", tt.patterns)
})
}
}
Expand Down
14 changes: 1 addition & 13 deletions quesma/elasticsearch/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
package elasticsearch

import (
"quesma/logger"
"regexp"
"strings"
)

const (
AllIndexesAliasIndexName = "_all"
internalIndexPrefix = "."
internalIndexPrefix = "."
)

func IsIndexPattern(index string) bool {
Expand All @@ -20,12 +17,3 @@ func IsIndexPattern(index string) bool {
func IsInternalIndex(index string) bool {
return strings.HasPrefix(index, internalIndexPrefix)
}

func IndexMatches(indexNamePattern, indexName string) bool {
r, err := regexp.Compile("^" + strings.Replace(indexNamePattern, "*", ".*", -1) + "$")
if err != nil {
logger.Error().Msgf("invalid index name pattern [%s]: %s", indexNamePattern, err)
return false
}
return r.MatchString(indexName)
}
45 changes: 0 additions & 45 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
chLib "quesma/clickhouse"
"quesma/concurrent"
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/index"
"quesma/jsonprocessor"
Expand All @@ -22,7 +21,6 @@ import (
"quesma/stats"
"quesma/telemetry"
"quesma/util"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -119,49 +117,6 @@ func (ip *IngestProcessor) Close() {
_ = ip.chDb.Close()
}

// Indexes can be in a form of wildcard, e.g. "index-*" or even contain multiple patterns like "index-*,logs-*",
// this method returns all matching indexes
// empty pattern means all indexes
// "_all" index name means all indexes
func (ip *IngestProcessor) ResolveIndexes(ctx context.Context, patterns string) (results []string, err error) {
if err = ip.tableDiscovery.TableDefinitionsFetchError(); err != nil {
return nil, err
}

results = make([]string, 0)
if strings.Contains(patterns, ",") {
for _, pattern := range strings.Split(patterns, ",") {
if pattern == elasticsearch.AllIndexesAliasIndexName || pattern == "" {
results = ip.tableDiscovery.TableDefinitions().Keys()
slices.Sort(results)
return results, nil
} else {
indexes, err := ip.ResolveIndexes(ctx, pattern)
if err != nil {
return nil, err
}
results = append(results, indexes...)
}
}
} else {
if patterns == elasticsearch.AllIndexesAliasIndexName || len(patterns) == 0 {
results = ip.tableDiscovery.TableDefinitions().Keys()
slices.Sort(results)
return results, nil
} else {
ip.tableDiscovery.TableDefinitions().
Range(func(tableName string, v *chLib.Table) bool {
if elasticsearch.IndexMatches(patterns, tableName) {
results = append(results, tableName)
}
return true
})
}
}

return util.Distinct(results), nil
}

// updates also Table TODO stop updating table here, find a better solution
func addOurFieldsToCreateTableQuery(q string, config *chLib.ChTableConfig, table *chLib.Table) string {
if len(config.Attributes) == 0 {
Expand Down
88 changes: 0 additions & 88 deletions quesma/ingest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,91 +816,3 @@ func TestLogManager_GetTable(t *testing.T) {
})
}
}

func TestLogManager_ResolveIndexes(t *testing.T) {
tests := []struct {
name string
tables *TableMap
patterns string
resolved []string
}{
{
name: "empty table map, non-empty pattern",
tables: NewTableMap(),
patterns: "table",
resolved: []string{},
},
{
name: "empty table map, empty pattern",
tables: NewTableMap(),
patterns: "table",
resolved: []string{},
},
{
name: "non-empty table map, empty pattern",
tables: newTableMap("table1", "table2"),
patterns: "",
resolved: []string{"table1", "table2"},
},
{
name: "non-empty table map, _all pattern",
tables: newTableMap("table1", "table2"),
patterns: "_all",
resolved: []string{"table1", "table2"},
},
{
name: "non-empty table map, * pattern",
tables: newTableMap("table1", "table2"),
patterns: "*",
resolved: []string{"table1", "table2"},
},
{
name: "non-empty table map, *,* pattern",
tables: newTableMap("table1", "table2"),
patterns: "*,*",
resolved: []string{"table1", "table2"},
},
{
name: "non-empty table map, table* pattern",
tables: newTableMap("table1", "table2"),
patterns: "table*",
resolved: []string{"table1", "table2"},
},
{
name: "non-empty table map, table1,table2 pattern",
tables: newTableMap("table1", "table2"),
patterns: "table1,table2",
resolved: []string{"table1", "table2"},
},
{
name: "non-empty table map, table1 pattern",
tables: newTableMap("table1", "table2"),
patterns: "table1",
resolved: []string{"table1"},
},
{
name: "non-empty table map, table2 pattern",
tables: newTableMap("table1", "table2"),
patterns: "table2",
resolved: []string{"table2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var tableDefinitions = atomic.Pointer[TableMap]{}
tableDefinitions.Store(tt.tables)
ip := &IngestProcessor{tableDiscovery: clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tt.tables)}
indexes, err := ip.ResolveIndexes(context.Background(), tt.patterns)
assert.NoError(t, err)
assert.Equalf(t, tt.resolved, indexes, tt.patterns, "ResolveIndexes(%v)", tt.patterns)
})
}
}

func newTableMap(tables ...string) *TableMap {
newMap := concurrent.NewMap[string, *clickhouse.Table]()
for _, table := range tables {
newMap.Store(table, &clickhouse.Table{Name: table})
}
return newMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func HandleFieldCaps(ctx context.Context, cfg *config.QuesmaConfiguration, schem
if len(cfg.IndexConfig[index].Override) > 0 {
index = cfg.IndexConfig[index].Override
}
indexes, err := lm.ResolveIndexes(ctx, index)
indexes, err := lm.ResolveIndexPattern(ctx, index)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/functionality/terms_enum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (

func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *clickhouse.LogManager,
schemaRegistry schema.Registry, qmc *ui.QuesmaManagementConsole) ([]byte, error) {
if resolvedTableName := lm.ResolveTableName(index); resolvedTableName == "" {
if indices, err := lm.ResolveIndexPattern(ctx, index); err != nil || len(indices) != 1 { // multi index terms enum is not yet supported
errorMsg := fmt.Sprintf("terms enum failed - could not resolve table name for index: %s", index)
logger.Error().Msg(errorMsg)
return nil, fmt.Errorf(errorMsg)
} else {
return handleTermsEnumRequest(ctx, body, &queryparser.ClickhouseQueryTranslator{
ClickhouseLM: lm, Table: lm.FindTable(resolvedTableName), SchemaRegistry: schemaRegistry, Ctx: context.Background(),
ClickhouseLM: lm, Table: lm.FindTable(indices[0]), SchemaRegistry: schemaRegistry, Ctx: context.Background(),
}, qmc)
}
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
}
} else {

feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexes)
feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexPattern)

rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true)
response := rawResponse.response
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id str

// returns -1 when table name could not be resolved
func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int64, error) {
indexes, err := q.logManager.ResolveIndexes(ctx, indexPattern)
indexes, err := q.logManager.ResolveIndexPattern(ctx, indexPattern)
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/source_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ResolveSources(indexPattern string, cfg *config.QuesmaConfiguration, im ela
}

for indexName, indexConfig := range cfg.IndexConfig {
if elasticsearch.IndexMatches(pattern, indexName) && !indexConfig.Disabled {
if util.IndexPatternMatches(pattern, indexName) && !indexConfig.Disabled {
matchesClickhouse = append(matchesClickhouse, indexName)
}
}
Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/source_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"quesma/elasticsearch"
"quesma/quesma/config"
"quesma/util"
"testing"
)

Expand Down Expand Up @@ -122,7 +123,7 @@ func (s stubIndexManagement) GetSourceNames() map[string]bool {
func (s stubIndexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool {
var result = make(map[string]bool)
for _, index := range s.indexes {
if elasticsearch.IndexMatches(indexPattern, index) {
if util.IndexPatternMatches(indexPattern, index) {
result[index] = true
}
}
Expand Down
4 changes: 3 additions & 1 deletion quesma/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Elastic-2.0
package schema

import "strings"
import (
"strings"
)

type (
Schema struct {
Expand Down
Loading

0 comments on commit bdfac95

Please sign in to comment.