Skip to content

Commit

Permalink
Fix override (#1086)
Browse files Browse the repository at this point in the history
Overriding table name does not work, this fix it, but it was a hole of
other issues. Got longer vision, but would love to validate before
digging more in that direction.

---------

Signed-off-by: Jacek Migdal <[email protected]>
  • Loading branch information
jakozaur authored Dec 12, 2024
1 parent f9e6b4e commit 70c6402
Show file tree
Hide file tree
Showing 44 changed files with 285 additions and 212 deletions.
33 changes: 18 additions & 15 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"quesma/persistence"
"quesma/quesma/config"
"quesma/quesma/recovery"
"quesma/schema"
"quesma/telemetry"
"quesma/util"
"slices"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (lm *LogManager) Close() {
// and returns all matching indexes. Empty pattern means all indexes, "_all" index name means all indexes
//
// Note: Empty pattern means all indexes, "_all" index name means all indexes
func (lm *LogManager) ResolveIndexPattern(ctx context.Context, pattern string) (results []string, err error) {
func (lm *LogManager) ResolveIndexPattern(ctx context.Context, schema schema.Registry, pattern string) (results []string, err error) {
if err = lm.tableDiscovery.TableDefinitionsFetchError(); err != nil {
return nil, err
}
Expand All @@ -143,11 +144,13 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, pattern string) (
if strings.Contains(pattern, ",") {
for _, pattern := range strings.Split(pattern, ",") {
if pattern == allElasticsearchIndicesPattern || pattern == "" {
results = lm.tableDiscovery.TableDefinitions().Keys()
for k := range schema.AllSchemas() {
results = append(results, k.AsString())
}
slices.Sort(results)
return results, nil
} else {
indexes, err := lm.ResolveIndexPattern(ctx, pattern)
indexes, err := lm.ResolveIndexPattern(ctx, schema, pattern)
if err != nil {
return nil, err
}
Expand All @@ -156,21 +159,21 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, pattern string) (
}
} else {
if pattern == allElasticsearchIndicesPattern || len(pattern) == 0 {
results = lm.tableDiscovery.TableDefinitions().Keys()
for k := range schema.AllSchemas() {
results = append(results, k.AsString())
}
slices.Sort(results)
return results, nil
} else {
lm.tableDiscovery.TableDefinitions().
Range(func(tableName string, v *Table) bool {
matches, err := util.IndexPatternMatches(pattern, tableName)
if err != nil {
logger.Error().Msgf("error matching index pattern: %v", err)
}
if matches {
results = append(results, tableName)
}
return true
})
for schemaName := range schema.AllSchemas() {
matches, err := util.IndexPatternMatches(pattern, schemaName.AsString())
if err != nil {
logger.Error().Msgf("error matching index pattern: %v", err)
}
if matches {
results = append(results, schemaName.AsString())
}
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"quesma/quesma/config"
"quesma/quesma/types"
schema2 "quesma/schema"
"quesma/util"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -774,9 +775,17 @@ func TestLogManager_ResolveIndexes(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var tableDefinitions = atomic.Pointer[TableMap]{}
schemaTables := make(map[schema2.IndexName]schema2.Schema)

for _, name := range tt.tables.Keys() {
schemaTables[schema2.IndexName(name)] = schema2.Schema{}
}
schemaRegistry := schema2.StaticRegistry{
Tables: schemaTables,
}
tableDefinitions.Store(tt.tables)
lm := &LogManager{tableDiscovery: NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tt.tables)}
indexes, err := lm.ResolveIndexPattern(context.Background(), tt.patterns)
indexes, err := lm.ResolveIndexPattern(context.Background(), &schemaRegistry, tt.patterns)
assert.NoError(t, err)
assert.Equalf(t, tt.resolved, indexes, tt.patterns, "ResolveIndexPattern(%v)", tt.patterns)
})
Expand Down
12 changes: 8 additions & 4 deletions quesma/frontend_connectors/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"quesma/quesma/recovery"
"quesma/quesma/types"
"quesma/quesma/ui"
"quesma/schema"
"quesma/telemetry"
"quesma/util"
quesma_api "quesma_v2/core"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (*RouterV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,

func (r *RouterV2) elasticFallback(decision *quesma_api.Decision,
ctx context.Context, w http.ResponseWriter,
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager) {
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager, schemaRegistry schema.Registry) {

var sendToElastic bool

Expand Down Expand Up @@ -185,7 +186,10 @@ func (r *RouterV2) elasticFallback(decision *quesma_api.Decision,
}

if sendToElastic {
feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(OpaqueIdHeaderKey), logManager.ResolveIndexPattern)
resolveIndexPattern := func(ctx context.Context, pattern string) ([]string, error) {
return logManager.ResolveIndexPattern(ctx, schemaRegistry, pattern)
}
feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(OpaqueIdHeaderKey), resolveIndexPattern)

rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true)
response := rawResponse.response
Expand All @@ -203,7 +207,7 @@ func (r *RouterV2) elasticFallback(decision *quesma_api.Decision,
}
}

func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router quesma_api.Router, logManager *clickhouse.LogManager) {
func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router quesma_api.Router, logManager *clickhouse.LogManager, schemaRegistry schema.Registry) {
defer recovery.LogAndHandlePanic(ctx, func(err error) {
w.WriteHeader(500)
w.Write(queryparser.InternalQuesmaError("Unknown Quesma error"))
Expand Down Expand Up @@ -256,7 +260,7 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
r.errorResponseV2(ctx, err, w)
}
} else {
r.elasticFallback(decision, ctx, w, req, reqBody, logManager)
r.elasticFallback(decision, ctx, w, req, reqBody, logManager, schemaRegistry)
}
}

Expand Down
3 changes: 2 additions & 1 deletion quesma/ingest/common_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func TestIngestToCommonTable(t *testing.T) {
quesmaConfig := &config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{
indexName: {
Name: indexName,
UseCommonTable: true,
},
},
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestIngestToCommonTable(t *testing.T) {
UseConnectors: []mux.ConnectorDecision{
&mux.ConnectorDecisionClickhouse{
ClickhouseTableName: common_table.TableName,
ClickhouseTables: []string{indexName},
ClickhouseIndexes: []string{indexName},
IsCommonTable: true,
},
},
Expand Down
4 changes: 2 additions & 2 deletions quesma/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {

virtualTableStorage := persistence.NewStaticJSONDatabase()
schemaRegistry := &schema.StaticRegistry{
Tables: make(map[schema.TableName]schema.Schema),
Tables: make(map[schema.IndexName]schema.Schema),
}
schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema
schemaRegistry.Tables[schema.IndexName(indexName)] = indexSchema

resolver := table_resolver.NewEmptyTableResolver()
decision := &mux.Decision{
Expand Down
4 changes: 2 additions & 2 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (ip *IngestProcessor) createTableObjectAndAttributes(ctx context.Context, q
}

func findSchemaPointer(schemaRegistry schema.Registry, tableName string) *schema.Schema {
if foundSchema, found := schemaRegistry.FindSchema(schema.TableName(tableName)); found {
if foundSchema, found := schemaRegistry.FindSchema(schema.IndexName(tableName)); found {
return &foundSchema
}
return nil
Expand Down Expand Up @@ -631,7 +631,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
fieldOrigins[schema.FieldName(column.ClickHouseColumnName)] = schema.FieldSourceIngest
}

ip.schemaRegistry.UpdateFieldsOrigins(schema.TableName(tableName), fieldOrigins)
ip.schemaRegistry.UpdateFieldsOrigins(schema.IndexName(tableName), fieldOrigins)

// This comes externally from (configuration)
// So we need to convert that separately
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor2.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (ip *IngestProcessor2) processInsertQuery(ctx context.Context,
fieldOrigins[schema.FieldName(column.ClickHouseColumnName)] = schema.FieldSourceIngest
}

ip.schemaRegistry.UpdateFieldsOrigins(schema.TableName(tableName), fieldOrigins)
ip.schemaRegistry.UpdateFieldsOrigins(schema.IndexName(tableName), fieldOrigins)

// This comes externally from (configuration)
// So we need to convert that separately
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/aggregation_parser_new_logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Test3AggregationParserNewLogic(t *testing.T) {
lm := clickhouse.NewLogManager(util.NewSyncMapWith(tableName, &table), &config.QuesmaConfiguration{})

s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -54,7 +54,7 @@ func Test3AggregationParserNewLogic(t *testing.T) {
},
},
}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), Schema: s.Tables[schema.TableName(tableName)]}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), Schema: s.Tables[schema.IndexName(tableName)]}

for i, test := range testdata.NewLogicTestCases {
t.Run(test.TestName+"("+strconv.Itoa(i)+")", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func Test_parseFieldFromScriptField(t *testing.T) {
{QueryMap{"script": QueryMap{"source": 1}}, nil, false},
}
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand Down
8 changes: 4 additions & 4 deletions quesma/queryparser/aggregation_percentile_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Test_parsePercentilesAggregationWithDefaultPercents(t *testing.T) {
"field": "custom_name",
}
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -34,7 +34,7 @@ func Test_parsePercentilesAggregationWithDefaultPercents(t *testing.T) {
},
},
}
cw := &ClickhouseQueryTranslator{Table: &clickhouse.Table{}, Ctx: context.Background(), Schema: s.Tables[schema.TableName("logs-generic-default")]}
cw := &ClickhouseQueryTranslator{Table: &clickhouse.Table{}, Ctx: context.Background(), Schema: s.Tables[schema.IndexName("logs-generic-default")]}
field, _, userSpecifiedPercents := cw.parsePercentilesAggregation(payload)
assert.Equal(t, model.NewColumnRef("custom_name"), field)
assert.Equal(t, defaultPercentiles, userSpecifiedPercents)
Expand Down Expand Up @@ -64,7 +64,7 @@ func Test_parsePercentilesAggregationWithUserSpecifiedPercents(t *testing.T) {
expectedOutputMapKeys = append(expectedOutputMapKeys, k)
}
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -81,7 +81,7 @@ func Test_parsePercentilesAggregationWithUserSpecifiedPercents(t *testing.T) {
},
},
}
cw := &ClickhouseQueryTranslator{Table: &clickhouse.Table{}, Ctx: context.Background(), Schema: s.Tables[schema.TableName("logs-generic-default")]}
cw := &ClickhouseQueryTranslator{Table: &clickhouse.Table{}, Ctx: context.Background(), Schema: s.Tables[schema.IndexName("logs-generic-default")]}
fieldName, _, parsedMap := cw.parsePercentilesAggregation(payload)
assert.Equal(t, model.NewColumnRef("custom_name"), fieldName)

Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/query_parser_async_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestQueryParserAsyncSearch(t *testing.T) {
}
lm := clickhouse.NewLogManager(util.NewSyncMapWith(tableName, &table), &config.QuesmaConfiguration{})
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestQueryParserAggregation(t *testing.T) {
table := clickhouse.NewEmptyTable("tablename")
lm := clickhouse.NewLogManager(util.NewSyncMapWith("tablename", table), &config.QuesmaConfiguration{})
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"tablename": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/query_parser_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var parseRangeTests = []parseRangeTest{

func Test_parseRange(t *testing.T) {
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -103,7 +103,7 @@ func Test_parseRange(t *testing.T) {
}
assert.NoError(t, err)
lm := clickhouse.NewLogManager(util.NewSyncMapWith(tableName, table), &config.QuesmaConfiguration{})
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: context.Background(), Schema: s.Tables[schema.TableName(tableName)]}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: context.Background(), Schema: s.Tables[schema.IndexName(tableName)]}

simpleQuery := cw.parseRange(test.rangePartOfQuery)
assert.Equal(t, test.expectedWhere, simpleQuery.WhereClauseAsString())
Expand Down
14 changes: 7 additions & 7 deletions quesma/queryparser/query_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestQueryParserStringAttrConfig(t *testing.T) {
lm := clickhouse.NewEmptyLogManager(&cfg, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(&config.QuesmaConfiguration{}, nil, persistence.NewStaticJSONDatabase()))
lm.AddTableIfDoesntExist(table)
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -64,7 +64,7 @@ func TestQueryParserStringAttrConfig(t *testing.T) {
},
},
}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: context.Background(), Config: &cfg, Schema: s.Tables[schema.TableName(tableName)]}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: context.Background(), Config: &cfg, Schema: s.Tables[schema.IndexName(tableName)]}

for i, tt := range testdata.TestsSearch {
t.Run(fmt.Sprintf("%s(%d)", tt.Name, i), func(t *testing.T) {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestQueryParserNoFullTextFields(t *testing.T) {

cfg.IndexConfig[indexConfig.Name] = indexConfig
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -128,7 +128,7 @@ func TestQueryParserNoFullTextFields(t *testing.T) {
},
},
}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), Config: &cfg, Schema: s.Tables[schema.TableName(tableName)]}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), Config: &cfg, Schema: s.Tables[schema.IndexName(tableName)]}

for i, tt := range testdata.TestsSearchNoFullTextFields {
t.Run(strconv.Itoa(i), func(t *testing.T) {
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestQueryParserNoAttrsConfig(t *testing.T) {

cfg.IndexConfig[indexConfig.Name] = indexConfig
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestNew(t *testing.T) {
}
lm := clickhouse.NewLogManager(util.NewSyncMapWith(tableName, table), &config.QuesmaConfiguration{})
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand All @@ -419,7 +419,7 @@ func TestNew(t *testing.T) {
},
}

cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: context.Background(), Schema: s.Tables[schema.TableName("logs-generic-default")]}
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: context.Background(), Schema: s.Tables[schema.IndexName("logs-generic-default")]}
for _, tt := range tests {
t.Run("test", func(t *testing.T) {
simpleQuery, _, _ := cw.ParseQueryAsyncSearch(tt)
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/query_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const (
func TestSearchResponse(t *testing.T) {
row := []model.QueryResultRow{{}}
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"test": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestMakeResponseSearchQuery(t *testing.T) {
},
}
s := schema.StaticRegistry{
Tables: map[schema.TableName]schema.Schema{
Tables: map[schema.IndexName]schema.Schema{
"test": {
Fields: map[schema.FieldName]schema.Field{
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (c *QuesmaNewConfiguration) validatePipelines() error {
continue
}
if queryIndexConf.Override != ingestIndexConf.Override {
return fmt.Errorf("ingest and query processors must have the same configuration of 'override' for index '%s' due to current limitations", indexName)
return fmt.Errorf("ingest and query processors must have the same configuration of 'Override' for index '%s' due to current limitations", indexName)
}
if queryIndexConf.UseCommonTable != ingestIndexConf.UseCommonTable {
return fmt.Errorf("ingest and query processors must have the same configuration of 'useCommonTable' for index '%s' due to current limitations", indexName)
Expand Down
Loading

0 comments on commit 70c6402

Please sign in to comment.