Skip to content

Commit

Permalink
[CCXDEV-12032] Add common Kafka configuration and clowder-related uti…
Browse files Browse the repository at this point in the history
…lities (RedHatInsights#393)

* Add Kafka package with common broker configuration functions

* Add clowder-related common functions

* Linting and license header

* Small refactor of clowder/clowder_test.go
  • Loading branch information
epapbak authored Jan 4, 2024
1 parent 958d754 commit e37a6f7
Show file tree
Hide file tree
Showing 10 changed files with 603 additions and 0 deletions.
214 changes: 214 additions & 0 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2024 Red Hat, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clowder_test

import (
"fmt"
"github.com/RedHatInsights/insights-operator-utils/clowder"
"github.com/RedHatInsights/insights-operator-utils/kafka"
"github.com/RedHatInsights/insights-operator-utils/postgres"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/stretchr/testify/assert"
"github.com/tisnik/go-capture"
"testing"
)

func TestUseDBConfig(t *testing.T) {
cfg := postgres.StorageConfiguration{}
expected := postgres.StorageConfiguration{
PGUsername: "username",
PGPassword: "password",
PGHost: "hostname",
PGPort: 1234,
PGDBName: "dbname",
}
loadedCfg := api.AppConfig{
Database: &api.DatabaseConfig{
AdminPassword: "adminpw",
AdminUsername: "admin",
Hostname: "hostname",
Name: "dbname",
Password: "password",
Port: 1234,
RdsCa: nil,
Username: "username",
},
}

clowder.UseDBConfig(&cfg, &loadedCfg)
assert.Equal(t, expected, cfg, "Clowder database config was not used")
}

func TestUseClowderTopicsTopicFound(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Name: clowderTopicName,
},
"topic2": {
Name: "AnotherTopicName",
},
}

clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicNotFound(t *testing.T) {
originalTopicName := "topic1"

brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
"topic2": {
Name: "AnotherTopicName",
},
}

output, _ := capture.StandardOutput(func() {
clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
})
assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change")
assert.Contains(t, output, "warning: no kafka mapping found for topic topic1")
}

func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
addr := "test_broker"
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: addr,
Port: nil,
},
},
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, addr, brokerCfg.Address)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
addr := "test_broker"
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: addr,
Port: &port,
},
},
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
addr := "test_broker"
authType := api.BrokerConfigAuthtypeSasl
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: addr,
Port: &port,
Authtype: &authType,
},
},
},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Contains(t, output, clowder.NoSaslCfg)
}

func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
addr := "test_broker"
saslUsr := "user"
saslPwd := "pwd"
saslMechanism := "sasl"
protocol := "tls"

authType := api.BrokerConfigAuthtypeSasl
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: addr,
Port: &port,
Authtype: &authType,
Sasl: &api.KafkaSASLConfig{
Password: &saslPwd,
Username: &saslUsr,
SaslMechanism: &saslMechanism,
},
SecurityProtocol: &protocol,
},
},
},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, saslUsr, brokerCfg.SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SecurityProtocol)
}
20 changes: 20 additions & 0 deletions clowder/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 Red Hat, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clowder

var (
NoBrokerCfg = noBrokerConfig
NoSaslCfg = noSaslConfig
)
75 changes: 75 additions & 0 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Red Hat, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clowder

import (
"fmt"
"github.com/RedHatInsights/insights-operator-utils/kafka"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)

// Common constants used for logging and error reporting
const (
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
)

// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
// make sure broker(s) are configured in Clowder
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
broker := loadedConfig.Kafka.Brokers[0]
// port can be empty in api, so taking it into account
if broker.Port != nil {
brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Address = broker.Hostname
}

// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SaslUsername = *broker.Sasl.Username
brokerCfg.SaslPassword = *broker.Sasl.Password
brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SecurityProtocol = *broker.SecurityProtocol

if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg.CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
}
} else {
fmt.Println(noBrokerConfig)
}
}

// UseClowderTopics tries to replace the configured topic with the corresponding
// topic loaded by Clowder
func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) {
// Get the correct topic name from clowder mapping if available
if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok {
configuration.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, configuration.Topic)
}
}
30 changes: 30 additions & 0 deletions clowder/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Red Hat, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clowder

import (
"github.com/RedHatInsights/insights-operator-utils/postgres"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)

// UseDBConfig tries to replace the StorageConfiguration parameters with the
// values loaded by Clowder
func UseDBConfig(storageCfg *postgres.StorageConfiguration, loadedConfig *api.AppConfig) {
storageCfg.PGDBName = loadedConfig.Database.Name
storageCfg.PGHost = loadedConfig.Database.Hostname
storageCfg.PGPort = loadedConfig.Database.Port
storageCfg.PGUsername = loadedConfig.Database.Username
storageCfg.PGPassword = loadedConfig.Database.Password
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ require (
github.com/mozillazg/request v0.8.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/redhatinsights/app-common-go v1.6.7
github.com/redis/go-redis/v9 v9.3.1
github.com/rs/zerolog v1.31.0
github.com/stretchr/testify v1.8.4
github.com/tisnik/go-capture v1.0.1
github.com/verdverm/frisby v0.0.0-20170604211311-b16556248a9a
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
gopkg.in/h2non/gock.v1 v1.1.2
)

Expand Down Expand Up @@ -56,6 +59,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/segmentio/kafka-go v0.4.10 // indirect
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redhatinsights/app-common-go v1.6.7 h1:cXWW0F6ZW53RLRr54gn7Azo9CLTysYOmFDR0D0Qd0Fs=
github.com/redhatinsights/app-common-go v1.6.7/go.mod h1:6gzRyg8ZyejwMCksukeAhh2ZXOB3uHSmBsbP06fG2PQ=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down Expand Up @@ -618,6 +620,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tisnik/go-capture v1.0.1 h1:o4zZpOlC01qCifeh0fj4SoUkt8UHFassn1+blmFN3BQ=
github.com/tisnik/go-capture v1.0.1/go.mod h1:NArgKXuvcG6gOW2SQoPGKy6TuiKBttQ2ZV0/zC4zVaY=
github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b/go.mod h1:uqlTeGUUfRdQvlQGkv+DYe3lLST3DionEwMA9YAYibY=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
Loading

0 comments on commit e37a6f7

Please sign in to comment.