Skip to content

Commit

Permalink
Add clowder-related common functions
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 3, 2024
1 parent ba59812 commit 3d7a077
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 2 deletions.
193 changes: 193 additions & 0 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package clowder_test

import (
"github.com/RedHatInsights/insights-operator-utils/clowder"
"github.com/RedHatInsights/insights-operator-utils/kafka"
"github.com/RedHatInsights/insights-operator-utils/postgres"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/stretchr/testify/assert"
"github.com/tisnik/go-capture"
"testing"
)

func TestUseDBConfig(t *testing.T) {
cfg := postgres.StorageConfiguration{}
expected := postgres.StorageConfiguration{
PGUsername: "username",
PGPassword: "password",
PGHost: "hostname",
PGPort: 1234,
PGDBName: "dbname",
}
loadedCfg := api.AppConfig{
Database: &api.DatabaseConfig{
AdminPassword: "adminpw",
AdminUsername: "admin",
Hostname: "hostname",
Name: "dbname",
Password: "password",
Port: 1234,
RdsCa: nil,
Username: "username",
},
}

clowder.UseDBConfig(&cfg, &loadedCfg)
assert.Equal(t, expected, cfg, "Clowder database config was not used")
}

func TestUseClowderTopicsTopicFound(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
originalTopicName: {
Name: clowderTopicName,
},
"topic2": {
Name: "AnotherTopicName",
},
}

clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
}

func TestUseClowderTopicsTopicNotFound(t *testing.T) {
originalTopicName := "topic1"

brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
"topic2": {
Name: "AnotherTopicName",
},
}

output, _ := capture.StandardOutput(func() {
clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
})
assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change")
assert.Contains(t, output, "warning: no kafka mapping found for topic topic1")
}

func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: "test_broker",
Port: nil,
},
},
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, "test_broker", brokerCfg.Address)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: "test_broker",
Port: &port,
},
},
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, "test_broker:12345", brokerCfg.Address)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
authType := api.BrokerConfigAuthtypeSasl
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: "test_broker",
Port: &port,
Authtype: &authType,
},
},
},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, "test_broker:12345", brokerCfg.Address)
assert.Contains(t, output, clowder.NoSaslCfg)
}

func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
saslCfg := "user_pwd"
protocol := "tls"

authType := api.BrokerConfigAuthtypeSasl
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: "test_broker",
Port: &port,
Authtype: &authType,
Sasl: &api.KafkaSASLConfig{
Password: &saslCfg,
Username: &saslCfg,
SaslMechanism: &saslCfg,
},
SecurityProtocol: &protocol,
},
},
},
}

output, _ := capture.StandardOutput(func() {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, "test_broker:12345", brokerCfg.Address)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, saslCfg, brokerCfg.SaslUsername)
assert.Equal(t, saslCfg, brokerCfg.SaslPassword)
assert.Equal(t, saslCfg, brokerCfg.SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SecurityProtocol)
}
6 changes: 6 additions & 0 deletions clowder/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package clowder

var (
NoBrokerCfg = noBrokerConfig
NoSaslCfg = noSaslConfig
)
56 changes: 56 additions & 0 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package clowder

import (
"fmt"
"github.com/RedHatInsights/insights-operator-utils/kafka"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)

// Common constants used for logging and error reporting
const (
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
)

func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {

Check failure on line 16 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.18

exported function UseBrokerConfig should have comment or be unexported

Check failure on line 16 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.19

exported function UseBrokerConfig should have comment or be unexported

Check failure on line 16 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.20

exported function UseBrokerConfig should have comment or be unexported

Check failure on line 16 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.21

exported function UseBrokerConfig should have comment or be unexported
// make sure broker(s) are configured in Clowder
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
broker := loadedConfig.Kafka.Brokers[0]
// port can be empty in api, so taking it into account
if broker.Port != nil {
brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Address = broker.Hostname
}

// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SaslUsername = *broker.Sasl.Username
brokerCfg.SaslPassword = *broker.Sasl.Password
brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SecurityProtocol = *broker.SecurityProtocol

if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg.CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
}
} else {
fmt.Println(noBrokerConfig)
}
}
func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) {

Check failure on line 49 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.18

exported function UseClowderTopics should have comment or be unexported

Check failure on line 49 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.19

exported function UseClowderTopics should have comment or be unexported

Check failure on line 49 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.20

exported function UseClowderTopics should have comment or be unexported

Check failure on line 49 in clowder/kafka.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.21

exported function UseClowderTopics should have comment or be unexported
// Get the correct topic name from clowder mapping if available
if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok {
configuration.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, configuration.Topic)
}
}
14 changes: 14 additions & 0 deletions clowder/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package clowder

import (
"github.com/RedHatInsights/insights-operator-utils/postgres"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)

func UseDBConfig(storageCfg *postgres.StorageConfiguration, loadedConfig *api.AppConfig) {

Check failure on line 8 in clowder/storage.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.18

exported function UseDBConfig should have comment or be unexported

Check failure on line 8 in clowder/storage.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.19

exported function UseDBConfig should have comment or be unexported

Check failure on line 8 in clowder/storage.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.20

exported function UseDBConfig should have comment or be unexported

Check failure on line 8 in clowder/storage.go

View workflow job for this annotation

GitHub Actions / Linters for Go 1.21

exported function UseDBConfig should have comment or be unexported
storageCfg.PGDBName = loadedConfig.Database.Name
storageCfg.PGHost = loadedConfig.Database.Hostname
storageCfg.PGPort = loadedConfig.Database.Port
storageCfg.PGUsername = loadedConfig.Database.Username
storageCfg.PGPassword = loadedConfig.Database.Password
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ require (
github.com/mozillazg/request v0.8.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/redhatinsights/app-common-go v1.6.7
github.com/redis/go-redis/v9 v9.3.1
github.com/rs/zerolog v1.31.0
github.com/stretchr/testify v1.8.4
github.com/tisnik/go-capture v1.0.1
github.com/verdverm/frisby v0.0.0-20170604211311-b16556248a9a
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
gopkg.in/h2non/gock.v1 v1.1.2
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redhatinsights/app-common-go v1.6.7 h1:cXWW0F6ZW53RLRr54gn7Azo9CLTysYOmFDR0D0Qd0Fs=
github.com/redhatinsights/app-common-go v1.6.7/go.mod h1:6gzRyg8ZyejwMCksukeAhh2ZXOB3uHSmBsbP06fG2PQ=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down Expand Up @@ -618,6 +620,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tisnik/go-capture v1.0.1 h1:o4zZpOlC01qCifeh0fj4SoUkt8UHFassn1+blmFN3BQ=
github.com/tisnik/go-capture v1.0.1/go.mod h1:NArgKXuvcG6gOW2SQoPGKy6TuiKBttQ2ZV0/zC4zVaY=
github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b/go.mod h1:uqlTeGUUfRdQvlQGkv+DYe3lLST3DionEwMA9YAYibY=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
5 changes: 3 additions & 2 deletions kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package broker contains data types, interfaces, and methods related to
// brokers that can be used to consume input messages by aggegator.
// Package kafka contains data types, interfaces, and methods related to
// Kafka that can be used to configure brokers, as well as consume/produce
// messages.
package kafka

import (
Expand Down
13 changes: 13 additions & 0 deletions postgres/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package postgres

// StorageConfiguration represents common configuration of data storage
type StorageConfiguration struct {
Driver string `mapstructure:"db_driver" toml:"db_driver"`
PGUsername string `mapstructure:"pg_username" toml:"pg_username"`
PGPassword string `mapstructure:"pg_password" toml:"pg_password"`
PGHost string `mapstructure:"pg_host" toml:"pg_host"`
PGPort int `mapstructure:"pg_port" toml:"pg_port"`
PGDBName string `mapstructure:"pg_db_name" toml:"pg_db_name"`
PGParams string `mapstructure:"pg_params" toml:"pg_params"`
LogSQLQueries bool `mapstructure:"log_sql_queries" toml:"log_sql_queries"`
}

0 comments on commit 3d7a077

Please sign in to comment.