Skip to content

Commit

Permalink
Create common table if needed (#828)
Browse files Browse the repository at this point in the history
We should create the common table if there is an index using it. In
other cases, it emits unnecessary logs.

It fixes a bug while processing the config.
  • Loading branch information
nablaone authored Oct 1, 2024
1 parent 4fe5a7c commit 06327f2
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 6 deletions.
6 changes: 4 additions & 2 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func main() {
var ingestProcessor *ingest.IngestProcessor

if cfg.EnableIngest {
// Ensure common table exists. This table have to be created before ingest processor starts
common_table.EnsureCommonTableExists(connectionPool)
if cfg.CreateCommonTable {
// Ensure common table exists. This table have to be created before ingest processor starts
common_table.EnsureCommonTableExists(connectionPool)
}

ingestProcessor = ingest.NewEmptyIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage)
} else {
Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type QuesmaConfiguration struct {
DisableAuth bool `koanf:"disableAuth"`
AutodiscoveryEnabled bool

EnableIngest bool // this is computed from the configuration 2.0
EnableIngest bool // this is computed from the configuration 2.0
CreateCommonTable bool
}

type LoggingConfiguration struct {
Expand Down
12 changes: 10 additions & 2 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,11 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ClickhouseTarget)
}

if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
if len(processedConfig.QueryTarget) == 2 && !(indexConfig.Target[0] == relationalDBBackendName && indexConfig.Target[1] == elasticBackendName) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
continue
}

if len(processedConfig.QueryTarget) == 2 {
// Turn on A/B testing
processedConfig.Optimizers = make(map[string]OptimizerConfiguration)
Expand Down Expand Up @@ -579,7 +580,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ClickhouseTarget)
}

if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
if len(processedConfig.QueryTarget) == 2 && !(indexConfig.Target[0] == relationalDBBackendName && indexConfig.Target[1] == elasticBackendName) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
continue
}
Expand Down Expand Up @@ -632,6 +633,13 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {

END:

for _, idxCfg := range conf.IndexConfig {
if idxCfg.UseCommonTable {
conf.CreateCommonTable = true
break
}
}

if !conf.TransparentProxy {
if relationalDBErr != nil {
errAcc = multierror.Append(errAcc, relationalDBErr)
Expand Down
32 changes: 31 additions & 1 deletion quesma/quesma/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/stretchr/testify/assert"
"os"
"strings"
"testing"
)

Expand Down Expand Up @@ -70,6 +71,8 @@ func TestQuesmaTransparentProxyConfiguration(t *testing.T) {
}
legacyConf := cfg.TranslateToLegacyConfig()
assert.True(t, legacyConf.TransparentProxy)
assert.Equal(t, false, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.CreateCommonTable)
}

func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {
Expand All @@ -89,7 +92,8 @@ func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {

assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{"elasticsearch"}, logsIndexConf.IngestTarget)

assert.Equal(t, true, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.CreateCommonTable)
}

func TestQuesmaHydrolixQueryOnly(t *testing.T) {
Expand All @@ -113,6 +117,32 @@ func TestQuesmaHydrolixQueryOnly(t *testing.T) {

assert.Equal(t, false, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.IngestStatistics)
assert.Equal(t, false, legacyConf.CreateCommonTable)
}

func TestHasCommonTable(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/has_common_table.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, true, legacyConf.EnableIngest)
assert.Equal(t, true, legacyConf.CreateCommonTable)
}

func TestInvalidDualTarget(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/invalid_dual_target.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {

if !strings.Contains(err.Error(), "has invalid dual query target configuration - when you specify two targets") {
t.Fatalf("unexpected error: %v", err)
}

t.Fatalf("expected error, but got none")
}
}

func TestMatchName(t *testing.T) {
Expand Down
70 changes: 70 additions & 0 deletions quesma/quesma/config/test_configs/has_common_table.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
installationId: #HYDROLIX_REQUIRES_THIS
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: E
type: elasticsearch
config:
url: "http://elasticsearch:9200"
user: elastic
password: quesmaquesma
- name: C
type: clickhouse-os
config:
url: "clickhouse://clickhouse:9000"
ingestStatistics: true
processors:
- name: QP
type: quesma-v1-processor-query
config:
indexes:
logs-1:
target: [ E ]
logs-2:
target: [ E ]
logs-3:
target: [ C, E ]
logs-4:
useCommonTable: true
target: [ C ]
logs-5:
useCommonTable: true
target: [ C ]
"*":
target: [ E ]

- name: IP
type: quesma-v1-processor-ingest
config:
indexes:
logs-1:
target: [ E ]
logs-2:
target: [ E ]
logs-3:
target: [ C, E ]
logs-4:
useCommonTable: true
target: [ C ]
"*":
target: [ E ]
logs-5:
useCommonTable: true
target: [ ]

pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ QP ]
backendConnectors: [ E, C ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ IP ]
backendConnectors: [ E, C ]
53 changes: 53 additions & 0 deletions quesma/quesma/config/test_configs/invalid_dual_target.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
installationId: #HYDROLIX_REQUIRES_THIS
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: E
type: elasticsearch
config:
url: "http://elasticsearch:9200"
user: elastic
password: quesmaquesma
- name: C
type: clickhouse-os
config:
url: "clickhouse://clickhouse:9000"
ingestStatistics: true
processors:
- name: QP
type: quesma-v1-processor-query
config:
indexes:
invalid:
target: [ E,C ]
"*":
target: [ E ]

- name: IP
type: quesma-v1-processor-ingest
config:
indexes:
invalid:
target: [ C, E ]
"*":
target: [ E ]
logs-5:
useCommonTable: true
target: [ ]

pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ QP ]
backendConnectors: [ E, C ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ IP ]
backendConnectors: [ E, C ]

0 comments on commit 06327f2

Please sign in to comment.