diff --git a/quesma/go.mod b/quesma/go.mod index 0b7fe7dac..e8cd6832b 100644 --- a/quesma/go.mod +++ b/quesma/go.mod @@ -12,6 +12,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/hashicorp/go-multierror v1.1.1 github.com/k0kubun/pp v3.0.1+incompatible + github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 github.com/knadh/koanf/providers/env v1.0.0 github.com/knadh/koanf/providers/file v1.1.2 @@ -21,6 +22,7 @@ require ( github.com/shirou/gopsutil/v3 v3.24.5 github.com/stretchr/testify v1.9.0 github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a + github.com/tidwall/sjson v1.2.5 github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 ) @@ -33,6 +35,9 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect ) require ( diff --git a/quesma/go.sum b/quesma/go.sum index 0908f95f4..a49da8bb0 100644 --- a/quesma/go.sum +++ b/quesma/go.sum @@ -60,6 +60,8 @@ github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/N github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/parsers/json v0.1.0 h1:dzSZl5pf5bBcW0Acnu20Djleto19T0CfHcvZ14NJ6fU= +github.com/knadh/koanf/parsers/json v0.1.0/go.mod h1:ll2/MlXcZ2BfXD6YJcjVFzhG9P0TdJ207aIBKQhV2hY= github.com/knadh/koanf/parsers/yaml v0.1.0 h1:ZZ8/iGfRLvKSaMEECEBPM1HQslrZADk8fP1XFUxVI5w= github.com/knadh/koanf/parsers/yaml v0.1.0/go.mod h1:cvbUDC7AL23pImuQP0oRw/hPuccrNBS2bps8asS0CwY= github.com/knadh/koanf/providers/env v1.0.0 h1:ufePaI9BnWH+ajuxGGiJ8pdTG0uLEUWC7/HDDPGLah0= @@ -123,7 +125,17 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a h1:SJy1Pu0eH1C29XwJucQo73FrleVK6t4kYz4NVhp34Yw= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a/go.mod h1:DFSS3NAGHthKo1gTlmEcSBiZrRJXi28rLNd/1udP1c8= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index 9bbbb345a..f4ca53383 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -6,7 +6,8 @@ import ( "errors" "fmt" "github.com/hashicorp/go-multierror" - "github.com/knadh/koanf/providers/env" + "github.com/knadh/koanf/parsers/json" + "github.com/knadh/koanf/v2" "github.com/rs/zerolog" "log" "quesma/network" @@ -100,18 +101,14 @@ func LoadV2Config() QuesmaNewConfiguration { v2config.Logging.RemoteLogDrainUrl = telemetryUrl loadConfigFile() - if err := k.Load(env.Provider("QUESMA_", ".", func(s string) string { - // This enables overriding config values with environment variables. It's case-sensitive, just like the YAML. - // Examples: - // `QUESMA_logging_level=debug` overrides `logging.level` in the config file - // `QUESMA_licenseKey=arbitrary-license-key` overrides `licenseKey` in the config file - return strings.Replace(strings.TrimPrefix(s, "QUESMA_"), "_", ".", -1) - }), nil); err != nil { + // We have to use custom env provider to allow array overrides + if err := k.Load(Env2JsonProvider("QUESMA_", "_", nil), json.Parser(), koanf.WithMergeFunc(mergeDictFunc)); err != nil { log.Fatalf("error loading config form supplied env vars: %v", err) } if err := k.Unmarshal("", &v2config); err != nil { log.Fatalf("error unmarshalling config: %v", err) } + if err := v2config.Validate(); err != nil { log.Fatalf("Config validation failed: %v", err) } @@ -215,7 +212,11 @@ func (c *QuesmaNewConfiguration) validatePipelines() error { } var backendConnectorTypes []string for _, con := range declaredBackendConnectors { - backendConnectorTypes = append(backendConnectorTypes, c.getBackendConnectorByName(con).Type) + connector := c.getBackendConnectorByName(con) + if connector == nil { + return fmt.Errorf(fmt.Sprintf("backend connector named [%s] not found in configuration", con)) + } + backendConnectorTypes = append(backendConnectorTypes, connector.Type) } if !slices.Contains(backendConnectorTypes, ElasticsearchBackendConnectorName) { return fmt.Errorf("query processor requires having one elasticsearch backend connector") diff --git a/quesma/quesma/config/config_v2_test.go b/quesma/quesma/config/config_v2_test.go index defa0ff04..a8d05dce1 100644 --- a/quesma/quesma/config/config_v2_test.go +++ b/quesma/quesma/config/config_v2_test.go @@ -16,8 +16,15 @@ func TestQuesmaConfigurationLoading(t *testing.T) { logLevelPassedAsEnvVar := "debug" licenseKeyPassedAsEnvVar := "arbitraty-license-key" - os.Setenv("QUESMA_logging_level", logLevelPassedAsEnvVar) // overrides what's in the config file - os.Setenv("QUESMA_licenseKey", licenseKeyPassedAsEnvVar) // overrides what's in the config file + os.Setenv("QUESMA_logging_level", logLevelPassedAsEnvVar) // overrides what's in the config file + os.Setenv("QUESMA_licenseKey", licenseKeyPassedAsEnvVar) // overrides what's in the config file + os.Setenv("QUESMA_backendConnectors_1_config_user", "user") // overrides what's in the config file + t.Cleanup(func() { + os.Unsetenv(configFileLocationEnvVar) + os.Unsetenv("QUESMA_logging_level") + os.Unsetenv("QUESMA_licenseKey") + os.Unsetenv("QUESMA_backendConnectors_1_config_user") + }) cfg := LoadV2Config() if err := cfg.Validate(); err != nil { t.Fatalf("error validating config: %v", err) @@ -30,6 +37,7 @@ func TestQuesmaConfigurationLoading(t *testing.T) { assert.Equal(t, 8080, int(legacyCfg.PublicTcpPort)) assert.Equal(t, "http://localhost:9200", legacyCfg.Elasticsearch.Url.String()) assert.Equal(t, "clickhouse://localhost:9000", legacyCfg.ClickHouse.Url.String()) + assert.Equal(t, "user", legacyCfg.ClickHouse.User) assert.Equal(t, true, legacyCfg.IngestStatistics) assert.Equal(t, "logs", legacyCfg.Logging.Path) assert.Equal(t, logLevelPassedAsEnvVar, legacyCfg.Logging.Level.String()) diff --git a/quesma/quesma/config/env2json.go b/quesma/quesma/config/env2json.go new file mode 100644 index 000000000..9258e7b55 --- /dev/null +++ b/quesma/quesma/config/env2json.go @@ -0,0 +1,189 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package config + +import ( + "errors" + "fmt" + "github.com/tidwall/sjson" + "os" + "quesma/logger" + "strings" +) + +type Env2Json struct { + prefix string + separator string + callback func(key string, value string) (string, interface{}) + resultJson string +} + +func Env2JsonProvider(prefix, sep string, callback func(key string, value string) (string, interface{})) *Env2Json { + if len(prefix) == 0 || len(sep) == 0 { + logger.Error().Msgf("Env2JsonProvider: prefix '%s' and sep '%s' is required", prefix, sep) + return nil + } + if callback == nil { + callback = func(key string, value string) (string, interface{}) { + return key, value + } + } + e := &Env2Json{ + prefix: prefix, + separator: sep, + resultJson: "{}", + callback: callback, + } + return e +} + +func (e *Env2Json) ReadBytes() ([]byte, error) { + var envKeyValues []string + for _, keyValue := range os.Environ() { + if strings.HasPrefix(keyValue, e.prefix) { + envKeyValues = append(envKeyValues, strings.TrimPrefix(keyValue, e.prefix)) + } + } + + for _, keyValue := range envKeyValues { + parts := strings.SplitN(keyValue, "=", 2) + if len(parts) != 2 { + return []byte{}, fmt.Errorf("invalid environment variable '%s', no '='", keyValue) + } + key, value := e.callback(parts[0], parts[1]) + // Omit blank keys + if key == "" { + continue + } + + if err := e.set(key, value); err != nil { + return []byte{}, err + } + } + + return []byte(e.resultJson), nil +} + +func (e *Env2Json) set(key string, value interface{}) error { + resultJson, err := sjson.Set(e.resultJson, strings.Replace(key, e.separator, ".", -1), value) + if err == nil { + e.resultJson = resultJson + } + + return err +} + +func (e *Env2Json) Read() (map[string]interface{}, error) { + return nil, errors.New("env2json Provider does not support Read()") +} + +func mergeArrayFunc(src, dest []interface{}) ([]interface{}, error) { + newLen := len(src) + if len(dest) > newLen { + newLen = len(dest) + } + newArray := make([]interface{}, newLen) + + for i := 0; i < newLen; i++ { + if i >= len(src) { + newArray[i] = dest[i] + } else if i >= len(dest) { + newArray[i] = src[i] + } else if src[i] == nil { + newArray[i] = dest[i] + } else if dest[i] == nil { + newArray[i] = src[i] + } else { + if srcMap, isMap := src[i].(map[string]interface{}); isMap { + if destMap, isDestMap := dest[i].(map[string]interface{}); isDestMap { + if err := mergeDictFunc(srcMap, destMap); err != nil { + return nil, err + } + newArray[i] = destMap + continue + } + } + + newArray[i] = src[i] + } + } + + return newArray, nil +} + +func mergeDictIntoArrayFunc(src map[string]interface{}, dest []interface{}) ([]interface{}, error) { + newArray := make([]interface{}, len(dest)) + copy(newArray, dest) + for k, v := range src { + foundIdx := -1 + + // find existing element with same name + for i := range newArray { + if m, isMap := newArray[i].(map[string]interface{}); isMap { + if m["name"] == k { + foundIdx = i + break + } + } + } + + // if not exist add new element + if foundIdx == -1 { + foundIdx = len(newArray) + newMap := make(map[string]interface{}) + newMap["name"] = k + newArray = append(newArray, newMap) + } + + if m, isMap := newArray[foundIdx].(map[string]interface{}); isMap { + if vTyped, isMap2 := v.(map[string]interface{}); isMap2 { + if err := mergeDictFunc(vTyped, m); err != nil { + return nil, err + } + newArray[foundIdx] = m + continue + } + } + newArray[foundIdx] = v + } + return newArray, nil +} + +func mergeDictFunc(src, dest map[string]interface{}) error { + for k, v := range src { + switch vTyped := v.(type) { + case map[string]interface{}: + if destV, exist := dest[k]; exist { + if destMap, isMap := destV.(map[string]interface{}); isMap { + if err := mergeDictFunc(vTyped, destMap); err != nil { + return err + } + continue + } else if destArray, isArray := destV.([]interface{}); isArray { + if newV, err := mergeDictIntoArrayFunc(vTyped, destArray); err != nil { + return err + } else { + dest[k] = newV + } + continue + } + } + dest[k] = v + case []interface{}: + if destV, exist := dest[k]; exist { + if destMap, isArray := destV.([]interface{}); isArray { + if newV, err := mergeArrayFunc(vTyped, destMap); err != nil { + return err + } else { + dest[k] = newV + } + continue + } + } + dest[k] = v + default: + dest[k] = v + } + } + return nil +} diff --git a/quesma/quesma/config/env2json_test.go b/quesma/quesma/config/env2json_test.go new file mode 100644 index 000000000..d222bcc6f --- /dev/null +++ b/quesma/quesma/config/env2json_test.go @@ -0,0 +1,79 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package config + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestEnv2Json_arrays(t *testing.T) { + provider := Env2JsonProvider("ENV2JSON_", "_", nil) + os.Setenv("ENV2JSON_licenseKey", "secret_key") + os.Setenv("ENV2JSON_backendConnectors_0_config_url", "http://localhost:8080") + os.Setenv("ENV2JSON_backendConnectors_0_config_user", "user") + os.Setenv("ENV2JSON_backendConnectors_0_config_password", "password") + t.Cleanup(func() { + os.Unsetenv("ENV2JSON_licenseKey") + os.Unsetenv("ENV2JSON_backendConnectors_0_config_url") + os.Unsetenv("ENV2JSON_backendConnectors_0_config_user") + os.Unsetenv("ENV2JSON_backendConnectors_0_config_password") + }) + resultJson, err := provider.ReadBytes() + assert.NoError(t, err) + + expectedJson := `{"licenseKey":"secret_key","backendConnectors":[{"config":{"url":"http://localhost:8080","user":"user","password":"password"}}]}` + assert.Equal(t, expectedJson, string(resultJson)) +} + +func TestEnv2Json_arraysByName(t *testing.T) { + os.Setenv(configFileLocationEnvVar, "./test_configs/test_config_v2.yaml") + os.Setenv("QUESMA_licenseKey", "secret_key") + os.Setenv("QUESMA_backendConnectors_my-clickhouse-data-source_config_url", "http://localhost:9201") + os.Setenv("QUESMA_backendConnectors_my-clickhouse-data-source_config_user", "user") + os.Setenv("QUESMA_backendConnectors_my-clickhouse-data-source_config_password", "password") + t.Cleanup(func() { + os.Unsetenv("QUESMA_licenseKey") + os.Unsetenv("QUESMA_backendConnectors_my-clickhouse-data-source_config_url") + os.Unsetenv("QUESMA_backendConnectors_my-clickhouse-data-source_config_user") + os.Unsetenv("QUESMA_backendConnectors_my-clickhouse-data-source_config_password") + }) + + cfg := LoadV2Config() + assert.Len(t, cfg.BackendConnectors, 2) + clickHouseBackend := cfg.BackendConnectors[1] + assert.Equal(t, "my-clickhouse-data-source", clickHouseBackend.Name) + assert.Equal(t, "http://localhost:9201", clickHouseBackend.Config.Url.String()) + assert.Equal(t, "user", clickHouseBackend.Config.User) + assert.Equal(t, "password", clickHouseBackend.Config.Password) +} + +func TestEnv2Json_empty(t *testing.T) { + provider := Env2JsonProvider("ENV2JSON2_", "_", nil) + resultJson, err := provider.ReadBytes() + assert.NoError(t, err) + + expectedJson := `{}` + assert.Equal(t, expectedJson, string(resultJson)) +} + +func TestEnv2Json_jsonMerge(t *testing.T) { + jsonA := `{"a":1,"b":2,"c":[{"d":1},{"d":2},{"d":3}]}` + jsonB := `{"a":3,"l":2,"c":[null,{"e":42}]}` + // turn into dicts + var dictA map[string]interface{} + var dictB map[string]interface{} + err := json.Unmarshal([]byte(jsonA), &dictA) + assert.NoError(t, err) + err = json.Unmarshal([]byte(jsonB), &dictB) + assert.NoError(t, err) + + err = mergeDictFunc(dictA, dictB) + assert.NoError(t, err) + mergedJson, err2 := json.Marshal(dictB) + assert.NoError(t, err2) + expectedJson := `{"a":1,"b":2,"c":[{"d":1},{"d":2,"e":42},{"d":3}],"l":2}` + assert.Equal(t, expectedJson, string(mergedJson)) +}