From 3d7a07732fed32d8e39bdad3a3b69cbf686bd968 Mon Sep 17 00:00:00 2001
From: Papa Bakary Camara <pcamara@redhat.com>
Date: Wed, 3 Jan 2024 15:50:55 +0100
Subject: [PATCH] Add clowder-related common functions

---
 clowder/clowder_test.go | 193 ++++++++++++++++++++++++++++++++++++++++
 clowder/export_test.go  |   6 ++
 clowder/kafka.go        |  56 ++++++++++++
 clowder/storage.go      |  14 +++
 go.mod                  |   2 +
 go.sum                  |   3 +
 kafka/configuration.go  |   5 +-
 postgres/config.go      |  13 +++
 8 files changed, 290 insertions(+), 2 deletions(-)
 create mode 100644 clowder/clowder_test.go
 create mode 100644 clowder/export_test.go
 create mode 100644 clowder/kafka.go
 create mode 100644 clowder/storage.go
 create mode 100644 postgres/config.go

diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go
new file mode 100644
index 0000000..b4c226c
--- /dev/null
+++ b/clowder/clowder_test.go
@@ -0,0 +1,193 @@
+package clowder_test
+
+import (
+	"github.com/RedHatInsights/insights-operator-utils/clowder"
+	"github.com/RedHatInsights/insights-operator-utils/kafka"
+	"github.com/RedHatInsights/insights-operator-utils/postgres"
+	api "github.com/redhatinsights/app-common-go/pkg/api/v1"
+	"github.com/stretchr/testify/assert"
+	"github.com/tisnik/go-capture"
+	"testing"
+)
+
+func TestUseDBConfig(t *testing.T) {
+	cfg := postgres.StorageConfiguration{}
+	expected := postgres.StorageConfiguration{
+		PGUsername: "username",
+		PGPassword: "password",
+		PGHost:     "hostname",
+		PGPort:     1234,
+		PGDBName:   "dbname",
+	}
+	loadedCfg := api.AppConfig{
+		Database: &api.DatabaseConfig{
+			AdminPassword: "adminpw",
+			AdminUsername: "admin",
+			Hostname:      "hostname",
+			Name:          "dbname",
+			Password:      "password",
+			Port:          1234,
+			RdsCa:         nil,
+			Username:      "username",
+		},
+	}
+
+	clowder.UseDBConfig(&cfg, &loadedCfg)
+	assert.Equal(t, expected, cfg, "Clowder database config was not used")
+}
+
+func TestUseClowderTopicsTopicFound(t *testing.T) {
+	originalTopicName := "topic1"
+	clowderTopicName := "NewTopicName"
+	brokerCfg := kafka.BrokerConfiguration{
+		Topic: originalTopicName,
+	}
+	kafkaTopics := map[string]api.TopicConfig{
+		originalTopicName: {
+			Name: clowderTopicName,
+		},
+		"topic2": {
+			Name: "AnotherTopicName",
+		},
+	}
+
+	clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
+	assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used")
+}
+
+func TestUseClowderTopicsTopicNotFound(t *testing.T) {
+	originalTopicName := "topic1"
+
+	brokerCfg := kafka.BrokerConfiguration{
+		Topic: originalTopicName,
+	}
+	kafkaTopics := map[string]api.TopicConfig{
+		"topic2": {
+			Name: "AnotherTopicName",
+		},
+	}
+
+	output, _ := capture.StandardOutput(func() {
+		clowder.UseClowderTopics(&brokerCfg, kafkaTopics)
+	})
+	assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change")
+	assert.Contains(t, output, "warning: no kafka mapping found for topic topic1")
+}
+
+func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
+	brokerCfg := kafka.BrokerConfiguration{}
+	loadedConfig := api.AppConfig{}
+
+	output, _ := capture.StandardOutput(func() {
+		clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
+	})
+	assert.Contains(t, output, clowder.NoBrokerCfg)
+}
+
+func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
+	brokerCfg := kafka.BrokerConfiguration{}
+	loadedConfig := api.AppConfig{
+		Kafka: &api.KafkaConfig{},
+	}
+
+	output, _ := capture.StandardOutput(func() {
+		clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
+	})
+	assert.Contains(t, output, clowder.NoBrokerCfg)
+}
+
+func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
+	brokerCfg := kafka.BrokerConfiguration{}
+	loadedConfig := api.AppConfig{
+		Kafka: &api.KafkaConfig{
+			Brokers: []api.BrokerConfig{
+				{
+					Hostname: "test_broker",
+					Port:     nil,
+				},
+			},
+		},
+	}
+
+	clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
+	assert.Equal(t, "test_broker", brokerCfg.Address)
+}
+
+func TestUseBrokerConfigNoAuth(t *testing.T) {
+	brokerCfg := kafka.BrokerConfiguration{}
+	port := 12345
+	loadedConfig := api.AppConfig{
+		Kafka: &api.KafkaConfig{
+			Brokers: []api.BrokerConfig{
+				{
+					Hostname: "test_broker",
+					Port:     &port,
+				},
+			},
+		},
+	}
+
+	clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
+	assert.Equal(t, "test_broker:12345", brokerCfg.Address)
+}
+
+func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
+	brokerCfg := kafka.BrokerConfiguration{}
+	port := 12345
+	authType := api.BrokerConfigAuthtypeSasl
+	loadedConfig := api.AppConfig{
+		Kafka: &api.KafkaConfig{
+			Brokers: []api.BrokerConfig{
+				{
+					Hostname: "test_broker",
+					Port:     &port,
+					Authtype: &authType,
+				},
+			},
+		},
+	}
+
+	output, _ := capture.StandardOutput(func() {
+		clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
+	})
+
+	assert.Equal(t, "test_broker:12345", brokerCfg.Address)
+	assert.Contains(t, output, clowder.NoSaslCfg)
+}
+
+func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
+	brokerCfg := kafka.BrokerConfiguration{}
+	port := 12345
+	saslCfg := "user_pwd"
+	protocol := "tls"
+
+	authType := api.BrokerConfigAuthtypeSasl
+	loadedConfig := api.AppConfig{
+		Kafka: &api.KafkaConfig{
+			Brokers: []api.BrokerConfig{
+				{
+					Hostname: "test_broker",
+					Port:     &port,
+					Authtype: &authType,
+					Sasl: &api.KafkaSASLConfig{
+						Password:      &saslCfg,
+						Username:      &saslCfg,
+						SaslMechanism: &saslCfg,
+					},
+					SecurityProtocol: &protocol,
+				},
+			},
+		},
+	}
+
+	output, _ := capture.StandardOutput(func() {
+		clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
+	})
+
+	assert.Equal(t, "test_broker:12345", brokerCfg.Address)
+	assert.Contains(t, output, "kafka is configured to use authentication")
+	assert.Equal(t, saslCfg, brokerCfg.SaslUsername)
+	assert.Equal(t, saslCfg, brokerCfg.SaslPassword)
+	assert.Equal(t, saslCfg, brokerCfg.SaslMechanism)
+	assert.Equal(t, protocol, brokerCfg.SecurityProtocol)
+}
diff --git a/clowder/export_test.go b/clowder/export_test.go
new file mode 100644
index 0000000..6dd3c81
--- /dev/null
+++ b/clowder/export_test.go
@@ -0,0 +1,6 @@
+package clowder
+
+var (
+	NoBrokerCfg = noBrokerConfig
+	NoSaslCfg   = noSaslConfig
+)
diff --git a/clowder/kafka.go b/clowder/kafka.go
new file mode 100644
index 0000000..35b63d0
--- /dev/null
+++ b/clowder/kafka.go
@@ -0,0 +1,56 @@
+package clowder
+
+import (
+	"fmt"
+	"github.com/RedHatInsights/insights-operator-utils/kafka"
+	api "github.com/redhatinsights/app-common-go/pkg/api/v1"
+)
+
+// Common constants used for logging and error reporting
+const (
+	noBrokerConfig = "warning: no broker configurations found in clowder config"
+	noSaslConfig   = "warning: SASL configuration is missing"
+	noTopicMapping = "warning: no kafka mapping found for topic %s"
+)
+
+func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
+	// make sure broker(s) are configured in Clowder
+	if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
+		broker := loadedConfig.Kafka.Brokers[0]
+		// port can be empty in api, so taking it into account
+		if broker.Port != nil {
+			brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
+		} else {
+			brokerCfg.Address = broker.Hostname
+		}
+
+		// SSL config
+		if broker.Authtype != nil {
+			fmt.Println("kafka is configured to use authentication")
+			if broker.Sasl != nil {
+				// we are trusting that these values are set and
+				// dereferencing the pointers without any check...
+				brokerCfg.SaslUsername = *broker.Sasl.Username
+				brokerCfg.SaslPassword = *broker.Sasl.Password
+				brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism
+				brokerCfg.SecurityProtocol = *broker.SecurityProtocol
+
+				if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
+					brokerCfg.CertPath = caPath
+				}
+			} else {
+				fmt.Println(noSaslConfig)
+			}
+		}
+	} else {
+		fmt.Println(noBrokerConfig)
+	}
+}
+func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) {
+	// Get the correct topic name from clowder mapping if available
+	if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok {
+		configuration.Topic = clowderTopic.Name
+	} else {
+		fmt.Printf(noTopicMapping, configuration.Topic)
+	}
+}
diff --git a/clowder/storage.go b/clowder/storage.go
new file mode 100644
index 0000000..2c543cc
--- /dev/null
+++ b/clowder/storage.go
@@ -0,0 +1,14 @@
+package clowder
+
+import (
+	"github.com/RedHatInsights/insights-operator-utils/postgres"
+	api "github.com/redhatinsights/app-common-go/pkg/api/v1"
+)
+
+func UseDBConfig(storageCfg *postgres.StorageConfiguration, loadedConfig *api.AppConfig) {
+	storageCfg.PGDBName = loadedConfig.Database.Name
+	storageCfg.PGHost = loadedConfig.Database.Hostname
+	storageCfg.PGPort = loadedConfig.Database.Port
+	storageCfg.PGUsername = loadedConfig.Database.Username
+	storageCfg.PGPassword = loadedConfig.Database.Password
+}
diff --git a/go.mod b/go.mod
index ca70b8a..f954cfc 100644
--- a/go.mod
+++ b/go.mod
@@ -20,9 +20,11 @@ require (
 	github.com/mozillazg/request v0.8.0
 	github.com/prometheus/client_golang v1.18.0
 	github.com/prometheus/client_model v0.5.0
+	github.com/redhatinsights/app-common-go v1.6.7
 	github.com/redis/go-redis/v9 v9.3.1
 	github.com/rs/zerolog v1.31.0
 	github.com/stretchr/testify v1.8.4
+	github.com/tisnik/go-capture v1.0.1
 	github.com/verdverm/frisby v0.0.0-20170604211311-b16556248a9a
 	github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
 	gopkg.in/h2non/gock.v1 v1.1.2
diff --git a/go.sum b/go.sum
index a245d91..bbbc325 100644
--- a/go.sum
+++ b/go.sum
@@ -555,6 +555,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/redhatinsights/app-common-go v1.6.7 h1:cXWW0F6ZW53RLRr54gn7Azo9CLTysYOmFDR0D0Qd0Fs=
+github.com/redhatinsights/app-common-go v1.6.7/go.mod h1:6gzRyg8ZyejwMCksukeAhh2ZXOB3uHSmBsbP06fG2PQ=
 github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
 github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@@ -618,6 +620,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
 github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
+github.com/tisnik/go-capture v1.0.1 h1:o4zZpOlC01qCifeh0fj4SoUkt8UHFassn1+blmFN3BQ=
 github.com/tisnik/go-capture v1.0.1/go.mod h1:NArgKXuvcG6gOW2SQoPGKy6TuiKBttQ2ZV0/zC4zVaY=
 github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b/go.mod h1:uqlTeGUUfRdQvlQGkv+DYe3lLST3DionEwMA9YAYibY=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
diff --git a/kafka/configuration.go b/kafka/configuration.go
index 148c88b..70c9a32 100644
--- a/kafka/configuration.go
+++ b/kafka/configuration.go
@@ -14,8 +14,9 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-// Package broker contains data types, interfaces, and methods related to
-// brokers that can be used to consume input messages by aggegator.
+// Package kafka contains data types, interfaces, and methods related to
+// Kafka that can be used to configure brokers, as well as consume/produce
+// messages.
 package kafka
 
 import (
diff --git a/postgres/config.go b/postgres/config.go
new file mode 100644
index 0000000..5dbf956
--- /dev/null
+++ b/postgres/config.go
@@ -0,0 +1,13 @@
+package postgres
+
+// StorageConfiguration represents common configuration of data storage
+type StorageConfiguration struct {
+	Driver        string `mapstructure:"db_driver"       toml:"db_driver"`
+	PGUsername    string `mapstructure:"pg_username"     toml:"pg_username"`
+	PGPassword    string `mapstructure:"pg_password"     toml:"pg_password"`
+	PGHost        string `mapstructure:"pg_host"         toml:"pg_host"`
+	PGPort        int    `mapstructure:"pg_port"         toml:"pg_port"`
+	PGDBName      string `mapstructure:"pg_db_name"      toml:"pg_db_name"`
+	PGParams      string `mapstructure:"pg_params"       toml:"pg_params"`
+	LogSQLQueries bool   `mapstructure:"log_sql_queries" toml:"log_sql_queries"`
+}