Skip to content

Commit

Permalink
Target config extended configuration (#945)
Browse files Browse the repository at this point in the history
This PR implements new variant of `target` configuration where it's a
list not an array and can have more attributes (potentially in the
future)

Current variant is still supported and is a fallback in the case
extended version is missing

Below
```
  kibana_sample_data_flights:
    target:
    - my-clickhouse-data-source:
        useCommonTable: true
```
is equivalent of 
```
  kibana_sample_data_flights:
    target: [my-clickhouse-data-source]
    useCommonTable: true
```

PR contains a bit of duplicated code, something that I would prefer fix
in next PR.
  • Loading branch information
pdelewski authored Nov 4, 2024
1 parent c6ed018 commit 5d19d94
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 44 deletions.
258 changes: 215 additions & 43 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,37 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
return fmt.Errorf("index name '%s' in processor configuration is an index pattern, not allowed", indexName)
}
if p.Type == QuesmaV1ProcessorQuery {
if len(indexConfig.Target) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
if _, ok := indexConfig.Target.([]interface{}); ok {
if len(indexConfig.Target.([]interface{})) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
}
}
} else {
if len(indexConfig.Target) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName)
if _, ok := indexConfig.Target.([]interface{}); ok {
if len(indexConfig.Target.([]interface{})) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName)
}
}
}

for _, target := range indexConfig.Target {
if c.getBackendConnectorByName(target) == nil {
return fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)
targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target)
if errTarget != nil {
return errTarget
}
// fallback to old style, simplified target configuration
if len(targets) > 0 {
for _, target := range targets {
if c.getBackendConnectorByName(target.target) == nil {
return fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)
}
}
}
if len(targets) == 0 {
if _, ok := indexConfig.Target.([]interface{}); ok {
for _, target := range indexConfig.Target.([]interface{}) {
if c.getBackendConnectorByName(target.(string)) == nil {
return fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)
}
}
}
}
}
Expand Down Expand Up @@ -545,13 +564,36 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {

// Handle default index configuration
defaultConfig := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]
for _, target := range defaultConfig.Target {
if targetType, found := c.getTargetType(target); found {
defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
targets, errTarget := c.getTargetsExtendedConfig(defaultConfig.Target)
if errTarget != nil {
errAcc = multierror.Append(errAcc, errTarget)
}
if len(targets) > 0 {
for _, target := range targets {
if targetType, found := c.getTargetType(target.target); found {
defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
}
if val, exists := target.properties["useCommonTable"]; exists {
conf.CreateCommonTable = val == "true"
conf.UseCommonTableForWildcard = val == "true"
}
}
}
// fallback to old style, simplified target configuration
if len(targets) == 0 {
if _, ok := defaultConfig.Target.([]interface{}); ok {
for _, target := range defaultConfig.Target.([]interface{}) {
if targetType, found := c.getTargetType(target.(string)); found {
defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
}
}
}
}

if defaultConfig.UseCommonTable {
// We set both flags to true here
// as creating common table depends on the first one
Expand All @@ -569,15 +611,34 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
for indexName, indexConfig := range queryProcessor.Config.IndexConfig {
processedConfig := indexConfig
processedConfig.Name = indexName

for _, target := range indexConfig.Target {
if targetType, found := c.getTargetType(target); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target)
if errTarget != nil {
errAcc = multierror.Append(errAcc, errTarget)
}
if len(targets) > 0 {
for _, target := range targets {
if targetType, found := c.getTargetType(target.target); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
if val, exists := target.properties["useCommonTable"]; exists {
processedConfig.UseCommonTable = val == "true"
}
}
}
// fallback to old style, simplified target configuration
if len(targets) == 0 {
if _, ok := indexConfig.Target.([]interface{}); ok {
for _, target := range indexConfig.Target.([]interface{}) {
if targetType, found := c.getTargetType(target.(string)); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
}
}
}

if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) ||
(processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName))
Expand Down Expand Up @@ -619,11 +680,33 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {

// Handle default index configuration
defaultConfig := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]
for _, target := range defaultConfig.Target {
if targetType, found := c.getTargetType(target); found {
defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
targets, errTarget := c.getTargetsExtendedConfig(defaultConfig.Target)
if errTarget != nil {
errAcc = multierror.Append(errAcc, errTarget)
}
if len(targets) > 0 {
for _, target := range targets {
if targetType, found := c.getTargetType(target.target); found {
defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
}
if val, exists := target.properties["useCommonTable"]; exists {
conf.CreateCommonTable = val == "true"
conf.UseCommonTableForWildcard = val == "true"
}
}
}
// fallback to old style, simplified target configuration
if len(targets) == 0 {
if _, ok := defaultConfig.Target.([]interface{}); ok {
for _, target := range defaultConfig.Target.([]interface{}) {
if targetType, found := c.getTargetType(target.(string)); found {
defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
}
}
}
}
if defaultConfig.UseCommonTable {
Expand All @@ -634,11 +717,33 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}

ingestProcessorDefaultIndexConfig := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName]
for _, target := range ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target {
if targetType, found := c.getTargetType(target); found {
defaultConfig.IngestTarget = append(defaultConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
targets, errTarget = c.getTargetsExtendedConfig(ingestProcessorDefaultIndexConfig.Target)
if errTarget != nil {
errAcc = multierror.Append(errAcc, errTarget)
}
if len(targets) > 0 {
for _, target := range targets {
if targetType, found := c.getTargetType(target.target); found {
defaultConfig.IngestTarget = append(defaultConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
}
if val, exists := target.properties["useCommonTable"]; exists {
conf.CreateCommonTable = val == "true"
conf.UseCommonTableForWildcard = val == "true"
}
}
}
// fallback to old style, simplified target configuration
if len(targets) == 0 {
if _, ok := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}); ok {
for _, target := range ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}) {
if targetType, found := c.getTargetType(target.(string)); found {
defaultConfig.IngestTarget = append(defaultConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName))
}
}
}
}
if ingestProcessorDefaultIndexConfig.UseCommonTable {
Expand Down Expand Up @@ -668,15 +773,34 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
processedConfig.Name = indexName

processedConfig.IngestTarget = defaultConfig.IngestTarget

for _, target := range indexConfig.Target {
if targetType, found := c.getTargetType(target); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
targets, errTarget = c.getTargetsExtendedConfig(indexConfig.Target)
if errTarget != nil {
errAcc = multierror.Append(errAcc, errTarget)
}
if len(targets) > 0 {
for _, target := range targets {
if targetType, found := c.getTargetType(target.target); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
if val, exists := target.properties["useCommonTable"]; exists {
processedConfig.UseCommonTable = val == true
}
}
}
// fallback to old style, simplified target configuration
if len(targets) == 0 {
if _, ok := indexConfig.Target.([]interface{}); ok {
for _, target := range indexConfig.Target.([]interface{}) {
if targetType, found := c.getTargetType(target.(string)); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
}
}
}

if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) ||
(processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName))
Expand Down Expand Up @@ -709,14 +833,34 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}

processedConfig.IngestTarget = make([]string, 0) // reset previously set defaultConfig.IngestTarget
for _, target := range indexConfig.Target {
if targetType, found := c.getTargetType(target); found {
processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
targets, errTarget = c.getTargetsExtendedConfig(indexConfig.Target)
if errTarget != nil {
errAcc = multierror.Append(errAcc, errTarget)
}
if len(targets) > 0 {
for _, target := range targets {
if targetType, found := c.getTargetType(target.target); found {
processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
if val, exists := target.properties["useCommonTable"]; exists {
processedConfig.UseCommonTable = val == true
}
}
}
// fallback to old style, simplified target configuration
if len(targets) == 0 {
if _, ok := indexConfig.Target.([]interface{}); ok {
for _, target := range indexConfig.Target.([]interface{}) {
if targetType, found := c.getTargetType(target.(string)); found {
processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
}
}
}

conf.IndexConfig[indexName] = processedConfig
}
}
Expand Down Expand Up @@ -824,3 +968,31 @@ func (c *QuesmaNewConfiguration) validateBackendConnectors() error {
func getAllowedProcessorTypes() []ProcessorType {
return []ProcessorType{QuesmaV1ProcessorNoOp, QuesmaV1ProcessorQuery, QuesmaV1ProcessorIngest}
}

func (c *QuesmaNewConfiguration) getTargetsExtendedConfig(target any) ([]struct {
target string
properties map[string]interface{}
}, error) {
result := make([]struct {
target string
properties map[string]interface{}
}, 0)

if targets, ok := target.([]interface{}); ok {
for _, target := range targets {
if targetMap, ok := target.(map[string]interface{}); ok {
for name, settings := range targetMap {
if settingsMap, ok := settings.(map[string]interface{}); ok {
result = append(result, struct {
target string
properties map[string]interface{}
}{target: name, properties: settingsMap})
} else {
return nil, fmt.Errorf("invalid target properties for target %s", name)
}
}
}
}
}
return result, nil
}
28 changes: 28 additions & 0 deletions quesma/quesma/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,31 @@ func TestMatchName(t *testing.T) {
})
}
}

func TestTargetNewVariant(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/target_new_variant.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()
assert.False(t, legacyConf.TransparentProxy)
assert.Equal(t, 3, len(legacyConf.IndexConfig))
ecommerce := legacyConf.IndexConfig["kibana_sample_data_ecommerce"]
flights := legacyConf.IndexConfig["kibana_sample_data_flights"]
logs := legacyConf.IndexConfig["kibana_sample_data_logs"]

assert.Equal(t, []string{ClickhouseTarget}, ecommerce.QueryTarget)
assert.Equal(t, []string{ClickhouseTarget}, ecommerce.IngestTarget)

assert.Equal(t, []string{ClickhouseTarget}, flights.QueryTarget)
assert.Equal(t, []string{ClickhouseTarget}, flights.IngestTarget)

assert.Equal(t, []string{ClickhouseTarget}, logs.QueryTarget)
assert.Equal(t, []string{ClickhouseTarget}, logs.IngestTarget)

assert.Equal(t, false, flights.UseCommonTable)
assert.Equal(t, false, ecommerce.UseCommonTable)
assert.Equal(t, true, logs.UseCommonTable)
assert.Equal(t, true, legacyConf.EnableIngest)
}
2 changes: 1 addition & 1 deletion quesma/quesma/config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type IndexConfiguration struct {
Optimizers map[string]OptimizerConfiguration `koanf:"optimizers"`
Override string `koanf:"override"`
UseCommonTable bool `koanf:"useCommonTable"`
Target []string `koanf:"target"`
Target any `koanf:"target"`

// Computed based on the overall configuration
Name string
Expand Down
Loading

0 comments on commit 5d19d94

Please sign in to comment.