diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go index d82f0aa..bc0680d 100644 --- a/clowder/clowder_test.go +++ b/clowder/clowder_test.go @@ -54,7 +54,26 @@ func TestUseDBConfig(t *testing.T) { func TestUseClowderTopicsTopicFound(t *testing.T) { originalTopicName := "topic1" clowderTopicName := "NewTopicName" - brokerCfg := kafka.BrokerConfiguration{ + brokerCfg := kafka.SingleBrokerConfiguration{ + Topic: originalTopicName, + } + kafkaTopics := map[string]api.TopicConfig{ + originalTopicName: { + Name: clowderTopicName, + }, + "topic2": { + Name: "AnotherTopicName", + }, + } + + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") +} + +func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) { + originalTopicName := "topic1" + clowderTopicName := "NewTopicName" + brokerCfg := kafka.MultiBrokerConfiguration{ Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ @@ -73,7 +92,7 @@ func TestUseClowderTopicsTopicFound(t *testing.T) { func TestUseClowderTopicsTopicNotFound(t *testing.T) { originalTopicName := "topic1" - brokerCfg := kafka.BrokerConfiguration{ + brokerCfg := kafka.SingleBrokerConfiguration{ Topic: originalTopicName, } kafkaTopics := map[string]api.TopicConfig{ @@ -90,7 +109,7 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) { } func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { - brokerCfg := kafka.BrokerConfiguration{} + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{} output, _ := capture.StandardOutput(func() { @@ -100,7 +119,7 @@ func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { } func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { - brokerCfg := kafka.BrokerConfiguration{} + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{}, } @@ -111,9 +130,33 @@ func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { assert.Contains(t, output, clowder.NoBrokerCfg) } +func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { + addr1 := "test_broker" + addr2 := "test_broker_backup" + port := 12345 + brokerCfg := kafka.MultiBrokerConfiguration{} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: addr1, + Port: &port, + }, + { + Hostname: addr2, + Port: nil, + }, + }, + }, + } + + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses) +} + func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { addr := "test_broker" - brokerCfg := kafka.BrokerConfiguration{} + brokerCfg := kafka.MultiBrokerConfiguration{} loadedConfig := api.AppConfig{ Kafka: &api.KafkaConfig{ Brokers: []api.BrokerConfig{ @@ -126,11 +169,11 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { } clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, addr, brokerCfg.Address) + assert.Equal(t, []string{addr}, brokerCfg.Addresses) } func TestUseBrokerConfigNoAuth(t *testing.T) { - brokerCfg := kafka.BrokerConfiguration{} + brokerCfg := kafka.MultiBrokerConfiguration{} port := 12345 addr := "test_broker" loadedConfig := api.AppConfig{ @@ -145,11 +188,11 @@ func TestUseBrokerConfigNoAuth(t *testing.T) { } clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) } func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { - brokerCfg := kafka.BrokerConfiguration{} + brokerCfg := kafka.MultiBrokerConfiguration{} port := 12345 addr := "test_broker" authType := api.BrokerConfigAuthtypeSasl @@ -169,15 +212,17 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) assert.Contains(t, output, clowder.NoSaslCfg) } func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { - brokerCfg := kafka.BrokerConfiguration{} + brokerCfg := kafka.MultiBrokerConfiguration{} port := 12345 addr := "test_broker" + addr2 := "test_broker_backup" saslUsr := "user" + saslUsr2 := "user2" saslPwd := "pwd" saslMechanism := "sasl" protocol := "tls" @@ -197,6 +242,17 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { }, SecurityProtocol: &protocol, }, + { + Hostname: addr2, + Port: &port, + Authtype: &authType, + Sasl: &api.KafkaSASLConfig{ + Password: &saslPwd, + Username: &saslUsr2, + SaslMechanism: &saslMechanism, + }, + SecurityProtocol: &protocol, + }, }, }, } @@ -205,10 +261,16 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses) assert.Contains(t, output, "kafka is configured to use authentication") - assert.Equal(t, saslUsr, brokerCfg.SaslUsername) - assert.Equal(t, saslPwd, brokerCfg.SaslPassword) - assert.Equal(t, saslMechanism, brokerCfg.SaslMechanism) - assert.Equal(t, protocol, brokerCfg.SecurityProtocol) + assert.Equal(t, 2, len(brokerCfg.SASLConfigs)) + assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername) + assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol) + + assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername) + assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol) } diff --git a/clowder/kafka.go b/clowder/kafka.go index 8a2bfe6..c5142fe 100644 --- a/clowder/kafka.go +++ b/clowder/kafka.go @@ -29,33 +29,33 @@ const ( // UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values // loaded by Clowder -func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) { - // make sure broker(s) are configured in Clowder +func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) { if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { - broker := loadedConfig.Kafka.Brokers[0] - // port can be empty in api, so taking it into account - if broker.Port != nil { - brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) - } else { - brokerCfg.Address = broker.Hostname - } - - // SSL config - if broker.Authtype != nil { - fmt.Println("kafka is configured to use authentication") - if broker.Sasl != nil { - // we are trusting that these values are set and - // dereferencing the pointers without any check... - brokerCfg.SaslUsername = *broker.Sasl.Username - brokerCfg.SaslPassword = *broker.Sasl.Password - brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism - brokerCfg.SecurityProtocol = *broker.SecurityProtocol + brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers)) + brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers)) + for i, broker := range loadedConfig.Kafka.Brokers { + if broker.Port != nil { + brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + } else { + brokerCfg.Addresses[i] = broker.Hostname + } + // SSL config + if broker.Authtype != nil { + fmt.Println("kafka is configured to use authentication") + if broker.Sasl != nil { + // we are trusting that these values are set and + // dereferencing the pointers without any check... + brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username + brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password + brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism + brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol - if caPath, err := loadedConfig.KafkaCa(broker); err == nil { - brokerCfg.CertPath = caPath + if caPath, err := loadedConfig.KafkaCa(broker); err == nil { + brokerCfg.SASLConfigs[i].CertPath = caPath + } + } else { + fmt.Println(noSaslConfig) } - } else { - fmt.Println(noSaslConfig) } } } else { @@ -65,11 +65,21 @@ func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.App // UseClowderTopics tries to replace the configured topic with the corresponding // topic loaded by Clowder -func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) { - // Get the correct topic name from clowder mapping if available - if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok { - configuration.Topic = clowderTopic.Name - } else { - fmt.Printf(noTopicMapping, configuration.Topic) +func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) { + switch cfg := brokerCfg.(type) { + case *kafka.SingleBrokerConfiguration: + if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { + cfg.Topic = clowderTopic.Name + } else { + fmt.Printf(noTopicMapping, cfg.Topic) + } + case *kafka.MultiBrokerConfiguration: + if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok { + cfg.Topic = clowderTopic.Name + } else { + fmt.Printf(noTopicMapping, cfg.Topic) + } + default: + fmt.Printf("Unknown Broker configuration type") } } diff --git a/kafka/configuration.go b/kafka/configuration.go index 70c9a32..2b18885 100644 --- a/kafka/configuration.go +++ b/kafka/configuration.go @@ -29,8 +29,18 @@ import ( "github.com/rs/zerolog/log" ) -// BrokerConfiguration represents configuration of Kafka broker -type BrokerConfiguration struct { +// SASLConfiguration represents configuration of SASL authentication for +// a given Kafka broker +type SASLConfiguration struct { + SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` + CertPath string `mapstructure:"cert_path" toml:"cert_path"` + SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"` + SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"` + SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"` +} + +// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker +type SingleBrokerConfiguration struct { Address string `mapstructure:"address" toml:"address"` SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` CertPath string `mapstructure:"cert_path" toml:"cert_path"` @@ -44,8 +54,21 @@ type BrokerConfiguration struct { Enabled bool `mapstructure:"enabled" toml:"enabled"` } +// MultiBrokerConfiguration represents configuration of Kafka broker with +// multiple instances running on different hosts +type MultiBrokerConfiguration struct { + Addresses []string `mapstructure:"addresses" toml:"addresses"` + SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` + SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"` + Topic string `mapstructure:"topic" toml:"topic"` + Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` + Group string `mapstructure:"group" toml:"group"` + ClientID string `mapstructure:"client_id" toml:"client_id"` + Enabled bool `mapstructure:"enabled" toml:"enabled"` +} + // SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters -func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) { +func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V0_10_2_0 diff --git a/kafka/configuration_test.go b/kafka/configuration_test.go index 82954ea..2a6b111 100644 --- a/kafka/configuration_test.go +++ b/kafka/configuration_test.go @@ -25,12 +25,12 @@ import ( ) func TestSaramaConfigFromBrokerConfig(t *testing.T) { - cfg := kafka.BrokerConfiguration{} + cfg := kafka.SingleBrokerConfiguration{} saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg) helpers.FailOnError(t, err) assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) - cfg = kafka.BrokerConfiguration{ + cfg = kafka.SingleBrokerConfiguration{ Timeout: time.Second, } saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) @@ -41,7 +41,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout) assert.Equal(t, "sarama", saramaConfig.ClientID) // default value - cfg = kafka.BrokerConfiguration{ + cfg = kafka.SingleBrokerConfiguration{ SecurityProtocol: "SSL", } @@ -50,7 +50,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) assert.True(t, saramaConfig.Net.TLS.Enable) - cfg = kafka.BrokerConfiguration{ + cfg = kafka.SingleBrokerConfiguration{ SecurityProtocol: "SASL_SSL", SaslMechanism: "PLAIN", SaslUsername: "username", @@ -80,7 +80,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) { } func TestBadConfiguration(t *testing.T) { - cfg := kafka.BrokerConfiguration{ + cfg := kafka.SingleBrokerConfiguration{ SecurityProtocol: "SSL", CertPath: "missing_path", }