Skip to content

Commit

Permalink
Fix resolving indices when tables are auto-discovered (#765)
Browse files Browse the repository at this point in the history
Users who haven't specified any table configurations aren't able to
create data views today.

1. Because `_resolve` endpoint relies on schema registry, which - as
turned out - has never been populated when the index configuration has
been missing.
2. Because `_field_caps` were **not** routed properly.
`matchedAgainstPattern` did rely only on config, not on schema registry,
so it couldn't route queries to auto discovered tables. Therefore,
timestamp field list has not been expanding and data view could not have
been created.

This PR addresses both issues.

**How it works now:**
<img width="1173" alt="image"
src="https://github.com/user-attachments/assets/5845efb1-ecc1-4d56-86d0-1438c9cb888e">
  • Loading branch information
mieciu authored Sep 13, 2024
1 parent 54d168a commit 1e0eb3e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 31 deletions.
7 changes: 4 additions & 3 deletions quesma/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TableDiscovery interface {
LastAccessTime() time.Time
LastReloadTime() time.Time
ForceReloadCh() <-chan chan<- struct{}
AutodiscoveryEnabled() bool
}

type tableDiscovery struct {
Expand Down Expand Up @@ -100,8 +101,8 @@ func (td *tableDiscovery) TableDefinitionsFetchError() error {
return td.ReloadTablesError
}

func (td *tableDiscovery) TableAutodiscoveryEnabled() bool {
return len(td.cfg.IndexConfig) == 0
func (td *tableDiscovery) AutodiscoveryEnabled() bool {
return td.cfg.IndexAutodiscoveryEnabled()
}

func (td *tableDiscovery) LastAccessTime() time.Time {
Expand Down Expand Up @@ -138,7 +139,7 @@ func (td *tableDiscovery) ReloadTableDefinitions() {
td.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix())
return
} else {
if td.TableAutodiscoveryEnabled() {
if td.AutodiscoveryEnabled() {
configuredTables = td.autoConfigureTables(tables, databaseName)
} else {
configuredTables = td.configureTables(tables, databaseName)
Expand Down
11 changes: 3 additions & 8 deletions quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,6 @@ func (c *QuesmaConfiguration) validateSchemaConfiguration(config IndexConfigurat
return err
}

//func countPrimaryKeys(config IndexConfiguration) (count int) {
// for _, configuration := range config.SchemaOverrides.Fields {
// if configuration.IsPrimaryKey {
// count++
// }
// }
// return count
//}
func (c *QuesmaConfiguration) IndexAutodiscoveryEnabled() bool {
return len(c.IndexConfig) == 0
}
18 changes: 16 additions & 2 deletions quesma/quesma/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"quesma/quesma/config"
"quesma/quesma/mux"
"quesma/quesma/types"
"quesma/schema"
"quesma/tracing"
"strings"
)
Expand Down Expand Up @@ -44,7 +45,7 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration) mux.Reque
})
}

func matchedAgainstPattern(configuration *config.QuesmaConfiguration) mux.RequestMatcher {
func matchedAgainstPattern(configuration *config.QuesmaConfiguration, sr schema.Registry) mux.RequestMatcher {
return mux.RequestMatcherFunc(func(req *mux.Request) bool {
indexPattern := elasticsearch.NormalizePattern(req.Params["index"])
if elasticsearch.IsInternalIndex(indexPattern) {
Expand All @@ -61,7 +62,13 @@ func matchedAgainstPattern(configuration *config.QuesmaConfiguration) mux.Reques
return false
}
}

if configuration.IndexAutodiscoveryEnabled() {
for tableName := range sr.AllSchemas() {
if config.MatchName(elasticsearch.NormalizePattern(indexPattern), string(tableName)) {
return true
}
}
}
for _, pattern := range indexPatterns {
for _, indexName := range configuration.IndexConfig {
if config.MatchName(elasticsearch.NormalizePattern(pattern), indexName.Name) {
Expand All @@ -73,6 +80,13 @@ func matchedAgainstPattern(configuration *config.QuesmaConfiguration) mux.Reques
}
return false
} else {
if configuration.IndexAutodiscoveryEnabled() {
for tableName := range sr.AllSchemas() {
if config.MatchName(elasticsearch.NormalizePattern(indexPattern), string(tableName)) {
return true
}
}
}
for _, index := range configuration.IndexConfig {
pattern := elasticsearch.NormalizePattern(indexPattern)
if config.MatchName(pattern, index.Name) {
Expand Down
20 changes: 10 additions & 10 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return resolveIndexResult(sources)
})

router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
cnt, err := queryRunner.handleCount(ctx, req.Params["index"])
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
Expand Down Expand Up @@ -137,7 +137,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.IndexSearchPath, and(method("GET", "POST"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexSearchPath, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
Expand All @@ -159,7 +159,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})
router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
waitForResultsMs := 1000 // Defaults to 1 second as in docs
if v, ok := req.Params["wait_for_completion_timeout"]; ok {
if w, err := time.ParseDuration(v); err == nil {
Expand Down Expand Up @@ -196,7 +196,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.IndexMappingPath, and(method("PUT"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexMappingPath, and(method("PUT"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]

body, err := types.ExpectJSON(req.ParsedBody)
Expand All @@ -211,7 +211,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return putIndexResult(index)
})

router.Register(routes.IndexMappingPath, and(method("GET"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexMappingPath, and(method("GET"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]

foundSchema, found := sr.FindSchema(schema.TableName(index))
Expand Down Expand Up @@ -242,7 +242,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, req.Params["index"], lm)
if err != nil {
Expand All @@ -257,7 +257,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})
router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
if strings.Contains(req.Params["index"], ",") {
return nil, errors.New("multi index terms enum is not yet supported")
} else {
Expand All @@ -278,7 +278,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
}
})

router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
Expand All @@ -295,7 +295,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.IndexPath, and(method("PUT"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexPath, and(method("PUT"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]

body, err := types.ExpectJSON(req.ParsedBody)
Expand All @@ -315,7 +315,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return putIndexResult(index)
})

router.Register(routes.IndexPath, and(method("GET"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
router.Register(routes.IndexPath, and(method("GET"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]

foundSchema, found := sr.FindSchema(schema.TableName(index))
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"quesma/quesma/config"
"quesma/quesma/mux"
"quesma/schema"
"testing"
)

Expand Down Expand Up @@ -131,8 +132,7 @@ func Test_matchedAgainstPattern(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {

req := &mux.Request{Params: map[string]string{"index": tt.pattern}, Body: tt.body}

assert.Equalf(t, tt.want, matchedAgainstPattern(&tt.configuration).Matches(req), "matchedAgainstPattern(%v)", tt.configuration)
assert.Equalf(t, tt.want, matchedAgainstPattern(&tt.configuration, schema.StaticRegistry{}).Matches(req), "matchedAgainstPattern(%v)", tt.configuration)
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions quesma/quesma/schema_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (f fixedTableProvider) TableDefinitions() map[string]schema.Table {
return f.tables
}

func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false }

func Test_ipRangeTransform(t *testing.T) {
const isIPAddressInRangePrimitive = "isIPAddressInRange"
const CASTPrimitive = "CAST"
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
}

func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body types.JSON, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im)
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.schemaRegistry)

switch sources {
case sourceBoth:
Expand Down
10 changes: 9 additions & 1 deletion quesma/quesma/source_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"quesma/elasticsearch"
"quesma/logger"
"quesma/quesma/config"
"quesma/schema"
"quesma/util"
"slices"
"strings"
Expand All @@ -18,7 +19,7 @@ const (
sourceNone = "none"
)

func ResolveSources(indexPattern string, cfg *config.QuesmaConfiguration, im elasticsearch.IndexManagement) (string, []string, []string) {
func ResolveSources(indexPattern string, cfg *config.QuesmaConfiguration, im elasticsearch.IndexManagement, sr schema.Registry) (string, []string, []string) {
if elasticsearch.IsIndexPattern(indexPattern) {
matchesElastic := []string{}
matchesClickhouse := []string{}
Expand All @@ -29,6 +30,13 @@ func ResolveSources(indexPattern string, cfg *config.QuesmaConfiguration, im ela
matchesElastic = append(matchesElastic, indexName)
}
}
if cfg.IndexAutodiscoveryEnabled() {
for tableName := range sr.AllSchemas() {
if config.MatchName(elasticsearch.NormalizePattern(indexPattern), string(tableName)) {
matchesClickhouse = append(matchesClickhouse, string(tableName))
}
}
}

for indexName, indexConfig := range cfg.IndexConfig {
if util.IndexPatternMatches(pattern, indexName) && !indexConfig.Disabled {
Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/source_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"quesma/elasticsearch"
"quesma/quesma/config"
"quesma/schema"
"quesma/util"
"testing"
)
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestResolveSources(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name+tt.args.indexPattern, func(t *testing.T) {
got, _, _ := ResolveSources(tt.args.indexPattern, &tt.args.cfg, tt.args.im)
got, _, _ := ResolveSources(tt.args.indexPattern, &tt.args.cfg, tt.args.im, schema.StaticRegistry{})
assert.Equalf(t, tt.want, got, "ResolveSources(%v, %v, %v)", tt.args.indexPattern, tt.args.cfg, tt.args.im)
})
}
Expand Down
9 changes: 9 additions & 0 deletions quesma/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
}
TableProvider interface {
TableDefinitions() map[string]Table
AutodiscoveryEnabled() bool
}
Table struct {
Columns map[string]Column
Expand All @@ -39,6 +40,14 @@ type (
func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) {
definitions := s.dataSourceTableProvider.TableDefinitions()
schemas := make(map[TableName]Schema)
if s.dataSourceTableProvider.AutodiscoveryEnabled() {
for tableName := range definitions {
fields := make(map[FieldName]Field)
existsInDataSource := s.populateSchemaFromTableDefinition(definitions, tableName, fields)
schemas[TableName(tableName)] = NewSchema(fields, existsInDataSource)
}
return schemas, nil
}

for indexName, indexConfiguration := range *s.indexConfiguration {
fields := make(map[FieldName]Field)
Expand Down
5 changes: 2 additions & 3 deletions quesma/schema/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,5 @@ type fixedTableProvider struct {
tables map[string]schema.Table
}

func (f fixedTableProvider) TableDefinitions() map[string]schema.Table {
return f.tables
}
func (f fixedTableProvider) TableDefinitions() map[string]schema.Table { return f.tables }
func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false }

0 comments on commit 1e0eb3e

Please sign in to comment.