From f0918387885f33b405434d9edb779dd9996c7d60 Mon Sep 17 00:00:00 2001
From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com>
Date: Fri, 19 Jul 2024 12:06:31 +0200
Subject: [PATCH] Index <-> tables mappings : fixing field caps and search
index resolving (#544)
This PR implements some missing pieces to connect index <-> tables
mappings e2e like field caps.
According configuration all request for `kibana_sample_data_flights` go
to `big_kibana_common_table`
```
indexMappings:
big_kibana_common_table:
sourceIndexes: ["kibana_sample_data_flights"]
```
---
quesma/eql/query_translator.go | 2 +-
quesma/model/typical_queries/hits.go | 9 +++++----
quesma/queryparser/query_parser.go | 2 +-
quesma/queryparser/query_translator.go | 5 +++--
quesma/queryparser/query_translator_test.go | 2 +-
.../field_capabilities/field_caps.go | 17 ++++++++++++++++-
quesma/quesma/query_translator.go | 4 ++--
quesma/quesma/search.go | 9 ++++++---
8 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/quesma/eql/query_translator.go b/quesma/eql/query_translator.go
index 5de5d3ba9..a7adad1e2 100644
--- a/quesma/eql/query_translator.go
+++ b/quesma/eql/query_translator.go
@@ -88,7 +88,7 @@ func (cw *ClickhouseEQLQueryTranslator) ParseQuery(body types.JSON) ([]*model.Qu
if simpleQuery.CanParse {
canParse = true
query = query_util.BuildHitsQuery(cw.Ctx, cw.Table.Name, "*", &simpleQuery, queryInfo.I2)
- queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, query.SelectCommand.OrderByFieldNames(), true, false, false)
+ queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, query.SelectCommand.OrderByFieldNames(), true, false, false, cw.Table.Name)
query.Type = &queryType
query.Highlighter = highlighter
query.SelectCommand.OrderBy = simpleQuery.OrderBy
diff --git a/quesma/model/typical_queries/hits.go b/quesma/model/typical_queries/hits.go
index 438909ab3..e647e3a47 100644
--- a/quesma/model/typical_queries/hits.go
+++ b/quesma/model/typical_queries/hits.go
@@ -27,13 +27,14 @@ type Hits struct {
addSource bool // true <=> we add hit.Source field to the response
addScore bool // true <=> we add hit.Score field to the response (whose value is always 1)
addVersion bool // true <=> we add hit.Version field to the response (whose value is always 1)
+ indexName string
}
func NewHits(ctx context.Context, table *clickhouse.Table, highlighter *model.Highlighter,
- sortFieldNames []string, addSource, addScore, addVersion bool) Hits {
+ sortFieldNames []string, addSource, addScore, addVersion bool, incomingIndexName string) Hits {
return Hits{ctx: ctx, table: table, highlighter: highlighter, sortFieldNames: sortFieldNames,
- addSource: addSource, addScore: addScore, addVersion: addVersion}
+ addSource: addSource, addScore: addScore, addVersion: addVersion, indexName: incomingIndexName}
}
const (
@@ -48,7 +49,7 @@ func (query Hits) IsBucketAggregation() bool {
func (query Hits) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
hits := make([]model.SearchHit, 0, len(rows))
for i, row := range rows {
- hit := model.NewSearchHit(query.table.Name)
+ hit := model.NewSearchHit(query.indexName)
if query.addScore {
hit.Score = defaultScore
}
@@ -140,7 +141,7 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st
}
func (query Hits) String() string {
- return fmt.Sprintf("hits(table: %v)", query.table.Name)
+ return fmt.Sprintf("hits(table: %v)", query.indexName)
}
func (query Hits) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
diff --git a/quesma/queryparser/query_parser.go b/quesma/queryparser/query_parser.go
index 1daec77e1..3291c6577 100644
--- a/quesma/queryparser/query_parser.go
+++ b/quesma/queryparser/query_parser.go
@@ -84,7 +84,7 @@ func (cw *ClickhouseQueryTranslator) buildListQueryIfNeeded(
if fullQuery != nil {
highlighter.SetTokensToHighlight(fullQuery.SelectCommand)
// TODO: pass right arguments
- queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, fullQuery.SelectCommand.OrderByFieldNames(), true, false, false)
+ queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, fullQuery.SelectCommand.OrderByFieldNames(), true, false, false, cw.IncomingIndexName)
fullQuery.Type = &queryType
fullQuery.Highlighter = highlighter
}
diff --git a/quesma/queryparser/query_translator.go b/quesma/queryparser/query_translator.go
index 28cc51af5..daf9a3ce4 100644
--- a/quesma/queryparser/query_translator.go
+++ b/quesma/queryparser/query_translator.go
@@ -24,8 +24,9 @@ type ClickhouseQueryTranslator struct {
Table *clickhouse.Table
Ctx context.Context
- DateMathRenderer string // "clickhouse_interval" or "literal" if not set, we use "clickhouse_interval"
- SchemaRegistry schema.Registry
+ DateMathRenderer string // "clickhouse_interval" or "literal" if not set, we use "clickhouse_interval"
+ SchemaRegistry schema.Registry
+ IncomingIndexName string
}
var completionStatusOK = func() *int { value := 200; return &value }()
diff --git a/quesma/queryparser/query_translator_test.go b/quesma/queryparser/query_translator_test.go
index ca97a0198..2259dbd76 100644
--- a/quesma/queryparser/query_translator_test.go
+++ b/quesma/queryparser/query_translator_test.go
@@ -188,7 +188,7 @@ func TestMakeResponseSearchQuery(t *testing.T) {
&model.SimpleQuery{FieldName: "*"}, model.WeNeedUnlimitedCount,
)
highlighter := NewEmptyHighlighter()
- queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, hitQuery.SelectCommand.OrderByFieldNames(), true, false, false)
+ queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, hitQuery.SelectCommand.OrderByFieldNames(), true, false, false, cw.Table.Name)
hitQuery.Type = &queryType
ourResponseRaw := cw.MakeSearchResponse(
[]*model.Query{hitQuery},
diff --git a/quesma/quesma/functionality/field_capabilities/field_caps.go b/quesma/quesma/functionality/field_capabilities/field_caps.go
index 1660dfd01..3f1bcbee5 100644
--- a/quesma/quesma/functionality/field_capabilities/field_caps.go
+++ b/quesma/quesma/functionality/field_capabilities/field_caps.go
@@ -41,7 +41,16 @@ func addFieldCapabilityFromSchemaRegistry(fields map[string]map[string]model.Fie
fields[colName][fieldTypeName] = fieldCapability
}
}
-
+func makeSourceToDestMappings(indexMappings map[string]config.IndexMappingsConfiguration) map[string]string {
+ sourceToDestMapping := make(map[string]string)
+ for _, indexMapping := range indexMappings {
+ for _, sourceIndex := range indexMapping.Mappings {
+ destIndex := indexMapping.Name
+ sourceToDestMapping[sourceIndex] = destIndex
+ }
+ }
+ return sourceToDestMapping
+}
func handleFieldCapsIndex(cfg config.QuesmaConfiguration, schemaRegistry schema.Registry, indexes []string) ([]byte, error) {
fields := make(map[string]map[string]model.FieldCapability)
for _, resolvedIndex := range indexes {
@@ -104,6 +113,12 @@ func EmptyFieldCapsResponse() []byte {
}
func HandleFieldCaps(ctx context.Context, cfg config.QuesmaConfiguration, schemaRegistry schema.Registry, index string, lm *clickhouse.LogManager) ([]byte, error) {
+ sourceToDestMapping := makeSourceToDestMappings(cfg.IndexSourceToInternalMappings)
+
+ if destIndex, ok := sourceToDestMapping[index]; ok {
+ index = destIndex
+ }
+
indexes, err := lm.ResolveIndexes(ctx, index)
if err != nil {
return nil, err
diff --git a/quesma/quesma/query_translator.go b/quesma/quesma/query_translator.go
index da0c10978..309b4d116 100644
--- a/quesma/quesma/query_translator.go
+++ b/quesma/quesma/query_translator.go
@@ -32,11 +32,11 @@ const (
QueryLanguageEQL = "eql"
)
-func NewQueryTranslator(ctx context.Context, language QueryLanguage, table *clickhouse.Table, logManager *clickhouse.LogManager, dateMathRenderer string, schemaRegistry schema.Registry) (queryTranslator IQueryTranslator) {
+func NewQueryTranslator(ctx context.Context, language QueryLanguage, table *clickhouse.Table, logManager *clickhouse.LogManager, dateMathRenderer string, schemaRegistry schema.Registry, incomingIndexName string) (queryTranslator IQueryTranslator) {
switch language {
case QueryLanguageEQL:
return &eql.ClickhouseEQLQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx}
default:
- return &queryparser.ClickhouseQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx, DateMathRenderer: dateMathRenderer, SchemaRegistry: schemaRegistry}
+ return &queryparser.ClickhouseQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx, DateMathRenderer: dateMathRenderer, SchemaRegistry: schemaRegistry, IncomingIndexName: incomingIndexName}
}
}
diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go
index 4e53a3d05..6b63faa90 100644
--- a/quesma/quesma/search.go
+++ b/quesma/quesma/search.go
@@ -213,17 +213,20 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
if err != nil {
return nil, err
}
-
+ sourceToDestMappings := makeSourceToDestMappings(q.cfg.IndexSourceToInternalMappings)
for _, resolvedTableName := range sourcesClickhouse {
var err error
doneCh := make(chan AsyncSearchWithError, 1)
-
+ incomingIndexName := resolvedTableName
+ if indexMapping, ok := sourceToDestMappings[resolvedTableName]; ok {
+ resolvedTableName = indexMapping
+ }
table, _ := tables.Load(resolvedTableName)
if table == nil {
return []byte{}, end_user_errors.ErrNoSuchTable.New(fmt.Errorf("can't load %s table", resolvedTableName)).Details("Table: %s", resolvedTableName)
}
- queryTranslator := NewQueryTranslator(ctx, queryLanguage, table, q.logManager, q.DateMathRenderer, q.schemaRegistry)
+ queryTranslator := NewQueryTranslator(ctx, queryLanguage, table, q.logManager, q.DateMathRenderer, q.schemaRegistry, incomingIndexName)
queries, canParse, err := queryTranslator.ParseQuery(body)
if err != nil {