Skip to content

Commit

Permalink
Refactor MultiBrokerConfiguration into a slice of BrokerConfiguration…
Browse files Browse the repository at this point in the history
… objects
  • Loading branch information
epapbak committed Jan 8, 2024
1 parent 55508e3 commit 02972e4
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 92 deletions.
108 changes: 64 additions & 44 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func TestUseDBConfig(t *testing.T) {
func TestUseClowderTopicsTopicFound(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.SingleBrokerConfiguration{
Topic: originalTopicName,
brokerCfg := kafka.BrokersConfig{
{
Topic: originalTopicName,
},
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Expand All @@ -66,15 +68,17 @@ func TestUseClowderTopicsTopicFound(t *testing.T) {
},
}

clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
clowder.UseClowderTopics(brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.MultiBrokerConfiguration{
Topic: originalTopicName,
brokerCfg := kafka.BrokersConfig{
{
Topic: originalTopicName,
},
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Expand All @@ -85,15 +89,17 @@ func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
},
}

clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
clowder.UseClowderTopics(brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicNotFound(t *testing.T) {
originalTopicName := "topic1"

brokerCfg := kafka.SingleBrokerConfiguration{
Topic: originalTopicName,
brokerCfg := kafka.BrokersConfig{
{
Topic: originalTopicName,
},
}
kafkaTopics := map[string]api.TopicConfig{
"topic2": {
Expand All @@ -102,39 +108,51 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) {
}

output, _ := capture.StandardOutput(func() {
clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
clowder.UseClowderTopics(brokerCfg, kafkaTopics)
})
assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change")
assert.Equal(t, originalTopicName, brokerCfg[0].Topic, "topic name should not change")
assert.Contains(t, output, "warning: no kafka mapping found for topic topic1")
}

func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
func TestUseBrokerConfigNoClowderKafkaConfig(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
loadedConfig := api.AppConfig{}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
func TestUseBrokerConfigNoOriginalKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokersConfig{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoOriginalBroker)
}

func TestUseBrokerConfigNoClowderKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
func TestUseBrokerConfigMultipleClowderKafkaBrokers(t *testing.T) {
addr1 := "test_broker"
addr2 := "test_broker_backup"
port := 12345
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokersConfig{{}}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -150,13 +168,14 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses)
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr1, port), brokerCfg[0].Address)
assert.Equal(t, addr2, brokerCfg[1].Address)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
addr := "test_broker"
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokersConfig{{}}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -168,12 +187,12 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{addr}, brokerCfg.Addresses)
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
assert.Equal(t, addr, brokerCfg[0].Address)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokersConfig{{}}
port := 12345
addr := "test_broker"
loadedConfig := api.AppConfig{
Expand All @@ -187,12 +206,12 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokersConfig{{}}
port := 12345
addr := "test_broker"
authType := api.BrokerConfigAuthtypeSasl
Expand All @@ -209,15 +228,15 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
})

assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address)
assert.Contains(t, output, clowder.NoSaslCfg)
}

func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokersConfig{{}}
port := 12345
addr := "test_broker"
addr2 := "test_broker_backup"
Expand Down Expand Up @@ -258,19 +277,20 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
})

assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, 2, len(brokerCfg.SASLConfigs))
assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol)

assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol)

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address)
assert.Equal(t, saslUsr, brokerCfg[0].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg[0].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg[0].SaslMechanism)
assert.Equal(t, protocol, brokerCfg[0].SecurityProtocol)

assert.Equal(t, fmt.Sprintf("%s:%d", addr2, port), brokerCfg[1].Address)
assert.Equal(t, saslUsr2, brokerCfg[1].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg[1].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg[1].SaslMechanism)
assert.Equal(t, protocol, brokerCfg[1].SecurityProtocol)
}
5 changes: 3 additions & 2 deletions clowder/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package clowder

var (
NoBrokerCfg = noBrokerConfig
NoSaslCfg = noSaslConfig
NoBrokerCfg = noBrokerConfig
NoOriginalBroker = noOriginalBroker
NoSaslCfg = noSaslConfig
)
69 changes: 42 additions & 27 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,59 @@ import (

// Common constants used for logging and error reporting
const (
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
noOriginalBroker = "warning: no original broker configuration found; aborting"
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
)

// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) {
// loaded by Clowder. It expects brokerCfg to already be initialized
func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig) kafka.BrokersConfig {
numBrokerConfigs := len(brokerCfg)
if numBrokerConfigs == 0 {
// if original brokers config is totally empty, do nothing.
// this shouldn't happen, but we need to control this scenario
// to avoid panics.
fmt.Println(noOriginalBroker)
return brokerCfg
}
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers))
numClowderBrokers := len(loadedConfig.Kafka.Brokers)
// if original config has fewer brokers than clowder's, we append additional
// brokerConfiguration items with topic, clientId, and group from existing
// items, and the rest will be filled with data from clowder's brokers.
// When appending, it's most probable that a new slice is returned due to
// original capacity not being enough, which is why the brokerCfg slice is
// returned
for len(brokerCfg) < numClowderBrokers {
brokerCfg = append(brokerCfg, &kafka.BrokerConfiguration{
Topic: (brokerCfg)[numBrokerConfigs-1].Topic,
ClientID: (brokerCfg)[numBrokerConfigs-1].ClientID,
Group: (brokerCfg)[numBrokerConfigs-1].Group,
// Since this will have data from Clowder's loadedConfig, always enable
Enabled: true,
})
}
for i, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
(brokerCfg)[i].Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Addresses[i] = broker.Hostname
(brokerCfg)[i].Address = broker.Hostname
}
// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username
brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password
brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol
brokerCfg[i].SaslUsername = *broker.Sasl.Username
brokerCfg[i].SaslPassword = *broker.Sasl.Password
brokerCfg[i].SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg[i].SecurityProtocol = *broker.SecurityProtocol

if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg.SASLConfigs[i].CertPath = caPath
brokerCfg[i].CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
Expand All @@ -61,25 +84,17 @@ func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *ap
} else {
fmt.Println(noBrokerConfig)
}
return brokerCfg
}

// UseClowderTopics tries to replace the configured topic with the corresponding
// topic loaded by Clowder
func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) {
switch cfg := brokerCfg.(type) {
case *kafka.SingleBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
case *kafka.MultiBrokerConfiguration:
// UseClowderTopics tries to replace the configured topic's name with the
// corresponding topic name loaded by Clowder, if any
func UseClowderTopics(brokersCfg kafka.BrokersConfig, kafkaTopics map[string]api.TopicConfig) {
for _, cfg := range brokersCfg {
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
default:
fmt.Printf("Unknown Broker configuration type")
}
}
28 changes: 14 additions & 14 deletions kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type SASLConfiguration struct {
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
}

// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker
type SingleBrokerConfiguration struct {
// BrokerConfiguration represents configuration of a single-instance Kafka broker
type BrokerConfiguration struct {
Address string `mapstructure:"address" toml:"address"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
Expand All @@ -54,21 +54,21 @@ type SingleBrokerConfiguration struct {
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}

// MultiBrokerConfiguration represents configuration of Kafka broker with
// multiple instances running on different hosts
type MultiBrokerConfiguration struct {
Addresses []string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time.Duration `mapstructure:"timeout" toml:"timeout"`
Group string `mapstructure:"group" toml:"group"`
ClientID string `mapstructure:"client_id" toml:"client_id"`
Enabled bool `mapstructure:"enabled" toml:"enabled"`
// BrokersConfig represents configuration of Kafka broker with
// multiple instances running on different hosts (kafka cluster)
type BrokersConfig []*BrokerConfiguration

// GetBrokersAddresses returns array of addresses of the configured brokers
func GetBrokersAddresses(brokersCfg BrokersConfig) []string {
addresses := make([]string, len(brokersCfg), len(brokersCfg))
for i, cfg := range brokersCfg {
addresses[i] = cfg.Address
}
return addresses
}

// SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters
func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) {
func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_2_0

Expand Down
Loading

0 comments on commit 02972e4

Please sign in to comment.