From 4721266aa948eaa0cf8b8285367635421909371e Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Mon, 15 Jan 2024 10:54:51 +0100 Subject: [PATCH] Revert to simple BrokerConfiguration structure with slice of addresses --- clowder/clowder_test.go | 33 ++++++++++------------ clowder/kafka.go | 56 +++++++++++++++---------------------- kafka/configuration.go | 31 +++----------------- kafka/configuration_test.go | 10 +++---- 4 files changed, 46 insertions(+), 84 deletions(-) diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go index bc0680d..ebb5b06 100644 --- a/clowder/clowder_test.go +++ b/clowder/clowder_test.go @@ -54,7 +54,7 @@ func TestUseDBConfig(t *testing.T) { func TestUseClowderTopicsTopicFound(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.SingleBrokerConfiguration{ + brokerCfg := kafka.BrokerConfiguration{ Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ @@ -73,7 +73,7 @@ func TestUseClowderTopicsTopicFound(t *testing.T) { func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.MultiBrokerConfiguration{ + brokerCfg := kafka.BrokerConfiguration{ Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ @@ -92,7 +92,7 @@ func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { func TestUseClowderTopicsTopicNotFound(t *testing.T) { originalTopicName := "topic1" - brokerCfg := kafka.SingleBrokerConfiguration{ + brokerCfg := kafka.BrokerConfiguration{ Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ @@ -109,7 +109,7 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) { } func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} loadedConfig := api.AppConfig{} output, _ := capture.StandardOutput(func() { @@ -119,7 +119,7 @@ func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { } func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{}, } @@ -134,7 +134,7 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { addr1 := "test_broker" addr2 := "test_broker_backup" port := 12345 - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -156,7 +156,7 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { addr := "test_broker" - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -173,7 +173,7 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { } func TestUseBrokerConfigNoAuth(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} port := 12345 addr := "test_broker" loadedConfig := api.AppConfig{ @@ -192,7 +192,7 @@ func TestUseBrokerConfigNoAuth(t *testing.T) { } func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} port := 12345 addr := "test_broker" authType := api.BrokerConfigAuthtypeSasl @@ -217,7 +217,7 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { } func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { - brokerCfg := kafka.MultiBrokerConfiguration{} + brokerCfg := kafka.BrokerConfiguration{} port := 12345 addr := "test_broker" addr2 := "test_broker_backup" @@ -263,14 +263,9 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses) assert.Contains(t, output, "kafka is configured to use authentication") - assert.Equal(t, 2, len(brokerCfg.SASLConfigs)) - assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername) - assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism) - assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol) + assert.Equal(t, saslUsr, brokerCfg.SaslUsername) + assert.Equal(t, saslPwd, brokerCfg.SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg.SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SecurityProtocol) - assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername) - assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism) - assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol) } diff --git a/clowder/kafka.go b/clowder/kafka.go index c5142fe..079d4d3 100644 --- a/clowder/kafka.go +++ b/clowder/kafka.go @@ -29,33 +29,33 @@ const ( // UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values // loaded by Clowder -func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) { +func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) { if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers)) - brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers)) for i, broker := range loadedConfig.Kafka.Brokers { if broker.Port != nil { brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) } else { brokerCfg.Addresses[i] = broker.Hostname } - // SSL config - if broker.Authtype != nil { - fmt.Println("kafka is configured to use authentication") - if broker.Sasl != nil { - // we are trusting that these values are set and - // dereferencing the pointers without any check... - brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username - brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password - brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism - brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol + } + // SSL config + clowderCfg := loadedConfig.Kafka.Brokers[0] + if clowderCfg.Authtype != nil { + fmt.Println("kafka is configured to use authentication") + if clowderCfg.Sasl != nil { + // we are trusting that these values are set and + // dereferencing the pointers without any check... + brokerCfg.SaslUsername = *clowderCfg.Sasl.Username + brokerCfg.SaslPassword = *clowderCfg.Sasl.Password + brokerCfg.SaslMechanism = *clowderCfg.Sasl.SaslMechanism + brokerCfg.SecurityProtocol = *clowderCfg.SecurityProtocol - if caPath, err := loadedConfig.KafkaCa(broker); err == nil { - brokerCfg.SASLConfigs[i].CertPath = caPath - } - } else { - fmt.Println(noSaslConfig) + if caPath, err := loadedConfig.KafkaCa(clowderCfg); err == nil { + brokerCfg.CertPath = caPath } + } else { + fmt.Println(noSaslConfig) } } } else { @@ -65,21 +65,11 @@ func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *ap // UseClowderTopics tries to replace the configured topic with the corresponding // topic loaded by Clowder -func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) { - switch cfg := brokerCfg.(type) { - case *kafka.SingleBrokerConfiguration: - if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { - cfg.Topic = clowderTopic.Name - } else { - fmt.Printf(noTopicMapping, cfg.Topic) - } - case *kafka.MultiBrokerConfiguration: - if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { - cfg.Topic = clowderTopic.Name - } else { - fmt.Printf(noTopicMapping, cfg.Topic) - } - default: - fmt.Printf("Unknown Broker configuration type") +func UseClowderTopics(brokerCfg *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) { + if clowderTopic, ok := kafkaTopics[brokerCfg.Topic]; ok { + brokerCfg.Topic = clowderTopic.Name + } else { + fmt.Printf(noTopicMapping, brokerCfg.Topic) } + } diff --git a/kafka/configuration.go b/kafka/configuration.go index 2b18885..a1b5cc8 100644 --- a/kafka/configuration.go +++ b/kafka/configuration.go @@ -29,19 +29,9 @@ import ( "github.com/rs/zerolog/log" ) -// SASLConfiguration represents configuration of SASL authentication for -// a given Kafka broker -type SASLConfiguration struct { - SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` - CertPath string `mapstructure:"cert_path" toml:"cert_path"` - SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"` - SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"` - SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"` -} - -// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker -type SingleBrokerConfiguration struct { - Address string `mapstructure:"address" toml:"address"` +// BrokerConfiguration represents configuration of a single-instance Kafka broker +type BrokerConfiguration struct { + Addresses []string `mapstructure:"address" toml:"address"` SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` CertPath string `mapstructure:"cert_path" toml:"cert_path"` SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"` @@ -54,21 +44,8 @@ type SingleBrokerConfiguration struct { Enabled bool `mapstructure:"enabled" toml:"enabled"` } -// MultiBrokerConfiguration represents configuration of Kafka broker with -// multiple instances running on different hosts -type MultiBrokerConfiguration struct { - Addresses []string `mapstructure:"addresses" toml:"addresses"` - SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` - SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"` - Topic string `mapstructure:"topic" toml:"topic"` - Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` - Group string `mapstructure:"group" toml:"group"` - ClientID string `mapstructure:"client_id" toml:"client_id"` - Enabled bool `mapstructure:"enabled" toml:"enabled"` -} - // SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters -func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) { +func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V0_10_2_0 diff --git a/kafka/configuration_test.go b/kafka/configuration_test.go index 2a6b111..82954ea 100644 --- a/kafka/configuration_test.go +++ b/kafka/configuration_test.go @@ -25,12 +25,12 @@ import ( ) func TestSaramaConfigFromBrokerConfig(t *testing.T) { - cfg := kafka.SingleBrokerConfiguration{} + cfg := kafka.BrokerConfiguration{} saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg) helpers.FailOnError(t, err) assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) - cfg = kafka.SingleBrokerConfiguration{ + cfg = kafka.BrokerConfiguration{ Timeout: time.Second, } saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) @@ -41,7 +41,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout) assert.Equal(t, "sarama", saramaConfig.ClientID) // default value - cfg = kafka.SingleBrokerConfiguration{ + cfg = kafka.BrokerConfiguration{ SecurityProtocol: "SSL", } @@ -50,7 +50,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) assert.True(t, saramaConfig.Net.TLS.Enable) - cfg = kafka.SingleBrokerConfiguration{ + cfg = kafka.BrokerConfiguration{ SecurityProtocol: "SASL_SSL", SaslMechanism: "PLAIN", SaslUsername: "username", @@ -80,7 +80,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { } func TestBadConfiguration(t *testing.T) { - cfg := kafka.SingleBrokerConfiguration{ + cfg := kafka.BrokerConfiguration{ SecurityProtocol: "SSL", CertPath: "missing_path", }