Skip to content

Commit

Permalink
Preserve order of targets in QueryTarget, IngestTarget (#897)
Browse files Browse the repository at this point in the history
A user in the configuration of a query/ingest processor can define
targets:
```yaml
my_index:
  target: [ elastic-conn, clickhouse-conn ]
```
We subsequently take those values and populate `QueryTarget` and
`IngestTarget` arrays based on it, however the code did not preserve the
order of the targets. The order is important for A/B testing case and
`rules.go` had to incorrectly allow an unsupported scenario for
everything to work correctly.

Fix the issue by preserving the same order in `QueryTarget`,
`IngestTarget` as in the original `target` user configuration.
`rules.go` workaround can be now removed.
  • Loading branch information
avelanarius authored Oct 16, 2024
1 parent c8d579e commit 1d982f4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 31 deletions.
64 changes: 35 additions & 29 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ func (c *QuesmaNewConfiguration) getProcessorByName(name string) *Processor {
return nil
}

func (c *QuesmaNewConfiguration) getTargetType(backendConnectorName string) (string, bool) {
backendConnector := c.getBackendConnectorByName(backendConnectorName)
if backendConnector == nil {
return "", false
}
switch backendConnector.Type {
case ElasticsearchBackendConnectorName:
return ElasticsearchTarget, true
case ClickHouseOSBackendConnectorName, ClickHouseBackendConnectorName, HydrolixBackendConnectorName:
return ClickhouseTarget, true
default:
return "", false
}
}

func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
var err, errAcc error
var conf QuesmaConfiguration
Expand Down Expand Up @@ -505,25 +520,20 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
// this a COPY-PASTE from the dual pipeline case, but we need to do it here as well
// TODO refactor this to a separate function

elasticBackendName := c.getElasticsearchBackendConnector().Name
var relationalDBBackendName string
if relationalDBBackend, _ := c.getRelationalDBBackendConnector(); relationalDBBackend != nil {
relationalDBBackendName = relationalDBBackend.Name
}

conf.IndexConfig = make(map[string]IndexConfiguration)
for indexName, indexConfig := range queryProcessor.Config.IndexConfig {
processedConfig := indexConfig
processedConfig.Name = indexName

if slices.Contains(indexConfig.Target, elasticBackendName) {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ElasticsearchTarget)
}
if slices.Contains(indexConfig.Target, relationalDBBackendName) {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ClickhouseTarget)
for _, target := range indexConfig.Target {
if targetType, found := c.getTargetType(target); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
}

if len(processedConfig.QueryTarget) == 2 && !(indexConfig.Target[0] == relationalDBBackendName && indexConfig.Target[1] == elasticBackendName) {
if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
continue
}
Expand Down Expand Up @@ -566,27 +576,22 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
goto END
}

elasticBackendName := c.getElasticsearchBackendConnector().Name
var relationalDBBackendName string
if relationalDBBackend, _ := c.getRelationalDBBackendConnector(); relationalDBBackend != nil {
relationalDBBackendName = relationalDBBackend.Name
}

conf.IndexConfig = make(map[string]IndexConfiguration)
for indexName, indexConfig := range queryProcessor.Config.IndexConfig {
processedConfig := indexConfig
processedConfig.Name = indexName

processedConfig.IngestTarget = DefaultIngestTarget

if slices.Contains(indexConfig.Target, elasticBackendName) {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ElasticsearchTarget)
}
if slices.Contains(indexConfig.Target, relationalDBBackendName) {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ClickhouseTarget)
for _, target := range indexConfig.Target {
if targetType, found := c.getTargetType(target); found {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
}

if len(processedConfig.QueryTarget) == 2 && !(indexConfig.Target[0] == relationalDBBackendName && indexConfig.Target[1] == elasticBackendName) {
if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
continue
}
Expand Down Expand Up @@ -616,11 +621,12 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}

processedConfig.IngestTarget = make([]string, 0) // reset previously set DefaultIngestTarget
if slices.Contains(indexConfig.Target, elasticBackendName) {
processedConfig.IngestTarget = append(processedConfig.IngestTarget, ElasticsearchTarget)
}
if slices.Contains(indexConfig.Target, relationalDBBackendName) {
processedConfig.IngestTarget = append(processedConfig.IngestTarget, ClickhouseTarget)
for _, target := range indexConfig.Target {
if targetType, found := c.getTargetType(target); found {
processedConfig.IngestTarget = append(processedConfig.IngestTarget, targetType)
} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("invalid target %s in configuration of index %s", target, indexName))
}
}

conf.IndexConfig[indexName] = processedConfig
Expand Down
3 changes: 1 addition & 2 deletions quesma/table_resolver/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi

case QueryPipeline:

if (targets[0] == config.ClickhouseTarget && targets[1] == config.ElasticsearchTarget) ||
(targets[0] == config.ElasticsearchTarget && targets[1] == config.ClickhouseTarget) {
if targets[0] == config.ClickhouseTarget && targets[1] == config.ElasticsearchTarget {

return &Decision{
Reason: "Enabled in the config. A/B testing.",
Expand Down

0 comments on commit 1d982f4

Please sign in to comment.