Skip to content

Commit

Permalink
Handle Kafka Broker addresses as string instead of slice
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Feb 13, 2024
1 parent 68803c2 commit 1c89253
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
10 changes: 5 additions & 5 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d,%s", addr1, port, addr2), brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
Expand All @@ -170,7 +170,7 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{addr}, brokerCfg.Addresses)
assert.Equal(t, addr, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
Expand All @@ -189,7 +189,7 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Addresses)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
Expand All @@ -213,7 +213,7 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Addresses)
assert.Contains(t, output, clowder.NoSaslCfg)
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d,%s:%d", addr, port, addr2, port), brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, saslUsr, brokerCfg.SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SaslPassword)
Expand Down
11 changes: 6 additions & 5 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package clowder

import (
"fmt"

"github.com/RedHatInsights/insights-operator-utils/kafka"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)
Expand All @@ -32,14 +31,16 @@ const (
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
for i, broker := range loadedConfig.Kafka.Brokers {
brokerCfg.Addresses = ""
for _, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
brokerCfg.Addresses += fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + ","
} else {
brokerCfg.Addresses[i] = broker.Hostname
brokerCfg.Addresses += broker.Hostname + ","
}
}
// remove the extra comma
brokerCfg.Addresses = brokerCfg.Addresses[:len(brokerCfg.Addresses)-1]
// SSL config
clowderCfg := loadedConfig.Kafka.Brokers[0]
if clowderCfg.Authtype != nil {
Expand Down
4 changes: 3 additions & 1 deletion kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

// BrokerConfiguration represents configuration of a single-instance Kafka broker
type BrokerConfiguration struct {
Addresses []string `mapstructure:"addresses" toml:"addresses"`
// Viper does not unmarshall automagically to a slice.
// Handling a string is easier and nicer than all the code required to do so
Addresses string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
Expand Down

0 comments on commit 1c89253

Please sign in to comment.