diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go new file mode 100644 index 0000000..b4c226c --- /dev/null +++ b/clowder/clowder_test.go @@ -0,0 +1,193 @@ +package clowder_test + +import ( + "github.com/RedHatInsights/insights-operator-utils/clowder" + "github.com/RedHatInsights/insights-operator-utils/kafka" + "github.com/RedHatInsights/insights-operator-utils/postgres" + api "github.com/redhatinsights/app-common-go/pkg/api/v1" + "github.com/stretchr/testify/assert" + "github.com/tisnik/go-capture" + "testing" +) + +func TestUseDBConfig(t *testing.T) { + cfg := postgres.StorageConfiguration{} + expected := postgres.StorageConfiguration{ + PGUsername: "username", + PGPassword: "password", + PGHost: "hostname", + PGPort: 1234, + PGDBName: "dbname", + } + loadedCfg := api.AppConfig{ + Database: &api.DatabaseConfig{ + AdminPassword: "adminpw", + AdminUsername: "admin", + Hostname: "hostname", + Name: "dbname", + Password: "password", + Port: 1234, + RdsCa: nil, + Username: "username", + }, + } + + clowder.UseDBConfig(&cfg, &loadedCfg) + assert.Equal(t, expected, cfg, "Clowder database config was not used") +} + +func TestUseClowderTopicsTopicFound(t *testing.T) { + originalTopicName := "topic1" + clowderTopicName := "NewTopicName" + brokerCfg := kafka.BrokerConfiguration{ + Topic: originalTopicName, + } + kafkaTopics := map[string]api.TopicConfig{ + originalTopicName: { + Name: clowderTopicName, + }, + "topic2": { + Name: "AnotherTopicName", + }, + } + + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") +} + +func TestUseClowderTopicsTopicNotFound(t *testing.T) { + originalTopicName := "topic1" + + brokerCfg := kafka.BrokerConfiguration{ + Topic: originalTopicName, + } + kafkaTopics := map[string]api.TopicConfig{ + "topic2": { + Name: "AnotherTopicName", + }, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + }) + assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change") + assert.Contains(t, output, "warning: no kafka mapping found for topic topic1") +} + +func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{} + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + assert.Contains(t, output, clowder.NoBrokerCfg) +} + +func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{}, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + assert.Contains(t, output, clowder.NoBrokerCfg) +} + +func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: "test_broker", + Port: nil, + }, + }, + }, + } + + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, "test_broker", brokerCfg.Address) +} + +func TestUseBrokerConfigNoAuth(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + port := 12345 + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: "test_broker", + Port: &port, + }, + }, + }, + } + + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, "test_broker:12345", brokerCfg.Address) +} + +func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + port := 12345 + authType := api.BrokerConfigAuthtypeSasl + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: "test_broker", + Port: &port, + Authtype: &authType, + }, + }, + }, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + + assert.Equal(t, "test_broker:12345", brokerCfg.Address) + assert.Contains(t, output, clowder.NoSaslCfg) +} + +func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + port := 12345 + saslCfg := "user_pwd" + protocol := "tls" + + authType := api.BrokerConfigAuthtypeSasl + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: "test_broker", + Port: &port, + Authtype: &authType, + Sasl: &api.KafkaSASLConfig{ + Password: &saslCfg, + Username: &saslCfg, + SaslMechanism: &saslCfg, + }, + SecurityProtocol: &protocol, + }, + }, + }, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + + assert.Equal(t, "test_broker:12345", brokerCfg.Address) + assert.Contains(t, output, "kafka is configured to use authentication") + assert.Equal(t, saslCfg, brokerCfg.SaslUsername) + assert.Equal(t, saslCfg, brokerCfg.SaslPassword) + assert.Equal(t, saslCfg, brokerCfg.SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SecurityProtocol) +} diff --git a/clowder/export_test.go b/clowder/export_test.go new file mode 100644 index 0000000..6dd3c81 --- /dev/null +++ b/clowder/export_test.go @@ -0,0 +1,6 @@ +package clowder + +var ( + NoBrokerCfg = noBrokerConfig + NoSaslCfg = noSaslConfig +) diff --git a/clowder/kafka.go b/clowder/kafka.go new file mode 100644 index 0000000..35b63d0 --- /dev/null +++ b/clowder/kafka.go @@ -0,0 +1,56 @@ +package clowder + +import ( + "fmt" + "github.com/RedHatInsights/insights-operator-utils/kafka" + api "github.com/redhatinsights/app-common-go/pkg/api/v1" +) + +// Common constants used for logging and error reporting +const ( + noBrokerConfig = "warning: no broker configurations found in clowder config" + noSaslConfig = "warning: SASL configuration is missing" + noTopicMapping = "warning: no kafka mapping found for topic %s" +) + +func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) { + // make sure broker(s) are configured in Clowder + if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { + broker := loadedConfig.Kafka.Brokers[0] + // port can be empty in api, so taking it into account + if broker.Port != nil { + brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + } else { + brokerCfg.Address = broker.Hostname + } + + // SSL config + if broker.Authtype != nil { + fmt.Println("kafka is configured to use authentication") + if broker.Sasl != nil { + // we are trusting that these values are set and + // dereferencing the pointers without any check... + brokerCfg.SaslUsername = *broker.Sasl.Username + brokerCfg.SaslPassword = *broker.Sasl.Password + brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism + brokerCfg.SecurityProtocol = *broker.SecurityProtocol + + if caPath, err := loadedConfig.KafkaCa(broker); err == nil { + brokerCfg.CertPath = caPath + } + } else { + fmt.Println(noSaslConfig) + } + } + } else { + fmt.Println(noBrokerConfig) + } +} +func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) { + // Get the correct topic name from clowder mapping if available + if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok { + configuration.Topic = clowderTopic.Name + } else { + fmt.Printf(noTopicMapping, configuration.Topic) + } +} diff --git a/clowder/storage.go b/clowder/storage.go new file mode 100644 index 0000000..2c543cc --- /dev/null +++ b/clowder/storage.go @@ -0,0 +1,14 @@ +package clowder + +import ( + "github.com/RedHatInsights/insights-operator-utils/postgres" + api "github.com/redhatinsights/app-common-go/pkg/api/v1" +) + +func UseDBConfig(storageCfg *postgres.StorageConfiguration, loadedConfig *api.AppConfig) { + storageCfg.PGDBName = loadedConfig.Database.Name + storageCfg.PGHost = loadedConfig.Database.Hostname + storageCfg.PGPort = loadedConfig.Database.Port + storageCfg.PGUsername = loadedConfig.Database.Username + storageCfg.PGPassword = loadedConfig.Database.Password +} diff --git a/go.mod b/go.mod index ca70b8a..f954cfc 100644 --- a/go.mod +++ b/go.mod @@ -20,9 +20,11 @@ require ( github.com/mozillazg/request v0.8.0 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 + github.com/redhatinsights/app-common-go v1.6.7 github.com/redis/go-redis/v9 v9.3.1 github.com/rs/zerolog v1.31.0 github.com/stretchr/testify v1.8.4 + github.com/tisnik/go-capture v1.0.1 github.com/verdverm/frisby v0.0.0-20170604211311-b16556248a9a github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c gopkg.in/h2non/gock.v1 v1.1.2 diff --git a/go.sum b/go.sum index a245d91..bbbc325 100644 --- a/go.sum +++ b/go.sum @@ -555,6 +555,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redhatinsights/app-common-go v1.6.7 h1:cXWW0F6ZW53RLRr54gn7Azo9CLTysYOmFDR0D0Qd0Fs= +github.com/redhatinsights/app-common-go v1.6.7/go.mod h1:6gzRyg8ZyejwMCksukeAhh2ZXOB3uHSmBsbP06fG2PQ= github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -618,6 +620,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/tisnik/go-capture v1.0.1 h1:o4zZpOlC01qCifeh0fj4SoUkt8UHFassn1+blmFN3BQ= github.com/tisnik/go-capture v1.0.1/go.mod h1:NArgKXuvcG6gOW2SQoPGKy6TuiKBttQ2ZV0/zC4zVaY= github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b/go.mod h1:uqlTeGUUfRdQvlQGkv+DYe3lLST3DionEwMA9YAYibY= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/kafka/configuration.go b/kafka/configuration.go index 148c88b..70c9a32 100644 --- a/kafka/configuration.go +++ b/kafka/configuration.go @@ -14,8 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package broker contains data types, interfaces, and methods related to -// brokers that can be used to consume input messages by aggegator. +// Package kafka contains data types, interfaces, and methods related to +// Kafka that can be used to configure brokers, as well as consume/produce +// messages. package kafka import ( diff --git a/postgres/config.go b/postgres/config.go new file mode 100644 index 0000000..5dbf956 --- /dev/null +++ b/postgres/config.go @@ -0,0 +1,13 @@ +package postgres + +// StorageConfiguration represents common configuration of data storage +type StorageConfiguration struct { + Driver string `mapstructure:"db_driver" toml:"db_driver"` + PGUsername string `mapstructure:"pg_username" toml:"pg_username"` + PGPassword string `mapstructure:"pg_password" toml:"pg_password"` + PGHost string `mapstructure:"pg_host" toml:"pg_host"` + PGPort int `mapstructure:"pg_port" toml:"pg_port"` + PGDBName string `mapstructure:"pg_db_name" toml:"pg_db_name"` + PGParams string `mapstructure:"pg_params" toml:"pg_params"` + LogSQLQueries bool `mapstructure:"log_sql_queries" toml:"log_sql_queries"` +}