Skip to content

Commit

Permalink
Revert "Refactor MultiBrokerConfiguration into a slice of BrokerConfi…
Browse files Browse the repository at this point in the history
…guration objects"

This reverts commit ee63cd1.
  • Loading branch information
epapbak committed Jan 15, 2024
1 parent 6f13707 commit fec884b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 156 deletions.
133 changes: 45 additions & 88 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ package clowder_test

import (
"fmt"
"testing"

"github.com/RedHatInsights/insights-operator-utils/clowder"
"github.com/RedHatInsights/insights-operator-utils/kafka"
"github.com/RedHatInsights/insights-operator-utils/postgres"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/stretchr/testify/assert"
"github.com/tisnik/go-capture"
"testing"
)

func TestUseDBConfig(t *testing.T) {
Expand Down Expand Up @@ -55,10 +54,8 @@ func TestUseDBConfig(t *testing.T) {
func TestUseClowderTopicsTopicFound(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.BrokersConfig{
{
Topic: originalTopicName,
},
brokerCfg := kafka.SingleBrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Expand All @@ -69,17 +66,15 @@ func TestUseClowderTopicsTopicFound(t *testing.T) {
},
}

clowder.UseClowderTopics(brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used")
clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.BrokersConfig{
{
Topic: originalTopicName,
},
brokerCfg := kafka.MultiBrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Expand All @@ -90,17 +85,15 @@ func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
},
}

clowder.UseClowderTopics(brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg[0].Topic, "Clowder topic name was not used")
clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicNotFound(t *testing.T) {
originalTopicName := "topic1"

brokerCfg := kafka.BrokersConfig{
{
Topic: originalTopicName,
},
brokerCfg := kafka.SingleBrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
"topic2": {
Expand All @@ -109,73 +102,39 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) {
}

output, _ := capture.StandardOutput(func() {
clowder.UseClowderTopics(brokerCfg, kafkaTopics)
clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
})
assert.Equal(t, originalTopicName, brokerCfg[0].Topic, "topic name should not change")
assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change")
assert.Contains(t, output, "warning: no kafka mapping found for topic topic1")
}

func TestGetBrokersAddressesNoBrokerConfig(t *testing.T) {
cfg := kafka.BrokersConfig{}
assert.Equal(t, []string{}, kafka.GetBrokersAddresses(cfg))
}

func TestGetBrokersAddressesSingleBrokerConfig(t *testing.T) {
const addr = "some_addr"
cfg := kafka.BrokersConfig{
{Address: addr},
}
assert.Equal(t, []string{addr}, kafka.GetBrokersAddresses(cfg))
}

func TestGetBrokersAddressesMultipleBrokerConfig(t *testing.T) {
const addr, addr2 = "some_addr", "some_other_addr"
cfg := kafka.BrokersConfig{
{Address: addr},
{Address: addr2},
}
assert.Equal(t, []string{addr, addr2}, kafka.GetBrokersAddresses(cfg))
}

func TestUseBrokerConfigNoClowderKafkaConfig(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigNoOriginalKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokersConfig{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoOriginalBroker)
}

func TestUseBrokerConfigNoClowderKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigMultipleClowderKafkaBrokers(t *testing.T) {
func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
addr1 := "test_broker"
addr2 := "test_broker_backup"
port := 12345
brokerCfg := kafka.BrokersConfig{{}}
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -191,14 +150,13 @@ func TestUseBrokerConfigMultipleClowderKafkaBrokers(t *testing.T) {
},
}

brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr1, port), brokerCfg[0].Address)
assert.Equal(t, addr2, brokerCfg[1].Address)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
addr := "test_broker"
brokerCfg := kafka.BrokersConfig{{}}
brokerCfg := kafka.MultiBrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -210,12 +168,12 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
},
}

brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
assert.Equal(t, addr, brokerCfg[0].Address)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{addr}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
brokerCfg := kafka.MultiBrokerConfiguration{}
port := 12345
addr := "test_broker"
loadedConfig := api.AppConfig{
Expand All @@ -229,12 +187,12 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
},
}

brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
brokerCfg := kafka.MultiBrokerConfiguration{}
port := 12345
addr := "test_broker"
authType := api.BrokerConfigAuthtypeSasl
Expand All @@ -251,15 +209,15 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
}

output, _ := capture.StandardOutput(func() {
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Contains(t, output, clowder.NoSaslCfg)
}

func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
brokerCfg := kafka.BrokersConfig{{}}
brokerCfg := kafka.MultiBrokerConfiguration{}
port := 12345
addr := "test_broker"
addr2 := "test_broker_backup"
Expand Down Expand Up @@ -300,20 +258,19 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
}

output, _ := capture.StandardOutput(func() {
brokerCfg = clowder.UseBrokerConfig(brokerCfg, &loadedConfig)
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg[0].Address)
assert.Equal(t, saslUsr, brokerCfg[0].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg[0].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg[0].SaslMechanism)
assert.Equal(t, protocol, brokerCfg[0].SecurityProtocol)

assert.Equal(t, fmt.Sprintf("%s:%d", addr2, port), brokerCfg[1].Address)
assert.Equal(t, saslUsr2, brokerCfg[1].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg[1].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg[1].SaslMechanism)
assert.Equal(t, protocol, brokerCfg[1].SecurityProtocol)
assert.Equal(t, 2, len(brokerCfg.SASLConfigs))
assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol)

assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol)
}
5 changes: 2 additions & 3 deletions clowder/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package clowder

var (
NoBrokerCfg = noBrokerConfig
NoOriginalBroker = noOriginalBroker
NoSaslCfg = noSaslConfig
NoBrokerCfg = noBrokerConfig
NoSaslCfg = noSaslConfig
)
70 changes: 27 additions & 43 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,42 @@ package clowder

import (
"fmt"

"github.com/RedHatInsights/insights-operator-utils/kafka"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)

// Common constants used for logging and error reporting
const (
noOriginalBroker = "warning: no original broker configuration found; aborting"
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
)

// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder. It expects brokerCfg to already be initialized
func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig) kafka.BrokersConfig {
numBrokerConfigs := len(brokerCfg)
if numBrokerConfigs == 0 {
// if original brokers config is totally empty, do nothing.
// this shouldn't happen, but we need to control this scenario
// to avoid panics.
fmt.Println(noOriginalBroker)
return brokerCfg
}
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) {
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
numClowderBrokers := len(loadedConfig.Kafka.Brokers)
// if original config has fewer brokers than clowder's, we append additional
// brokerConfiguration items with topic, clientId, and group from existing
// items, and the rest will be filled with data from clowder's brokers.
// When appending, it's most probable that a new slice is returned due to
// original capacity not being enough, which is why the brokerCfg slice is
// returned
for len(brokerCfg) < numClowderBrokers {
brokerCfg = append(brokerCfg, &kafka.BrokerConfiguration{
Topic: (brokerCfg)[numBrokerConfigs-1].Topic,
ClientID: (brokerCfg)[numBrokerConfigs-1].ClientID,
Group: (brokerCfg)[numBrokerConfigs-1].Group,
// Since this will have data from Clowder's loadedConfig, always enable
Enabled: true,
})
}
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers))
for i, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
(brokerCfg)[i].Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
(brokerCfg)[i].Address = broker.Hostname
brokerCfg.Addresses[i] = broker.Hostname
}
// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg[i].SaslUsername = *broker.Sasl.Username
brokerCfg[i].SaslPassword = *broker.Sasl.Password
brokerCfg[i].SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg[i].SecurityProtocol = *broker.SecurityProtocol
brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username
brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password
brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol

if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg[i].CertPath = caPath
brokerCfg.SASLConfigs[i].CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
Expand All @@ -85,17 +61,25 @@ func UseBrokerConfig(brokerCfg kafka.BrokersConfig, loadedConfig *api.AppConfig)
} else {
fmt.Println(noBrokerConfig)
}
return brokerCfg
}

// UseClowderTopics tries to replace the configured topic's name with the
// corresponding topic name loaded by Clowder, if any
func UseClowderTopics(brokersCfg kafka.BrokersConfig, kafkaTopics map[string]api.TopicConfig) {
for _, cfg := range brokersCfg {
// UseClowderTopics tries to replace the configured topic with the corresponding
// topic loaded by Clowder
func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) {
switch cfg := brokerCfg.(type) {
case *kafka.SingleBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
case *kafka.MultiBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
default:
fmt.Printf("Unknown Broker configuration type")
}
}
Loading

0 comments on commit fec884b

Please sign in to comment.