Skip to content

Commit

Permalink
Fixed MSK ephem mode
Browse files Browse the repository at this point in the history
  • Loading branch information
psav committed Nov 13, 2023
1 parent 012b1da commit 395562c
Showing 1 changed file with 77 additions and 1 deletion.
78 changes: 77 additions & 1 deletion controllers/cloud.redhat.com/providers/kafka/msk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ import (
"strings"

crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
rc "github.com/RedHatInsights/rhc-osdk-utils/resourceCache"
"github.com/RedHatInsights/rhc-osdk-utils/utils"

"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/clowderconfig"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
core "k8s.io/api/core/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/types"
)

// KafkaManagedSecret is the resource ident for the MSK user secret object.
var KafkaManagedSecret = rc.NewMultiResourceIdent(ProvName, "kafka_managed_secret", &core.Secret{})

// KafkaConnectSecret is the resource ident for a MSK Connect secret object.
var KafkaConnectSecret = rc.NewMultiResourceIdent(ProvName, "kafka_connect_secret", &core.Secret{})

type mskProvider struct {
providers.Provider
}
Expand All @@ -28,6 +37,8 @@ func NewMSK(p *providers.Provider) (providers.ClowderProvider, error) {
CyndiConfigMap,
KafkaTopic,
KafkaConnect,
KafkaManagedSecret,
KafkaConnectSecret,
)
return &mskProvider{Provider: *p}, nil
}
Expand All @@ -36,7 +47,72 @@ func (s *mskProvider) EnvProvide() error {
s.Config = &config.AppConfig{
Kafka: &config.KafkaConfig{},
}
return s.configureBrokers()

if err := s.configureBrokers(); err != nil {
return err
}

namespaceList, err := s.Env.GetNamespacesInEnv(s.Ctx, s.Client)
if err != nil {
return err
}

for _, namespace := range namespaceList {
if err := s.copyManagedSecret(namespace); err != nil {
return err
}
if err := s.copyConnectSecret(namespace); err != nil {
return err
}
}

return nil
}

func (s *mskProvider) copyManagedSecret(namespace string) error {
srcSecretRef := types.NamespacedName{
Name: s.Env.Spec.Providers.Kafka.ManagedSecretRef.Name,
Namespace: s.Env.Status.TargetNamespace,
}
dstSecretRef := types.NamespacedName{
Name: srcSecretRef.Name,
Namespace: namespace,
}
sec, err := utils.CopySecret(s.Ctx, s.Client, srcSecretRef, dstSecretRef)

if err != nil {
return err
}

if err = s.Cache.Create(KafkaManagedSecret, dstSecretRef, sec); err != nil {
s.Log.Error(err, "Failed to add managed secret to cache")
return err
}
return nil
}

func (s *mskProvider) copyConnectSecret(namespace string) error {
secName := s.getConnectClusterUserName()

srcSecretRef := types.NamespacedName{
Name: secName,
Namespace: s.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace,
}
dstSecretRef := types.NamespacedName{
Name: secName,
Namespace: namespace,
}

sec, err := utils.CopySecret(s.Ctx, s.Client, srcSecretRef, dstSecretRef)
if err != nil {
return err
}

if err = s.Cache.Create(KafkaConnectSecret, dstSecretRef, sec); err != nil {
s.Log.Error(err, "Failed to add managed secret to cache")
return err
}
return nil
}

func (s *mskProvider) Provide(app *crd.ClowdApp) error {
Expand Down

0 comments on commit 395562c

Please sign in to comment.