From fec884bbe68176053204e7284e6941494c9cce00 Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Mon, 15 Jan 2024 10:38:20 +0100 Subject: [PATCH] Revert "Refactor MultiBrokerConfiguration into a slice of BrokerConfiguration objects" This reverts commit ee63cd11459da087ac734039ab360f318f4addee. --- clowder/clowder_test.go | 133 ++++++++++++------------------------ clowder/export_test.go | 5 +- clowder/kafka.go | 70 ++++++++----------- kafka/configuration.go | 28 ++++---- kafka/configuration_test.go | 15 ++-- 5 files changed, 95 insertions(+), 156 deletions(-) diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go index 3c70704..bc0680d 100644 --- a/clowder/clowder_test.go +++ b/clowder/clowder_test.go @@ -16,14 +16,13 @@ package clowder_test import ( "fmt" - "testing" - "github.com/RedHatInsights/insights-operator-utils/clowder" "github.com/RedHatInsights/insights-operator-utils/kafka" "github.com/RedHatInsights/insights-operator-utils/postgres" api "github.com/redhatinsights/app-common-go/pkg/api/v1" "github.com/stretchr/testify/assert" "github.com/tisnik/go-capture" + "testing" ) func TestUseDBConfig(t *testing.T) { @@ -55,10 +54,8 @@ func TestUseDBConfig(t *testing.T) { func TestUseClowderTopicsTopicFound(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.BrokersConfig{ - { - Topic: originalTopicName, - }, + brokerCfg := kafka.SingleBrokerConfiguration{ + Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ originalTopicName: { @@ -69,17 +66,15 @@ func TestUseClowderTopicsTopicFound(t *testing.T) { }, } - clowder.UseClowderTopics(brokerCfg, kafkaTopics) - assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used") + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") } func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.BrokersConfig{ - { - Topic: originalTopicName, - }, + brokerCfg := kafka.MultiBrokerConfiguration{ + Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ originalTopicName: { @@ -90,17 +85,15 @@ func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { }, } - clowder.UseClowderTopics(brokerCfg, kafkaTopics) - assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used") + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") } func TestUseClowderTopicsTopicNotFound(t *testing.T) { originalTopicName := "topic1" - brokerCfg := kafka.BrokersConfig{ - { - Topic: originalTopicName, - }, + brokerCfg := kafka.SingleBrokerConfiguration{ + Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ "topic2": { @@ -109,73 +102,39 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) { } output, _ := capture.StandardOutput(func() { - clowder.UseClowderTopics(brokerCfg, kafkaTopics) + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) }) - assert.Equal(t, originalTopicName, brokerCfg[0].Topic, "topic name should not change") + assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change") assert.Contains(t, output, "warning: no kafka mapping found for topic topic1") } -func TestGetBrokersAddressesNoBrokerConfig(t *testing.T) { - cfg := kafka.BrokersConfig{} - assert.Equal(t, []string{}, kafka.GetBrokersAddresses(cfg)) -} - -func TestGetBrokersAddressesSingleBrokerConfig(t *testing.T) { - const addr = "some_addr" - cfg := kafka.BrokersConfig{ - {Address: addr}, - } - assert.Equal(t, []string{addr}, kafka.GetBrokersAddresses(cfg)) -} - -func TestGetBrokersAddressesMultipleBrokerConfig(t *testing.T) { - const addr, addr2 = "some_addr", "some_other_addr" - cfg := kafka.BrokersConfig{ - {Address: addr}, - {Address: addr2}, - } - assert.Equal(t, []string{addr, addr2}, kafka.GetBrokersAddresses(cfg)) -} - -func TestUseBrokerConfigNoClowderKafkaConfig(t *testing.T) { - brokerCfg := kafka.BrokersConfig{{}} +func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{} output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) assert.Contains(t, output, clowder.NoBrokerCfg) } -func TestUseBrokerConfigNoOriginalKafkaBrokers(t *testing.T) { - brokerCfg := kafka.BrokersConfig{} - loadedConfig := api.AppConfig{ - Kafka: &api.KafkaConfig{}, - } - - output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(brokerCfg, &loadedConfig) - }) - assert.Contains(t, output, clowder.NoOriginalBroker) -} - -func TestUseBrokerConfigNoClowderKafkaBrokers(t *testing.T) { - brokerCfg := kafka.BrokersConfig{{}} +func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{}, } output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) assert.Contains(t, output, clowder.NoBrokerCfg) } -func TestUseBrokerConfigMultipleClowderKafkaBrokers(t *testing.T) { +func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { addr1 := "test_broker" addr2 := "test_broker_backup" port := 12345 - brokerCfg := kafka.BrokersConfig{{}} + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -191,14 +150,13 @@ func TestUseBrokerConfigMultipleClowderKafkaBrokers(t *testing.T) { }, } - brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) - assert.Equal(t, fmt.Sprintf("%s:%d", addr1, port), brokerCfg[0].Address) - assert.Equal(t, addr2, brokerCfg[1].Address) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses) } func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { addr := "test_broker" - brokerCfg := kafka.BrokersConfig{{}} + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -210,12 +168,12 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { }, } - brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) - assert.Equal(t, addr, brokerCfg[0].Address) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, []string{addr}, brokerCfg.Addresses) } func TestUseBrokerConfigNoAuth(t *testing.T) { - brokerCfg := kafka.BrokersConfig{{}} + brokerCfg := kafka.MultiBrokerConfiguration{} port := 12345 addr := "test_broker" loadedConfig := api.AppConfig{ @@ -229,12 +187,12 @@ func TestUseBrokerConfigNoAuth(t *testing.T) { }, } - brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) } func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { - brokerCfg := kafka.BrokersConfig{{}} + brokerCfg := kafka.MultiBrokerConfiguration{} port := 12345 addr := "test_broker" authType := api.BrokerConfigAuthtypeSasl @@ -251,15 +209,15 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { } output, _ := capture.StandardOutput(func() { - brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) assert.Contains(t, output, clowder.NoSaslCfg) } func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { - brokerCfg := kafka.BrokersConfig{{}} + brokerCfg := kafka.MultiBrokerConfiguration{} port := 12345 addr := "test_broker" addr2 := "test_broker_backup" @@ -300,20 +258,19 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { } output, _ := capture.StandardOutput(func() { - brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses) assert.Contains(t, output, "kafka is configured to use authentication") - - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address) - assert.Equal(t, saslUsr, brokerCfg[0].SaslUsername) - assert.Equal(t, saslPwd, brokerCfg[0].SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg[0].SaslMechanism) - assert.Equal(t, protocol, brokerCfg[0].SecurityProtocol) - - assert.Equal(t, fmt.Sprintf("%s:%d", addr2, port), brokerCfg[1].Address) - assert.Equal(t, saslUsr2, brokerCfg[1].SaslUsername) - assert.Equal(t, saslPwd, brokerCfg[1].SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg[1].SaslMechanism) - assert.Equal(t, protocol, brokerCfg[1].SecurityProtocol) + assert.Equal(t, 2, len(brokerCfg.SASLConfigs)) + assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername) + assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol) + + assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername) + assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol) } diff --git a/clowder/export_test.go b/clowder/export_test.go index 443f4a7..8556f2d 100644 --- a/clowder/export_test.go +++ b/clowder/export_test.go @@ -15,7 +15,6 @@ package clowder var ( - NoBrokerCfg = noBrokerConfig - NoOriginalBroker = noOriginalBroker - NoSaslCfg = noSaslConfig + NoBrokerCfg = noBrokerConfig + NoSaslCfg = noSaslConfig ) diff --git a/clowder/kafka.go b/clowder/kafka.go index a79e243..c5142fe 100644 --- a/clowder/kafka.go +++ b/clowder/kafka.go @@ -16,52 +16,28 @@ package clowder import ( "fmt" - "github.com/RedHatInsights/insights-operator-utils/kafka" api "github.com/redhatinsights/app-common-go/pkg/api/v1" ) // Common constants used for logging and error reporting const ( - noOriginalBroker = "warning: no original broker configuration found; aborting" - noBrokerConfig = "warning: no broker configurations found in clowder config" - noSaslConfig = "warning: SASL configuration is missing" - noTopicMapping = "warning: no kafka mapping found for topic %s" + noBrokerConfig = "warning: no broker configurations found in clowder config" + noSaslConfig = "warning: SASL configuration is missing" + noTopicMapping = "warning: no kafka mapping found for topic %s" ) // UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values -// loaded by Clowder. It expects brokerCfg to already be initialized -func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig) kafka.BrokersConfig { - numBrokerConfigs := len(brokerCfg) - if numBrokerConfigs == 0 { - // if original brokers config is totally empty, do nothing. - // this shouldn't happen, but we need to control this scenario - // to avoid panics. - fmt.Println(noOriginalBroker) - return brokerCfg - } +// loaded by Clowder +func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) { if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { - numClowderBrokers := len(loadedConfig.Kafka.Brokers) - // if original config has fewer brokers than clowder's, we append additional - // brokerConfiguration items with topic, clientId, and group from existing - // items, and the rest will be filled with data from clowder's brokers. - // When appending, it's most probable that a new slice is returned due to - // original capacity not being enough, which is why the brokerCfg slice is - // returned - for len(brokerCfg) < numClowderBrokers { - brokerCfg = append(brokerCfg, &kafka.BrokerConfiguration{ - Topic: (brokerCfg)[numBrokerConfigs-1].Topic, - ClientID: (brokerCfg)[numBrokerConfigs-1].ClientID, - Group: (brokerCfg)[numBrokerConfigs-1].Group, - // Since this will have data from Clowder's loadedConfig, always enable - Enabled: true, - }) - } + brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers)) + brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers)) for i, broker := range loadedConfig.Kafka.Brokers { if broker.Port != nil { - (brokerCfg)[i].Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) } else { - (brokerCfg)[i].Address = broker.Hostname + brokerCfg.Addresses[i] = broker.Hostname } // SSL config if broker.Authtype != nil { @@ -69,13 +45,13 @@ func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig) if broker.Sasl != nil { // we are trusting that these values are set and // dereferencing the pointers without any check... - brokerCfg[i].SaslUsername = *broker.Sasl.Username - brokerCfg[i].SaslPassword = *broker.Sasl.Password - brokerCfg[i].SaslMechanism = *broker.Sasl.SaslMechanism - brokerCfg[i].SecurityProtocol = *broker.SecurityProtocol + brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username + brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password + brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism + brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol if caPath, err := loadedConfig.KafkaCa(broker); err == nil { - brokerCfg[i].CertPath = caPath + brokerCfg.SASLConfigs[i].CertPath = caPath } } else { fmt.Println(noSaslConfig) @@ -85,17 +61,25 @@ func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig) } else { fmt.Println(noBrokerConfig) } - return brokerCfg } -// UseClowderTopics tries to replace the configured topic's name with the -// corresponding topic name loaded by Clowder, if any -func UseClowderTopics(brokersCfg kafka.BrokersConfig, kafkaTopics map[string]api.TopicConfig) { - for _, cfg := range brokersCfg { +// UseClowderTopics tries to replace the configured topic with the corresponding +// topic loaded by Clowder +func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) { + switch cfg := brokerCfg.(type) { + case *kafka.SingleBrokerConfiguration: + if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { + cfg.Topic = clowderTopic.Name + } else { + fmt.Printf(noTopicMapping, cfg.Topic) + } + case *kafka.MultiBrokerConfiguration: if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { cfg.Topic = clowderTopic.Name } else { fmt.Printf(noTopicMapping, cfg.Topic) } + default: + fmt.Printf("Unknown Broker configuration type") } } diff --git a/kafka/configuration.go b/kafka/configuration.go index 8a3265d..2b18885 100644 --- a/kafka/configuration.go +++ b/kafka/configuration.go @@ -39,8 +39,8 @@ type SASLConfiguration struct { SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"` } -// BrokerConfiguration represents configuration of a single-instance Kafka broker -type BrokerConfiguration struct { +// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker +type SingleBrokerConfiguration struct { Address string `mapstructure:"address" toml:"address"` SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` CertPath string `mapstructure:"cert_path" toml:"cert_path"` @@ -54,21 +54,21 @@ type BrokerConfiguration struct { Enabled bool `mapstructure:"enabled" toml:"enabled"` } -// BrokersConfig represents configuration of Kafka broker with -// multiple instances running on different hosts (kafka cluster) -type BrokersConfig []*BrokerConfiguration - -// GetBrokersAddresses returns array of addresses of the configured brokers -func GetBrokersAddresses(brokersCfg BrokersConfig) []string { - addresses := make([]string, len(brokersCfg)) - for i, cfg := range brokersCfg { - addresses[i] = cfg.Address - } - return addresses +// MultiBrokerConfiguration represents configuration of Kafka broker with +// multiple instances running on different hosts +type MultiBrokerConfiguration struct { + Addresses []string `mapstructure:"addresses" toml:"addresses"` + SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` + SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"` + Topic string `mapstructure:"topic" toml:"topic"` + Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` + Group string `mapstructure:"group" toml:"group"` + ClientID string `mapstructure:"client_id" toml:"client_id"` + Enabled bool `mapstructure:"enabled" toml:"enabled"` } // SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters -func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) { +func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V0_10_2_0 diff --git a/kafka/configuration_test.go b/kafka/configuration_test.go index ea2bb5e..2a6b111 100644 --- a/kafka/configuration_test.go +++ b/kafka/configuration_test.go @@ -15,23 +15,22 @@ package kafka_test import ( - "testing" - "time" - "github.com/RedHatInsights/insights-operator-utils/kafka" "github.com/RedHatInsights/insights-operator-utils/tests/helpers" + "testing" + "time" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" ) func TestSaramaConfigFromBrokerConfig(t *testing.T) { - cfg := kafka.BrokerConfiguration{} + cfg := kafka.SingleBrokerConfiguration{} saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg) helpers.FailOnError(t, err) assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) - cfg = kafka.BrokerConfiguration{ + cfg = kafka.SingleBrokerConfiguration{ Timeout: time.Second, } saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) @@ -42,7 +41,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout) assert.Equal(t, "sarama", saramaConfig.ClientID) // default value - cfg = kafka.BrokerConfiguration{ + cfg = kafka.SingleBrokerConfiguration{ SecurityProtocol: "SSL", } @@ -51,7 +50,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) assert.True(t, saramaConfig.Net.TLS.Enable) - cfg = kafka.BrokerConfiguration{ + cfg = kafka.SingleBrokerConfiguration{ SecurityProtocol: "SASL_SSL", SaslMechanism: "PLAIN", SaslUsername: "username", @@ -81,7 +80,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { } func TestBadConfiguration(t *testing.T) { - cfg := kafka.BrokerConfiguration{ + cfg := kafka.SingleBrokerConfiguration{ SecurityProtocol: "SSL", CertPath: "missing_path", }