diff --git a/controllers/cloud.redhat.com/providers/kafka/msk.go b/controllers/cloud.redhat.com/providers/kafka/msk.go index d5384b22e..62fe6b5ef 100644 --- a/controllers/cloud.redhat.com/providers/kafka/msk.go +++ b/controllers/cloud.redhat.com/providers/kafka/msk.go @@ -6,6 +6,8 @@ import ( "strings" crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1" + rc "github.com/RedHatInsights/rhc-osdk-utils/resourceCache" + "github.com/RedHatInsights/rhc-osdk-utils/utils" "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/clowderconfig" "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config" @@ -13,8 +15,15 @@ import ( "github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers" core "k8s.io/api/core/v1" apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/types" ) +// KafkaManagedSecret is the resource ident for the MSK user secret object. +var KafkaManagedSecret = rc.NewMultiResourceIdent(ProvName, "kafka_managed_secret", &core.Secret{}) + +// KafkaConnectSecret is the resource ident for a MSK Connect secret object. +var KafkaConnectSecret = rc.NewMultiResourceIdent(ProvName, "kafka_connect_secret", &core.Secret{}) + type mskProvider struct { providers.Provider } @@ -28,6 +37,8 @@ func NewMSK(p *providers.Provider) (providers.ClowderProvider, error) { CyndiConfigMap, KafkaTopic, KafkaConnect, + KafkaManagedSecret, + KafkaConnectSecret, ) return &mskProvider{Provider: *p}, nil } @@ -36,7 +47,72 @@ func (s *mskProvider) EnvProvide() error { s.Config = &config.AppConfig{ Kafka: &config.KafkaConfig{}, } - return s.configureBrokers() + + if err := s.configureBrokers(); err != nil { + return err + } + + namespaceList, err := s.Env.GetNamespacesInEnv(s.Ctx, s.Client) + if err != nil { + return err + } + + for _, namespace := range namespaceList { + if err := s.copyManagedSecret(namespace); err != nil { + return err + } + if err := s.copyConnectSecret(namespace); err != nil { + return err + } + } + + return nil +} + +func (s *mskProvider) copyManagedSecret(namespace string) error { + srcSecretRef := types.NamespacedName{ + Name: s.Env.Spec.Providers.Kafka.ManagedSecretRef.Name, + Namespace: s.Env.Status.TargetNamespace, + } + dstSecretRef := types.NamespacedName{ + Name: srcSecretRef.Name, + Namespace: namespace, + } + sec, err := utils.CopySecret(s.Ctx, s.Client, srcSecretRef, dstSecretRef) + + if err != nil { + return err + } + + if err = s.Cache.Create(KafkaManagedSecret, dstSecretRef, sec); err != nil { + s.Log.Error(err, "Failed to add managed secret to cache") + return err + } + return nil +} + +func (s *mskProvider) copyConnectSecret(namespace string) error { + secName := s.getConnectClusterUserName() + + srcSecretRef := types.NamespacedName{ + Name: secName, + Namespace: s.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace, + } + dstSecretRef := types.NamespacedName{ + Name: secName, + Namespace: namespace, + } + + sec, err := utils.CopySecret(s.Ctx, s.Client, srcSecretRef, dstSecretRef) + if err != nil { + return err + } + + if err = s.Cache.Create(KafkaConnectSecret, dstSecretRef, sec); err != nil { + s.Log.Error(err, "Failed to add managed secret to cache") + return err + } + return nil } func (s *mskProvider) Provide(app *crd.ClowdApp) error {