diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go new file mode 100644 index 0000000..d82f0aa --- /dev/null +++ b/clowder/clowder_test.go @@ -0,0 +1,214 @@ +// Copyright 2024 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clowder_test + +import ( + "fmt" + "github.com/RedHatInsights/insights-operator-utils/clowder" + "github.com/RedHatInsights/insights-operator-utils/kafka" + "github.com/RedHatInsights/insights-operator-utils/postgres" + api "github.com/redhatinsights/app-common-go/pkg/api/v1" + "github.com/stretchr/testify/assert" + "github.com/tisnik/go-capture" + "testing" +) + +func TestUseDBConfig(t *testing.T) { + cfg := postgres.StorageConfiguration{} + expected := postgres.StorageConfiguration{ + PGUsername: "username", + PGPassword: "password", + PGHost: "hostname", + PGPort: 1234, + PGDBName: "dbname", + } + loadedCfg := api.AppConfig{ + Database: &api.DatabaseConfig{ + AdminPassword: "adminpw", + AdminUsername: "admin", + Hostname: "hostname", + Name: "dbname", + Password: "password", + Port: 1234, + RdsCa: nil, + Username: "username", + }, + } + + clowder.UseDBConfig(&cfg, &loadedCfg) + assert.Equal(t, expected, cfg, "Clowder database config was not used") +} + +func TestUseClowderTopicsTopicFound(t *testing.T) { + originalTopicName := "topic1" + clowderTopicName := "NewTopicName" + brokerCfg := kafka.BrokerConfiguration{ + Topic: originalTopicName, + } + kafkaTopics := map[string]api.TopicConfig{ + originalTopicName: { + Name: clowderTopicName, + }, + "topic2": { + Name: "AnotherTopicName", + }, + } + + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + assert.Equal(t, clowderTopicName, brokerCfg.Topic, "Clowder topic name was not used") +} + +func TestUseClowderTopicsTopicNotFound(t *testing.T) { + originalTopicName := "topic1" + + brokerCfg := kafka.BrokerConfiguration{ + Topic: originalTopicName, + } + kafkaTopics := map[string]api.TopicConfig{ + "topic2": { + Name: "AnotherTopicName", + }, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseClowderTopics(&brokerCfg, kafkaTopics) + }) + assert.Equal(t, originalTopicName, brokerCfg.Topic, "topic name should not change") + assert.Contains(t, output, "warning: no kafka mapping found for topic topic1") +} + +func TestUseBrokerConfigNoKafkaConfig(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{} + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + assert.Contains(t, output, clowder.NoBrokerCfg) +} + +func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{}, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + assert.Contains(t, output, clowder.NoBrokerCfg) +} + +func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { + addr := "test_broker" + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: addr, + Port: nil, + }, + }, + }, + } + + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, addr, brokerCfg.Address) +} + +func TestUseBrokerConfigNoAuth(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + port := 12345 + addr := "test_broker" + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: addr, + Port: &port, + }, + }, + }, + } + + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) +} + +func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + port := 12345 + addr := "test_broker" + authType := api.BrokerConfigAuthtypeSasl + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: addr, + Port: &port, + Authtype: &authType, + }, + }, + }, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + + assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Contains(t, output, clowder.NoSaslCfg) +} + +func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { + brokerCfg := kafka.BrokerConfiguration{} + port := 12345 + addr := "test_broker" + saslUsr := "user" + saslPwd := "pwd" + saslMechanism := "sasl" + protocol := "tls" + + authType := api.BrokerConfigAuthtypeSasl + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: addr, + Port: &port, + Authtype: &authType, + Sasl: &api.KafkaSASLConfig{ + Password: &saslPwd, + Username: &saslUsr, + SaslMechanism: &saslMechanism, + }, + SecurityProtocol: &protocol, + }, + }, + }, + } + + output, _ := capture.StandardOutput(func() { + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + }) + + assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Contains(t, output, "kafka is configured to use authentication") + assert.Equal(t, saslUsr, brokerCfg.SaslUsername) + assert.Equal(t, saslPwd, brokerCfg.SaslPassword) + assert.Equal(t, saslMechanism, brokerCfg.SaslMechanism) + assert.Equal(t, protocol, brokerCfg.SecurityProtocol) +} diff --git a/clowder/export_test.go b/clowder/export_test.go new file mode 100644 index 0000000..8556f2d --- /dev/null +++ b/clowder/export_test.go @@ -0,0 +1,20 @@ +// Copyright 2024 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clowder + +var ( + NoBrokerCfg = noBrokerConfig + NoSaslCfg = noSaslConfig +) diff --git a/clowder/kafka.go b/clowder/kafka.go new file mode 100644 index 0000000..8a2bfe6 --- /dev/null +++ b/clowder/kafka.go @@ -0,0 +1,75 @@ +// Copyright 2024 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clowder + +import ( + "fmt" + "github.com/RedHatInsights/insights-operator-utils/kafka" + api "github.com/redhatinsights/app-common-go/pkg/api/v1" +) + +// Common constants used for logging and error reporting +const ( + noBrokerConfig = "warning: no broker configurations found in clowder config" + noSaslConfig = "warning: SASL configuration is missing" + noTopicMapping = "warning: no kafka mapping found for topic %s" +) + +// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values +// loaded by Clowder +func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) { + // make sure broker(s) are configured in Clowder + if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { + broker := loadedConfig.Kafka.Brokers[0] + // port can be empty in api, so taking it into account + if broker.Port != nil { + brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + } else { + brokerCfg.Address = broker.Hostname + } + + // SSL config + if broker.Authtype != nil { + fmt.Println("kafka is configured to use authentication") + if broker.Sasl != nil { + // we are trusting that these values are set and + // dereferencing the pointers without any check... + brokerCfg.SaslUsername = *broker.Sasl.Username + brokerCfg.SaslPassword = *broker.Sasl.Password + brokerCfg.SaslMechanism = *broker.Sasl.SaslMechanism + brokerCfg.SecurityProtocol = *broker.SecurityProtocol + + if caPath, err := loadedConfig.KafkaCa(broker); err == nil { + brokerCfg.CertPath = caPath + } + } else { + fmt.Println(noSaslConfig) + } + } + } else { + fmt.Println(noBrokerConfig) + } +} + +// UseClowderTopics tries to replace the configured topic with the corresponding +// topic loaded by Clowder +func UseClowderTopics(configuration *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) { + // Get the correct topic name from clowder mapping if available + if clowderTopic, ok := kafkaTopics[configuration.Topic]; ok { + configuration.Topic = clowderTopic.Name + } else { + fmt.Printf(noTopicMapping, configuration.Topic) + } +} diff --git a/clowder/storage.go b/clowder/storage.go new file mode 100644 index 0000000..ec35738 --- /dev/null +++ b/clowder/storage.go @@ -0,0 +1,30 @@ +// Copyright 2024 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clowder + +import ( + "github.com/RedHatInsights/insights-operator-utils/postgres" + api "github.com/redhatinsights/app-common-go/pkg/api/v1" +) + +// UseDBConfig tries to replace the StorageConfiguration parameters with the +// values loaded by Clowder +func UseDBConfig(storageCfg *postgres.StorageConfiguration, loadedConfig *api.AppConfig) { + storageCfg.PGDBName = loadedConfig.Database.Name + storageCfg.PGHost = loadedConfig.Database.Hostname + storageCfg.PGPort = loadedConfig.Database.Port + storageCfg.PGUsername = loadedConfig.Database.Username + storageCfg.PGPassword = loadedConfig.Database.Password +} diff --git a/go.mod b/go.mod index f319a8d..f954cfc 100644 --- a/go.mod +++ b/go.mod @@ -20,10 +20,13 @@ require ( github.com/mozillazg/request v0.8.0 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 + github.com/redhatinsights/app-common-go v1.6.7 github.com/redis/go-redis/v9 v9.3.1 github.com/rs/zerolog v1.31.0 github.com/stretchr/testify v1.8.4 + github.com/tisnik/go-capture v1.0.1 github.com/verdverm/frisby v0.0.0-20170604211311-b16556248a9a + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c gopkg.in/h2non/gock.v1 v1.1.2 ) @@ -56,6 +59,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect github.com/segmentio/kafka-go v0.4.10 // indirect + github.com/xdg/stringprep v1.0.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.15.0 // indirect diff --git a/go.sum b/go.sum index a245d91..bbbc325 100644 --- a/go.sum +++ b/go.sum @@ -555,6 +555,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redhatinsights/app-common-go v1.6.7 h1:cXWW0F6ZW53RLRr54gn7Azo9CLTysYOmFDR0D0Qd0Fs= +github.com/redhatinsights/app-common-go v1.6.7/go.mod h1:6gzRyg8ZyejwMCksukeAhh2ZXOB3uHSmBsbP06fG2PQ= github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -618,6 +620,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/tisnik/go-capture v1.0.1 h1:o4zZpOlC01qCifeh0fj4SoUkt8UHFassn1+blmFN3BQ= github.com/tisnik/go-capture v1.0.1/go.mod h1:NArgKXuvcG6gOW2SQoPGKy6TuiKBttQ2ZV0/zC4zVaY= github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b/go.mod h1:uqlTeGUUfRdQvlQGkv+DYe3lLST3DionEwMA9YAYibY= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/kafka/configuration.go b/kafka/configuration.go new file mode 100644 index 0000000..70c9a32 --- /dev/null +++ b/kafka/configuration.go @@ -0,0 +1,93 @@ +/* +Copyright © 2020, 2023 Red Hat, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package kafka contains data types, interfaces, and methods related to +// Kafka that can be used to configure brokers, as well as consume/produce +// messages. +package kafka + +import ( + "crypto/sha512" + "strings" + "time" + + tlsutils "github.com/RedHatInsights/insights-operator-utils/tls" + "github.com/Shopify/sarama" + "github.com/rs/zerolog/log" +) + +// BrokerConfiguration represents configuration of Kafka broker +type BrokerConfiguration struct { + Address string `mapstructure:"address" toml:"address"` + SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` + CertPath string `mapstructure:"cert_path" toml:"cert_path"` + SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"` + SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"` + SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"` + Topic string `mapstructure:"topic" toml:"topic"` + Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` + Group string `mapstructure:"group" toml:"group"` + ClientID string `mapstructure:"client_id" toml:"client_id"` + Enabled bool `mapstructure:"enabled" toml:"enabled"` +} + +// SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters +func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.Version = sarama.V0_10_2_0 + + if cfg.Timeout > 0 { + saramaConfig.Net.DialTimeout = cfg.Timeout + saramaConfig.Net.ReadTimeout = cfg.Timeout + saramaConfig.Net.WriteTimeout = cfg.Timeout + } + + if strings.Contains(cfg.SecurityProtocol, "SSL") { + saramaConfig.Net.TLS.Enable = true + } + + if strings.EqualFold(cfg.SecurityProtocol, "SSL") && cfg.CertPath != "" { + tlsConfig, err := tlsutils.NewTLSConfig(cfg.CertPath) + if err != nil { + log.Error().Msgf("Unable to load TLS config for %s cert", cfg.CertPath) + return nil, err + } + saramaConfig.Net.TLS.Config = tlsConfig + } else if strings.HasPrefix(cfg.SecurityProtocol, "SASL_") { + log.Info().Msg("Configuring SASL authentication") + saramaConfig.Net.SASL.Enable = true + saramaConfig.Net.SASL.User = cfg.SaslUsername + saramaConfig.Net.SASL.Password = cfg.SaslPassword + saramaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(cfg.SaslMechanism) + + if strings.EqualFold(cfg.SaslMechanism, sarama.SASLTypeSCRAMSHA512) { + log.Info().Msg("Configuring SCRAM-SHA512") + saramaConfig.Net.SASL.Handshake = true + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &SCRAMClient{HashGeneratorFcn: sha512.New} + } + } + } + + // ClientID is fully optional, but by setting it, we can get rid of some warning messages in logs + if cfg.ClientID != "" { + // if not set, the "sarama" will be used instead + saramaConfig.ClientID = cfg.ClientID + } + + // now the config structure is filled-in + return saramaConfig, nil +} diff --git a/kafka/configuration_test.go b/kafka/configuration_test.go new file mode 100644 index 0000000..82954ea --- /dev/null +++ b/kafka/configuration_test.go @@ -0,0 +1,91 @@ +// Copyright 2022, 2023 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka_test + +import ( + "github.com/RedHatInsights/insights-operator-utils/kafka" + "github.com/RedHatInsights/insights-operator-utils/tests/helpers" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" +) + +func TestSaramaConfigFromBrokerConfig(t *testing.T) { + cfg := kafka.BrokerConfiguration{} + saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg) + helpers.FailOnError(t, err) + assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) + + cfg = kafka.BrokerConfiguration{ + Timeout: time.Second, + } + saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) + helpers.FailOnError(t, err) + assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) + assert.Equal(t, time.Second, saramaConfig.Net.DialTimeout) + assert.Equal(t, time.Second, saramaConfig.Net.ReadTimeout) + assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout) + assert.Equal(t, "sarama", saramaConfig.ClientID) // default value + + cfg = kafka.BrokerConfiguration{ + SecurityProtocol: "SSL", + } + + saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) + helpers.FailOnError(t, err) + assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) + assert.True(t, saramaConfig.Net.TLS.Enable) + + cfg = kafka.BrokerConfiguration{ + SecurityProtocol: "SASL_SSL", + SaslMechanism: "PLAIN", + SaslUsername: "username", + SaslPassword: "password", + ClientID: "foobarbaz", + } + saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) + helpers.FailOnError(t, err) + assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) + assert.True(t, saramaConfig.Net.TLS.Enable) + assert.True(t, saramaConfig.Net.SASL.Enable) + assert.Equal(t, sarama.SASLMechanism("PLAIN"), saramaConfig.Net.SASL.Mechanism) + assert.Equal(t, "username", saramaConfig.Net.SASL.User) + assert.Equal(t, "password", saramaConfig.Net.SASL.Password) + assert.Equal(t, "foobarbaz", saramaConfig.ClientID) + + cfg.SaslMechanism = "SCRAM-SHA-512" + saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg) + helpers.FailOnError(t, err) + assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version) + assert.True(t, saramaConfig.Net.TLS.Enable) + assert.True(t, saramaConfig.Net.SASL.Enable) + assert.Equal(t, sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512), saramaConfig.Net.SASL.Mechanism) + assert.Equal(t, "username", saramaConfig.Net.SASL.User) + assert.Equal(t, "password", saramaConfig.Net.SASL.Password) + assert.Equal(t, "foobarbaz", saramaConfig.ClientID) +} + +func TestBadConfiguration(t *testing.T) { + cfg := kafka.BrokerConfiguration{ + SecurityProtocol: "SSL", + CertPath: "missing_path", + } + + saramaCfg, err := kafka.SaramaConfigFromBrokerConfig(&cfg) + assert.Error(t, err) + assert.Nil(t, saramaCfg) +} diff --git a/kafka/scram_client.go b/kafka/scram_client.go new file mode 100644 index 0000000..dc3755f --- /dev/null +++ b/kafka/scram_client.go @@ -0,0 +1,46 @@ +// Copyright 2024 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import "github.com/xdg/scram" + +// SCRAMClient implementation for the SCRAM authentication +type SCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +// Begin prepares the client for the SCRAM exchange +func (x *SCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +// Step steps client through the SCRAM exchange +func (x *SCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +// Done should return true when the SCRAM conversation +// is over. +func (x *SCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/postgres/config.go b/postgres/config.go new file mode 100644 index 0000000..14bfd3d --- /dev/null +++ b/postgres/config.go @@ -0,0 +1,27 @@ +// Copyright 2024 Red Hat, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +// StorageConfiguration represents common configuration of data storage +type StorageConfiguration struct { + Driver string `mapstructure:"db_driver" toml:"db_driver"` + PGUsername string `mapstructure:"pg_username" toml:"pg_username"` + PGPassword string `mapstructure:"pg_password" toml:"pg_password"` + PGHost string `mapstructure:"pg_host" toml:"pg_host"` + PGPort int `mapstructure:"pg_port" toml:"pg_port"` + PGDBName string `mapstructure:"pg_db_name" toml:"pg_db_name"` + PGParams string `mapstructure:"pg_params" toml:"pg_params"` + LogSQLQueries bool `mapstructure:"log_sql_queries" toml:"log_sql_queries"` +}