diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index 0cf8ebb29..2f993e0d3 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -377,18 +377,37 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error { return fmt.Errorf("index name '%s' in processor configuration is an index pattern, not allowed", indexName) } if p.Type == QuesmaV1ProcessorQuery { - if len(indexConfig.Target) > 2 { - return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName) + if _, ok := indexConfig.Target.([]interface{}); ok { + if len(indexConfig.Target.([]interface{})) > 2 { + return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName) + } } } else { - if len(indexConfig.Target) > 2 { - return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName) + if _, ok := indexConfig.Target.([]interface{}); ok { + if len(indexConfig.Target.([]interface{})) > 2 { + return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName) + } } } - - for _, target := range indexConfig.Target { - if c.getBackendConnectorByName(target) == nil { - return fmt.Errorf("invalid target %s in configuration of index %s", target, indexName) + targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target) + if errTarget != nil { + return errTarget + } + // fallback to old style, simplified target configuration + if len(targets) > 0 { + for _, target := range targets { + if c.getBackendConnectorByName(target.target) == nil { + return fmt.Errorf("invalid target %s in configuration of index %s", target, indexName) + } + } + } + if len(targets) == 0 { + if _, ok := indexConfig.Target.([]interface{}); ok { + for _, target := range indexConfig.Target.([]interface{}) { + if c.getBackendConnectorByName(target.(string)) == nil { + return fmt.Errorf("invalid target %s in configuration of index %s", target, indexName) + } + } } } } @@ -545,13 +564,36 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { // Handle default index configuration defaultConfig := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName] - for _, target := range defaultConfig.Target { - if targetType, found := c.getTargetType(target); found { - defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType) - } else { - errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + targets, errTarget := c.getTargetsExtendedConfig(defaultConfig.Target) + if errTarget != nil { + errAcc = multierror.Append(errAcc, errTarget) + } + if len(targets) > 0 { + for _, target := range targets { + if targetType, found := c.getTargetType(target.target); found { + defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + } + if val, exists := target.properties["useCommonTable"]; exists { + conf.CreateCommonTable = val == "true" + conf.UseCommonTableForWildcard = val == "true" + } } } + // fallback to old style, simplified target configuration + if len(targets) == 0 { + if _, ok := defaultConfig.Target.([]interface{}); ok { + for _, target := range defaultConfig.Target.([]interface{}) { + if targetType, found := c.getTargetType(target.(string)); found { + defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + } + } + } + } + if defaultConfig.UseCommonTable { // We set both flags to true here // as creating common table depends on the first one @@ -569,15 +611,34 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { for indexName, indexConfig := range queryProcessor.Config.IndexConfig { processedConfig := indexConfig processedConfig.Name = indexName - - for _, target := range indexConfig.Target { - if targetType, found := c.getTargetType(target); found { - processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType) - } else { - errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target) + if errTarget != nil { + errAcc = multierror.Append(errAcc, errTarget) + } + if len(targets) > 0 { + for _, target := range targets { + if targetType, found := c.getTargetType(target.target); found { + processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + } + if val, exists := target.properties["useCommonTable"]; exists { + processedConfig.UseCommonTable = val == "true" + } + } + } + // fallback to old style, simplified target configuration + if len(targets) == 0 { + if _, ok := indexConfig.Target.([]interface{}); ok { + for _, target := range indexConfig.Target.([]interface{}) { + if targetType, found := c.getTargetType(target.(string)); found { + processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + } + } } } - if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) || (processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) { errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName)) @@ -619,11 +680,33 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { // Handle default index configuration defaultConfig := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName] - for _, target := range defaultConfig.Target { - if targetType, found := c.getTargetType(target); found { - defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType) - } else { - errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + targets, errTarget := c.getTargetsExtendedConfig(defaultConfig.Target) + if errTarget != nil { + errAcc = multierror.Append(errAcc, errTarget) + } + if len(targets) > 0 { + for _, target := range targets { + if targetType, found := c.getTargetType(target.target); found { + defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + } + if val, exists := target.properties["useCommonTable"]; exists { + conf.CreateCommonTable = val == "true" + conf.UseCommonTableForWildcard = val == "true" + } + } + } + // fallback to old style, simplified target configuration + if len(targets) == 0 { + if _, ok := defaultConfig.Target.([]interface{}); ok { + for _, target := range defaultConfig.Target.([]interface{}) { + if targetType, found := c.getTargetType(target.(string)); found { + defaultConfig.QueryTarget = append(defaultConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + } + } } } if defaultConfig.UseCommonTable { @@ -634,11 +717,33 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { } ingestProcessorDefaultIndexConfig := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName] - for _, target := range ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target { - if targetType, found := c.getTargetType(target); found { - defaultConfig.IngestTarget = append(defaultConfig.IngestTarget, targetType) - } else { - errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + targets, errTarget = c.getTargetsExtendedConfig(ingestProcessorDefaultIndexConfig.Target) + if errTarget != nil { + errAcc = multierror.Append(errAcc, errTarget) + } + if len(targets) > 0 { + for _, target := range targets { + if targetType, found := c.getTargetType(target.target); found { + defaultConfig.IngestTarget = append(defaultConfig.IngestTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + } + if val, exists := target.properties["useCommonTable"]; exists { + conf.CreateCommonTable = val == "true" + conf.UseCommonTableForWildcard = val == "true" + } + } + } + // fallback to old style, simplified target configuration + if len(targets) == 0 { + if _, ok := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}); ok { + for _, target := range ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}) { + if targetType, found := c.getTargetType(target.(string)); found { + defaultConfig.IngestTarget = append(defaultConfig.IngestTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of %s", target, DefaultWildcardIndexName)) + } + } } } if ingestProcessorDefaultIndexConfig.UseCommonTable { @@ -668,15 +773,34 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { processedConfig.Name = indexName processedConfig.IngestTarget = defaultConfig.IngestTarget - - for _, target := range indexConfig.Target { - if targetType, found := c.getTargetType(target); found { - processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType) - } else { - errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + targets, errTarget = c.getTargetsExtendedConfig(indexConfig.Target) + if errTarget != nil { + errAcc = multierror.Append(errAcc, errTarget) + } + if len(targets) > 0 { + for _, target := range targets { + if targetType, found := c.getTargetType(target.target); found { + processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + } + if val, exists := target.properties["useCommonTable"]; exists { + processedConfig.UseCommonTable = val == true + } + } + } + // fallback to old style, simplified target configuration + if len(targets) == 0 { + if _, ok := indexConfig.Target.([]interface{}); ok { + for _, target := range indexConfig.Target.([]interface{}) { + if targetType, found := c.getTargetType(target.(string)); found { + processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + } + } } } - if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) || (processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) { errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName)) @@ -709,14 +833,34 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { } processedConfig.IngestTarget = make([]string, 0) // reset previously set defaultConfig.IngestTarget - for _, target := range indexConfig.Target { - if targetType, found := c.getTargetType(target); found { - processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType) - } else { - errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + targets, errTarget = c.getTargetsExtendedConfig(indexConfig.Target) + if errTarget != nil { + errAcc = multierror.Append(errAcc, errTarget) + } + if len(targets) > 0 { + for _, target := range targets { + if targetType, found := c.getTargetType(target.target); found { + processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + } + if val, exists := target.properties["useCommonTable"]; exists { + processedConfig.UseCommonTable = val == true + } + } + } + // fallback to old style, simplified target configuration + if len(targets) == 0 { + if _, ok := indexConfig.Target.([]interface{}); ok { + for _, target := range indexConfig.Target.([]interface{}) { + if targetType, found := c.getTargetType(target.(string)); found { + processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType) + } else { + errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName)) + } + } } } - conf.IndexConfig[indexName] = processedConfig } } @@ -824,3 +968,31 @@ func (c *QuesmaNewConfiguration) validateBackendConnectors() error { func getAllowedProcessorTypes() []ProcessorType { return []ProcessorType{QuesmaV1ProcessorNoOp, QuesmaV1ProcessorQuery, QuesmaV1ProcessorIngest} } + +func (c *QuesmaNewConfiguration) getTargetsExtendedConfig(target any) ([]struct { + target string + properties map[string]interface{} +}, error) { + result := make([]struct { + target string + properties map[string]interface{} + }, 0) + + if targets, ok := target.([]interface{}); ok { + for _, target := range targets { + if targetMap, ok := target.(map[string]interface{}); ok { + for name, settings := range targetMap { + if settingsMap, ok := settings.(map[string]interface{}); ok { + result = append(result, struct { + target string + properties map[string]interface{} + }{target: name, properties: settingsMap}) + } else { + return nil, fmt.Errorf("invalid target properties for target %s", name) + } + } + } + } + } + return result, nil +} diff --git a/quesma/quesma/config/config_v2_test.go b/quesma/quesma/config/config_v2_test.go index a8d05dce1..b61ea7843 100644 --- a/quesma/quesma/config/config_v2_test.go +++ b/quesma/quesma/config/config_v2_test.go @@ -229,3 +229,31 @@ func TestMatchName(t *testing.T) { }) } } + +func TestTargetNewVariant(t *testing.T) { + os.Setenv(configFileLocationEnvVar, "./test_configs/target_new_variant.yaml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + assert.False(t, legacyConf.TransparentProxy) + assert.Equal(t, 3, len(legacyConf.IndexConfig)) + ecommerce := legacyConf.IndexConfig["kibana_sample_data_ecommerce"] + flights := legacyConf.IndexConfig["kibana_sample_data_flights"] + logs := legacyConf.IndexConfig["kibana_sample_data_logs"] + + assert.Equal(t, []string{ClickhouseTarget}, ecommerce.QueryTarget) + assert.Equal(t, []string{ClickhouseTarget}, ecommerce.IngestTarget) + + assert.Equal(t, []string{ClickhouseTarget}, flights.QueryTarget) + assert.Equal(t, []string{ClickhouseTarget}, flights.IngestTarget) + + assert.Equal(t, []string{ClickhouseTarget}, logs.QueryTarget) + assert.Equal(t, []string{ClickhouseTarget}, logs.IngestTarget) + + assert.Equal(t, false, flights.UseCommonTable) + assert.Equal(t, false, ecommerce.UseCommonTable) + assert.Equal(t, true, logs.UseCommonTable) + assert.Equal(t, true, legacyConf.EnableIngest) +} diff --git a/quesma/quesma/config/index_config.go b/quesma/quesma/config/index_config.go index 0bc96f3c1..d5980cf5c 100644 --- a/quesma/quesma/config/index_config.go +++ b/quesma/quesma/config/index_config.go @@ -18,7 +18,7 @@ type IndexConfiguration struct { Optimizers map[string]OptimizerConfiguration `koanf:"optimizers"` Override string `koanf:"override"` UseCommonTable bool `koanf:"useCommonTable"` - Target []string `koanf:"target"` + Target any `koanf:"target"` // Computed based on the overall configuration Name string diff --git a/quesma/quesma/config/test_configs/target_new_variant.yaml b/quesma/quesma/config/test_configs/target_new_variant.yaml new file mode 100644 index 000000000..127deb721 --- /dev/null +++ b/quesma/quesma/config/test_configs/target_new_variant.yaml @@ -0,0 +1,71 @@ +# TEST CONFIGURATION +licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004" +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://localhost:9200" + - name: my-clickhouse-data-source + type: clickhouse-os + config: + url: "clickhouse://localhost:9000" +ingestStatistics: true +internalTelemetryUrl: "https://api.quesma.com/phone-home" +logging: + remoteUrl: "https://api.quesma.com/phone-home" + path: "logs" + level: "info" +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + kibana_sample_data_ecommerce: + target: + - my-clickhouse-data-source: + useCommonTable: false + kibana_sample_data_flights: + target: + - my-clickhouse-data-source + kibana_sample_data_logs: + target: + - my-clickhouse-data-source: + useCommonTable: true + "*": + target: [ my-minimal-elasticsearch ] + + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + kibana_sample_data_ecommerce: + target: + - my-clickhouse-data-source: + useCommonTable: false + kibana_sample_data_flights: + target: + - my-clickhouse-data-source + kibana_sample_data_logs: + target: + - my-clickhouse-data-source: + useCommonTable: true + "*": + target: [ my-minimal-elasticsearch ] +pipelines: + - name: my-pipeline-elasticsearch-query-clickhouse + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ] + - name: my-pipeline-elasticsearch-ingest-to-clickhouse + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ] \ No newline at end of file