From 02972e4fda097969663eea08002e523236bac581 Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Mon, 8 Jan 2024 13:29:36 +0100 Subject: [PATCH] Refactor MultiBrokerConfiguration into a slice of BrokerConfiguration objects --- clowder/clowder_test.go | 108 +++++++++++++++++++++--------------- clowder/export_test.go | 5 +- clowder/kafka.go | 69 ++++++++++++++--------- kafka/configuration.go | 28 +++++----- kafka/configuration_test.go | 10 ++-- 5 files changed, 128 insertions(+), 92 deletions(-) diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go index bc0680d..9fbe2b5 100644 --- a/clowder/clowder_test.go +++ b/clowder/clowder_test.go @@ -54,8 +54,10 @@ func TestUseDBConfig(t *testing.T) { func TestUseClowderTopicsTopicFound(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.SingleBrokerConfiguration{ - Topic: originalTopicName, + brokerCfg := kafka.BrokersConfig{ + { + Topic: originalTopicName, + }, } kafkaTopics := map[string]api.TopicConfig{ originalTopicName: { @@ -66,15 +68,17 @@ func TestUseClowderTopicsTopicFound(t *testing.T) { }, } - clowder.UseClowderTopics(&brokerCfg, kafkaTopics) - assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") + clowder.UseClowderTopics(brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used") } func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.MultiBrokerConfiguration{ - Topic: originalTopicName, + brokerCfg := kafka.BrokersConfig{ + { + Topic: originalTopicName, + }, } kafkaTopics := map[string]api.TopicConfig{ originalTopicName: { @@ -85,15 +89,17 @@ func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { }, } - clowder.UseClowderTopics(&brokerCfg, kafkaTopics) - assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") + clowder.UseClowderTopics(brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used") } func TestUseClowderTopicsTopicNotFound(t *testing.T) { originalTopicName := "topic1" - brokerCfg := kafka.SingleBrokerConfiguration{ - Topic: originalTopicName, + brokerCfg := kafka.BrokersConfig{ + { + Topic: originalTopicName, + }, } kafkaTopics := map[string]api.TopicConfig{ "topic2": { @@ -102,39 +108,51 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) { } output, _ := capture.StandardOutput(func() { - clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + clowder.UseClowderTopics(brokerCfg, kafkaTopics) }) - assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change") + assert.Equal(t, originalTopicName, brokerCfg[0].Topic, "topic name should not change") assert.Contains(t, output, "warning: no kafka mapping found for topic topic1") } -func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} +func TestUseBrokerConfigNoClowderKafkaConfig(t *testing.T) { + brokerCfg := kafka.BrokersConfig{{}} loadedConfig := api.AppConfig{} output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + clowder.UseBrokerConfig(brokerCfg, &loadedConfig) }) assert.Contains(t, output, clowder.NoBrokerCfg) } -func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} +func TestUseBrokerConfigNoOriginalKafkaBrokers(t *testing.T) { + brokerCfg := kafka.BrokersConfig{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{}, } output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + }) + assert.Contains(t, output, clowder.NoOriginalBroker) +} + +func TestUseBrokerConfigNoClowderKafkaBrokers(t *testing.T) { + brokerCfg := kafka.BrokersConfig{{}} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{}, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(brokerCfg, &loadedConfig) }) assert.Contains(t, output, clowder.NoBrokerCfg) } -func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { +func TestUseBrokerConfigMultipleClowderKafkaBrokers(t *testing.T) { addr1 := "test_broker" addr2 := "test_broker_backup" port := 12345 - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokersConfig{{}} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -150,13 +168,14 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { }, } - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses) + brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + assert.Equal(t, fmt.Sprintf("%s:%d", addr1, port), brokerCfg[0].Address) + assert.Equal(t, addr2, brokerCfg[1].Address) } func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { addr := "test_broker" - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokersConfig{{}} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -168,12 +187,12 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { }, } - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, []string{addr}, brokerCfg.Addresses) + brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + assert.Equal(t, addr, brokerCfg[0].Address) } func TestUseBrokerConfigNoAuth(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokersConfig{{}} port := 12345 addr := "test_broker" loadedConfig := api.AppConfig{ @@ -187,12 +206,12 @@ func TestUseBrokerConfigNoAuth(t *testing.T) { }, } - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) + brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) + assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address) } func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokersConfig{{}} port := 12345 addr := "test_broker" authType := api.BrokerConfigAuthtypeSasl @@ -209,15 +228,15 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { } output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) }) - assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) + assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address) assert.Contains(t, output, clowder.NoSaslCfg) } func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokersConfig{{}} port := 12345 addr := "test_broker" addr2 := "test_broker_backup" @@ -258,19 +277,20 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { } output, _ := capture.StandardOutput(func() { - clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig) }) - assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses) assert.Contains(t, output, "kafka is configured to use authentication") - assert.Equal(t, 2, len(brokerCfg.SASLConfigs)) - assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername) - assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism) - assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol) - - assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername) - assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism) - assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol) + + assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address) + assert.Equal(t, saslUsr, brokerCfg[0].SaslUsername) + assert.Equal(t, saslPwd, brokerCfg[0].SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg[0].SaslMechanism) + assert.Equal(t, protocol, brokerCfg[0].SecurityProtocol) + + assert.Equal(t, fmt.Sprintf("%s:%d", addr2, port), brokerCfg[1].Address) + assert.Equal(t, saslUsr2, brokerCfg[1].SaslUsername) + assert.Equal(t, saslPwd, brokerCfg[1].SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg[1].SaslMechanism) + assert.Equal(t, protocol, brokerCfg[1].SecurityProtocol) } diff --git a/clowder/export_test.go b/clowder/export_test.go index 8556f2d..443f4a7 100644 --- a/clowder/export_test.go +++ b/clowder/export_test.go @@ -15,6 +15,7 @@ package clowder var ( - NoBrokerCfg = noBrokerConfig - NoSaslCfg = noSaslConfig + NoBrokerCfg = noBrokerConfig + NoOriginalBroker = noOriginalBroker + NoSaslCfg = noSaslConfig ) diff --git a/clowder/kafka.go b/clowder/kafka.go index c5142fe..4aca93f 100644 --- a/clowder/kafka.go +++ b/clowder/kafka.go @@ -22,22 +22,45 @@ import ( // Common constants used for logging and error reporting const ( - noBrokerConfig = "warning: no broker configurations found in clowder config" - noSaslConfig = "warning: SASL configuration is missing" - noTopicMapping = "warning: no kafka mapping found for topic %s" + noOriginalBroker = "warning: no original broker configuration found; aborting" + noBrokerConfig = "warning: no broker configurations found in clowder config" + noSaslConfig = "warning: SASL configuration is missing" + noTopicMapping = "warning: no kafka mapping found for topic %s" ) // UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values -// loaded by Clowder -func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) { +// loaded by Clowder. It expects brokerCfg to already be initialized +func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig) kafka.BrokersConfig { + numBrokerConfigs := len(brokerCfg) + if numBrokerConfigs == 0 { + // if original brokers config is totally empty, do nothing. + // this shouldn't happen, but we need to control this scenario + // to avoid panics. + fmt.Println(noOriginalBroker) + return brokerCfg + } if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { - brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers)) - brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers)) + numClowderBrokers := len(loadedConfig.Kafka.Brokers) + // if original config has fewer brokers than clowder's, we append additional + // brokerConfiguration items with topic, clientId, and group from existing + // items, and the rest will be filled with data from clowder's brokers. + // When appending, it's most probable that a new slice is returned due to + // original capacity not being enough, which is why the brokerCfg slice is + // returned + for len(brokerCfg) < numClowderBrokers { + brokerCfg = append(brokerCfg, &kafka.BrokerConfiguration{ + Topic: (brokerCfg)[numBrokerConfigs-1].Topic, + ClientID: (brokerCfg)[numBrokerConfigs-1].ClientID, + Group: (brokerCfg)[numBrokerConfigs-1].Group, + // Since this will have data from Clowder's loadedConfig, always enable + Enabled: true, + }) + } for i, broker := range loadedConfig.Kafka.Brokers { if broker.Port != nil { - brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + (brokerCfg)[i].Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) } else { - brokerCfg.Addresses[i] = broker.Hostname + (brokerCfg)[i].Address = broker.Hostname } // SSL config if broker.Authtype != nil { @@ -45,13 +68,13 @@ func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *ap if broker.Sasl != nil { // we are trusting that these values are set and // dereferencing the pointers without any check... - brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username - brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password - brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism - brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol + brokerCfg[i].SaslUsername = *broker.Sasl.Username + brokerCfg[i].SaslPassword = *broker.Sasl.Password + brokerCfg[i].SaslMechanism = *broker.Sasl.SaslMechanism + brokerCfg[i].SecurityProtocol = *broker.SecurityProtocol if caPath, err := loadedConfig.KafkaCa(broker); err == nil { - brokerCfg.SASLConfigs[i].CertPath = caPath + brokerCfg[i].CertPath = caPath } } else { fmt.Println(noSaslConfig) @@ -61,25 +84,17 @@ func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *ap } else { fmt.Println(noBrokerConfig) } + return brokerCfg } -// UseClowderTopics tries to replace the configured topic with the corresponding -// topic loaded by Clowder -func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) { - switch cfg := brokerCfg.(type) { - case *kafka.SingleBrokerConfiguration: - if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { - cfg.Topic = clowderTopic.Name - } else { - fmt.Printf(noTopicMapping, cfg.Topic) - } - case *kafka.MultiBrokerConfiguration: +// UseClowderTopics tries to replace the configured topic's name with the +// corresponding topic name loaded by Clowder, if any +func UseClowderTopics(brokersCfg kafka.BrokersConfig, kafkaTopics map[string]api.TopicConfig) { + for _, cfg := range brokersCfg { if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { cfg.Topic = clowderTopic.Name } else { fmt.Printf(noTopicMapping, cfg.Topic) } - default: - fmt.Printf("Unknown Broker configuration type") } } diff --git a/kafka/configuration.go b/kafka/configuration.go index 2b18885..b7d8438 100644 --- a/kafka/configuration.go +++ b/kafka/configuration.go @@ -39,8 +39,8 @@ type SASLConfiguration struct { SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"` } -// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker -type SingleBrokerConfiguration struct { +// BrokerConfiguration represents configuration of a single-instance Kafka broker +type BrokerConfiguration struct { Address string `mapstructure:"address" toml:"address"` SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` CertPath string `mapstructure:"cert_path" toml:"cert_path"` @@ -54,21 +54,21 @@ type SingleBrokerConfiguration struct { Enabled bool `mapstructure:"enabled" toml:"enabled"` } -// MultiBrokerConfiguration represents configuration of Kafka broker with -// multiple instances running on different hosts -type MultiBrokerConfiguration struct { - Addresses []string `mapstructure:"addresses" toml:"addresses"` - SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` - SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"` - Topic string `mapstructure:"topic" toml:"topic"` - Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` - Group string `mapstructure:"group" toml:"group"` - ClientID string `mapstructure:"client_id" toml:"client_id"` - Enabled bool `mapstructure:"enabled" toml:"enabled"` +// BrokersConfig represents configuration of Kafka broker with +// multiple instances running on different hosts (kafka cluster) +type BrokersConfig []*BrokerConfiguration + +// GetBrokersAddresses returns array of addresses of the configured brokers +func GetBrokersAddresses(brokersCfg BrokersConfig) []string { + addresses := make([]string, len(brokersCfg), len(brokersCfg)) + for i, cfg := range brokersCfg { + addresses[i] = cfg.Address + } + return addresses } // SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters -func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) { +func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V0_10_2_0 diff --git a/kafka/configuration_test.go b/kafka/configuration_test.go index 2a6b111..82954ea 100644 --- a/kafka/configuration_test.go +++ b/kafka/configuration_test.go @@ -25,12 +25,12 @@ import ( ) func TestSaramaConfigFromBrokerConfig(t *testing.T) { - cfg := kafka.SingleBrokerConfiguration{} + cfg := kafka.BrokerConfiguration{} saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg) helpers.FailOnError(t, err) assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) - cfg = kafka.SingleBrokerConfiguration{ + cfg = kafka.BrokerConfiguration{ Timeout: time.Second, } saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) @@ -41,7 +41,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout) assert.Equal(t, "sarama", saramaConfig.ClientID) // default value - cfg = kafka.SingleBrokerConfiguration{ + cfg = kafka.BrokerConfiguration{ SecurityProtocol: "SSL", } @@ -50,7 +50,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) assert.True(t, saramaConfig.Net.TLS.Enable) - cfg = kafka.SingleBrokerConfiguration{ + cfg = kafka.BrokerConfiguration{ SecurityProtocol: "SASL_SSL", SaslMechanism: "PLAIN", SaslUsername: "username", @@ -80,7 +80,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { } func TestBadConfiguration(t *testing.T) { - cfg := kafka.SingleBrokerConfiguration{ + cfg := kafka.BrokerConfiguration{ SecurityProtocol: "SSL", CertPath: "missing_path", }