From bf823eb7823d803f575060ef6e6e6959fd4d8fc8 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Mon, 4 Dec 2023 13:48:40 +0200 Subject: [PATCH] kafka user certificates were refactored --- PROJECT | 12 + .../v1beta1/kafkauser_types.go | 29 -- .../v1beta1/kafkauser_webhook.go | 28 +- .../v1beta1/usercertificate_types.go | 122 +++++ .../v1beta1/usercertificate_webhook.go | 79 ++++ .../v1beta1/webhook_suite_test.go | 3 + .../v1beta1/zz_generated.deepcopy.go | 161 +++++-- ...management.instaclustr.com_kafkausers.yaml | 31 -- ...ment.instaclustr.com_usercertificates.yaml | 142 ++++++ config/crd/kustomization.yaml | 3 + ...n_in_kafkamanagement_usercertificates.yaml | 7 + ...k_in_kafkamanagement_usercertificates.yaml | 16 + ...anagement_usercertificate_editor_role.yaml | 31 ++ ...anagement_usercertificate_viewer_role.yaml | 27 ++ config/rbac/role.yaml | 26 ++ config/samples/clusters_v1beta1_kafka.yaml | 81 +++- .../kafkamanagement_v1beta1_kafkauser.yaml | 19 - ...fkamanagement_v1beta1_usercertificate.yaml | 35 ++ config/webhook/manifests.yaml | 20 + .../cassandrauser_controller.go | 3 +- controllers/clusterresources/helpers.go | 25 -- .../opensearchuser_controller.go | 5 +- .../postgresqluser_controller.go | 5 +- .../clusterresources/redisuser_controller.go | 5 +- controllers/clusters/kafka_controller.go | 2 +- .../kafkamanagement/kafkauser_controller.go | 423 +----------------- controllers/kafkamanagement/suite_test.go | 4 + .../usercertificate_controller.go | 377 ++++++++++++++++ main.go | 13 + pkg/helpers/utils/user_creds_from_secret.go | 30 ++ pkg/instaclustr/client.go | 12 +- pkg/instaclustr/interfaces.go | 4 +- pkg/instaclustr/mock/client.go | 4 +- pkg/models/kafka_user_apv2.go | 7 + pkg/models/operator.go | 9 +- 35 files changed, 1210 insertions(+), 590 deletions(-) create mode 100644 apis/kafkamanagement/v1beta1/usercertificate_types.go create mode 100644 apis/kafkamanagement/v1beta1/usercertificate_webhook.go create mode 100644 config/crd/bases/kafkamanagement.instaclustr.com_usercertificates.yaml create mode 100644 config/crd/patches/cainjection_in_kafkamanagement_usercertificates.yaml create mode 100644 config/crd/patches/webhook_in_kafkamanagement_usercertificates.yaml create mode 100644 config/rbac/kafkamanagement_usercertificate_editor_role.yaml create mode 100644 config/rbac/kafkamanagement_usercertificate_viewer_role.yaml create mode 100644 config/samples/kafkamanagement_v1beta1_usercertificate.yaml create mode 100644 controllers/kafkamanagement/usercertificate_controller.go create mode 100644 pkg/helpers/utils/user_creds_from_secret.go diff --git a/PROJECT b/PROJECT index 87253fe87..865a17cd8 100644 --- a/PROJECT +++ b/PROJECT @@ -360,4 +360,16 @@ resources: defaulting: true validation: true webhookVersion: v1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: instaclustr.com + group: kafkamanagement + kind: UserCertificate + path: github.com/instaclustr/operator/apis/kafkamanagement/v1beta1 + version: v1beta1 + webhooks: + validation: true + webhookVersion: v1 version: "3" diff --git a/apis/kafkamanagement/v1beta1/kafkauser_types.go b/apis/kafkamanagement/v1beta1/kafkauser_types.go index cc2586b6f..752991cb2 100644 --- a/apis/kafkamanagement/v1beta1/kafkauser_types.go +++ b/apis/kafkamanagement/v1beta1/kafkauser_types.go @@ -28,7 +28,6 @@ import ( // KafkaUserSpec defines the desired state of KafkaUser type KafkaUserSpec struct { SecretRef *v1beta1.SecretReference `json:"secretRef"` - CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"` InitialPermissions string `json:"initialPermissions"` OverrideExistingUser bool `json:"overrideExistingUser,omitempty"` SASLSCRAMMechanism string `json:"saslScramMechanism"` @@ -40,25 +39,6 @@ type KafkaUserStatus struct { ClustersEvents map[string]string `json:"clustersEvents,omitempty"` } -type Certificate struct { - ID string `json:"id,omitempty"` - ExpiryDate string `json:"expiryDate,omitempty"` - SignedCertificate string `json:"signedCertificate,omitempty"` -} - -type CertificateRequest struct { - SecretName string `json:"secretName"` - SecretNamespace string `json:"secretNamespace"` - ClusterID string `json:"clusterId"` - CSR string `json:"csr,omitempty"` - ValidPeriod int `json:"validPeriod"` - CommonName string `json:"commonName,omitempty"` - Country string `json:"country,omitempty"` - Organization string `json:"organization,omitempty"` - OrganizationalUnit string `json:"organizationalUnit,omitempty"` - AutoRenew bool `json:"autoRenew"` -} - //+kubebuilder:object:root=true //+kubebuilder:subresource:status @@ -135,12 +115,3 @@ func (ks *KafkaUserSpec) ToInstAPI(clusterID string, username string, password s Username: username, } } - -func (cr *CertificateRequest) ToInstAPI(username string) *models.CertificateRequest { - return &models.CertificateRequest{ - ClusterID: cr.ClusterID, - CSR: cr.CSR, - KafkaUsername: username, - ValidPeriod: cr.ValidPeriod, - } -} diff --git a/apis/kafkamanagement/v1beta1/kafkauser_webhook.go b/apis/kafkamanagement/v1beta1/kafkauser_webhook.go index a0c726618..f1bde2420 100644 --- a/apis/kafkamanagement/v1beta1/kafkauser_webhook.go +++ b/apis/kafkamanagement/v1beta1/kafkauser_webhook.go @@ -55,35 +55,27 @@ func (r *KafkaUser) Default() { var _ webhook.Validator = &KafkaUser{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type -func (r *KafkaUser) ValidateCreate() error { - kafkauserlog.Info("validate create", "name", r.Name) - - if len(r.Spec.CertificateRequests) != 0 { - return models.ErrNotEmptyCSRs - } +func (ku *KafkaUser) ValidateCreate() error { + kafkauserlog.Info("validate create", "name", ku.Name) return nil } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type -func (r *KafkaUser) ValidateUpdate(old runtime.Object) error { - kafkauserlog.Info("validate update", "name", r.Name) - - for _, request := range r.Spec.CertificateRequests { - if request.CSR == "" { - if request.Organization == "" || request.OrganizationalUnit == "" || request.Country == "" || request.CommonName == "" { - return models.ErrEmptyCertGeneratingFields - } - } +func (ku *KafkaUser) ValidateUpdate(old runtime.Object) error { + kafkauserlog.Info("validate update", "name", ku.Name) + + oldUser := old.(*KafkaUser) + if *ku.Spec.SecretRef != *oldUser.Spec.SecretRef { + return models.ErrImmutableSecretRef } return nil } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type -func (r *KafkaUser) ValidateDelete() error { - kafkauserlog.Info("validate delete", "name", r.Name) +func (ku *KafkaUser) ValidateDelete() error { + kafkauserlog.Info("validate delete", "name", ku.Name) - // TODO(user): fill in your validation logic upon object deletion. return nil } diff --git a/apis/kafkamanagement/v1beta1/usercertificate_types.go b/apis/kafkamanagement/v1beta1/usercertificate_types.go new file mode 100644 index 000000000..da1719aba --- /dev/null +++ b/apis/kafkamanagement/v1beta1/usercertificate_types.go @@ -0,0 +1,122 @@ +/* +Copyright 2022. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + "crypto/x509/pkix" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// UserCertificateSpec defines the desired state of UserCertificateSpec +type UserCertificateSpec struct { + // SecretRef references to the secret which stores pre-generated certificate request. + // +kubebuilder:validation:XValidation:message="Cannot be changed after it is set",rule="self == oldSelf" + SecretRef *FromSecret `json:"secretRef,omitempty"` + + // UserRef references to the KafkaUser resource to whom a certificate will be created. + // +kubebuilder:validation:XValidation:message="Cannot be changed after it is set",rule="self == oldSelf" + UserRef Reference `json:"userRef"` + + // ClusterRef references to the Kafka resource to whom a certificate will be created. + // +kubebuilder:validation:XValidation:message="Cannot be changed after it is set",rule="self == oldSelf" + ClusterRef Reference `json:"clusterRef"` + + // ValidPeriod is amount of month until a signed certificate is expired. + // +kubebuilder:validation:Min=3 + // +kubebuilder:validation:Max=120 + // +kubebuilder:validation:XValidation:message="Cannot be changed after it is set",rule="self == oldSelf" + ValidPeriod int `json:"validPeriod"` + + // CertificateRequestTemplate is a template for generating a CSR. + // +kubebuilder:validation:XValidation:message="Cannot be changed after it is set",rule="self == oldSelf" + CertificateRequestTemplate *CSRTemplate `json:"certificateRequestTemplate,omitempty"` +} + +type CSRTemplate struct { + Country string `json:"country"` + Organization string `json:"organization"` + OrganizationalUnit string `json:"organizationalUnit"` +} + +func (c *CSRTemplate) ToSubject() pkix.Name { + return pkix.Name{ + Country: []string{c.Country}, + Organization: []string{c.Organization}, + OrganizationalUnit: []string{c.OrganizationalUnit}, + } +} + +type FromSecret struct { + Reference `json:",inline"` + Key string `json:"key"` +} + +type Reference struct { + Name string `json:"name"` + Namespace string `json:"namespace"` +} + +func (r *Reference) AsNamespacedName() types.NamespacedName { + return types.NamespacedName{ + Name: r.Name, + Namespace: r.Namespace, + } +} + +// UserCertificateStatus defines the observed state of UserCertificateStatus +type UserCertificateStatus struct { + // CertID is a unique identifier of a certificate on Instaclustr. + CertID string `json:"certId,omitempty"` + + // ExpiryDate is a date when a signed certificate is expired. + ExpiryDate string `json:"expiryDate,omitempty"` + + // SignedCertSecretRef references to a secret which stores signed cert. + SignedCertSecretRef Reference `json:"signedCertSecretRef,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status + +// UserCertificate is the Schema for the usercertificates API +type UserCertificate struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec UserCertificateSpec `json:"spec,omitempty"` + Status UserCertificateStatus `json:"status,omitempty"` +} + +func (csr *UserCertificate) NewPatch() client.Patch { + return client.MergeFrom(csr.DeepCopy()) +} + +//+kubebuilder:object:root=true + +// UserCertificateList contains a list of UserCertificate +type UserCertificateList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []UserCertificate `json:"items"` +} + +func init() { + SchemeBuilder.Register(&UserCertificate{}, &UserCertificateList{}) +} diff --git a/apis/kafkamanagement/v1beta1/usercertificate_webhook.go b/apis/kafkamanagement/v1beta1/usercertificate_webhook.go new file mode 100644 index 000000000..e49030d3e --- /dev/null +++ b/apis/kafkamanagement/v1beta1/usercertificate_webhook.go @@ -0,0 +1,79 @@ +/* +Copyright 2022. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + "errors" + + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +// log is for logging in this package. +var usercertificatelog = logf.Log.WithName("usercertificate-resource") + +func (r *UserCertificate) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(r). + Complete() +} + +//+kubebuilder:webhook:path=/validate-kafkamanagement-instaclustr-com-v1beta1-usercertificate,mutating=false,failurePolicy=fail,sideEffects=None,groups=kafkamanagement.instaclustr.com,resources=usercertificates,verbs=create;update,versions=v1beta1,name=vusercertificate.kb.io,admissionReviewVersions=v1 + +var _ webhook.Validator = &UserCertificate{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (cert *UserCertificate) ValidateCreate() error { + usercertificatelog.Info("validate create", "name", cert.Name) + + if cert.Spec.SecretRef == nil && cert.Spec.CertificateRequestTemplate == nil { + return errors.New("one of the following fields should be set: spec.secretRef, spec.generateCSR") + } + + if cert.Spec.SecretRef != nil && cert.Spec.CertificateRequestTemplate != nil { + return errors.New("only one of the following fields can be set: spec.secretRef, spec.generateCSR") + } + + return nil +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (cert *UserCertificate) ValidateUpdate(old runtime.Object) error { + usercertificatelog.Info("validate update", "name", cert.Name) + + oldCert := old.(*UserCertificate) + + if oldCert.Spec.SecretRef != nil && cert.Spec.SecretRef == nil { + return errors.New("spec.secretRef cannot be removed after it is set") + } + + if oldCert.Spec.CertificateRequestTemplate != nil && cert.Spec.CertificateRequestTemplate == nil { + return errors.New("spec.certificateRequestTemplate cannot be removed after it is set") + } + + return nil +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (cert *UserCertificate) ValidateDelete() error { + usercertificatelog.Info("validate delete", "name", cert.Name) + + // TODO(user): fill in your validation logic upon object deletion. + return nil +} diff --git a/apis/kafkamanagement/v1beta1/webhook_suite_test.go b/apis/kafkamanagement/v1beta1/webhook_suite_test.go index e76029684..dfd087c3b 100644 --- a/apis/kafkamanagement/v1beta1/webhook_suite_test.go +++ b/apis/kafkamanagement/v1beta1/webhook_suite_test.go @@ -108,6 +108,9 @@ var _ = BeforeSuite(func() { err = (&KafkaUser{}).SetupWebhookWithManager(mgr) Expect(err).NotTo(HaveOccurred()) + err = (&UserCertificate{}).SetupWebhookWithManager(mgr) + Expect(err).NotTo(HaveOccurred()) + //+kubebuilder:scaffold:webhook go func() { diff --git a/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go b/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go index b0731c9d1..917eab1bd 100644 --- a/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go +++ b/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go @@ -42,61 +42,62 @@ func (in *ACL) DeepCopy() *ACL { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Certificate) DeepCopyInto(out *Certificate) { +func (in *CSRTemplate) DeepCopyInto(out *CSRTemplate) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Certificate. -func (in *Certificate) DeepCopy() *Certificate { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CSRTemplate. +func (in *CSRTemplate) DeepCopy() *CSRTemplate { if in == nil { return nil } - out := new(Certificate) + out := new(CSRTemplate) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *CertificateRequest) DeepCopyInto(out *CertificateRequest) { +func (in *Connector) DeepCopyInto(out *Connector) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CertificateRequest. -func (in *CertificateRequest) DeepCopy() *CertificateRequest { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Connector. +func (in *Connector) DeepCopy() *Connector { if in == nil { return nil } - out := new(CertificateRequest) + out := new(Connector) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Connector) DeepCopyInto(out *Connector) { +func (in *ExternalCluster) DeepCopyInto(out *ExternalCluster) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Connector. -func (in *Connector) DeepCopy() *Connector { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExternalCluster. +func (in *ExternalCluster) DeepCopy() *ExternalCluster { if in == nil { return nil } - out := new(Connector) + out := new(ExternalCluster) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ExternalCluster) DeepCopyInto(out *ExternalCluster) { +func (in *FromSecret) DeepCopyInto(out *FromSecret) { *out = *in + out.Reference = in.Reference } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExternalCluster. -func (in *ExternalCluster) DeepCopy() *ExternalCluster { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FromSecret. +func (in *FromSecret) DeepCopy() *FromSecret { if in == nil { return nil } - out := new(ExternalCluster) + out := new(FromSecret) in.DeepCopyInto(out) return out } @@ -262,17 +263,6 @@ func (in *KafkaUserSpec) DeepCopyInto(out *KafkaUserSpec) { *out = new(clusterresourcesv1beta1.SecretReference) **out = **in } - if in.CertificateRequests != nil { - in, out := &in.CertificateRequests, &out.CertificateRequests - *out = make([]*CertificateRequest, len(*in)) - for i := range *in { - if (*in)[i] != nil { - in, out := &(*in)[i], &(*out)[i] - *out = new(CertificateRequest) - **out = **in - } - } - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaUserSpec. @@ -461,6 +451,21 @@ func (in *MirroredTopic) DeepCopy() *MirroredTopic { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Reference) DeepCopyInto(out *Reference) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Reference. +func (in *Reference) DeepCopy() *Reference { + if in == nil { + return nil + } + out := new(Reference) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SourceCluster) DeepCopyInto(out *SourceCluster) { *out = *in @@ -600,3 +605,105 @@ func (in *TopicStatus) DeepCopy() *TopicStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UserCertificate) DeepCopyInto(out *UserCertificate) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserCertificate. +func (in *UserCertificate) DeepCopy() *UserCertificate { + if in == nil { + return nil + } + out := new(UserCertificate) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *UserCertificate) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UserCertificateList) DeepCopyInto(out *UserCertificateList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]UserCertificate, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserCertificateList. +func (in *UserCertificateList) DeepCopy() *UserCertificateList { + if in == nil { + return nil + } + out := new(UserCertificateList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *UserCertificateList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UserCertificateSpec) DeepCopyInto(out *UserCertificateSpec) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(FromSecret) + **out = **in + } + out.UserRef = in.UserRef + out.ClusterRef = in.ClusterRef + if in.CertificateRequestTemplate != nil { + in, out := &in.CertificateRequestTemplate, &out.CertificateRequestTemplate + *out = new(CSRTemplate) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserCertificateSpec. +func (in *UserCertificateSpec) DeepCopy() *UserCertificateSpec { + if in == nil { + return nil + } + out := new(UserCertificateSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UserCertificateStatus) DeepCopyInto(out *UserCertificateStatus) { + *out = *in + out.SignedCertSecretRef = in.SignedCertSecretRef +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserCertificateStatus. +func (in *UserCertificateStatus) DeepCopy() *UserCertificateStatus { + if in == nil { + return nil + } + out := new(UserCertificateStatus) + in.DeepCopyInto(out) + return out +} diff --git a/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml b/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml index 7cb6d462f..9205268e4 100644 --- a/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml +++ b/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml @@ -37,37 +37,6 @@ spec: properties: authMechanism: type: string - certificateRequests: - items: - properties: - autoRenew: - type: boolean - clusterId: - type: string - commonName: - type: string - country: - type: string - csr: - type: string - organization: - type: string - organizationalUnit: - type: string - secretName: - type: string - secretNamespace: - type: string - validPeriod: - type: integer - required: - - autoRenew - - clusterId - - secretName - - secretNamespace - - validPeriod - type: object - type: array initialPermissions: type: string overrideExistingUser: diff --git a/config/crd/bases/kafkamanagement.instaclustr.com_usercertificates.yaml b/config/crd/bases/kafkamanagement.instaclustr.com_usercertificates.yaml new file mode 100644 index 000000000..babee6068 --- /dev/null +++ b/config/crd/bases/kafkamanagement.instaclustr.com_usercertificates.yaml @@ -0,0 +1,142 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.9.2 + creationTimestamp: null + name: usercertificates.kafkamanagement.instaclustr.com +spec: + group: kafkamanagement.instaclustr.com + names: + kind: UserCertificate + listKind: UserCertificateList + plural: usercertificates + singular: usercertificate + scope: Namespaced + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + description: UserCertificate is the Schema for the usercertificates API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: UserCertificateSpec defines the desired state of UserCertificateSpec + properties: + certificateRequestTemplate: + description: CertificateRequestTemplate is a template for generating + a CSR. + properties: + country: + type: string + organization: + type: string + organizationalUnit: + type: string + required: + - country + - organization + - organizationalUnit + type: object + x-kubernetes-validations: + - message: Cannot be changed after it is set + rule: self == oldSelf + clusterRef: + description: ClusterRef references to the Kafka resource to whom a + certificate will be created. + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object + x-kubernetes-validations: + - message: Cannot be changed after it is set + rule: self == oldSelf + secretRef: + description: SecretRef references to the secret which stores pre-generated + certificate request. + properties: + key: + type: string + name: + type: string + namespace: + type: string + required: + - key + - name + - namespace + type: object + x-kubernetes-validations: + - message: Cannot be changed after it is set + rule: self == oldSelf + userRef: + description: UserRef references to the KafkaUser resource to whom + a certificate will be created. + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object + x-kubernetes-validations: + - message: Cannot be changed after it is set + rule: self == oldSelf + validPeriod: + description: ValidPeriod is amount of month until a signed certificate + is expired. + type: integer + x-kubernetes-validations: + - message: Cannot be changed after it is set + rule: self == oldSelf + required: + - clusterRef + - userRef + - validPeriod + type: object + status: + description: UserCertificateStatus defines the observed state of UserCertificateStatus + properties: + certId: + description: CertID is a unique identifier of a certificate on Instaclustr. + type: string + expiryDate: + description: ExpiryDate is a date when a signed certificate is expired. + type: string + signedCertSecretRef: + description: SignedCertSecretRef references to a secret which stores + signed cert. + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 30c40a399..afdb16191 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -30,6 +30,7 @@ resources: - bases/clusterresources.instaclustr.com_exclusionwindows.yaml - bases/clusterresources.instaclustr.com_postgresqlusers.yaml - bases/clusterresources.instaclustr.com_opensearchegressrules.yaml +- bases/kafkamanagement.instaclustr.com_usercertificates.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -62,6 +63,7 @@ patchesStrategicMerge: #- patches/webhook_in_exclusionwindows.yaml #- patches/webhook_in_postgresqlusers.yaml #- patches/webhook_in_opensearchegressrules.yaml +#- patches/webhook_in_usercertificates.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. @@ -97,6 +99,7 @@ patchesStrategicMerge: #- patches/cainjection_in_exclusionwindows.yaml #- patches/cainjection_in_postgresqlusers.yaml #- patches/cainjection_in_opensearchegressrules.yaml +#- patches/cainjection_in_usercertificates.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/crd/patches/cainjection_in_kafkamanagement_usercertificates.yaml b/config/crd/patches/cainjection_in_kafkamanagement_usercertificates.yaml new file mode 100644 index 000000000..0536c4ef6 --- /dev/null +++ b/config/crd/patches/cainjection_in_kafkamanagement_usercertificates.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: usercertificates.kafkamanagement.instaclustr.com diff --git a/config/crd/patches/webhook_in_kafkamanagement_usercertificates.yaml b/config/crd/patches/webhook_in_kafkamanagement_usercertificates.yaml new file mode 100644 index 000000000..42187d309 --- /dev/null +++ b/config/crd/patches/webhook_in_kafkamanagement_usercertificates.yaml @@ -0,0 +1,16 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: usercertificates.kafkamanagement.instaclustr.com +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert + conversionReviewVersions: + - v1 diff --git a/config/rbac/kafkamanagement_usercertificate_editor_role.yaml b/config/rbac/kafkamanagement_usercertificate_editor_role.yaml new file mode 100644 index 000000000..a05fa446d --- /dev/null +++ b/config/rbac/kafkamanagement_usercertificate_editor_role.yaml @@ -0,0 +1,31 @@ +# permissions for end users to edit usercertificates. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: usercertificate-editor-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: operator + app.kubernetes.io/part-of: operator + app.kubernetes.io/managed-by: kustomize + name: usercertificate-editor-role +rules: +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates/status + verbs: + - get diff --git a/config/rbac/kafkamanagement_usercertificate_viewer_role.yaml b/config/rbac/kafkamanagement_usercertificate_viewer_role.yaml new file mode 100644 index 000000000..23d47d78c --- /dev/null +++ b/config/rbac/kafkamanagement_usercertificate_viewer_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to view usercertificates. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: usercertificate-viewer-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: operator + app.kubernetes.io/part-of: operator + app.kubernetes.io/managed-by: kustomize + name: usercertificate-viewer-role +rules: +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates + verbs: + - get + - list + - watch +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates/status + verbs: + - get diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 167097051..b9a09ca93 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -862,3 +862,29 @@ rules: - patch - update - watch +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates/finalizers + verbs: + - update +- apiGroups: + - kafkamanagement.instaclustr.com + resources: + - usercertificates/status + verbs: + - get + - patch + - update diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index 6b6c632f4..afea89747 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -2,8 +2,10 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Kafka metadata: name: kafka +# name: kafka-2 spec: - name: "Kafka-example" + name: "bohdan-kafka-test" +# name: "bohdan-kafka-test-2" version: "3.3.1" pciCompliance: false replicationFactor: 3 @@ -14,7 +16,7 @@ spec: privateNetworkCluster: false slaTier: "NON_PRODUCTION" # bundledUseOnly: true -# clientBrokerAuthWithMtls: true + clientBrokerAuthWithMtls: true # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 @@ -58,9 +60,78 @@ spec: # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 -# userRefs: -# - name: kafkauser-sample -# namespace: default + userRefs: + - name: kafkauser-sample + namespace: default + resizeSettings: + - notifySupportContacts: false + concurrency: 1 +--- +apiVersion: clusters.instaclustr.com/v1beta1 +kind: Kafka +metadata: +# name: kafka + name: kafka-2 +spec: +# name: "bohdan-kafka-test" + name: "bohdan-kafka-test-2" + version: "3.3.1" + pciCompliance: false + replicationFactor: 3 + partitionsNumber: 3 + allowDeleteTopics: true + autoCreateTopics: true + clientToClusterEncryption: false + privateNetworkCluster: false + slaTier: "NON_PRODUCTION" + # bundledUseOnly: true + clientBrokerAuthWithMtls: true + # dedicatedZookeeper: + # - nodeSize: "KDZ-DEV-t4g.small-30" + # nodesNumber: 3 + # twoFactorDelete: + # - email: "asdfadfsdsf" + # phone: "ddsafasdf" + # karapaceSchemaRegistry: + # - version: "3.2.0" + # schemaRegistry: + # - version: "5.0.0" + # karapaceRestProxy: + # - integrateRestProxyWithSchemaRegistry: true + # version: "3.2.0" + # kraft: + # - controllerNodeCount: 3 + # restProxy: + # - integrateRestProxyWithSchemaRegistry: false + # schemaRegistryPassword: "asdfasdf" + # schemaRegistryServerUrl: "schemaRegistryServerUrl" + # "useLocalSchemaRegistry": true + # version: "5.0.0" + dataCentres: + - name: "AWS_VPC_US_EAST_1" + nodesNumber: 3 + cloudProvider: "AWS_VPC" + tags: + tag: "oneTag" + tag2: "twoTags" + nodeSize: "KFK-DEV-t4g.small-5" + # nodeSize: "KFK-PRD-r6g.large-250" + # nodeSize: "KFK-DEV-t4g.medium-80" + network: "10.0.0.0/16" + region: "US_EAST_1" + # accountName: "Custrom" + # cloudProviderSettings: + # - customVirtualNetworkId: "vpc-12345678" + # diskEncryptionKey: "123e4567-e89b-12d3-a456-426614174000" + # resourceGroup: "asdfadfsdfas" + # privateLink: + # - advertisedHostname: "kafka-example-test.com" + # dedicatedZookeeper: + # - nodeSize: "KDZ-DEV-t4g.small-30" + # nodesNumber: 3 + userRefs: + - name: kafkauser-sample + namespace: default resizeSettings: - notifySupportContacts: false concurrency: 1 \ No newline at end of file diff --git a/config/samples/kafkamanagement_v1beta1_kafkauser.yaml b/config/samples/kafkamanagement_v1beta1_kafkauser.yaml index 74d632245..bd33fa354 100644 --- a/config/samples/kafkamanagement_v1beta1_kafkauser.yaml +++ b/config/samples/kafkamanagement_v1beta1_kafkauser.yaml @@ -14,25 +14,6 @@ spec: secretRef: name: "secret-test" namespace: "default" -# certificateRequests: -# - secretName: "cert-secret-test" -# secretNamespace: "default" -# clusterId: "ef324cb6-01a5-4d77-8867-9cf794312d43" -# commonName: "Sanch-two" -# country: "VN" -# organization: "Instaclustr" -# organizationalUnit: "IT" -# validPeriod: 6 -# autoRenew: true -# - secretName: "cert-secret-test-three" -# secretNamespace: "default" -# clusterId: "ef324cb6-01a5-4d77-8867-9cf794312d43" -# commonName: "Sanch-two" -# country: "VN" -# organization: "Instaclustr" -# organizationalUnit: "IT" -# validPeriod: 6 -# autoRenew: false initialPermissions: "standard" overrideExistingUser: true saslScramMechanism: "SCRAM-SHA-256" diff --git a/config/samples/kafkamanagement_v1beta1_usercertificate.yaml b/config/samples/kafkamanagement_v1beta1_usercertificate.yaml new file mode 100644 index 000000000..0ac7401e1 --- /dev/null +++ b/config/samples/kafkamanagement_v1beta1_usercertificate.yaml @@ -0,0 +1,35 @@ +# sample with providing own generated csr +apiVersion: kafkamanagement.instaclustr.com/v1beta1 +kind: UserCertificate +metadata: + name: user-cert-from-secret-sample +spec: + clusterRef: + name: kafka + namespace: default + userRef: + name: kafkauser-sample + namespace: default + secretRef: + name: csr-2 + namespace: default + key: ssl-CSR-2.csr + validPeriod: 3 +--- +# sample with generating csr in place +apiVersion: kafkamanagement.instaclustr.com/v1beta1 +kind: UserCertificate +metadata: + name: user-cert-from-template-sample +spec: + clusterRef: + name: kafka + namespace: default + userRef: + name: kafkauser-sample + namespace: default + certificateRequestTemplate: + country: Ukraine + organization: Instaclustr + organizationalUnit: IC + validPeriod: 3 \ No newline at end of file diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 0e126171b..5f3224c53 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -932,3 +932,23 @@ webhooks: resources: - kafkausers sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-kafkamanagement-instaclustr-com-v1beta1-usercertificate + failurePolicy: Fail + name: vusercertificate.kb.io + rules: + - apiGroups: + - kafkamanagement.instaclustr.com + apiVersions: + - v1beta1 + operations: + - CREATE + - UPDATE + resources: + - usercertificates + sideEffects: None diff --git a/controllers/clusterresources/cassandrauser_controller.go b/controllers/clusterresources/cassandrauser_controller.go index 48c0184e7..a4ff4d065 100644 --- a/controllers/clusterresources/cassandrauser_controller.go +++ b/controllers/clusterresources/cassandrauser_controller.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" + "github.com/instaclustr/operator/pkg/helpers/utils" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" "github.com/instaclustr/operator/pkg/ratelimiter" @@ -90,7 +91,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - username, password, err := getUserCreds(s) + username, password, err := utils.GetUserCreds(s) if err != nil { l.Error(err, "Cannot get the Cassandra user credentials from the secret", "secret name", s.Name, diff --git a/controllers/clusterresources/helpers.go b/controllers/clusterresources/helpers.go index 799ae9bae..9a764ac86 100644 --- a/controllers/clusterresources/helpers.go +++ b/controllers/clusterresources/helpers.go @@ -17,14 +17,10 @@ limitations under the License. package clusterresources import ( - "strings" - - k8sCore "k8s.io/api/core/v1" "k8s.io/utils/strings/slices" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" - "github.com/instaclustr/operator/pkg/models" ) func areFirewallRuleStatusesEqual(a, b *v1beta1.FirewallRuleStatus) bool { @@ -78,27 +74,6 @@ func CheckIfUserExistsOnInstaclustrAPI(username, clusterID, app string, api inst return slices.Contains(users, username), nil } -func getUserCreds(secret *k8sCore.Secret) (username, password string, err error) { - password = string(secret.Data[models.Password]) - username = string(secret.Data[models.Username]) - - if len(username) == 0 || len(password) == 0 { - return "", "", models.ErrMissingSecretKeys - } - - newLineSuffix := "\n" - - if strings.HasSuffix(username, newLineSuffix) { - username = strings.TrimRight(username, newLineSuffix) - } - - if strings.HasSuffix(password, newLineSuffix) { - password = strings.TrimRight(password, newLineSuffix) - } - - return username, password, nil -} - func subnetsEqual(subnets1, subnets2 []string) bool { if len(subnets1) != len(subnets2) { return false diff --git a/controllers/clusterresources/opensearchuser_controller.go b/controllers/clusterresources/opensearchuser_controller.go index 19a1dabf2..2037ada4f 100644 --- a/controllers/clusterresources/opensearchuser_controller.go +++ b/controllers/clusterresources/opensearchuser_controller.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" + "github.com/instaclustr/operator/pkg/helpers/utils" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" "github.com/instaclustr/operator/pkg/ratelimiter" @@ -197,7 +198,7 @@ func (r *OpenSearchUserReconciler) createUser( secret *k8sCore.Secret, logger logr.Logger, ) error { - username, password, err := getUserCreds(secret) + username, password, err := utils.GetUserCreds(secret) if err != nil { logger.Error(err, "Cannot get user's credentials during creating user on the cluster") r.EventRecorder.Eventf( @@ -267,7 +268,7 @@ func (r *OpenSearchUserReconciler) deleteUser( secret *k8sCore.Secret, logger logr.Logger, ) error { - username, _, err := getUserCreds(secret) + username, _, err := utils.GetUserCreds(secret) if err != nil { logger.Error(err, "Cannot get user's credentials during deleting") r.EventRecorder.Eventf( diff --git a/controllers/clusterresources/postgresqluser_controller.go b/controllers/clusterresources/postgresqluser_controller.go index 71ac7aaa0..dba31d4f3 100644 --- a/controllers/clusterresources/postgresqluser_controller.go +++ b/controllers/clusterresources/postgresqluser_controller.go @@ -35,6 +35,7 @@ import ( clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/exposeservice" + "github.com/instaclustr/operator/pkg/helpers/utils" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" "github.com/instaclustr/operator/pkg/ratelimiter" @@ -90,7 +91,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, err } - newUsername, newPassword, err := getUserCreds(s) + newUsername, newPassword, err := utils.GetUserCreds(s) if err != nil { l.Error(err, "Cannot get the PostgreSQL user credentials from the secret", "secret name", s.Name, @@ -373,7 +374,7 @@ func (r *PostgreSQLUserReconciler) getDefaultPostgreSQLUserCreds( return nil, "", fmt.Errorf("cannot get default PostgreSQL user secret, user reference: %v, err: %w", defaultUserSecretNamespacedName, err) } - defaultUsername, defaultPassword, err := getUserCreds(defaultUserSecret) + defaultUsername, defaultPassword, err := utils.GetUserCreds(defaultUserSecret) if err != nil { return nil, "", fmt.Errorf("cannot get default PostgreSQL user credentials, user reference: %v, err: %w", defaultUserSecretNamespacedName, err) } diff --git a/controllers/clusterresources/redisuser_controller.go b/controllers/clusterresources/redisuser_controller.go index 83d11f7f9..e697c57f3 100644 --- a/controllers/clusterresources/redisuser_controller.go +++ b/controllers/clusterresources/redisuser_controller.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" + "github.com/instaclustr/operator/pkg/helpers/utils" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" "github.com/instaclustr/operator/pkg/ratelimiter" @@ -101,7 +102,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - username, password, err := getUserCreds(secret) + username, password, err := utils.GetUserCreds(secret) if err != nil { l.Error(err, "Cannot get user credentials", "user", username) r.EventRecorder.Eventf(user, models.Warning, models.CreatingEvent, @@ -329,7 +330,7 @@ func (r *RedisUserReconciler) handleDeleteUser( s *k8sCore.Secret, u *v1beta1.RedisUser, ) error { - username, _, err := getUserCreds(s) + username, _, err := utils.GetUserCreds(s) if err != nil { l.Error(err, "Cannot get user credentials", "user", u.Name) r.EventRecorder.Eventf(u, models.Warning, models.CreatingEvent, diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index 9acd59b04..a7ec26e3a 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -182,7 +182,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, k *v1beta1.Ka "Cluster status check job is started", ) - if k.Spec.UserRefs != nil { + if k.Spec.UserRefs != nil && k.Status.AvailableUsers == nil { err = r.startUsersCreationJob(k) if err != nil { l.Error(err, "Failed to start user creation job") diff --git a/controllers/kafkamanagement/kafkauser_controller.go b/controllers/kafkamanagement/kafkauser_controller.go index c69c29388..647dff490 100644 --- a/controllers/kafkamanagement/kafkauser_controller.go +++ b/controllers/kafkamanagement/kafkauser_controller.go @@ -18,14 +18,7 @@ package kafkamanagement import ( "context" - "crypto/rand" - "crypto/rsa" - "crypto/x509" - "crypto/x509/pkix" - "encoding/pem" - "strings" - - "github.com/go-logr/logr" + v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" @@ -33,14 +26,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" @@ -241,7 +231,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( ) r.EventRecorder.Eventf(user, models.Normal, models.Deleted, - "user has been deleted for a cluster, username: %s, clusterID: %s.", + "User has been deleted for a cluster, username: %s, clusterID: %s.", username, clusterID) continue @@ -396,17 +386,8 @@ func (r *KafkaUserReconciler) SetupWithManager(mgr ctrl.Manager) error { ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay, ), }). - For(&v1beta1.KafkaUser{}, builder.WithPredicates(predicate.Funcs{ - UpdateFunc: func(event event.UpdateEvent) bool { - newObj := event.ObjectNew.(*v1beta1.KafkaUser) - oldObj := event.ObjectOld.(*v1beta1.KafkaUser) - if newObj.Generation != event.ObjectOld.GetGeneration() { - r.handleCertificateEvent(newObj, oldObj.Spec.CertificateRequests) - } - - return true - }, - })).Owns(&v1.Secret{}). + For(&v1beta1.KafkaUser{}). + Owns(&v1.Secret{}). Watches( &source.Kind{Type: &v1.Secret{}}, handler.EnqueueRequestsFromMapFunc(r.findSecretObjects), @@ -471,399 +452,3 @@ func (r *KafkaUserReconciler) getKafkaUserCredsFromSecret( return string(username[:len(username)-1]), string(password[:len(password)-1]), nil } - -func (r *KafkaUserReconciler) getKafkaUserCertIDFromSecret( - ctx context.Context, - certRequest *v1beta1.CertificateRequest, -) (string, error) { - kafkaUserCertSecret := &v1.Secret{} - kafkaUserCertSecretNamespacedName := types.NamespacedName{ - Name: certRequest.SecretName, - Namespace: certRequest.SecretNamespace, - } - - err := r.Get(ctx, kafkaUserCertSecretNamespacedName, kafkaUserCertSecret) - if err != nil { - return "", err - } - - certID := kafkaUserCertSecret.Data["id"] - - if len(certID) == 0 { - return "", models.ErrMissingSecretKeys - } - - return string(certID), nil -} - -func (r *KafkaUserReconciler) GenerateCSR(certRequest *v1beta1.CertificateRequest) (string, error) { - keyBytes, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - return "", err - } - - subj := pkix.Name{ - CommonName: certRequest.CommonName, - Country: []string{certRequest.Country}, - Organization: []string{certRequest.Organization}, - OrganizationalUnit: []string{certRequest.OrganizationalUnit}, - } - - template := x509.CertificateRequest{ - Subject: subj, - SignatureAlgorithm: x509.SHA256WithRSA, - } - - csrBytes, err := x509.CreateCertificateRequest(rand.Reader, &template, keyBytes) - if err != nil { - return "", err - } - strBuf := strings.Builder{} - err = pem.Encode(&strBuf, &pem.Block{Type: "NEW CERTIFICATE REQUEST", Bytes: csrBytes}) - if err != nil { - return "", err - } - - return strBuf.String(), nil -} - -func (r *KafkaUserReconciler) UpdateCertSecret(ctx context.Context, secret *v1.Secret, certResp *v1beta1.Certificate) error { - secret.StringData = make(map[string]string) - - secret.StringData["id"] = certResp.ID - secret.StringData["expiryDate"] = certResp.ExpiryDate - secret.StringData["signedCertificate"] = certResp.SignedCertificate - - err := r.Update(ctx, secret) - if err != nil { - return err - } - - return nil -} - -func (r *KafkaUserReconciler) handleCertificateEvent( - newObj *v1beta1.KafkaUser, - oldCerts []*v1beta1.CertificateRequest, -) { - ctx := context.TODO() - l := log.FromContext(ctx) - - for _, oldCert := range oldCerts { - var exist bool - for _, newCert := range newObj.Spec.CertificateRequests { - if *oldCert == *newCert { - exist = true - break - } - } - - if exist { - continue - } - - err := r.handleDeleteCertificate(ctx, newObj, l, oldCert) - if err != nil { - l.Error(err, "Cannot delete Kafka user mTLS certificate", "user", oldCert) - r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, - "Cannot delete mTlS certificate. Reason: %v", err) - } - } - - for _, newCert := range newObj.Spec.CertificateRequests { - var exist bool - for _, oldCert := range oldCerts { - if *newCert == *oldCert { - exist = true - break - } - } - - if exist { - if newCert.AutoRenew { - err := r.handleRenewCertificate(ctx, newObj, newCert, l) - if err != nil { - l.Error(err, "Cannot renew Kafka user mTLS certificate", "cert", newCert) - r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, - "Cannot renew user mTLS certificate. Reason: %v", err) - } - } - continue - } - - err := r.handleCreateCertificate(ctx, newObj, l, newCert) - if err != nil { - l.Error(err, "Cannot create Kafka user mTLS certificate", "cert", newCert) - r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, - "Cannot create user mTLS certificate. Reason: %v", err) - } - - oldCerts = append(oldCerts, newCert) - } -} - -func (r *KafkaUserReconciler) handleCreateCertificate(ctx context.Context, user *v1beta1.KafkaUser, l logr.Logger, certRequest *v1beta1.CertificateRequest) error { - username, _, err := r.getKafkaUserCredsFromSecret(user.Spec) - if err != nil { - l.Error( - err, "Cannot get Kafka user creds from secret", - "kafka user spec", user.Spec, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.FetchFailed, - "Fetch user credentials from secret is failed. Reason: %v", - err, - ) - - return err - } - - var isCSRGenerated bool - - if certRequest.CSR == "" { - certRequest.CSR, err = r.GenerateCSR(certRequest) - if err != nil { - l.Error(err, "Cannot generate CSR for Kafka user certificate creation", - "user", user.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.GenerateFailed, - "Generate CSR is failed. Reason: %v", - err, - ) - - return err - } - isCSRGenerated = true - } - - certResponse, err := r.API.CreateKafkaUserCertificate(certRequest.ToInstAPI(username)) - if err != nil { - l.Error(err, "Cannot create Kafka user mTLS certificate", - "user", user.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.CreationFailed, - "Certificate creation is failed. Reason: %v", - err, - ) - - return err - } - - newSecret := user.NewCertificateSecret(certRequest.SecretName, certRequest.SecretNamespace) - err = r.Client.Create(ctx, newSecret) - if err != nil { - l.Error(err, "Cannot create Kafka user Cert Secret.", - "secret name", certRequest.SecretName, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.CreationFailed, - "create user Cert Secret is failed. Reason: %v", - err, - ) - - return err - - } - - controllerutil.AddFinalizer(newSecret, models.DeletionFinalizer) - - err = r.UpdateCertSecret(ctx, newSecret, certResponse) - if err != nil { - l.Error(err, "Cannot update certificate secret", - "user", user.Name, - "secret", newSecret.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.UpdateFailed, - "Certificate secret update is failed. Reason: %v", - err, - ) - - return err - } - - l.Info("Kafka user mTLS certificate has been created", - "User ID", user.GetID(certRequest.ClusterID, username), - ) - - if isCSRGenerated { - certRequest.CSR = "" - } - - return nil -} - -func (r *KafkaUserReconciler) handleDeleteCertificate(ctx context.Context, user *v1beta1.KafkaUser, l logr.Logger, certRequest *v1beta1.CertificateRequest) error { - certID, err := r.getKafkaUserCertIDFromSecret(ctx, certRequest) - if err != nil { - l.Error( - err, "Cannot get Kafka user certificate ID from secret", - "kafka user certificate secret name", certRequest.SecretName, - "kafka user certificate secret namespace", certRequest.SecretNamespace, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.FetchFailed, - "Fetch user certificate ID from secret is failed. Reason: %v", - err, - ) - - return err - } - - err = r.API.DeleteKafkaUserCertificate(certID) - if err != nil { - l.Error(err, "Cannot Delete Kafka user mTLS certificate", - "user", user.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.DeletionFailed, - "Certificate deletion is failed. Reason: %v", - err, - ) - - return err - } - - secret := &v1.Secret{} - certSecretNamespacedName := types.NamespacedName{ - Name: certRequest.SecretName, - Namespace: certRequest.SecretNamespace, - } - err = r.Client.Get(ctx, certSecretNamespacedName, secret) - if err != nil { - l.Error(err, "Cannot get Kafka user certificate secret.", - "kafka user certificate secret name", certRequest.SecretName, - "kafka user certificate secret namespace", certRequest.SecretNamespace, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.FetchFailed, - "Fetch user certificate secret is failed. Reason: %v", - err, - ) - - return err - } - - controllerutil.RemoveFinalizer(secret, models.DeletionFinalizer) - err = r.Update(ctx, secret) - if err != nil { - l.Error(err, "Cannot remove finalizer from secret", "secret name", secret.Name) - - r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed, - "Resource patch is failed. Reason: %v", err) - - return err - } - - err = r.Client.Delete(ctx, secret) - if err != nil && !k8serrors.IsNotFound(err) { - l.Error(err, "Cannot delete Kafka user certificate secret", - "kafka user certificate secret name", certRequest.SecretName, - "kafka user certificate secret namespace", certRequest.SecretNamespace, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.DeletionFailed, - "Delete user certificate secret is failed. Reason: %v", - err, - ) - - return err - } - - l.Info("Kafka user mTLS certificate has been deleted", - "Certificate ID", certID, - ) - - return nil -} - -func (r *KafkaUserReconciler) handleRenewCertificate(ctx context.Context, user *v1beta1.KafkaUser, certRequest *v1beta1.CertificateRequest, l logr.Logger) error { - certID, err := r.getKafkaUserCertIDFromSecret(ctx, certRequest) - if err != nil { - l.Error( - err, "Cannot get Kafka user certificate ID from secret", - "kafka user certificate secret name", certRequest.SecretName, - "kafka user certificate secret namespace", certRequest.SecretNamespace, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.FetchFailed, - "Fetch user certificate ID from secret is failed. Reason: %v", - err, - ) - - return err - } - - newCert, err := r.API.RenewKafkaUserCertificate(certID) - if err != nil { - l.Error(err, "Cannot Renew Kafka user mTLS certificate", - "user", user.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.DeletionFailed, - "Certificate renew is failed. Reason: %v", - err, - ) - - return err - } - - secret := &v1.Secret{} - certSecretNamespacedName := types.NamespacedName{ - Name: certRequest.SecretName, - Namespace: certRequest.SecretNamespace, - } - err = r.Client.Get(ctx, certSecretNamespacedName, secret) - if err != nil { - l.Error(err, "Cannot get Kafka user certificate secret.", - "kafka user certificate secret name", certRequest.SecretName, - "kafka user certificate secret namespace", certRequest.SecretNamespace, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.FetchFailed, - "Fetch user certificate secret is failed. Reason: %v", - err, - ) - - return err - } - - err = r.UpdateCertSecret(ctx, secret, newCert) - if err != nil { - l.Error(err, "Cannot update certificate secret", - "user", user.Name, - "secret", secret.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.UpdateFailed, - "Certificate secret update is failed. Reason: %v", - err, - ) - - return err - } - - l.Info("Kafka user mTLS certificate has been renewed", - "Certificate ID", certID, - "New expiry date", newCert.ExpiryDate, - "New ID", newCert.ID, - ) - - err = r.API.DeleteKafkaUserCertificate(certID) - if err != nil { - l.Error(err, "Cannot Delete Kafka user mTLS certificate", - "user", user.Name, - ) - r.EventRecorder.Eventf( - user, models.Warning, models.DeletionFailed, - "Certificate deletion is failed. Reason: %v", - err, - ) - - return err - } - - return nil -} diff --git a/controllers/kafkamanagement/suite_test.go b/controllers/kafkamanagement/suite_test.go index 0c1451cbc..36bae7fc5 100644 --- a/controllers/kafkamanagement/suite_test.go +++ b/controllers/kafkamanagement/suite_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" + kafkamanagementv1beta1 "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" "github.com/instaclustr/operator/pkg/scheduler" @@ -80,6 +81,9 @@ var _ = BeforeSuite(func() { err = v1beta1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = kafkamanagementv1beta1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + //+kubebuilder:scaffold:scheme k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) diff --git a/controllers/kafkamanagement/usercertificate_controller.go b/controllers/kafkamanagement/usercertificate_controller.go new file mode 100644 index 000000000..9a952a4a5 --- /dev/null +++ b/controllers/kafkamanagement/usercertificate_controller.go @@ -0,0 +1,377 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafkamanagement + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + + clustersv1beta1 "github.com/instaclustr/operator/apis/clusters/v1beta1" + kafkamanagementv1beta1 "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" + "github.com/instaclustr/operator/pkg/helpers/utils" + "github.com/instaclustr/operator/pkg/instaclustr" + "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" +) + +// UserCertificateReconciler reconciles a CertificateSigningRequest object +type UserCertificateReconciler struct { + client.Client + Scheme *runtime.Scheme + API instaclustr.API + EventRecorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=kafkamanagement.instaclustr.com,resources=usercertificates,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=kafkamanagement.instaclustr.com,resources=usercertificates/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=kafkamanagement.instaclustr.com,resources=usercertificates/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.4/pkg/reconcile +func (r *UserCertificateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + l := log.FromContext(ctx) + + cert := &kafkamanagementv1beta1.UserCertificate{} + err := r.Get(ctx, req.NamespacedName, cert) + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + if cert.DeletionTimestamp != nil { + return ctrl.Result{}, r.handleDelete(ctx, cert) + } + + if cert.Status.CertID != "" { + return ctrl.Result{}, nil + } + + kafkaUser := &kafkamanagementv1beta1.KafkaUser{} + err = r.Get(ctx, cert.Spec.UserRef.AsNamespacedName(), kafkaUser) + if err != nil { + r.EventRecorder.Eventf(cert, models.Warning, models.CreationFailed, + "Failed to get KafkaUser resource. Reason: %v", err, + ) + return ctrl.Result{}, err + } + + kafkaCluster := &clustersv1beta1.Kafka{} + err = r.Get(ctx, cert.Spec.ClusterRef.AsNamespacedName(), kafkaCluster) + if err != nil { + r.EventRecorder.Eventf(cert, models.Warning, models.CreationFailed, + "Failed to get Kafka resource. Reason: %v", err, + ) + return ctrl.Result{}, err + } + + res, err := r.checkKafkaUserExistsOnCluster(kafkaCluster, kafkaUser) + if err != nil { + return res, err + } + + if cert.Spec.CertificateRequestTemplate != nil && cert.Spec.SecretRef == nil { + err = r.createCSR(ctx, cert, kafkaUser) + if err != nil { + r.EventRecorder.Eventf(cert, models.Warning, models.CreationFailed, + "Certificate signing request creation failed. Reason: %v", err, + ) + return ctrl.Result{}, err + } + } + + err = r.handleCreate(ctx, cert, kafkaCluster, kafkaUser) + if err != nil { + return ctrl.Result{}, err + } + + l.Info("Certificate has been created", + "certId", cert.Status.CertID, + "userRef", client.ObjectKeyFromObject(kafkaUser), + "clusterRef", client.ObjectKeyFromObject(kafkaCluster), + ) + r.EventRecorder.Event(cert, models.Normal, models.Created, "Certificate has been created") + + return ctrl.Result{}, nil +} + +func (r *UserCertificateReconciler) checkKafkaUserExistsOnCluster( + cluster *clustersv1beta1.Kafka, + user *kafkamanagementv1beta1.KafkaUser, +) (ctrl.Result, error) { + var userRefExists bool + for _, userRef := range cluster.Spec.UserRefs { + if userRef.Name == user.Name && userRef.Namespace == user.Namespace { + userRefExists = true + break + } + } + + event, clusterEventExists := user.Status.ClustersEvents[cluster.Status.ID] + + if !userRefExists || !clusterEventExists { + return ctrl.Result{}, fmt.Errorf("user %s/%s is not added to the cluster %s/%s", + user.Namespace, user.Name, cluster.Namespace, cluster.Name) + } + + if event == models.CreatingEvent { + // Waiting until user is created + return ctrl.Result{RequeueAfter: time.Second}, nil + } + + return ctrl.Result{}, nil +} + +func (r *UserCertificateReconciler) handleCreate( + ctx context.Context, + cert *kafkamanagementv1beta1.UserCertificate, + cluster *clustersv1beta1.Kafka, + user *kafkamanagementv1beta1.KafkaUser, +) error { + userSecret := &v1.Secret{} + err := r.Get(ctx, types.NamespacedName{ + Namespace: user.Spec.SecretRef.Namespace, + Name: user.Spec.SecretRef.Name, + }, userSecret) + if err != nil { + return err + } + + csrSecret := &v1.Secret{} + err = r.Get(ctx, cert.Spec.SecretRef.AsNamespacedName(), csrSecret) + if err != nil { + return err + } + + username, _, err := utils.GetUserCreds(userSecret) + if err != nil { + return err + } + + csrRaw := csrSecret.Data[cert.Spec.SecretRef.Key] + + err = r.validateCSR(csrRaw, username) + if err != nil { + return err + } + + signedCert, err := r.API.CreateKafkaUserCertificate(&models.CertificateRequest{ + ClusterID: cluster.Status.ID, + CSR: string(csrRaw), + KafkaUsername: username, + ValidPeriod: cert.Spec.ValidPeriod, + }) + if err != nil { + return err + } + + certSecret, err := r.createSignedCertificateSecret(ctx, cert, signedCert.SignedCertificate) + if err != nil { + return err + } + + patch := cert.NewPatch() + cert.Status = kafkamanagementv1beta1.UserCertificateStatus{ + CertID: signedCert.ID, + ExpiryDate: signedCert.ExpiryDate, + SignedCertSecretRef: kafkamanagementv1beta1.Reference{ + Namespace: certSecret.Namespace, + Name: certSecret.Name, + }, + } + err = r.Status().Patch(ctx, cert, patch) + if err != nil { + return err + } + + controllerutil.AddFinalizer(cert, models.DeletionFinalizer) + err = r.Patch(ctx, cert, patch) + if err != nil { + return err + } + + return nil +} + +func (r *UserCertificateReconciler) createSignedCertificateSecret( + ctx context.Context, + cert *kafkamanagementv1beta1.UserCertificate, + signedCert string, +) (*v1.Secret, error) { + secret := &v1.Secret{ + ObjectMeta: ctrl.ObjectMeta{ + Name: fmt.Sprintf("%s-signed-cert", cert.Name), + Namespace: cert.Namespace, + }, + StringData: map[string]string{ + models.SignedCertificateSecretKey: signedCert, + }, + } + + _ = controllerutil.SetControllerReference(cert, secret, r.Scheme) + + err := r.Create(ctx, secret) + if err != nil { + return nil, err + } + + return secret, nil +} + +func (r *UserCertificateReconciler) createCSR( + ctx context.Context, + csr *kafkamanagementv1beta1.UserCertificate, + user *kafkamanagementv1beta1.KafkaUser, +) error { + userSecret := &v1.Secret{} + err := r.Get(ctx, types.NamespacedName{ + Namespace: user.Spec.SecretRef.Namespace, + Name: user.Spec.SecretRef.Name, + }, userSecret) + if err != nil { + return err + } + + username, _, err := utils.GetUserCreds(userSecret) + if err != nil { + return err + } + + privateKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return err + } + + template := x509.CertificateRequest{ + Subject: csr.Spec.CertificateRequestTemplate.ToSubject(), + SignatureAlgorithm: x509.SHA256WithRSA, + } + + template.Subject.CommonName = username + + csrBytes, err := x509.CreateCertificateRequest(rand.Reader, &template, privateKey) + if err != nil { + return err + } + + buf := &bytes.Buffer{} + err = pem.Encode(buf, &pem.Block{Type: models.CertificateRequestType, Bytes: csrBytes}) + if err != nil { + return err + } + + csrSecret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: csr.Name + "-csr", + Namespace: csr.Namespace, + }, + Data: map[string][]byte{ + models.CSRSecretKey: buf.Bytes(), + models.PrivateKeySecretKey: exportRSAPrivateKeyAsPEM(privateKey), + }, + } + + _ = controllerutil.SetControllerReference(csr, csrSecret, r.Scheme) + + err = r.Create(ctx, csrSecret) + if err != nil { + return err + } + + csr.Spec.SecretRef = &kafkamanagementv1beta1.FromSecret{ + Reference: kafkamanagementv1beta1.Reference{ + Name: csrSecret.Name, + Namespace: csrSecret.Namespace, + }, + Key: models.CSRSecretKey, + } + + return r.Update(ctx, csr) +} + +func exportRSAPrivateKeyAsPEM(privateKey *rsa.PrivateKey) []byte { + return pem.EncodeToMemory( + &pem.Block{ + Type: models.RSAPrivateKeyType, + Bytes: x509.MarshalPKCS1PrivateKey(privateKey), + }, + ) +} + +func (r *UserCertificateReconciler) validateCSR(raw []byte, username string) error { + block, _ := pem.Decode(raw) + + req, err := x509.ParseCertificateRequest(block.Bytes) + if err != nil { + return err + } + + if req.Subject.CommonName != username { + return errors.New("common name in the secret doesn't match username") + } + + return nil +} + +func (r *UserCertificateReconciler) handleDelete(ctx context.Context, cert *kafkamanagementv1beta1.UserCertificate) error { + l := log.FromContext(ctx) + + err := r.API.DeleteKafkaUserCertificate(cert.Status.CertID) + if err != nil && !errors.Is(err, instaclustr.NotFound) { + return err + } + + l.Info("The certificate has been deleted", + "certId", cert.Status.CertID, + ) + + patch := cert.NewPatch() + controllerutil.RemoveFinalizer(cert, models.DeletionFinalizer) + return r.Patch(ctx, cert, patch) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *UserCertificateReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries( + ratelimiter.DefaultBaseDelay, + ratelimiter.DefaultMaxDelay, + ), + }). + For(&kafkamanagementv1beta1.UserCertificate{}). + Complete(r) +} diff --git a/main.go b/main.go index c29ce497b..218194d79 100644 --- a/main.go +++ b/main.go @@ -501,6 +501,19 @@ func main() { setupLog.Error(err, "unable to create webhook", "webhook", "OpenSearchEgressRules") os.Exit(1) } + if err = (&kafkamanagementcontrollers.UserCertificateReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + API: instaClient, + EventRecorder: eventRecorder, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "UserCertificate") + os.Exit(1) + } + if err = (&kafkamanagementv1beta1.UserCertificate{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "UserCertificate") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/pkg/helpers/utils/user_creds_from_secret.go b/pkg/helpers/utils/user_creds_from_secret.go new file mode 100644 index 000000000..7e741be19 --- /dev/null +++ b/pkg/helpers/utils/user_creds_from_secret.go @@ -0,0 +1,30 @@ +package utils + +import ( + "strings" + + k8sCore "k8s.io/api/core/v1" + + "github.com/instaclustr/operator/pkg/models" +) + +func GetUserCreds(secret *k8sCore.Secret) (username, password string, err error) { + password = string(secret.Data[models.Password]) + username = string(secret.Data[models.Username]) + + if len(username) == 0 || len(password) == 0 { + return "", "", models.ErrMissingSecretKeys + } + + newLineSuffix := "\n" + + if strings.HasSuffix(username, newLineSuffix) { + username = strings.TrimRight(username, newLineSuffix) + } + + if strings.HasSuffix(password, newLineSuffix) { + password = strings.TrimRight(password, newLineSuffix) + } + + return username, password, nil +} diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index 61e65cd61..371c0a220 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -975,7 +975,7 @@ func (c *Client) DeleteKafkaUser(kafkaUserID, kafkaUserEndpoint string) error { return nil } -func (c *Client) CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*kafkamanagementv1beta1.Certificate, error) { +func (c *Client) CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*models.Certificate, error) { data, err := json.Marshal(certRequest) if err != nil { return nil, err @@ -997,7 +997,7 @@ func (c *Client) CreateKafkaUserCertificate(certRequest *models.CertificateReque return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } - cert := &kafkamanagementv1beta1.Certificate{} + cert := &models.Certificate{} err = json.Unmarshal(body, cert) if err != nil { return nil, err @@ -1019,6 +1019,10 @@ func (c *Client) DeleteKafkaUserCertificate(certificateID string) error { return err } + if resp.StatusCode == http.StatusNotFound { + return NotFound + } + if resp.StatusCode != http.StatusNoContent { return fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } @@ -1026,7 +1030,7 @@ func (c *Client) DeleteKafkaUserCertificate(certificateID string) error { return nil } -func (c *Client) RenewKafkaUserCertificate(certificateID string) (*kafkamanagementv1beta1.Certificate, error) { +func (c *Client) RenewKafkaUserCertificate(certificateID string) (*models.Certificate, error) { payload := &struct { CertificateID string `json:"certificateId"` }{ @@ -1050,7 +1054,7 @@ func (c *Client) RenewKafkaUserCertificate(certificateID string) (*kafkamanageme return nil, err } - cert := &kafkamanagementv1beta1.Certificate{} + cert := &models.Certificate{} err = json.Unmarshal(body, cert) if err != nil { return nil, err diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index e72383756..bd6611015 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -44,9 +44,9 @@ type API interface { CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (*kafkamanagementv1beta1.KafkaUserStatus, error) UpdateKafkaUser(kafkaUserID string, kafkaUserSpec *models.KafkaUser) error DeleteKafkaUser(kafkaUserID, kafkaUserEndpoint string) error - CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*kafkamanagementv1beta1.Certificate, error) + CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*models.Certificate, error) DeleteKafkaUserCertificate(certificateID string) error - RenewKafkaUserCertificate(certificateID string) (*kafkamanagementv1beta1.Certificate, error) + RenewKafkaUserCertificate(certificateID string) (*models.Certificate, error) GetTopicStatus(id string) ([]byte, error) CreateKafkaTopic(url string, topic *kafkamanagementv1beta1.Topic) error DeleteKafkaTopic(url, id string) error diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 836f25931..ee0c57b8b 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -119,7 +119,7 @@ func (c *mockClient) CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (* panic("CreateKafkaUser: is not implemented") } -func (c *mockClient) CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*kafkamanagementv1beta1.Certificate, error) { +func (c *mockClient) CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*models.Certificate, error) { panic("CreateKafkaUserCertificate: is not implemented") } @@ -127,7 +127,7 @@ func (c *mockClient) DeleteKafkaUserCertificate(certificateID string) error { panic("DeleteKafkaUserCertificate: is not implemented") } -func (c *mockClient) RenewKafkaUserCertificate(certificateID string) (*kafkamanagementv1beta1.Certificate, error) { +func (c *mockClient) RenewKafkaUserCertificate(certificateID string) (*models.Certificate, error) { panic("RenewKafkaUserCertificate: is not implemented") } diff --git a/pkg/models/kafka_user_apv2.go b/pkg/models/kafka_user_apv2.go index 0aa688f7a..fbf885007 100644 --- a/pkg/models/kafka_user_apv2.go +++ b/pkg/models/kafka_user_apv2.go @@ -32,3 +32,10 @@ type CertificateRequest struct { KafkaUsername string `json:"kafkaUsername"` ValidPeriod int `json:"validPeriod"` } + +type Certificate struct { + ID string `json:"id"` + ClusterID string `json:"clusterId"` + ExpiryDate string `json:"expiryDate"` + SignedCertificate string `json:"signedCertificate"` +} diff --git a/pkg/models/operator.go b/pkg/models/operator.go index 55e0d9be3..fe862c369 100644 --- a/pkg/models/operator.go +++ b/pkg/models/operator.go @@ -130,7 +130,6 @@ const ( NotFound = "NotFound" CreationFailed = "CreationFailed" FetchFailed = "FetchFailed" - GenerateFailed = "GenerateFailed" ConversionFailed = "ConversionFailed" ValidationFailed = "ValidationFailed" UpdateFailed = "UpdateFailed" @@ -174,3 +173,11 @@ type Credentials struct { } var ClusterKindsMap = map[string]string{"PostgreSQL": "postgres", "Redis": "redis", "OpenSearch": "opensearch", "Cassandra": "cassandra"} + +const ( + CertificateRequestType = "CERTIFICATE REQUEST" + RSAPrivateKeyType = "RSA PRIVATE KEY" + SignedCertificateSecretKey = "signedCertificate" + CSRSecretKey = "csr" + PrivateKeySecretKey = "privateKey" +)