Skip to content

Commit

Permalink
support for multiple broker addresses when using clowder config
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 4, 2024
1 parent e37a6f7 commit 08ccfe1
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 54 deletions.
94 changes: 78 additions & 16 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,26 @@ func TestUseDBConfig(t *testing.T) {
func TestUseClowderTopicsTopicFound(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.BrokerConfiguration{
brokerCfg := kafka.SingleBrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Name: clowderTopicName,
},
"topic2": {
Name: "AnotherTopicName",
},
}

clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.MultiBrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
Expand All @@ -73,7 +92,7 @@ func TestUseClowderTopicsTopicFound(t *testing.T) {
func TestUseClowderTopicsTopicNotFound(t *testing.T) {
originalTopicName := "topic1"

brokerCfg := kafka.BrokerConfiguration{
brokerCfg := kafka.SingleBrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
Expand All @@ -90,7 +109,7 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) {
}

func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{}

output, _ := capture.StandardOutput(func() {
Expand All @@ -100,7 +119,7 @@ func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
}

func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}
Expand All @@ -111,9 +130,33 @@ func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
addr1 := "test_broker"
addr2 := "test_broker_backup"
port := 12345
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: addr1,
Port: &port,
},
{
Hostname: addr2,
Port: nil,
},
},
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
addr := "test_broker"
brokerCfg := kafka.BrokerConfiguration{}
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -126,11 +169,11 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, addr, brokerCfg.Address)
assert.Equal(t, []string{addr}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
brokerCfg := kafka.MultiBrokerConfiguration{}
port := 12345
addr := "test_broker"
loadedConfig := api.AppConfig{
Expand All @@ -145,11 +188,11 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
brokerCfg := kafka.MultiBrokerConfiguration{}
port := 12345
addr := "test_broker"
authType := api.BrokerConfigAuthtypeSasl
Expand All @@ -169,15 +212,17 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Contains(t, output, clowder.NoSaslCfg)
}

func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
brokerCfg := kafka.MultiBrokerConfiguration{}
port := 12345
addr := "test_broker"
addr2 := "test_broker_backup"
saslUsr := "user"
saslUsr2 := "user2"
saslPwd := "pwd"
saslMechanism := "sasl"
protocol := "tls"
Expand All @@ -197,6 +242,17 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
},
SecurityProtocol: &protocol,
},
{
Hostname: addr2,
Port: &port,
Authtype: &authType,
Sasl: &api.KafkaSASLConfig{
Password: &saslPwd,
Username: &saslUsr2,
SaslMechanism: &saslMechanism,
},
SecurityProtocol: &protocol,
},
},
},
}
Expand All @@ -205,10 +261,16 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, saslUsr, brokerCfg.SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SecurityProtocol)
assert.Equal(t, 2, len(brokerCfg.SASLConfigs))
assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol)

assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol)
}
70 changes: 40 additions & 30 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ const (

// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
// make sure broker(s) are configured in Clowder
func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) {
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
broker := loadedConfig.Kafka.Brokers[0]
// port can be empty in api, so taking it into account
if broker.Port != nil {
brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Address = broker.Hostname
}

// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SaslUsername = *broker.Sasl.Username
brokerCfg.SaslPassword = *broker.Sasl.Password
brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SecurityProtocol = *broker.SecurityProtocol
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers))
for i, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Addresses[i] = broker.Hostname
}
// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username
brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password
brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol

if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg.CertPath = caPath
if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg.SASLConfigs[i].CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
} else {
fmt.Println(noSaslConfig)
}
}
} else {
Expand All @@ -65,11 +65,21 @@ func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.App

// UseClowderTopics tries to replace the configured topic with the corresponding
// topic loaded by Clowder
func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) {
// Get the correct topic name from clowder mapping if available
if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok {
configuration.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, configuration.Topic)
func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) {
switch cfg := brokerCfg.(type) {
case *kafka.SingleBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
case *kafka.MultiBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
default:
fmt.Printf("Unknown Broker configuration type")
}
}
29 changes: 26 additions & 3 deletions kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ import (
"github.com/rs/zerolog/log"
)

// BrokerConfiguration represents configuration of Kafka broker
type BrokerConfiguration struct {
// SASLConfiguration represents configuration of SASL authentication for
// a given Kafka broker
type SASLConfiguration struct {
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
}

// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker
type SingleBrokerConfiguration struct {
Address string `mapstructure:"address" toml:"address"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
Expand All @@ -44,8 +54,21 @@ type BrokerConfiguration struct {
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}

// MultiBrokerConfiguration represents configuration of Kafka broker with
// multiple instances running on different hosts
type MultiBrokerConfiguration struct {
Addresses []string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time.Duration `mapstructure:"timeout" toml:"timeout"`
Group string `mapstructure:"group" toml:"group"`
ClientID string `mapstructure:"client_id" toml:"client_id"`
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}

// SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters
func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) {
func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_2_0

Expand Down
10 changes: 5 additions & 5 deletions kafka/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
)

func TestSaramaConfigFromBrokerConfig(t *testing.T) {
cfg := kafka.BrokerConfiguration{}
cfg := kafka.SingleBrokerConfiguration{}
saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)

cfg = kafka.BrokerConfiguration{
cfg = kafka.SingleBrokerConfiguration{
Timeout: time.Second,
}
saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg)
Expand All @@ -41,7 +41,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout)
assert.Equal(t, "sarama", saramaConfig.ClientID) // default value

cfg = kafka.BrokerConfiguration{
cfg = kafka.SingleBrokerConfiguration{
SecurityProtocol: "SSL",
}

Expand All @@ -50,7 +50,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)
assert.True(t, saramaConfig.Net.TLS.Enable)

cfg = kafka.BrokerConfiguration{
cfg = kafka.SingleBrokerConfiguration{
SecurityProtocol: "SASL_SSL",
SaslMechanism: "PLAIN",
SaslUsername: "username",
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
}

func TestBadConfiguration(t *testing.T) {
cfg := kafka.BrokerConfiguration{
cfg := kafka.SingleBrokerConfiguration{
SecurityProtocol: "SSL",
CertPath: "missing_path",
}
Expand Down

0 comments on commit 08ccfe1

Please sign in to comment.