From 11057977cd09672fbe725183be913da4aecbb145 Mon Sep 17 00:00:00 2001 From: tengu-alt Date: Tue, 19 Dec 2023 17:04:23 +0200 Subject: [PATCH] clusterreference for clusterresources were implemented --- .../awsendpointserviceprincipal_types.go | 6 +- .../awsendpointserviceprincipal_webhook.go | 6 +- .../awssecuritygroupfirewallrule_webhook.go | 5 + .../v1beta1/awsvpcpeering_types.go | 10 +- .../v1beta1/awsvpcpeering_webhook.go | 5 +- .../v1beta1/azurevnetpeering_types.go | 2 +- .../v1beta1/azurevnetpeering_webhook.go | 5 +- .../v1beta1/clusterbackup_types.go | 5 +- .../v1beta1/clusterbackup_webhook.go | 10 +- .../clusternetworkfirewallrule_webhook.go | 5 + .../v1beta1/exclusionwindow_types.go | 22 +- .../v1beta1/exclusionwindow_webhook.go | 7 +- .../v1beta1/gcpvpcpeering_types.go | 2 +- .../v1beta1/gcpvpcpeering_webhook.go | 5 +- .../v1beta1/maintenanceevents_types.go | 1 - .../v1beta1/opensearchegressrules_types.go | 12 +- .../v1beta1/opensearchegressrules_webhook.go | 7 +- apis/clusterresources/v1beta1/structs.go | 30 ++- .../v1beta1/zz_generated.deepcopy.go | 107 +++++++--- apis/clusters/v1beta1/cadence_types.go | 15 ++ apis/clusters/v1beta1/cassandra_types.go | 40 +++- apis/clusters/v1beta1/kafka_types.go | 12 ++ apis/clusters/v1beta1/kafkaconnect_types.go | 34 ++- apis/clusters/v1beta1/kafkaconnect_webhook.go | 13 +- apis/clusters/v1beta1/opensearch_types.go | 19 +- apis/clusters/v1beta1/postgresql_types.go | 22 +- apis/clusters/v1beta1/redis_types.go | 19 +- apis/clusters/v1beta1/structs.go | 2 + apis/clusters/v1beta1/zookeeper_types.go | 16 ++ .../clusters/v1beta1/zz_generated.deepcopy.go | 16 +- ...ustr.com_awsendpointserviceprincipals.yaml | 14 +- ...str.com_awssecuritygroupfirewallrules.yaml | 14 +- ...ources.instaclustr.com_awsvpcpeerings.yaml | 14 +- ...ces.instaclustr.com_azurevnetpeerings.yaml | 14 +- ...ources.instaclustr.com_clusterbackups.yaml | 18 +- ...lustr.com_clusternetworkfirewallrules.yaml | 14 +- ...rces.instaclustr.com_exclusionwindows.yaml | 14 +- ...ources.instaclustr.com_gcpvpcpeerings.yaml | 14 +- ...ces.instaclustr.com_maintenanceevents.yaml | 3 - ...instaclustr.com_opensearchegressrules.yaml | 14 +- .../clusters.instaclustr.com_cadences.yaml | 2 + .../clusters.instaclustr.com_cassandras.yaml | 14 +- ...lusters.instaclustr.com_kafkaconnects.yaml | 16 +- .../clusters.instaclustr.com_kafkas.yaml | 2 + ...clusters.instaclustr.com_opensearches.yaml | 2 + .../clusters.instaclustr.com_postgresqls.yaml | 2 + .../bases/clusters.instaclustr.com_redis.yaml | 2 + .../clusters.instaclustr.com_zookeepers.yaml | 2 + ...s_v1beta1_awsendpointserviceprincipal.yaml | 4 + ..._v1beta1_awssecuritygroupfirewallrule.yaml | 7 +- ...lusterresources_v1beta1_awsvpcpeering.yaml | 6 +- ...terresources_v1beta1_azurevnetpeering.yaml | 6 +- ...lusterresources_v1beta1_clusterbackup.yaml | 7 +- ...es_v1beta1_clusternetworkfirewallrule.yaml | 8 +- ...sterresources_v1beta1_exclusionwindow.yaml | 6 +- ...lusterresources_v1beta1_gcpvpcpeering.yaml | 6 +- ...erresources_v1beta1_maintenanceevents.yaml | 1 - ...sources_v1beta1_opensearchegressrules.yaml | 8 +- .../samples/clusters_v1beta1_cassandra.yaml | 12 +- config/samples/clusters_v1beta1_kafka.yaml | 85 +------- .../clusters_v1beta1_kafkaconnect.yaml | 8 +- .../samples/clusters_v1beta1_zookeeper.yaml | 2 +- .../awsendpointserviceprincipal_controller.go | 25 ++- ...awssecuritygroupfirewallrule_controller.go | 47 +++-- .../awsvpcpeering_controller.go | 40 +++- .../awsvpcpeering_controller_test.go | 2 +- .../azurevnetpeering_controller.go | 38 +++- .../azurevnetpeering_controller_test.go | 2 +- .../clusterbackup_controller.go | 55 +++-- .../clusternetworkfirewallrule_controller.go | 50 +++-- .../exclusionwindow_controller.go | 49 +++-- .../gcpvpcpeering_controller.go | 34 ++- .../gcpvpcpeering_controller_test.go | 2 +- controllers/clusterresources/helpers.go | 83 ++++++++ .../opensearchegressrules_controller.go | 27 ++- controllers/clusters/cassandra_controller.go | 18 +- .../clusters/kafkaconnect_controller.go | 41 +++- controllers/clusters/user.go | 7 +- pkg/instaclustr/client.go | 193 +++++++++++++++++- pkg/instaclustr/interfaces.go | 9 +- pkg/instaclustr/mock/client.go | 34 ++- pkg/models/errors.go | 3 + pkg/models/operator.go | 2 + 83 files changed, 1236 insertions(+), 326 deletions(-) diff --git a/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go b/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go index fb1df7c94..bdf1220db 100644 --- a/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go +++ b/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go @@ -24,7 +24,8 @@ import ( // AWSEndpointServicePrincipalSpec defines the desired state of AWSEndpointServicePrincipal type AWSEndpointServicePrincipalSpec struct { // The ID of the cluster data center - ClusterDataCenterID string `json:"clusterDataCenterId"` + ClusterDataCenterID string `json:"clusterDataCenterId,omitempty"` + ClusterRef *ClusterRef `json:"clusterRef,omitempty"` // The Instaclustr ID of the AWS endpoint service EndPointServiceID string `json:"endPointServiceId,omitempty"` @@ -36,7 +37,8 @@ type AWSEndpointServicePrincipalSpec struct { // AWSEndpointServicePrincipalStatus defines the observed state of AWSEndpointServicePrincipal type AWSEndpointServicePrincipalStatus struct { // The Instaclustr ID of the IAM Principal ARN - ID string `json:"id,omitempty"` + ID string `json:"id,omitempty"` + CDCID string `json:"cdcId,omitempty"` // The Instaclustr ID of the AWS endpoint service EndPointServiceID string `json:"endPointServiceId,omitempty"` diff --git a/apis/clusterresources/v1beta1/awsendpointserviceprincipal_webhook.go b/apis/clusterresources/v1beta1/awsendpointserviceprincipal_webhook.go index cf4e61851..beb80939c 100644 --- a/apis/clusterresources/v1beta1/awsendpointserviceprincipal_webhook.go +++ b/apis/clusterresources/v1beta1/awsendpointserviceprincipal_webhook.go @@ -45,9 +45,9 @@ var principalArnPattern, _ = regexp.Compile(`^arn:aws:iam::[0-9]{12}:(root$|user func (r *AWSEndpointServicePrincipal) ValidateCreate() error { awsendpointserviceprincipallog.Info("validate create", "name", r.Name) - if r.Spec.ClusterDataCenterID == "" || - r.Spec.PrincipalARN == "" { - return fmt.Errorf("spec.clusterDataCenterId and spec.principalArn should be filled") + if (r.Spec.ClusterDataCenterID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.ClusterDataCenterID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: dataCentreId, clusterRef") } if !principalArnPattern.MatchString(r.Spec.PrincipalARN) { diff --git a/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_webhook.go b/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_webhook.go index fdae84208..50626e7b7 100644 --- a/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_webhook.go +++ b/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_webhook.go @@ -65,6 +65,11 @@ func (r *AWSSecurityGroupFirewallRule) ValidateCreate() error { r.Spec.Type, models.BundleTypes) } + if (r.Spec.ClusterID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.ClusterID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of dataCenter ID and cluster reference fields should be specified") + } + return nil } diff --git a/apis/clusterresources/v1beta1/awsvpcpeering_types.go b/apis/clusterresources/v1beta1/awsvpcpeering_types.go index 2f657794b..556b173ae 100644 --- a/apis/clusterresources/v1beta1/awsvpcpeering_types.go +++ b/apis/clusterresources/v1beta1/awsvpcpeering_types.go @@ -29,7 +29,7 @@ import ( // AWSVPCPeeringSpec defines the desired state of AWSVPCPeering type AWSVPCPeeringSpec struct { - VPCPeeringSpec `json:",inline"` + PeeringSpec `json:",inline"` PeerAWSAccountID string `json:"peerAwsAccountId"` PeerVPCID string `json:"peerVpcId"` PeerRegion string `json:"peerRegion,omitempty"` @@ -128,9 +128,11 @@ func (aws *AWSVPCPeeringSpec) Validate(availableRegions []string) error { return fmt.Errorf("VPC ID must begin with 'vpc-' and fit pattern: %s. %v", models.PeerVPCIDRegExp, err) } - dataCentreIDMatched, err := regexp.Match(models.UUIDStringRegExp, []byte(aws.DataCentreID)) - if !dataCentreIDMatched || err != nil { - return fmt.Errorf("data centre ID is a UUID formated string. It must fit the pattern: %s. %v", models.UUIDStringRegExp, err) + if aws.DataCentreID != "" { + dataCentreIDMatched, err := regexp.Match(models.UUIDStringRegExp, []byte(aws.DataCentreID)) + if !dataCentreIDMatched || err != nil { + return fmt.Errorf("data centre ID is a UUID formated string. It must fit the pattern: %s. %v", models.UUIDStringRegExp, err) + } } if !validation.Contains(aws.PeerRegion, availableRegions) { diff --git a/apis/clusterresources/v1beta1/awsvpcpeering_webhook.go b/apis/clusterresources/v1beta1/awsvpcpeering_webhook.go index edda260c7..017fa5aff 100644 --- a/apis/clusterresources/v1beta1/awsvpcpeering_webhook.go +++ b/apis/clusterresources/v1beta1/awsvpcpeering_webhook.go @@ -67,8 +67,9 @@ func (r *AWSVPCPeering) ValidateCreate() error { return fmt.Errorf("peer AWS Account Region is empty") } - if r.Spec.DataCentreID == "" { - return fmt.Errorf("dataCentre ID is empty") + if (r.Spec.DataCentreID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.DataCentreID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: dataCentreId, clusterRef") } if r.Spec.PeerSubnets == nil { diff --git a/apis/clusterresources/v1beta1/azurevnetpeering_types.go b/apis/clusterresources/v1beta1/azurevnetpeering_types.go index fca2a4263..a568b1a52 100644 --- a/apis/clusterresources/v1beta1/azurevnetpeering_types.go +++ b/apis/clusterresources/v1beta1/azurevnetpeering_types.go @@ -28,7 +28,7 @@ import ( // AzureVNetPeeringSpec defines the desired state of AzureVNetPeering type AzureVNetPeeringSpec struct { - VPCPeeringSpec `json:",inline"` + PeeringSpec `json:",inline"` PeerResourceGroup string `json:"peerResourceGroup"` PeerSubscriptionID string `json:"peerSubscriptionId"` PeerADObjectID string `json:"peerAdObjectId,omitempty"` diff --git a/apis/clusterresources/v1beta1/azurevnetpeering_webhook.go b/apis/clusterresources/v1beta1/azurevnetpeering_webhook.go index 39ab9c7e3..4de7dad0d 100644 --- a/apis/clusterresources/v1beta1/azurevnetpeering_webhook.go +++ b/apis/clusterresources/v1beta1/azurevnetpeering_webhook.go @@ -71,8 +71,9 @@ func (r *AzureVNetPeering) ValidateCreate() error { return fmt.Errorf("peer Subscription ID is empty") } - if r.Spec.DataCentreID == "" { - return fmt.Errorf("dataCentre ID is empty") + if (r.Spec.DataCentreID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.DataCentreID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: dataCentreId, clusterRef") } if r.Spec.PeerSubnets == nil { diff --git a/apis/clusterresources/v1beta1/clusterbackup_types.go b/apis/clusterresources/v1beta1/clusterbackup_types.go index 9364a3a4f..eb48ffce4 100644 --- a/apis/clusterresources/v1beta1/clusterbackup_types.go +++ b/apis/clusterresources/v1beta1/clusterbackup_types.go @@ -27,12 +27,13 @@ import ( // ClusterBackupSpec defines the desired state of ClusterBackup type ClusterBackupSpec struct { - ClusterID string `json:"clusterId"` - ClusterKind string `json:"clusterKind"` + ClusterID string `json:"clusterId,omitempty"` + ClusterRef *ClusterRef `json:"clusterRef,omitempty"` } // ClusterBackupStatus defines the observed state of ClusterBackup type ClusterBackupStatus struct { + ClusterID string `json:"clusterId,omitempty"` OperationStatus string `json:"operationStatus,omitempty"` Progress string `json:"progress,omitempty"` Start int `json:"start,omitempty"` diff --git a/apis/clusterresources/v1beta1/clusterbackup_webhook.go b/apis/clusterresources/v1beta1/clusterbackup_webhook.go index e969452d0..5f0e5ce26 100644 --- a/apis/clusterresources/v1beta1/clusterbackup_webhook.go +++ b/apis/clusterresources/v1beta1/clusterbackup_webhook.go @@ -54,11 +54,19 @@ var _ webhook.Validator = &ClusterBackup{} func (r *ClusterBackup) ValidateCreate() error { clusterbackuplog.Info("validate create", "name", r.Name) - _, ok := models.ClusterKindsMap[r.Spec.ClusterKind] + _, ok := models.ClusterKindsMap[r.Spec.ClusterRef.ClusterKind] if !ok { return models.ErrUnsupportedBackupClusterKind } + if r.Spec.ClusterRef.Name != "" && r.Spec.ClusterRef.Namespace == "" { + return models.ErrEmptyNamespace + } + + if r.Spec.ClusterRef.Namespace != "" && r.Spec.ClusterRef.Name == "" { + return models.ErrEmptyName + } + return nil } diff --git a/apis/clusterresources/v1beta1/clusternetworkfirewallrule_webhook.go b/apis/clusterresources/v1beta1/clusternetworkfirewallrule_webhook.go index 9e679949e..88af77d3b 100644 --- a/apis/clusterresources/v1beta1/clusternetworkfirewallrule_webhook.go +++ b/apis/clusterresources/v1beta1/clusternetworkfirewallrule_webhook.go @@ -65,6 +65,11 @@ func (fr *ClusterNetworkFirewallRule) ValidateCreate() error { fr.Spec.Type, models.BundleTypes) } + if (fr.Spec.ClusterID == "" && fr.Spec.ClusterRef == nil) || + (fr.Spec.ClusterID != "" && fr.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: clusterId, clusterRef") + } + return nil } diff --git a/apis/clusterresources/v1beta1/exclusionwindow_types.go b/apis/clusterresources/v1beta1/exclusionwindow_types.go index 90e7f9c41..1ec98e160 100644 --- a/apis/clusterresources/v1beta1/exclusionwindow_types.go +++ b/apis/clusterresources/v1beta1/exclusionwindow_types.go @@ -23,8 +23,9 @@ import ( // ExclusionWindowSpec defines the desired state of ExclusionWindow type ExclusionWindowSpec struct { - ClusterID string `json:"clusterId"` - DayOfWeek string `json:"dayOfWeek"` + ClusterID string `json:"clusterId,omitempty"` + ClusterRef *ClusterRef `json:"clusterRef,omitempty"` + DayOfWeek string `json:"dayOfWeek"` //+kubebuilder:validation:Minimum:=0 //+kubebuilder:validation:Maximum:=23 StartHour int32 `json:"startHour"` @@ -36,7 +37,8 @@ type ExclusionWindowSpec struct { // ExclusionWindowStatus defines the observed state of ExclusionWindow type ExclusionWindowStatus struct { - ID string `json:"id"` + ClusterID string `json:"clusterId,omitempty"` + ID string `json:"id"` } //+kubebuilder:object:root=true @@ -70,3 +72,17 @@ func (r *ExclusionWindow) NewPatch() client.Patch { old := r.DeepCopy() return client.MergeFrom(old) } + +func (e *ExclusionWindowSpec) validateUpdate(old ExclusionWindowSpec) bool { + if e.DayOfWeek != old.DayOfWeek || + e.ClusterID != old.ClusterID || + e.DurationInHours != old.DurationInHours || + e.StartHour != old.StartHour || + (e.ClusterRef != nil && old.ClusterRef == nil) || + (e.ClusterRef == nil && old.ClusterRef != nil) || + (e.ClusterRef != nil && *e.ClusterRef != *old.ClusterRef) { + return false + } + + return true +} diff --git a/apis/clusterresources/v1beta1/exclusionwindow_webhook.go b/apis/clusterresources/v1beta1/exclusionwindow_webhook.go index c3f86dbab..3891ac6db 100644 --- a/apis/clusterresources/v1beta1/exclusionwindow_webhook.go +++ b/apis/clusterresources/v1beta1/exclusionwindow_webhook.go @@ -55,6 +55,11 @@ var _ webhook.Validator = &ExclusionWindow{} func (r *ExclusionWindow) ValidateCreate() error { exclusionwindowlog.Info("validate create", "name", r.Name) + if (r.Spec.ClusterID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.ClusterID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: clusterId, clusterRef") + } + if !validation.Contains(r.Spec.DayOfWeek, models.DaysOfWeek) { return fmt.Errorf("%v, available values: %v", models.ErrIncorrectDayOfWeek, models.DaysOfWeek) @@ -67,7 +72,7 @@ func (r *ExclusionWindow) ValidateUpdate(old runtime.Object) error { exclusionwindowlog.Info("validate update", "name", r.Name) oldWindow := old.(*ExclusionWindow) - if r.Spec != oldWindow.Spec { + if !r.Spec.validateUpdate(oldWindow.Spec) { return models.ErrImmutableSpec } diff --git a/apis/clusterresources/v1beta1/gcpvpcpeering_types.go b/apis/clusterresources/v1beta1/gcpvpcpeering_types.go index fbd76ee24..00532066c 100644 --- a/apis/clusterresources/v1beta1/gcpvpcpeering_types.go +++ b/apis/clusterresources/v1beta1/gcpvpcpeering_types.go @@ -28,7 +28,7 @@ import ( // GCPVPCPeeringSpec defines the desired state of GCPVPCPeering type GCPVPCPeeringSpec struct { - VPCPeeringSpec `json:",inline"` + PeeringSpec `json:",inline"` PeerVPCNetworkName string `json:"peerVpcNetworkName"` PeerProjectID string `json:"peerProjectId"` } diff --git a/apis/clusterresources/v1beta1/gcpvpcpeering_webhook.go b/apis/clusterresources/v1beta1/gcpvpcpeering_webhook.go index 72c5d2ce3..b50b3595b 100644 --- a/apis/clusterresources/v1beta1/gcpvpcpeering_webhook.go +++ b/apis/clusterresources/v1beta1/gcpvpcpeering_webhook.go @@ -67,8 +67,9 @@ func (r *GCPVPCPeering) ValidateCreate() error { return fmt.Errorf("peer Project ID is empty") } - if r.Spec.DataCentreID == "" { - return fmt.Errorf("dataCentre ID is empty") + if (r.Spec.DataCentreID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.DataCentreID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: dataCentreId, clusterRef") } if r.Spec.PeerSubnets == nil { diff --git a/apis/clusterresources/v1beta1/maintenanceevents_types.go b/apis/clusterresources/v1beta1/maintenanceevents_types.go index 0bc6ede24..f4552fbe7 100644 --- a/apis/clusterresources/v1beta1/maintenanceevents_types.go +++ b/apis/clusterresources/v1beta1/maintenanceevents_types.go @@ -23,7 +23,6 @@ import ( // MaintenanceEventsSpec defines the desired state of MaintenanceEvents type MaintenanceEventsSpec struct { - ClusterID string `json:"clusterId"` MaintenanceEventsReschedules []*MaintenanceEventReschedule `json:"maintenanceEventsReschedule"` } diff --git a/apis/clusterresources/v1beta1/opensearchegressrules_types.go b/apis/clusterresources/v1beta1/opensearchegressrules_types.go index 6c8e59270..79d06b069 100644 --- a/apis/clusterresources/v1beta1/opensearchegressrules_types.go +++ b/apis/clusterresources/v1beta1/opensearchegressrules_types.go @@ -22,14 +22,16 @@ import ( ) type OpenSearchEgressRulesSpec struct { - ClusterID string `json:"clusterId"` - OpenSearchBindingID string `json:"openSearchBindingId"` - Source string `json:"source"` - Type string `json:"type,omitempty"` + ClusterID string `json:"clusterId,omitempty"` + ClusterRef *ClusterRef `json:"clusterRef,omitempty"` + OpenSearchBindingID string `json:"openSearchBindingId"` + Source string `json:"source"` + Type string `json:"type,omitempty"` } type OpenSearchEgressRulesStatus struct { - ID string `json:"id,omitempty"` + ID string `json:"id,omitempty"` + ClusterID string `json:"clusterId,omitempty"` } //+kubebuilder:object:root=true diff --git a/apis/clusterresources/v1beta1/opensearchegressrules_webhook.go b/apis/clusterresources/v1beta1/opensearchegressrules_webhook.go index f505e6fe4..3313f6419 100644 --- a/apis/clusterresources/v1beta1/opensearchegressrules_webhook.go +++ b/apis/clusterresources/v1beta1/opensearchegressrules_webhook.go @@ -65,6 +65,11 @@ func (r *OpenSearchEgressRules) ValidateCreate() error { return fmt.Errorf("the type should be equal to one of the options: %q , got: %q", destinationTypes, r.Spec.Type) } + if (r.Spec.ClusterID == "" && r.Spec.ClusterRef == nil) || + (r.Spec.ClusterID != "" && r.Spec.ClusterRef != nil) { + return fmt.Errorf("only one of the following fields should be specified: clusterId, clusterRef") + } + return nil } @@ -78,7 +83,7 @@ func (r *OpenSearchEgressRules) ValidateUpdate(old runtime.Object) error { return r.ValidateCreate() } - if r.Spec != oldRules.Spec { + if r.Spec != oldRules.Spec && r.Generation != oldRules.Generation { return models.ErrImmutableSpec } diff --git a/apis/clusterresources/v1beta1/structs.go b/apis/clusterresources/v1beta1/structs.go index 583412c86..7f4632506 100644 --- a/apis/clusterresources/v1beta1/structs.go +++ b/apis/clusterresources/v1beta1/structs.go @@ -20,11 +20,14 @@ import ( "encoding/json" "github.com/instaclustr/operator/pkg/apiextensions" + + "k8s.io/apimachinery/pkg/types" ) -type VPCPeeringSpec struct { - DataCentreID string `json:"cdcId"` - PeerSubnets []string `json:"peerSubnets"` +type PeeringSpec struct { + DataCentreID string `json:"cdcId,omitempty"` + PeerSubnets []string `json:"peerSubnets"` + ClusterRef *ClusterRef `json:"clusterRef,omitempty"` } type PeeringStatus struct { @@ -32,6 +35,7 @@ type PeeringStatus struct { StatusCode string `json:"statusCode,omitempty"` Name string `json:"name,omitempty"` FailureReason string `json:"failureReason,omitempty"` + CDCID string `json:"cdcId,omitempty"` } type PatchRequest struct { @@ -41,14 +45,16 @@ type PatchRequest struct { } type FirewallRuleSpec struct { - ClusterID string `json:"clusterId"` - Type string `json:"type"` + ClusterID string `json:"clusterId,omitempty"` + Type string `json:"type"` + ClusterRef *ClusterRef `json:"clusterRef,omitempty"` } type FirewallRuleStatus struct { ID string `json:"id,omitempty"` DeferredReason string `json:"deferredReason,omitempty"` Status string `json:"status,omitempty"` + ClusterID string `json:"clusterId,omitempty"` } type immutablePeeringFields struct { @@ -57,3 +63,17 @@ type immutablePeeringFields struct { // +kubebuilder:object:generate:=false type SecretReference = apiextensions.ObjectReference + +type ClusterRef struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + ClusterKind string `json:"clusterKind,omitempty"` + CDCName string `json:"cdcName,omitempty"` +} + +func (r *ClusterRef) AsNamespacedName() types.NamespacedName { + return types.NamespacedName{ + Name: r.Name, + Namespace: r.Namespace, + } +} diff --git a/apis/clusterresources/v1beta1/zz_generated.deepcopy.go b/apis/clusterresources/v1beta1/zz_generated.deepcopy.go index 3d858a773..b89deb205 100644 --- a/apis/clusterresources/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusterresources/v1beta1/zz_generated.deepcopy.go @@ -121,7 +121,7 @@ func (in *AWSEndpointServicePrincipal) DeepCopyInto(out *AWSEndpointServicePrinc *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -178,6 +178,11 @@ func (in *AWSEndpointServicePrincipalList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AWSEndpointServicePrincipalSpec) DeepCopyInto(out *AWSEndpointServicePrincipalSpec) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSEndpointServicePrincipalSpec. @@ -210,7 +215,7 @@ func (in *AWSSecurityGroupFirewallRule) DeepCopyInto(out *AWSSecurityGroupFirewa *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -267,7 +272,7 @@ func (in *AWSSecurityGroupFirewallRuleList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AWSSecurityGroupFirewallRuleSpec) DeepCopyInto(out *AWSSecurityGroupFirewallRuleSpec) { *out = *in - out.FirewallRuleSpec = in.FirewallRuleSpec + in.FirewallRuleSpec.DeepCopyInto(&out.FirewallRuleSpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSSecurityGroupFirewallRuleSpec. @@ -358,7 +363,7 @@ func (in *AWSVPCPeeringList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AWSVPCPeeringSpec) DeepCopyInto(out *AWSVPCPeeringSpec) { *out = *in - in.VPCPeeringSpec.DeepCopyInto(&out.VPCPeeringSpec) + in.PeeringSpec.DeepCopyInto(&out.PeeringSpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSVPCPeeringSpec. @@ -449,7 +454,7 @@ func (in *AzureVNetPeeringList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AzureVNetPeeringSpec) DeepCopyInto(out *AzureVNetPeeringSpec) { *out = *in - in.VPCPeeringSpec.DeepCopyInto(&out.VPCPeeringSpec) + in.PeeringSpec.DeepCopyInto(&out.PeeringSpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AzureVNetPeeringSpec. @@ -584,7 +589,7 @@ func (in *ClusterBackup) DeepCopyInto(out *ClusterBackup) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -641,6 +646,11 @@ func (in *ClusterBackupList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterBackupSpec) DeepCopyInto(out *ClusterBackupSpec) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterBackupSpec. @@ -689,7 +699,7 @@ func (in *ClusterNetworkFirewallRule) DeepCopyInto(out *ClusterNetworkFirewallRu *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -746,7 +756,7 @@ func (in *ClusterNetworkFirewallRuleList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterNetworkFirewallRuleSpec) DeepCopyInto(out *ClusterNetworkFirewallRuleSpec) { *out = *in - out.FirewallRuleSpec = in.FirewallRuleSpec + in.FirewallRuleSpec.DeepCopyInto(&out.FirewallRuleSpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterNetworkFirewallRuleSpec. @@ -775,6 +785,21 @@ func (in *ClusterNetworkFirewallRuleStatus) DeepCopy() *ClusterNetworkFirewallRu return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterRef) DeepCopyInto(out *ClusterRef) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterRef. +func (in *ClusterRef) DeepCopy() *ClusterRef { + if in == nil { + return nil + } + out := new(ClusterRef) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusteredMaintenanceEventStatus) DeepCopyInto(out *ClusteredMaintenanceEventStatus) { *out = *in @@ -828,7 +853,7 @@ func (in *ExclusionWindow) DeepCopyInto(out *ExclusionWindow) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -885,6 +910,11 @@ func (in *ExclusionWindowList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExclusionWindowSpec) DeepCopyInto(out *ExclusionWindowSpec) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExclusionWindowSpec. @@ -915,6 +945,11 @@ func (in *ExclusionWindowStatus) DeepCopy() *ExclusionWindowStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FirewallRuleSpec) DeepCopyInto(out *FirewallRuleSpec) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FirewallRuleSpec. @@ -1004,7 +1039,7 @@ func (in *GCPVPCPeeringList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GCPVPCPeeringSpec) DeepCopyInto(out *GCPVPCPeeringSpec) { *out = *in - in.VPCPeeringSpec.DeepCopyInto(&out.VPCPeeringSpec) + in.PeeringSpec.DeepCopyInto(&out.PeeringSpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GCPVPCPeeringSpec. @@ -1342,7 +1377,7 @@ func (in *OpenSearchEgressRules) DeepCopyInto(out *OpenSearchEgressRules) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -1399,6 +1434,11 @@ func (in *OpenSearchEgressRulesList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OpenSearchEgressRulesSpec) DeepCopyInto(out *OpenSearchEgressRulesSpec) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenSearchEgressRulesSpec. @@ -1562,6 +1602,31 @@ func (in *PatchRequest) DeepCopy() *PatchRequest { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PeeringSpec) DeepCopyInto(out *PeeringSpec) { + *out = *in + if in.PeerSubnets != nil { + in, out := &in.PeerSubnets, &out.PeerSubnets + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(ClusterRef) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PeeringSpec. +func (in *PeeringSpec) DeepCopy() *PeeringSpec { + if in == nil { + return nil + } + out := new(PeeringSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PeeringStatus) DeepCopyInto(out *PeeringStatus) { *out = *in @@ -1778,23 +1843,3 @@ func (in *RedisUserStatus) DeepCopy() *RedisUserStatus { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *VPCPeeringSpec) DeepCopyInto(out *VPCPeeringSpec) { - *out = *in - if in.PeerSubnets != nil { - in, out := &in.PeerSubnets, &out.PeerSubnets - *out = make([]string, len(*in)) - copy(*out, *in) - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VPCPeeringSpec. -func (in *VPCPeeringSpec) DeepCopy() *VPCPeeringSpec { - if in == nil { - return nil - } - out := new(VPCPeeringSpec) - in.DeepCopyInto(out) - return out -} diff --git a/apis/clusters/v1beta1/cadence_types.go b/apis/clusters/v1beta1/cadence_types.go index 4c311e6a8..967e6546c 100644 --- a/apis/clusters/v1beta1/cadence_types.go +++ b/apis/clusters/v1beta1/cadence_types.go @@ -172,6 +172,21 @@ func (c *Cadence) NewPatch() client.Patch { return client.MergeFrom(old) } +func (c *Cadence) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return c.Status.DataCentres[0].ID + } + for _, cdc := range c.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} +func (c *Cadence) GetClusterID() string { + return c.Status.ID +} + func (cs *CadenceSpec) ToInstAPI(ctx context.Context, k8sClient client.Client) (*models.CadenceCluster, error) { awsArchival, err := cs.ArchivalToInstAPI(ctx, k8sClient) if err != nil { diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 12669c15d..8705a6a5e 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -85,10 +85,11 @@ type CassandraDataCentre struct { type DebeziumCassandraSpec struct { // KafkaVPCType with only VPC_PEERED supported - KafkaVPCType string `json:"kafkaVpcType"` - KafkaTopicPrefix string `json:"kafkaTopicPrefix"` - KafkaDataCentreID string `json:"kafkaCdcId"` - Version string `json:"version"` + KafkaVPCType string `json:"kafkaVpcType"` + KafkaTopicPrefix string `json:"kafkaTopicPrefix"` + KafkaDataCentreID string `json:"kafkaCdcId,omitempty"` + ClusterRef *clusterresourcesv1beta1.ClusterRef `json:"clusterRef,omitempty"` + Version string `json:"version"` } func (d *CassandraDataCentre) DebeziumToInstAPI() []*models.Debezium { @@ -104,14 +105,16 @@ func (d *CassandraDataCentre) DebeziumToInstAPI() []*models.Debezium { return instDebezium } -func (d *CassandraDataCentre) DebeziumEquals(other *CassandraDataCentre) bool { - if len(d.Debezium) != len(other.Debezium) { +func (d *CassandraDataCentre) DebeziumEquals(new *CassandraDataCentre) bool { + if len(d.Debezium) != len(new.Debezium) { return false } - for _, old := range d.Debezium { - for _, new := range other.Debezium { - if old != new { + for _, oldDbz := range d.Debezium { + for _, newDbz := range new.Debezium { + if newDbz.Version != oldDbz.Version || + newDbz.KafkaTopicPrefix != oldDbz.KafkaTopicPrefix || + newDbz.KafkaVPCType != oldDbz.KafkaVPCType { return false } } @@ -191,8 +194,11 @@ func (c *Cassandra) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.C Finalizers: []string{models.DeletionFinalizer}, }, Spec: clusterresourcesv1beta1.ClusterBackupSpec{ - ClusterID: c.Status.ID, - ClusterKind: models.CassandraClusterKind, + ClusterRef: &clusterresourcesv1beta1.ClusterRef{ + Name: c.Name, + Namespace: c.Namespace, + ClusterKind: models.CassandraClusterKind, + }, }, } } @@ -535,6 +541,18 @@ func (c *Cassandra) GetClusterID() string { return c.Status.ID } +func (c *Cassandra) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return c.Status.DataCentres[0].ID + } + for _, cdc := range c.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} + func (c *Cassandra) SetClusterID(id string) { c.Status.ID = id } diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index dfdb09061..0dcbd7319 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -484,6 +484,18 @@ func (k *Kafka) GetClusterID() string { return k.Status.ID } +func (k *Kafka) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return k.Status.DataCentres[0].ID + } + for _, cdc := range k.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} + func (k *Kafka) SetClusterID(id string) { k.Status.ID = id } diff --git a/apis/clusters/v1beta1/kafkaconnect_types.go b/apis/clusters/v1beta1/kafkaconnect_types.go index 3a9018c94..fcb500fc2 100644 --- a/apis/clusters/v1beta1/kafkaconnect_types.go +++ b/apis/clusters/v1beta1/kafkaconnect_types.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" + clusterresource "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/models" ) @@ -49,7 +50,8 @@ type ExternalCluster struct { } type ManagedCluster struct { - TargetKafkaClusterID string `json:"targetKafkaClusterId"` + TargetKafkaClusterID string `json:"targetKafkaClusterId,omitempty"` + ClusterRef *clusterresource.ClusterRef `json:"clusterRef,omitempty"` // Available options are KAFKA_VPC, VPC_PEERED, SEPARATE_VPC KafkaConnectVPCType string `json:"kafkaConnectVpcType"` @@ -118,7 +120,8 @@ type KafkaConnectSpec struct { // KafkaConnectStatus defines the observed state of KafkaConnect type KafkaConnectStatus struct { - ClusterStatus `json:",inline"` + ClusterStatus `json:",inline"` + TargetKafkaClusterID string `json:"targetKafkaClusterId,omitempty"` } //+kubebuilder:object:root=true @@ -156,6 +159,21 @@ func (k *KafkaConnect) NewPatch() client.Patch { return client.MergeFrom(old) } +func (k *KafkaConnect) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return k.Status.DataCentres[0].ID + } + for _, cdc := range k.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} +func (k *KafkaConnect) GetClusterID() string { + return k.Status.ID +} + func init() { SchemeBuilder.Register(&KafkaConnect{}, &KafkaConnectList{}) } @@ -287,9 +305,13 @@ func validateImmutableExternalClusterFields(new, old *TargetCluster) error { } func validateImmutableManagedClusterFields(new, old *TargetCluster) error { - for _, index := range new.ManagedCluster { - for _, elem := range old.ManagedCluster { - if *index != *elem { + for _, nm := range new.ManagedCluster { + for _, om := range old.ManagedCluster { + if nm.TargetKafkaClusterID != om.TargetKafkaClusterID || + nm.KafkaConnectVPCType != om.KafkaConnectVPCType || + (nm.ClusterRef != nil && om.ClusterRef == nil) || + (nm.ClusterRef == nil && om.ClusterRef != nil) || + (nm.ClusterRef != nil && *nm.ClusterRef != *om.ClusterRef) { return models.ErrImmutableManagedCluster } } @@ -507,7 +529,7 @@ func (tc *TargetCluster) AreManagedClustersEqual(mClusters []*ManagedCluster) bo for i, mCluster := range mClusters { cluster := tc.ManagedCluster[i] - if *mCluster != *cluster { + if mCluster.KafkaConnectVPCType != cluster.KafkaConnectVPCType { return false } } diff --git a/apis/clusters/v1beta1/kafkaconnect_webhook.go b/apis/clusters/v1beta1/kafkaconnect_webhook.go index 595a7d4b8..1b215f4bf 100644 --- a/apis/clusters/v1beta1/kafkaconnect_webhook.go +++ b/apis/clusters/v1beta1/kafkaconnect_webhook.go @@ -123,9 +123,16 @@ func (kcv *kafkaConnectValidator) ValidateCreate(ctx context.Context, obj runtim return fmt.Errorf("externalCluster array size must be between 0 and 1") } for _, mc := range tc.ManagedCluster { - clusterIDMatched, err := regexp.Match(models.UUIDStringRegExp, []byte(mc.TargetKafkaClusterID)) - if !clusterIDMatched || err != nil { - return fmt.Errorf("cluster ID is a UUID formated string. It must fit the pattern: %s, %v", models.UUIDStringRegExp, err) + if (mc.TargetKafkaClusterID == "" && mc.ClusterRef == nil) || + (mc.TargetKafkaClusterID != "" && mc.ClusterRef != nil) { + return fmt.Errorf("only one of dataCenter ID and cluster reference fields should be specified") + } + + if mc.TargetKafkaClusterID != "" { + clusterIDMatched, err := regexp.Match(models.UUIDStringRegExp, []byte(mc.TargetKafkaClusterID)) + if !clusterIDMatched || err != nil { + return fmt.Errorf("cluster ID is a UUID formated string. It must fit the pattern: %s, %v", models.UUIDStringRegExp, err) + } } if !validation.Contains(mc.KafkaConnectVPCType, models.KafkaConnectVPCTypes) { diff --git a/apis/clusters/v1beta1/opensearch_types.go b/apis/clusters/v1beta1/opensearch_types.go index 7a4a6a9e6..27ec1229e 100644 --- a/apis/clusters/v1beta1/opensearch_types.go +++ b/apis/clusters/v1beta1/opensearch_types.go @@ -584,8 +584,11 @@ func (os *OpenSearch) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1 Finalizers: []string{models.DeletionFinalizer}, }, Spec: clusterresourcesv1beta1.ClusterBackupSpec{ - ClusterID: os.Status.ID, - ClusterKind: models.OsClusterKind, + ClusterRef: &clusterresourcesv1beta1.ClusterRef{ + Name: os.Name, + Namespace: os.Namespace, + ClusterKind: models.OsClusterKind, + }, }, } } @@ -628,6 +631,18 @@ func (oss *OpenSearch) GetClusterID() string { return oss.Status.ID } +func (oss *OpenSearch) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return oss.Status.DataCentres[0].ID + } + for _, cdc := range oss.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} + func (oss *OpenSearch) SetClusterID(id string) { oss.Status.ID = id } diff --git a/apis/clusters/v1beta1/postgresql_types.go b/apis/clusters/v1beta1/postgresql_types.go index 02f8c8e33..aef5a67d5 100644 --- a/apis/clusters/v1beta1/postgresql_types.go +++ b/apis/clusters/v1beta1/postgresql_types.go @@ -162,8 +162,11 @@ func (pg *PostgreSQL) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1 Finalizers: []string{models.DeletionFinalizer}, }, Spec: clusterresourcesv1beta1.ClusterBackupSpec{ - ClusterID: pg.Status.ID, - ClusterKind: models.PgClusterKind, + ClusterRef: &clusterresourcesv1beta1.ClusterRef{ + Name: pg.Name, + Namespace: pg.Namespace, + ClusterKind: models.PgClusterKind, + }, }, } } @@ -184,6 +187,21 @@ func (pg *PostgreSQL) RestoreInfoToInstAPI(restoreData *PgRestoreFrom) any { return iRestore } +func (pg *PostgreSQL) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return pg.Status.DataCentres[0].ID + } + for _, cdc := range pg.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} +func (pg *PostgreSQL) GetClusterID() string { + return pg.Status.ID +} + func (pgs *PgSpec) HasRestore() bool { if pgs.PgRestoreFrom != nil && pgs.PgRestoreFrom.ClusterID != "" { return true diff --git a/apis/clusters/v1beta1/redis_types.go b/apis/clusters/v1beta1/redis_types.go index a7cf677ef..6fdc7d3d0 100644 --- a/apis/clusters/v1beta1/redis_types.go +++ b/apis/clusters/v1beta1/redis_types.go @@ -146,8 +146,11 @@ func (r *Redis) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.Clust Finalizers: []string{models.DeletionFinalizer}, }, Spec: clusterresourcesv1beta1.ClusterBackupSpec{ - ClusterID: r.Status.ID, - ClusterKind: models.RedisClusterKind, + ClusterRef: &clusterresourcesv1beta1.ClusterRef{ + Name: r.Name, + Namespace: r.Namespace, + ClusterKind: models.RedisClusterKind, + }, }, } } @@ -488,6 +491,18 @@ func (r *Redis) GetClusterID() string { return r.Status.ID } +func (r *Redis) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return r.Status.DataCentres[0].ID + } + for _, cdc := range r.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} + func (r *Redis) SetClusterID(id string) { r.Status.ID = id } diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index ff199d9cd..118b9d346 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -44,6 +44,7 @@ type DataCentre struct { } type DataCentreStatus struct { + Name string `json:"name,omitempty"` ID string `json:"id,omitempty"` Status string `json:"status,omitempty"` Nodes []*Node `json:"nodes,omitempty"` @@ -568,6 +569,7 @@ func areClusteredMaintenanceEventStatusEqual(a, b *clusterresource.MaintenanceEv func (cs *ClusterStatus) DCFromInstAPI(iDC models.DataCentre) *DataCentreStatus { return &DataCentreStatus{ + Name: iDC.Name, ID: iDC.ID, Status: iDC.Status, Nodes: cs.NodesFromInstAPI(iDC.Nodes), diff --git a/apis/clusters/v1beta1/zookeeper_types.go b/apis/clusters/v1beta1/zookeeper_types.go index 259de5100..d8e23028e 100644 --- a/apis/clusters/v1beta1/zookeeper_types.go +++ b/apis/clusters/v1beta1/zookeeper_types.go @@ -102,6 +102,22 @@ func (z *Zookeeper) FromInstAPI(iData []byte) (*Zookeeper, error) { }, nil } +func (z *Zookeeper) GetDataCentreID(cdcName string) string { + if cdcName == "" { + return z.Status.DataCentres[0].ID + } + for _, cdc := range z.Status.DataCentres { + if cdc.Name == cdcName { + return cdc.ID + } + } + return "" +} + +func (z *Zookeeper) GetClusterID() string { + return z.Status.ID +} + func (zs *ZookeeperSpec) FromInstAPI(iZook *models.ZookeeperCluster) ZookeeperSpec { return ZookeeperSpec{ Cluster: Cluster{ diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 1b6578ab6..471506b19 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -388,7 +388,9 @@ func (in *CassandraDataCentre) DeepCopyInto(out *CassandraDataCentre) { if in.Debezium != nil { in, out := &in.Debezium, &out.Debezium *out = make([]DebeziumCassandraSpec, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } @@ -824,6 +826,11 @@ func (in *DataCentreStatus) DeepCopy() *DataCentreStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DebeziumCassandraSpec) DeepCopyInto(out *DebeziumCassandraSpec) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(clusterresourcesv1beta1.ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DebeziumCassandraSpec. @@ -1337,6 +1344,11 @@ func (in *Kraft) DeepCopy() *Kraft { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ManagedCluster) DeepCopyInto(out *ManagedCluster) { *out = *in + if in.ClusterRef != nil { + in, out := &in.ClusterRef, &out.ClusterRef + *out = new(clusterresourcesv1beta1.ClusterRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagedCluster. @@ -2474,7 +2486,7 @@ func (in *TargetCluster) DeepCopyInto(out *TargetCluster) { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] *out = new(ManagedCluster) - **out = **in + (*in).DeepCopyInto(*out) } } } diff --git a/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml b/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml index 80476e1eb..451a95043 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml @@ -50,6 +50,17 @@ spec: clusterDataCenterId: description: The ID of the cluster data center type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object endPointServiceId: description: The Instaclustr ID of the AWS endpoint service type: string @@ -57,13 +68,14 @@ spec: description: The IAM Principal ARN type: string required: - - clusterDataCenterId - principalArn type: object status: description: AWSEndpointServicePrincipalStatus defines the observed state of AWSEndpointServicePrincipal properties: + cdcId: + type: string endPointServiceId: description: The Instaclustr ID of the AWS endpoint service type: string diff --git a/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml b/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml index d89b9a200..0207dc837 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml @@ -49,12 +49,22 @@ spec: properties: clusterId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object securityGroupId: type: string type: type: string required: - - clusterId - securityGroupId - type type: object @@ -62,6 +72,8 @@ spec: description: AWSSecurityGroupFirewallRuleStatus defines the observed state of AWSSecurityGroupFirewallRule properties: + clusterId: + type: string deferredReason: type: string id: diff --git a/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml b/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml index 59792b97e..3962c8022 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml @@ -47,6 +47,17 @@ spec: properties: cdcId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object peerAwsAccountId: type: string peerRegion: @@ -58,7 +69,6 @@ spec: peerVpcId: type: string required: - - cdcId - peerAwsAccountId - peerSubnets - peerVpcId @@ -66,6 +76,8 @@ spec: status: description: AWSVPCPeeringStatus defines the observed state of AWSVPCPeering properties: + cdcId: + type: string failureReason: type: string id: diff --git a/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml b/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml index 3e0e6a599..b09f8a01c 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml @@ -47,6 +47,17 @@ spec: properties: cdcId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object peerAdObjectId: type: string peerResourceGroup: @@ -60,7 +71,6 @@ spec: peerVirtualNetworkName: type: string required: - - cdcId - peerResourceGroup - peerSubnets - peerSubscriptionId @@ -69,6 +79,8 @@ spec: status: description: AzureVNetPeeringStatus defines the observed state of AzureVNetPeering properties: + cdcId: + type: string failureReason: type: string id: diff --git a/config/crd/bases/clusterresources.instaclustr.com_clusterbackups.yaml b/config/crd/bases/clusterresources.instaclustr.com_clusterbackups.yaml index e8469c39d..9d3445387 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_clusterbackups.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_clusterbackups.yaml @@ -47,15 +47,23 @@ spec: properties: clusterId: type: string - clusterKind: - type: string - required: - - clusterId - - clusterKind + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object type: object status: description: ClusterBackupStatus defines the observed state of ClusterBackup properties: + clusterId: + type: string end: type: integer operationStatus: diff --git a/config/crd/bases/clusterresources.instaclustr.com_clusternetworkfirewallrules.yaml b/config/crd/bases/clusterresources.instaclustr.com_clusternetworkfirewallrules.yaml index 5a681c77c..1d3179a4b 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_clusternetworkfirewallrules.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_clusternetworkfirewallrules.yaml @@ -49,12 +49,22 @@ spec: properties: clusterId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object network: type: string type: type: string required: - - clusterId - network - type type: object @@ -62,6 +72,8 @@ spec: description: ClusterNetworkFirewallRuleStatus defines the observed state of ClusterNetworkFirewallRule properties: + clusterId: + type: string deferredReason: type: string id: diff --git a/config/crd/bases/clusterresources.instaclustr.com_exclusionwindows.yaml b/config/crd/bases/clusterresources.instaclustr.com_exclusionwindows.yaml index f6bbc8c97..849283ef8 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_exclusionwindows.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_exclusionwindows.yaml @@ -44,6 +44,17 @@ spec: properties: clusterId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object dayOfWeek: type: string durationInHours: @@ -56,7 +67,6 @@ spec: minimum: 0 type: integer required: - - clusterId - dayOfWeek - durationInHours - startHour @@ -64,6 +74,8 @@ spec: status: description: ExclusionWindowStatus defines the observed state of ExclusionWindow properties: + clusterId: + type: string id: type: string required: diff --git a/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml b/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml index 7e4044eb7..e6443ae97 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml @@ -47,6 +47,17 @@ spec: properties: cdcId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object peerProjectId: type: string peerSubnets: @@ -56,7 +67,6 @@ spec: peerVpcNetworkName: type: string required: - - cdcId - peerProjectId - peerSubnets - peerVpcNetworkName @@ -64,6 +74,8 @@ spec: status: description: GCPVPCPeeringStatus defines the observed state of GCPVPCPeering properties: + cdcId: + type: string failureReason: type: string id: diff --git a/config/crd/bases/clusterresources.instaclustr.com_maintenanceevents.yaml b/config/crd/bases/clusterresources.instaclustr.com_maintenanceevents.yaml index e7a5ce71c..f1eac4d71 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_maintenanceevents.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_maintenanceevents.yaml @@ -35,8 +35,6 @@ spec: spec: description: MaintenanceEventsSpec defines the desired state of MaintenanceEvents properties: - clusterId: - type: string maintenanceEventsReschedule: items: properties: @@ -50,7 +48,6 @@ spec: type: object type: array required: - - clusterId - maintenanceEventsReschedule type: object status: diff --git a/config/crd/bases/clusterresources.instaclustr.com_opensearchegressrules.yaml b/config/crd/bases/clusterresources.instaclustr.com_opensearchegressrules.yaml index 10a1a7b61..a1017e92f 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_opensearchegressrules.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_opensearchegressrules.yaml @@ -44,6 +44,17 @@ spec: properties: clusterId: type: string + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object openSearchBindingId: type: string source: @@ -51,12 +62,13 @@ spec: type: type: string required: - - clusterId - openSearchBindingId - source type: object status: properties: + clusterId: + type: string id: type: string type: object diff --git a/config/crd/bases/clusters.instaclustr.com_cadences.yaml b/config/crd/bases/clusters.instaclustr.com_cadences.yaml index e29b12a9c..80e094b4d 100644 --- a/config/crd/bases/clusters.instaclustr.com_cadences.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cadences.yaml @@ -355,6 +355,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index 0b015aaba..e393d6da2 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -77,6 +77,17 @@ spec: Cassandra to the Cassandra cluster items: properties: + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object kafkaCdcId: type: string kafkaTopicPrefix: @@ -87,7 +98,6 @@ spec: version: type: string required: - - kafkaCdcId - kafkaTopicPrefix - kafkaVpcType - version @@ -302,6 +312,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml b/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml index 466d0fcec..4d7245ad1 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml @@ -265,6 +265,17 @@ spec: Cannot be provided if targeting an external cluster. items: properties: + clusterRef: + properties: + cdcName: + type: string + clusterKind: + type: string + name: + type: string + namespace: + type: string + type: object kafkaConnectVpcType: description: Available options are KAFKA_VPC, VPC_PEERED, SEPARATE_VPC @@ -273,7 +284,6 @@ spec: type: string required: - kafkaConnectVpcType - - targetKafkaClusterId type: object type: array type: object @@ -311,6 +321,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: @@ -504,6 +516,8 @@ spec: type: object state: type: string + targetKafkaClusterId: + type: string twoFactorDeleteEnabled: type: boolean type: object diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index f3c57780b..025541c8a 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -329,6 +329,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/crd/bases/clusters.instaclustr.com_opensearches.yaml b/config/crd/bases/clusters.instaclustr.com_opensearches.yaml index 1143a75b8..ed677bd58 100644 --- a/config/crd/bases/clusters.instaclustr.com_opensearches.yaml +++ b/config/crd/bases/clusters.instaclustr.com_opensearches.yaml @@ -291,6 +291,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml b/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml index 54d55add3..f65b1fd7d 100644 --- a/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml +++ b/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml @@ -282,6 +282,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/crd/bases/clusters.instaclustr.com_redis.yaml b/config/crd/bases/clusters.instaclustr.com_redis.yaml index 9fae3c6a4..069ca7989 100644 --- a/config/crd/bases/clusters.instaclustr.com_redis.yaml +++ b/config/crd/bases/clusters.instaclustr.com_redis.yaml @@ -283,6 +283,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml b/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml index ca3b9ebde..07c7cec53 100644 --- a/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml +++ b/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml @@ -148,6 +148,8 @@ spec: type: string id: type: string + name: + type: string nodeNumber: type: integer nodes: diff --git a/config/samples/clusterresources_v1beta1_awsendpointserviceprincipal.yaml b/config/samples/clusterresources_v1beta1_awsendpointserviceprincipal.yaml index 2da67d844..5ccf94768 100644 --- a/config/samples/clusterresources_v1beta1_awsendpointserviceprincipal.yaml +++ b/config/samples/clusterresources_v1beta1_awsendpointserviceprincipal.yaml @@ -4,5 +4,9 @@ metadata: name: awsendpointserviceprincipal-sample-1 spec: clusterDataCenterId: "1a2c55c7-ab88-4914-94eb-2f618809795c" + clusterRef: + name: redis-sample + namespace: default + clusterKind: Redis # endPointServiceId: "249efcdb-6538-4563-bf7a-4a2f5f90de96" principalArn: "arn:aws:iam::152668027680:role/aws-principal-test-1" \ No newline at end of file diff --git a/config/samples/clusterresources_v1beta1_awssecuritygroupfirewallrule.yaml b/config/samples/clusterresources_v1beta1_awssecuritygroupfirewallrule.yaml index d90f56e8c..1ceb84a18 100644 --- a/config/samples/clusterresources_v1beta1_awssecuritygroupfirewallrule.yaml +++ b/config/samples/clusterresources_v1beta1_awssecuritygroupfirewallrule.yaml @@ -11,4 +11,9 @@ metadata: spec: securityGroupId: sg-0d681e2d0fe0f0a39 clusterId: ef924204-3139-43e9-8e03-c29278e6eccd - type: POSTGRESQL + type: REDIS + clusterRef: + name: redis-sample + namespace: default + clusterKind: Redis + diff --git a/config/samples/clusterresources_v1beta1_awsvpcpeering.yaml b/config/samples/clusterresources_v1beta1_awsvpcpeering.yaml index b0112bdd7..7c8c79c5c 100644 --- a/config/samples/clusterresources_v1beta1_awsvpcpeering.yaml +++ b/config/samples/clusterresources_v1beta1_awsvpcpeering.yaml @@ -15,4 +15,8 @@ spec: - "192.168.0.0/16" peerVpcId: "vpc-87241ae1" peerRegion: "US_EAST_1" - cdcId: "85b26d7e-f8ff-4ce6-9fd1-b0d25e6659a9" \ No newline at end of file +# cdcId: b815c694-2f99-4b69-a315-19b958cbb52c + clusterRef: + name: kafka + namespace: default + clusterKind: Kafka \ No newline at end of file diff --git a/config/samples/clusterresources_v1beta1_azurevnetpeering.yaml b/config/samples/clusterresources_v1beta1_azurevnetpeering.yaml index 69330698e..29ec2aec5 100644 --- a/config/samples/clusterresources_v1beta1_azurevnetpeering.yaml +++ b/config/samples/clusterresources_v1beta1_azurevnetpeering.yaml @@ -3,7 +3,11 @@ kind: AzureVNetPeering metadata: name: azurevnetpeering-sample spec: - cdcId: f8581465-098c-4576-9e52-ea8308a27d8a + cdcId: a8ffb0e9-ed1c-4e56-8275-031316fdf4d4 +# clusterRef: +# name: redis-sample +# namespace: default +# clusterKind: Redis peerResourceGroup: rnd peerSubnets: - 10.224.0.0/16 diff --git a/config/samples/clusterresources_v1beta1_clusterbackup.yaml b/config/samples/clusterresources_v1beta1_clusterbackup.yaml index 5c1afa170..f409f2f48 100644 --- a/config/samples/clusterresources_v1beta1_clusterbackup.yaml +++ b/config/samples/clusterresources_v1beta1_clusterbackup.yaml @@ -3,5 +3,8 @@ kind: ClusterBackup metadata: name: clusterbackup-sample spec: - clusterId: 2ae611cf-ac91-4325-941c-a35c043f9c34 - clusterKind: PostgreSQL \ No newline at end of file + clusterId: 5b3e5e6c-1bd5-4d18-97e7-c4bafa749fc8 + clusterRef: +# name: redis-sample +# namespace: default + clusterKind: Redis diff --git a/config/samples/clusterresources_v1beta1_clusternetworkfirewallrule.yaml b/config/samples/clusterresources_v1beta1_clusternetworkfirewallrule.yaml index b475ddf1f..239215097 100644 --- a/config/samples/clusterresources_v1beta1_clusternetworkfirewallrule.yaml +++ b/config/samples/clusterresources_v1beta1_clusternetworkfirewallrule.yaml @@ -10,5 +10,9 @@ metadata: name: clusternetworkfirewallrule-sample spec: network: 62.212.64.19/32 - clusterId: 944cfe6b-441f-4c5a-865b-42fd40c7d816 - type: KAFKA +# clusterId: 59ff1cc6-9b1c-4755-85e3-9ef4d892a96e + type: KAFKA_CONNECT + clusterRef: + name: kafkaconnect-sample + namespace: default + clusterKind: KafkaConnect diff --git a/config/samples/clusterresources_v1beta1_exclusionwindow.yaml b/config/samples/clusterresources_v1beta1_exclusionwindow.yaml index 56abb7e9a..9fa3253c6 100644 --- a/config/samples/clusterresources_v1beta1_exclusionwindow.yaml +++ b/config/samples/clusterresources_v1beta1_exclusionwindow.yaml @@ -3,7 +3,11 @@ kind: ExclusionWindow metadata: name: exclusionwindow-sample spec: - clusterId: "4b453851-9002-475a-a603-f8fb1e0ae7df" + clusterId: 9b5ba158-12f5-4681-a279-df5c371b417c +# clusterRef: +# name: kafka +# namespace: default +# clusterKind: Kafka dayOfWeek: "MONDAY" startHour: 10 durationInHours: 40 diff --git a/config/samples/clusterresources_v1beta1_gcpvpcpeering.yaml b/config/samples/clusterresources_v1beta1_gcpvpcpeering.yaml index 7b6cd3318..7b38a1065 100644 --- a/config/samples/clusterresources_v1beta1_gcpvpcpeering.yaml +++ b/config/samples/clusterresources_v1beta1_gcpvpcpeering.yaml @@ -3,7 +3,11 @@ kind: GCPVPCPeering metadata: name: gcpvpcpeering-sample spec: - cdcId: ab974700-1ba9-4fcd-8399-3dc83fc2a3c3 + cdcId: 095f8410-ee09-473a-afcd-69c74271a750 + clusterRef: + name: redis-sample + namespace: default + clusterKind: Redis peerProjectId: netapp-hcl-seclab peerSubnets: - 192.168.0.0/16 diff --git a/config/samples/clusterresources_v1beta1_maintenanceevents.yaml b/config/samples/clusterresources_v1beta1_maintenanceevents.yaml index 307734950..1078a5128 100644 --- a/config/samples/clusterresources_v1beta1_maintenanceevents.yaml +++ b/config/samples/clusterresources_v1beta1_maintenanceevents.yaml @@ -3,7 +3,6 @@ kind: MaintenanceEvents metadata: name: maintenanceevents-sample spec: - clusterId: "9cf09a53-a09e-450a-ba7d-e98b3c724911" maintenanceEventsReschedule: - scheduledStartTime: "2023-11-09T04:30:00Z" maintenanceEventId: "0d25b466-bc22-44a8-b15d-8f92e815cb6e" diff --git a/config/samples/clusterresources_v1beta1_opensearchegressrules.yaml b/config/samples/clusterresources_v1beta1_opensearchegressrules.yaml index 90690e8b3..4414da57f 100644 --- a/config/samples/clusterresources_v1beta1_opensearchegressrules.yaml +++ b/config/samples/clusterresources_v1beta1_opensearchegressrules.yaml @@ -9,7 +9,11 @@ metadata: app.kubernetes.io/created-by: operator name: opensearchegressrules-sample spec: - clusterId: "5425d1ed-7a6b-44da-965e-6f085e85b6d0" - openSearchBindingId: "i9dAa4sBKX0G_sCfbpSF" +# clusterId: "5425d1ed-7a6b-44da-965e-6f085e85b6d0" + clusterRef: + name: opensearch-sample + namespace: default + clusterKind: OpenSearch + openSearchBindingId: "rJLtJYwBbvQX8Rgdz9SR" source: "NOTIFICATIONS" type: "SLACK" diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index 784b5eb55..97e8daeac 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -10,10 +10,14 @@ spec: - name: "AWS_cassandra" region: "US_EAST_1" debezium: - - kafkaVpcType: "VPC_PEERED" - kafkaTopicPrefix: "test" - kafkaCdcId: "5134aed3-7b98-4ebd-95d0-2e181bdb073b" - version: "2.0.1" +# - kafkaVpcType: "VPC_PEERED" +# kafkaTopicPrefix: "test" +# kafkaCdcId: "5134aed3-7b98-4ebd-95d0-2e181bdb073b" +# clusterRef: +# name: kafka +# namespace: default +# clusterKind: Kafka +# version: "2.0.1" cloudProvider: "AWS_VPC" continuousBackup: false nodesNumber: 2 diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index afea89747..c7527894f 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -2,11 +2,9 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Kafka metadata: name: kafka -# name: kafka-2 spec: - name: "bohdan-kafka-test" -# name: "bohdan-kafka-test-2" - version: "3.3.1" + name: "example-Kafka" + version: "3.5.1" pciCompliance: false replicationFactor: 3 partitionsNumber: 3 @@ -16,7 +14,7 @@ spec: privateNetworkCluster: false slaTier: "NON_PRODUCTION" # bundledUseOnly: true - clientBrokerAuthWithMtls: true +# clientBrokerAuthWithMtls: true # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 @@ -45,7 +43,7 @@ spec: tags: tag: "oneTag" tag2: "twoTags" - nodeSize: "KFK-DEV-t4g.small-5" + nodeSize: "KFK-DEV-t4g.medium-80" # nodeSize: "KFK-PRD-r6g.large-250" # nodeSize: "KFK-DEV-t4g.medium-80" network: "10.0.0.0/16" @@ -60,78 +58,9 @@ spec: # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 - userRefs: - - name: kafkauser-sample - namespace: default - resizeSettings: - - notifySupportContacts: false - concurrency: 1 ---- -apiVersion: clusters.instaclustr.com/v1beta1 -kind: Kafka -metadata: -# name: kafka - name: kafka-2 -spec: -# name: "bohdan-kafka-test" - name: "bohdan-kafka-test-2" - version: "3.3.1" - pciCompliance: false - replicationFactor: 3 - partitionsNumber: 3 - allowDeleteTopics: true - autoCreateTopics: true - clientToClusterEncryption: false - privateNetworkCluster: false - slaTier: "NON_PRODUCTION" - # bundledUseOnly: true - clientBrokerAuthWithMtls: true - # dedicatedZookeeper: - # - nodeSize: "KDZ-DEV-t4g.small-30" - # nodesNumber: 3 - # twoFactorDelete: - # - email: "asdfadfsdsf" - # phone: "ddsafasdf" - # karapaceSchemaRegistry: - # - version: "3.2.0" - # schemaRegistry: - # - version: "5.0.0" - # karapaceRestProxy: - # - integrateRestProxyWithSchemaRegistry: true - # version: "3.2.0" - # kraft: - # - controllerNodeCount: 3 - # restProxy: - # - integrateRestProxyWithSchemaRegistry: false - # schemaRegistryPassword: "asdfasdf" - # schemaRegistryServerUrl: "schemaRegistryServerUrl" - # "useLocalSchemaRegistry": true - # version: "5.0.0" - dataCentres: - - name: "AWS_VPC_US_EAST_1" - nodesNumber: 3 - cloudProvider: "AWS_VPC" - tags: - tag: "oneTag" - tag2: "twoTags" - nodeSize: "KFK-DEV-t4g.small-5" - # nodeSize: "KFK-PRD-r6g.large-250" - # nodeSize: "KFK-DEV-t4g.medium-80" - network: "10.0.0.0/16" - region: "US_EAST_1" - # accountName: "Custrom" - # cloudProviderSettings: - # - customVirtualNetworkId: "vpc-12345678" - # diskEncryptionKey: "123e4567-e89b-12d3-a456-426614174000" - # resourceGroup: "asdfadfsdfas" - # privateLink: - # - advertisedHostname: "kafka-example-test.com" - # dedicatedZookeeper: - # - nodeSize: "KDZ-DEV-t4g.small-30" - # nodesNumber: 3 - userRefs: - - name: kafkauser-sample - namespace: default +# userRefs: +# - name: kafkauser-sample +# namespace: default resizeSettings: - notifySupportContacts: false concurrency: 1 \ No newline at end of file diff --git a/config/samples/clusters_v1beta1_kafkaconnect.yaml b/config/samples/clusters_v1beta1_kafkaconnect.yaml index 0b6a840a0..681369028 100644 --- a/config/samples/clusters_v1beta1_kafkaconnect.yaml +++ b/config/samples/clusters_v1beta1_kafkaconnect.yaml @@ -16,10 +16,14 @@ spec: network: "10.15.0.0/16" region: "US_EAST_1" name: "Username-KC" - version: "3.1.2" + version: "3.5.1" privateNetworkCluster: false slaTier: "NON_PRODUCTION" targetCluster: - managedCluster: - - targetKafkaClusterId: "34dfc53c-c8c1-4be8-bd2f-cfdb77ec7349" +# - targetKafkaClusterId: 9b5ba158-12f5-4681-a279-df5c371b417c + - clusterRef: + name: kafka + namespace: default + clusterKind: Kafka kafkaConnectVpcType: "KAFKA_VPC" diff --git a/config/samples/clusters_v1beta1_zookeeper.yaml b/config/samples/clusters_v1beta1_zookeeper.yaml index 4fb8adbe2..9e434cdc7 100644 --- a/config/samples/clusters_v1beta1_zookeeper.yaml +++ b/config/samples/clusters_v1beta1_zookeeper.yaml @@ -15,4 +15,4 @@ spec: name: "Username-zookeeper" privateNetworkCluster: false slaTier: "NON_PRODUCTION" - version: "3.7.1" + version: "3.8.2" diff --git a/controllers/clusterresources/awsendpointserviceprincipal_controller.go b/controllers/clusterresources/awsendpointserviceprincipal_controller.go index 85f58b4b9..3f5c9b783 100644 --- a/controllers/clusterresources/awsendpointserviceprincipal_controller.go +++ b/controllers/clusterresources/awsendpointserviceprincipal_controller.go @@ -87,7 +87,29 @@ func (r *AWSEndpointServicePrincipalReconciler) Reconcile(ctx context.Context, r } func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context, l logr.Logger, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) (ctrl.Result, error) { - b, err := r.API.CreateAWSEndpointServicePrincipal(principal.Spec) + var cdcID string + var err error + if principal.Spec.ClusterRef != nil { + cdcID, err = GetDataCentreID(r.Client, ctx, principal.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get CDCID", + "Cluster reference", principal.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating AWS Endpoint Service Principal resource from the cluster reference", + "cluster reference", principal.Spec.ClusterRef, + ) + } else { + cdcID = principal.Spec.ClusterDataCenterID + l.Info( + "Creating AWS Endpoint Service Principal resource", + "principal", principal.Spec, + ) + } + + b, err := r.API.CreateAWSEndpointServicePrincipal(principal.Spec, cdcID) if err != nil { l.Error(err, "failed to create an AWS endpoint service principal resource on Instaclustr") r.EventRecorder.Eventf(principal, models.Warning, models.CreationFailed, @@ -108,6 +130,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context return ctrl.Result{}, err } + principal.Status.CDCID = cdcID err = r.Status().Patch(ctx, principal, patch) if err != nil { l.Error(err, "failed to patch an AWS endpoint service principal resource status with its ID") diff --git a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go index 742228611..0818463f1 100644 --- a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go +++ b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go @@ -82,7 +82,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) Reconcile(ctx context.Context, return r.handleDeleteFirewallRule(ctx, firewallRule, &l) case models.GenericEvent: l.Info("AWS security group firewall rule event isn't handled", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, "request", req, "event", firewallRule.Annotations[models.ResourceStateAnnotation]) @@ -98,15 +98,32 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( l *logr.Logger, ) (ctrl.Result, error) { if firewallRule.Status.ID == "" { - l.Info( - "Creating AWS security group firewall rule", - "cluster ID", firewallRule.Spec.ClusterID, - "type", firewallRule.Spec.Type, - ) - + var clusterID string + var err error + if firewallRule.Spec.ClusterRef != nil { + clusterID, err = GetClusterID(r.Client, ctx, firewallRule.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get CDCID", + "Cluster reference", firewallRule.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating AWS security group firewall rule from the cluster reference", + "cluster reference", firewallRule.Spec.ClusterRef, + "cluster ID", clusterID, + ) + } else { + clusterID = firewallRule.Spec.ClusterID + l.Info( + "Creating AWS security group firewall rule", + "cluster ID", clusterID, + "type", firewallRule.Spec.Type, + ) + } patch := firewallRule.NewPatch() - firewallRuleStatus, err := r.API.CreateFirewallRule(instaclustr.AWSSecurityGroupFirewallRuleEndpoint, &firewallRule.Spec) + firewallRuleStatus, err := r.API.CreateAWSSecurityGroupFirewallRule(&firewallRule.Spec, clusterID) if err != nil { l.Error( err, "Cannot create AWS security group firewall rule", @@ -142,7 +159,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( err = r.Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch AWS security group firewall rule", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) r.EventRecorder.Eventf( @@ -155,7 +172,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( l.Info( "AWS security group firewall rule resource has been created", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) } @@ -189,7 +206,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( err := r.Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch AWS security group firewall rule metadata", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) @@ -205,7 +222,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( err, "Cannot get AWS security group firewall rule status from the Instaclustr API", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) @@ -222,7 +239,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( if err != nil { l.Error(err, "Cannot delete AWS security group firewall rule", "rule ID", firewallRule.Status.ID, - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) @@ -245,7 +262,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( err = r.Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch AWS security group firewall rule metadata", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, "status", firewallRule.Status, ) @@ -259,7 +276,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( } l.Info("AWS security group firewall rule has been deleted", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, "status", firewallRule.Status, ) diff --git a/controllers/clusterresources/awsvpcpeering_controller.go b/controllers/clusterresources/awsvpcpeering_controller.go index 246b23c84..9d6d20d5d 100644 --- a/controllers/clusterresources/awsvpcpeering_controller.go +++ b/controllers/clusterresources/awsvpcpeering_controller.go @@ -97,14 +97,35 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( l logr.Logger, ) (ctrl.Result, error) { if aws.Status.ID == "" { - l.Info( - "Creating AWS VPC Peering resource", - "AWS Account ID", aws.Spec.PeerAWSAccountID, - "VPC ID", aws.Spec.PeerVPCID, - "Region", aws.Spec.PeerRegion, - ) + var cdcID string + var err error + if aws.Spec.ClusterRef != nil { + cdcID, err = GetDataCentreID(r.Client, ctx, aws.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get CDCID", + "Cluster reference", aws.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating AWS VPC Peering resource from the cluster reference", + "cluster reference", aws.Spec.ClusterRef, + "cdcID ID", cdcID, + "AWS Account ID", aws.Spec.PeerAWSAccountID, + "VPC ID", aws.Spec.PeerVPCID, + "Region", aws.Spec.PeerRegion, + ) + } else { + cdcID = aws.Spec.DataCentreID + l.Info( + "Creating AWS VPC Peering resource", + "AWS Account ID", aws.Spec.PeerAWSAccountID, + "VPC ID", aws.Spec.PeerVPCID, + "Region", aws.Spec.PeerRegion, + ) + } - awsStatus, err := r.API.CreatePeering(instaclustr.AWSPeeringEndpoint, &aws.Spec) + awsStatus, err := r.API.CreateAWSVPCPeering(&aws.Spec, cdcID) if err != nil { l.Error( err, "cannot create AWS VPC Peering resource", @@ -126,6 +147,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( patch := aws.NewPatch() aws.Status.PeeringStatus = *awsStatus + aws.Status.CDCID = cdcID err = r.Status().Patch(ctx, aws, patch) if err != nil { l.Error(err, "cannot patch AWS VPC Peering resource status", @@ -287,7 +309,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "AWS Account ID", aws.Spec.PeerAWSAccountID, "VPC ID", aws.Spec.PeerVPCID, "Region", aws.Spec.PeerRegion, - "AWS VPC Peering Data Centre ID", aws.Spec.DataCentreID, + "AWS VPC Peering Data Centre ID", aws.Status.CDCID, "AWS VPC Peering Status", aws.Status.PeeringStatus, ) @@ -366,7 +388,7 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( "AWS VPC Peering ID", aws.Status.ID, "VPC ID", aws.Spec.PeerVPCID, "Region", aws.Spec.PeerRegion, - "AWS VPC Peering Data Centre ID", aws.Spec.DataCentreID, + "AWS VPC Peering Data Centre ID", aws.Status.CDCID, "AWS VPC Peering Status", aws.Status.PeeringStatus, ) diff --git a/controllers/clusterresources/awsvpcpeering_controller_test.go b/controllers/clusterresources/awsvpcpeering_controller_test.go index cb5a9447a..af0c8e4eb 100644 --- a/controllers/clusterresources/awsvpcpeering_controller_test.go +++ b/controllers/clusterresources/awsvpcpeering_controller_test.go @@ -33,7 +33,7 @@ import ( var _ = Describe("Successful creation of a AWS VPC Peering resource", Ordered, func() { awsVPCPeeringSpec := v1beta1.AWSVPCPeeringSpec{ - VPCPeeringSpec: v1beta1.VPCPeeringSpec{ + PeeringSpec: v1beta1.PeeringSpec{ DataCentreID: "375e4d1c-2f77-4d02-a6f2-1af617ff2ab2", PeerSubnets: []string{"172.31.0.0/16", "192.168.0.0/16"}, }, diff --git a/controllers/clusterresources/azurevnetpeering_controller.go b/controllers/clusterresources/azurevnetpeering_controller.go index 0a21a88b4..233a22675 100644 --- a/controllers/clusterresources/azurevnetpeering_controller.go +++ b/controllers/clusterresources/azurevnetpeering_controller.go @@ -98,15 +98,36 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( l logr.Logger, ) (ctrl.Result, error) { if azure.Status.ID == "" { - l.Info( - "Creating Azure VNet Peering resource", - "Azure Subscription ID", azure.Spec.PeerSubscriptionID, - "AD Object ID", azure.Spec.PeerADObjectID, - "Resource Group", azure.Spec.PeerResourceGroup, - "Vnet Name", azure.Spec.PeerVirtualNetworkName, - ) + var cdcID string + var err error + if azure.Spec.ClusterRef != nil { + cdcID, err = GetDataCentreID(r.Client, ctx, azure.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get CDCID", + "Cluster reference", azure.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating Azure VNet Peering resource from the cluster reference", + "cluster reference", azure.Spec.ClusterRef, + "Azure Subscription ID", azure.Spec.PeerSubscriptionID, + "AD Object ID", azure.Spec.PeerADObjectID, + "Resource Group", azure.Spec.PeerResourceGroup, + "Vnet Name", azure.Spec.PeerVirtualNetworkName, + ) + } else { + cdcID = azure.Spec.DataCentreID + l.Info( + "Creating Azure VNet Peering resource", + "Azure Subscription ID", azure.Spec.PeerSubscriptionID, + "AD Object ID", azure.Spec.PeerADObjectID, + "Resource Group", azure.Spec.PeerResourceGroup, + "Vnet Name", azure.Spec.PeerVirtualNetworkName, + ) + } - azureStatus, err := r.API.CreatePeering(instaclustr.AzurePeeringEndpoint, &azure.Spec) + azureStatus, err := r.API.CreateAzureVNetPeering(&azure.Spec, cdcID) if err != nil { l.Error( err, "cannot create Azure VNet Peering resource", @@ -128,6 +149,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( patch := azure.NewPatch() azure.Status.PeeringStatus = *azureStatus + azure.Status.CDCID = cdcID err = r.Status().Patch(ctx, azure, patch) if err != nil { l.Error(err, "cannot patch Azure VNet Peering resource status", diff --git a/controllers/clusterresources/azurevnetpeering_controller_test.go b/controllers/clusterresources/azurevnetpeering_controller_test.go index aa332b01d..7fbc59764 100644 --- a/controllers/clusterresources/azurevnetpeering_controller_test.go +++ b/controllers/clusterresources/azurevnetpeering_controller_test.go @@ -33,7 +33,7 @@ import ( var _ = Describe("AzureVNetPeering controller", Ordered, func() { azureVNetPeeringSpec := v1beta1.AzureVNetPeeringSpec{ - VPCPeeringSpec: v1beta1.VPCPeeringSpec{ + PeeringSpec: v1beta1.PeeringSpec{ DataCentreID: "375e4d1c-2f77-4d02-a6f2-1af617ff2ab2", PeerSubnets: []string{"172.31.0.0/16", "192.168.0.0/16"}, }, diff --git a/controllers/clusterresources/clusterbackup_controller.go b/controllers/clusterresources/clusterbackup_controller.go index db42d76d2..88ae20cb0 100644 --- a/controllers/clusterresources/clusterbackup_controller.go +++ b/controllers/clusterresources/clusterbackup_controller.go @@ -77,13 +77,40 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } + var clusterID string + if backup.Spec.ClusterRef.Name != "" { + clusterID, err = GetClusterID(r.Client, ctx, backup.Spec.ClusterRef) + if err != nil { + logger.Error(err, "Cannot get ClusterID", + "Cluster reference", backup.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + } else { + clusterID = backup.Spec.ClusterID + } + patch := backup.NewPatch() + backup.Status.ClusterID = clusterID + err = r.Status().Patch(ctx, backup, patch) + if err != nil { + logger.Error(err, "Cannot patch cluster backup resource status", + "backup name", backup.Name, + ) + + r.EventRecorder.Eventf( + backup, models.Warning, models.PatchFailed, + "Resource status patch is failed. Reason: %v", + err, + ) + return ctrl.Result{}, err + } - if backup.Labels[models.ClusterIDLabel] != backup.Spec.ClusterID { + if backup.Labels[models.ClusterIDLabel] != backup.Status.ClusterID { if backup.Labels == nil { - backup.Labels = map[string]string{models.ClusterIDLabel: backup.Spec.ClusterID} + backup.Labels = map[string]string{models.ClusterIDLabel: backup.Status.ClusterID} } else { - backup.Labels[models.ClusterIDLabel] = backup.Spec.ClusterID + backup.Labels[models.ClusterIDLabel] = backup.Status.ClusterID } err = r.Patch(ctx, backup, patch) if err != nil { @@ -100,11 +127,11 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } - backupsList, err := r.listClusterBackups(ctx, backup.Spec.ClusterID, backup.Namespace) + backupsList, err := r.listClusterBackups(ctx, backup.Status.ClusterID, backup.Namespace) if err != nil { logger.Error(err, "Cannot get cluster backups", "backup name", backup.Name, - "cluster ID", backup.Spec.ClusterID, + "cluster ID", backup.Status.ClusterID, ) r.EventRecorder.Eventf( @@ -115,16 +142,16 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - clusterKind := models.ClusterKindsMap[backup.Spec.ClusterKind] - if backup.Spec.ClusterKind == models.PgClusterKind { + clusterKind := models.ClusterKindsMap[backup.Spec.ClusterRef.ClusterKind] + if backup.Spec.ClusterRef.ClusterKind == models.PgClusterKind { clusterKind = models.PgAppKind } - iBackup, err := r.API.GetClusterBackups(backup.Spec.ClusterID, clusterKind) + iBackup, err := r.API.GetClusterBackups(backup.Status.ClusterID, clusterKind) if err != nil { logger.Error(err, "Cannot get cluster backups from Instaclustr", "backup name", backup.Name, - "cluster ID", backup.Spec.ClusterID, + "cluster ID", backup.Status.ClusterID, ) r.EventRecorder.Eventf( @@ -135,14 +162,14 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - iBackupEvents := iBackup.GetBackupEvents(backup.Spec.ClusterKind) + iBackupEvents := iBackup.GetBackupEvents(backup.Spec.ClusterRef.ClusterKind) if len(iBackupEvents) < len(backupsList.Items) { - err = r.API.TriggerClusterBackup(backup.Spec.ClusterID, models.ClusterKindsMap[backup.Spec.ClusterKind]) + err = r.API.TriggerClusterBackup(backup.Status.ClusterID, models.ClusterKindsMap[backup.Spec.ClusterRef.ClusterKind]) if err != nil { logger.Error(err, "Cannot trigger cluster backup", "backup name", backup.Name, - "cluster ID", backup.Spec.ClusterID, + "cluster ID", backup.Status.ClusterID, ) r.EventRecorder.Eventf( @@ -158,7 +185,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Resource creation request is sent", ) logger.Info("New cluster backup request was sent", - "cluster ID", backup.Spec.ClusterID, + "cluster ID", backup.Status.ClusterID, ) } @@ -216,7 +243,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques logger.Info("Cluster backup resource was reconciled", "backup name", backup.Name, - "cluster ID", backup.Spec.ClusterID, + "cluster ID", backup.Status.ClusterID, ) return ctrl.Result{}, nil diff --git a/controllers/clusterresources/clusternetworkfirewallrule_controller.go b/controllers/clusterresources/clusternetworkfirewallrule_controller.go index fa145717c..287cec570 100644 --- a/controllers/clusterresources/clusternetworkfirewallrule_controller.go +++ b/controllers/clusterresources/clusternetworkfirewallrule_controller.go @@ -88,7 +88,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) Reconcile(ctx context.Context, re return r.HandleDeleteFirewallRule(ctx, firewallRule, &l) case models.GenericEvent: l.Info("Cluster network firewall rule event isn't handled", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, "request", req, "event", firewallRule.Annotations[models.ResourceStateAnnotation]) @@ -104,15 +104,32 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( l *logr.Logger, ) (ctrl.Result, error) { if firewallRule.Status.ID == "" { - l.Info( - "Creating cluster network firewall rule", - "cluster ID", firewallRule.Spec.ClusterID, - "type", firewallRule.Spec.Type, - ) + var clusterID string + var err error + if firewallRule.Spec.ClusterRef != nil { + clusterID, err = GetClusterID(r.Client, ctx, firewallRule.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get cluster ID", + "Cluster reference", firewallRule.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating cluster network firewall rule from the cluster reference", + "cluster reference", firewallRule.Spec.ClusterRef, + "cluster ID", clusterID, + ) + } else { + clusterID = firewallRule.Spec.ClusterID + l.Info( + "Creating cluster network firewall rule", + "cluster ID", clusterID, + "type", firewallRule.Spec.Type, + ) + } patch := firewallRule.NewPatch() - - firewallRuleStatus, err := r.API.CreateFirewallRule(instaclustr.ClusterNetworkFirewallRuleEndpoint, &firewallRule.Spec) + firewallRuleStatus, err := r.API.CreateClusterNetworkFirewallRule(&firewallRule.Spec, clusterID) if err != nil { l.Error( err, "Cannot create cluster network firewall rule", @@ -132,6 +149,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( ) firewallRule.Status.FirewallRuleStatus = *firewallRuleStatus + firewallRule.Status.ClusterID = clusterID err = r.Status().Patch(ctx, firewallRule, patch) if err != nil { @@ -150,7 +168,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( err = r.Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch cluster network firewall rule", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) r.EventRecorder.Eventf( @@ -163,7 +181,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( l.Info( "Cluster network firewall rule resource has been created", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) } @@ -194,7 +212,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleUpdateFirewallRule( l *logr.Logger, ) (ctrl.Result, error) { l.Info("Cluster network firewall rule update is not implemented", - "firewall rule ID", firewallRule.Spec.ClusterID, + "firewall rule ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) @@ -210,7 +228,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( err := r.Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch cluster network firewall rule metadata", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) r.EventRecorder.Eventf( @@ -225,7 +243,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( err, "Cannot get cluster network firewall rule status from the Instaclustr API", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) r.EventRecorder.Eventf( @@ -241,7 +259,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( if err != nil { l.Error(err, "Cannot delete cluster network firewall rule", "rule ID", firewallRule.Status.ID, - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, ) r.EventRecorder.Eventf( @@ -264,7 +282,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( err = r.Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch cluster network firewall rule metadata", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, "status", firewallRule.Status, ) @@ -277,7 +295,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( } l.Info("Cluster network firewall rule has been deleted", - "cluster ID", firewallRule.Spec.ClusterID, + "cluster ID", firewallRule.Status.ClusterID, "type", firewallRule.Spec.Type, "status", firewallRule.Status, ) diff --git a/controllers/clusterresources/exclusionwindow_controller.go b/controllers/clusterresources/exclusionwindow_controller.go index c46b07f7e..411d75af0 100644 --- a/controllers/clusterresources/exclusionwindow_controller.go +++ b/controllers/clusterresources/exclusionwindow_controller.go @@ -82,7 +82,7 @@ func (r *ExclusionWindowReconciler) Reconcile(ctx context.Context, req ctrl.Requ return r.handleDeleteWindow(ctx, ew, l) default: l.Info("event isn't handled", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, "Request", req, "event", ew.Annotations[models.ResourceStateAnnotation]) @@ -96,13 +96,31 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( l logr.Logger, ) (ctrl.Result, error) { if ew.Status.ID == "" { - l.Info( - "Creating Exclusion Window resource", - "Cluster ID", ew.Spec.ClusterID, - "Exclusion Window Spec", ew.Spec, - ) + var clusterID string + var err error + if ew.Spec.ClusterRef != nil { + clusterID, err = GetClusterID(r.Client, ctx, ew.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get ClusterID", + "Cluster reference", ew.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating Exclusion Window resource from the cluster reference", + "Cluster ID", ew.Status.ClusterID, + "Exclusion Window Spec", ew.Spec, + ) + } else { + clusterID = ew.Spec.ClusterID + l.Info( + "Creating Exclusion Window resource", + "Cluster ID", ew.Status.ClusterID, + "Exclusion Window Spec", ew.Spec, + ) + } - id, err := r.API.CreateExclusionWindow(ew.Spec.ClusterID, &ew.Spec) + id, err := r.API.CreateExclusionWindow(clusterID, &ew.Spec) if err != nil { l.Error( err, "cannot create Exclusion Window resource", @@ -123,10 +141,11 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( patch := ew.NewPatch() ew.Status.ID = id + ew.Status.ClusterID = clusterID err = r.Status().Patch(ctx, ew, patch) if err != nil { l.Error(err, "cannot patch Exclusion Window resource status after creation", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, "Exclusion Window metadata", ew.ObjectMeta, ) @@ -143,7 +162,7 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( err = r.Patch(ctx, ew, patch) if err != nil { l.Error(err, "cannot patch Exclusion Window resource metadata with created event", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, "Exclusion Window metadata", ew.ObjectMeta, ) @@ -157,7 +176,7 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( l.Info( "Exclusion Window resource was created", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, ) } @@ -174,7 +193,7 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( err, "cannot get Exclusion Window status from the Instaclustr API", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, ) r.EventRecorder.Eventf( @@ -187,9 +206,9 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( if status != "" { err = r.API.DeleteExclusionWindow(ew.Status.ID) - if err != nil { + if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error(err, "cannot delete Exclusion Window resource", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, "Exclusion Window metadata", ew.ObjectMeta, ) @@ -212,7 +231,7 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( err = r.Patch(ctx, ew, patch) if err != nil { l.Error(err, "cannot patch Exclusion Window resource metadata with deleted event", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, "Exclusion Window metadata", ew.ObjectMeta, ) @@ -225,7 +244,7 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( } l.Info("Exclusion Window has been deleted", - "Cluster ID", ew.Spec.ClusterID, + "Cluster ID", ew.Status.ClusterID, "Exclusion Window Spec", ew.Spec, "Exclusion Window Status", ew.Status, ) diff --git a/controllers/clusterresources/gcpvpcpeering_controller.go b/controllers/clusterresources/gcpvpcpeering_controller.go index e8c16c5cc..61b5b7eab 100644 --- a/controllers/clusterresources/gcpvpcpeering_controller.go +++ b/controllers/clusterresources/gcpvpcpeering_controller.go @@ -96,13 +96,32 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( l logr.Logger, ) (ctrl.Result, error) { if gcp.Status.ID == "" { - l.Info( - "Creating GCP VPC Peering resource", - "project ID", gcp.Spec.PeerProjectID, - "network name", gcp.Spec.PeerVPCNetworkName, - ) + var cdcID string + var err error + if gcp.Spec.ClusterRef != nil { + cdcID, err = GetDataCentreID(r.Client, ctx, gcp.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get CDCID", + "Cluster reference", gcp.Spec.ClusterRef, + ) + return ctrl.Result{}, err + } + l.Info( + "Creating GCP VPC Peering resource from the cluster reference", + "cluster reference", gcp.Spec.ClusterRef, + "project ID", gcp.Spec.PeerProjectID, + "network name", gcp.Spec.PeerVPCNetworkName, + ) + } else { + cdcID = gcp.Spec.DataCentreID + l.Info( + "Creating GCP VPC Peering resource", + "project ID", gcp.Spec.PeerProjectID, + "network name", gcp.Spec.PeerVPCNetworkName, + ) + } - gcpStatus, err := r.API.CreatePeering(instaclustr.GCPPeeringEndpoint, &gcp.Spec) + gcpStatus, err := r.API.CreateGCPVPCPeering(&gcp.Spec, cdcID) if err != nil { l.Error( err, "Cannot create GCP VPC Peering resource", @@ -124,6 +143,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( patch := gcp.NewPatch() gcp.Status.PeeringStatus = *gcpStatus + gcp.Status.CDCID = cdcID err = r.Status().Patch(ctx, gcp, patch) if err != nil { l.Error(err, "Cannot patch GCP VPC Peering resource status", @@ -260,7 +280,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( "id", gcp.Status.ID, "project ID", gcp.Spec.PeerProjectID, "network name", gcp.Spec.PeerVPCNetworkName, - "data centre ID", gcp.Spec.DataCentreID, + "data centre ID", gcp.Status.CDCID, "status", gcp.Status.PeeringStatus, ) diff --git a/controllers/clusterresources/gcpvpcpeering_controller_test.go b/controllers/clusterresources/gcpvpcpeering_controller_test.go index 46c72c020..5afe6b469 100644 --- a/controllers/clusterresources/gcpvpcpeering_controller_test.go +++ b/controllers/clusterresources/gcpvpcpeering_controller_test.go @@ -33,7 +33,7 @@ import ( var _ = Describe("Successful creation of a GCP VPC Peering resource", Ordered, func() { gcpVPCPeeringSpec := v1beta1.GCPVPCPeeringSpec{ - VPCPeeringSpec: v1beta1.VPCPeeringSpec{ + PeeringSpec: v1beta1.PeeringSpec{ DataCentreID: "375e4d1c-2f77-4d02-a6f2-1af617ff2ab2", PeerSubnets: []string{"172.31.0.0/16", "192.168.0.0/16"}, }, diff --git a/controllers/clusterresources/helpers.go b/controllers/clusterresources/helpers.go index 9a764ac86..bbd41807e 100644 --- a/controllers/clusterresources/helpers.go +++ b/controllers/clusterresources/helpers.go @@ -17,10 +17,16 @@ limitations under the License. package clusterresources import ( + "context" + + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/strings/slices" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" + clustersv1beta1 "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" + "github.com/instaclustr/operator/pkg/models" ) func areFirewallRuleStatusesEqual(a, b *v1beta1.FirewallRuleStatus) bool { @@ -94,3 +100,80 @@ func subnetsEqual(subnets1, subnets2 []string) bool { return true } + +type ClusterIDProvider interface { + client.Object + GetClusterID() string + GetDataCentreID(cdcName string) string +} + +func GetDataCentreID(cl client.Client, ctx context.Context, ref *v1beta1.ClusterRef) (string, error) { + var obj ClusterIDProvider + ns := types.NamespacedName{ + Namespace: ref.Namespace, + Name: ref.Name, + } + + switch ref.ClusterKind { + case models.RedisClusterKind: + obj = &clustersv1beta1.Redis{} + case models.OpenSearchKind: + obj = &clustersv1beta1.OpenSearch{} + case models.KafkaKind: + obj = &clustersv1beta1.Kafka{} + case models.CassandraKind: + obj = &clustersv1beta1.Cassandra{} + case models.PgClusterKind: + obj = &clustersv1beta1.PostgreSQL{} + case models.ZookeeperClusterKind: + obj = &clustersv1beta1.Zookeeper{} + case models.CadenceClusterKind: + obj = &clustersv1beta1.Cadence{} + case models.KafkaConnectClusterKind: + obj = &clustersv1beta1.KafkaConnect{} + default: + return "", models.ErrUnsupportedClusterKind + } + err := cl.Get(ctx, ns, obj) + if err != nil { + return "", err + } + + return obj.GetDataCentreID(ref.CDCName), nil +} + +func GetClusterID(cl client.Client, ctx context.Context, ref *v1beta1.ClusterRef) (string, error) { + var obj ClusterIDProvider + ns := types.NamespacedName{ + Namespace: ref.Namespace, + Name: ref.Name, + } + + switch ref.ClusterKind { + case models.RedisClusterKind: + obj = &clustersv1beta1.Redis{} + case models.OpenSearchKind: + obj = &clustersv1beta1.OpenSearch{} + case models.KafkaKind: + obj = &clustersv1beta1.Kafka{} + case models.CassandraKind: + obj = &clustersv1beta1.Cassandra{} + case models.PgClusterKind: + obj = &clustersv1beta1.PostgreSQL{} + case models.ZookeeperClusterKind: + obj = &clustersv1beta1.Zookeeper{} + case models.CadenceClusterKind: + obj = &clustersv1beta1.Cadence{} + case models.KafkaConnectClusterKind: + obj = &clustersv1beta1.KafkaConnect{} + default: + return "", models.ErrUnsupportedClusterKind + } + + err := cl.Get(ctx, ns, obj) + if err != nil { + return "", err + } + + return obj.GetClusterID(), nil +} diff --git a/controllers/clusterresources/opensearchegressrules_controller.go b/controllers/clusterresources/opensearchegressrules_controller.go index 697ef82ee..e357fca6a 100644 --- a/controllers/clusterresources/opensearchegressrules_controller.go +++ b/controllers/clusterresources/opensearchegressrules_controller.go @@ -104,7 +104,32 @@ func (r *OpenSearchEgressRulesReconciler) handleCreate(ctx context.Context, l lo patch := rule.NewPatch() if rule.Status.ID == "" { - rule.Status.ID = fmt.Sprintf("%s~%s~%s", rule.Spec.ClusterID, rule.Spec.Source, rule.Spec.OpenSearchBindingID) + var clusterID string + var err error + if rule.Spec.ClusterRef != nil { + clusterID, err = GetClusterID(r.Client, ctx, rule.Spec.ClusterRef) + if err != nil { + l.Error(err, "Cannot get cluster ID", + "Cluster reference", rule.Spec.ClusterRef, + ) + return err + } + l.Info( + "Creating OpenSearch Egress rule from the cluster reference", + "cluster reference", rule.Spec.ClusterRef, + "cluster ID", clusterID, + ) + } else { + clusterID = rule.Spec.ClusterID + l.Info( + "Creating OpenSearch Egress rule", + "cluster ID", clusterID, + "type", rule.Spec.Type, + ) + } + + rule.Status.ID = fmt.Sprintf("%s~%s~%s", clusterID, rule.Spec.Source, rule.Spec.OpenSearchBindingID) + rule.Status.ClusterID = clusterID } _, err := r.API.GetOpenSearchEgressRule(rule.Status.ID) diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index a1230b7b7..967b4ea15 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -38,6 +38,7 @@ import ( clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/apis/clusters/v1beta1" + "github.com/instaclustr/operator/controllers/clusterresources" "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" @@ -155,8 +156,23 @@ func (r *CassandraReconciler) handleCreateCluster( "cluster name", c.Spec.Name, "data centres", c.Spec.DataCentres, ) + iCassandraSpec := c.Spec.ToInstAPI() + for i, dc := range c.Spec.DataCentres { + for j, debezium := range dc.Debezium { + if debezium.ClusterRef != nil { + cdcID, err := clusterresources.GetDataCentreID(r.Client, ctx, debezium.ClusterRef) + if err != nil { + l.Error(err, "Cannot get cluster ID", + "Cluster reference", debezium.ClusterRef, + ) + return ctrl.Result{}, err + } + iCassandraSpec.DataCentres[i].Debezium[j].KafkaDataCentreID = cdcID + } + } + } - id, err = r.API.CreateCluster(instaclustr.CassandraEndpoint, c.Spec.ToInstAPI()) + id, err = r.API.CreateCluster(instaclustr.CassandraEndpoint, iCassandraSpec) if err != nil { l.Error( err, "Cannot create cluster", diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index 423688fdd..e3772da95 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusters/v1beta1" + "github.com/instaclustr/operator/controllers/clusterresources" "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" @@ -102,14 +103,41 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 l = l.WithName("Creation Event") if kc.Status.ID == "" { - l.Info("Creating Kafka Connect cluster", - "cluster name", kc.Spec.Name, - "data centres", kc.Spec.DataCentres) - - patch := kc.NewPatch() var err error + iKafkaConnectSpec := kc.Spec.ToInstAPI() + var targetClusterID string + + for i, targetCluster := range kc.Spec.TargetCluster { + for j, managedCluster := range targetCluster.ManagedCluster { + if managedCluster.ClusterRef != nil { + targetClusterID, err = clusterresources.GetClusterID(r.Client, ctx, managedCluster.ClusterRef) + if err != nil { + l.Error(err, "Cannot get cluster ID", + "Cluster reference", managedCluster.ClusterRef, + ) + return ctrl.Result{}, err + } + + iKafkaConnectSpec.TargetCluster[i].ManagedCluster[j].TargetKafkaClusterID = targetClusterID + l.Info( + "Creating KafkaConnect cluster from cluster reference", + "cluster reference", managedCluster.ClusterRef, + "cluster ID", targetClusterID, + ) + } else { + targetClusterID = managedCluster.TargetKafkaClusterID + l.Info( + "Creating Kafka Connect cluster", + "cluster name", kc.Spec.Name, + "cluster ID", targetClusterID, + "data centres", kc.Spec.DataCentres, + ) + } + } + } - kc.Status.ID, err = r.API.CreateCluster(instaclustr.KafkaConnectEndpoint, kc.Spec.ToInstAPI()) + patch := kc.NewPatch() + kc.Status.ID, err = r.API.CreateCluster(instaclustr.KafkaConnectEndpoint, iKafkaConnectSpec) if err != nil { l.Error(err, "cannot create Kafka Connect in Instaclustr", "Kafka Connect manifest", kc.Spec) r.EventRecorder.Eventf( @@ -119,6 +147,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 ) return reconcile.Result{}, err } + kc.Status.TargetKafkaClusterID = targetClusterID r.EventRecorder.Eventf( kc, models.Normal, models.Created, diff --git a/controllers/clusters/user.go b/controllers/clusters/user.go index d4eb3e7b2..5cfad598b 100644 --- a/controllers/clusters/user.go +++ b/controllers/clusters/user.go @@ -16,13 +16,14 @@ type userObject interface { SetClusterEvents(events map[string]string) } -type clusterObject interface { +type ClusterObject interface { Object GetUserRefs() v1beta1.References SetUserRefs(refs v1beta1.References) GetAvailableUsers() v1beta1.References SetAvailableUsers(users v1beta1.References) GetClusterID() string + GetDataCentreID(cdcName string) string SetClusterID(id string) } @@ -37,7 +38,7 @@ func handleUsersChanges( ctx context.Context, c client.Client, userFactory userResourceFactory, - cluster clusterObject, + cluster ClusterObject, ) error { l := log.FromContext(ctx).V(1) @@ -107,7 +108,7 @@ func detachUsers( ctx context.Context, c client.Client, userFactory userResourceFactory, - cluster clusterObject, + cluster ClusterObject, ) error { l := log.FromContext(ctx).V(1) diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index 371c0a220..80a0ba746 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -598,14 +598,74 @@ func (c *Client) GetPeeringStatus(peerID, return &peeringStatus, nil } -func (c *Client) CreatePeering(url string, peeringSpec any) (*clusterresourcesv1beta1.PeeringStatus, error) { +func (c *Client) CreateAzureVNetPeering(peeringSpec *clusterresourcesv1beta1.AzureVNetPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) { + payload := &struct { + PeerSubnets []string `json:"peerSubnets"` + PeerResourceGroup string `json:"peerResourceGroup"` + PeerSubscriptionID string `json:"peerSubscriptionId"` + PeerADObjectID string `json:"peerAdObjectId,omitempty"` + PeerVirtualNetworkName string `json:"peerVirtualNetworkName"` + CDCID string `json:"cdcId"` + }{ + PeerSubnets: peeringSpec.PeerSubnets, + PeerResourceGroup: peeringSpec.PeerResourceGroup, + PeerADObjectID: peeringSpec.PeerADObjectID, + PeerSubscriptionID: peeringSpec.PeerSubscriptionID, + PeerVirtualNetworkName: peeringSpec.PeerVirtualNetworkName, + CDCID: cdcId, + } - jsonDataCreate, err := json.Marshal(peeringSpec) + jsonDataCreate, err := json.Marshal(payload) if err != nil { return nil, err } - url = c.serverHostname + url + url := c.serverHostname + AzurePeeringEndpoint + resp, err := c.DoRequest(url, http.MethodPost, jsonDataCreate) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) + } + + var creationResponse *clusterresourcesv1beta1.PeeringStatus + err = json.Unmarshal(body, &creationResponse) + if err != nil { + return nil, err + } + + return creationResponse, nil +} + +func (c *Client) CreateAWSVPCPeering(peeringSpec *clusterresourcesv1beta1.AWSVPCPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) { + payload := &struct { + PeerSubnets []string `json:"peerSubnets"` + PeerAWSAccountID string `json:"peerAwsAccountId"` + PeerVPCID string `json:"peerVpcId"` + PeerRegion string `json:"peerRegion,omitempty"` + CDCID string `json:"cdcId"` + }{ + PeerSubnets: peeringSpec.PeerSubnets, + PeerAWSAccountID: peeringSpec.PeerAWSAccountID, + PeerVPCID: peeringSpec.PeerVPCID, + PeerRegion: peeringSpec.PeerRegion, + CDCID: cdcId, + } + + jsonDataCreate, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + url := c.serverHostname + AWSPeeringEndpoint resp, err := c.DoRequest(url, http.MethodPost, jsonDataCreate) if err != nil { return nil, err @@ -630,6 +690,45 @@ func (c *Client) CreatePeering(url string, peeringSpec any) (*clusterresourcesv1 return creationResponse, nil } +func (c *Client) CreateGCPVPCPeering(peeringSpec *clusterresourcesv1beta1.GCPVPCPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) { + payload := &struct { + PeerSubnets []string `json:"peerSubnets"` + PeerVPCNetworkName string `json:"peerVpcNetworkName"` + PeerProjectID string `json:"peerProjectId"` + CDCID string `json:"cdcId"` + }{ + PeerSubnets: peeringSpec.PeerSubnets, + PeerVPCNetworkName: peeringSpec.PeerVPCNetworkName, + PeerProjectID: peeringSpec.PeerProjectID, + CDCID: cdcId, + } + + jsonDataCreate, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + url := c.serverHostname + GCPPeeringEndpoint + resp, err := c.DoRequest(url, http.MethodPost, jsonDataCreate) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) + } + var creationResponse *clusterresourcesv1beta1.PeeringStatus + err = json.Unmarshal(body, &creationResponse) + if err != nil { + return nil, err + } + return creationResponse, nil +} + func (c *Client) UpdatePeering(peerID, peeringEndpoint string, peerSpec any, @@ -718,16 +817,66 @@ func (c *Client) GetFirewallRuleStatus( return firewallRuleStatus, nil } -func (c *Client) CreateFirewallRule( - url string, - firewallRuleSpec any, +func (c *Client) CreateClusterNetworkFirewallRule( + firewallRuleSpec *clusterresourcesv1beta1.ClusterNetworkFirewallRuleSpec, + clusterID string, ) (*clusterresourcesv1beta1.FirewallRuleStatus, error) { - jsonFirewallRule, err := json.Marshal(firewallRuleSpec) + payload := &struct { + ClusterID string `json:"clusterId"` + Type string `json:"type"` + Network string `json:"network"` + }{ + ClusterID: clusterID, + Type: firewallRuleSpec.Type, + Network: firewallRuleSpec.Network, + } + + jsonFirewallRule, err := json.Marshal(payload) if err != nil { return nil, err } - url = c.serverHostname + url + url := c.serverHostname + ClusterNetworkFirewallRuleEndpoint + resp, err := c.DoRequest(url, http.MethodPost, jsonFirewallRule) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) + } + var creationResponse *clusterresourcesv1beta1.FirewallRuleStatus + err = json.Unmarshal(body, &creationResponse) + if err != nil { + return nil, err + } + return creationResponse, nil +} + +func (c *Client) CreateAWSSecurityGroupFirewallRule( + firewallRuleSpec *clusterresourcesv1beta1.AWSSecurityGroupFirewallRuleSpec, + clusterID string, +) (*clusterresourcesv1beta1.FirewallRuleStatus, error) { + payload := &struct { + SecurityGroupID string `json:"securityGroupId"` + ClusterID string `json:"clusterId,omitempty"` + Type string `json:"type"` + }{ + SecurityGroupID: firewallRuleSpec.SecurityGroupID, + ClusterID: clusterID, + Type: firewallRuleSpec.Type, + } + + jsonFirewallRule, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + url := c.serverHostname + AWSSecurityGroupFirewallRuleEndpoint resp, err := c.DoRequest(url, http.MethodPost, jsonFirewallRule) if err != nil { return nil, err @@ -2220,10 +2369,20 @@ func (c *Client) GetAWSEndpointServicePrincipal(principalID string) (*models.AWS return &principal, nil } -func (c *Client) CreateAWSEndpointServicePrincipal(spec any) ([]byte, error) { +func (c *Client) CreateAWSEndpointServicePrincipal(spec clusterresourcesv1beta1.AWSEndpointServicePrincipalSpec, CDCID string) ([]byte, error) { + payload := &struct { + ClusterDataCenterID string `json:"clusterDataCenterId"` + EndPointServiceID string `json:"endPointServiceId,omitempty"` + PrincipalARN string `json:"principalArn"` + }{ + ClusterDataCenterID: CDCID, + EndPointServiceID: spec.EndPointServiceID, + PrincipalARN: spec.PrincipalARN, + } + url := c.serverHostname + AWSEndpointServicePrincipalEndpoint - b, err := json.Marshal(spec) + b, err := json.Marshal(payload) if err != nil { return nil, err } @@ -2330,10 +2489,22 @@ func (c *Client) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) } func (c *Client) CreateOpenSearchEgressRules(rule *clusterresourcesv1beta1.OpenSearchEgressRules) (string, error) { + payload := &struct { + ClusterID string `json:"clusterId,omitempty"` + OpenSearchBindingID string `json:"openSearchBindingId"` + Source string `json:"source"` + Type string `json:"type,omitempty"` + }{ + ClusterID: rule.Status.ClusterID, + OpenSearchBindingID: rule.Spec.OpenSearchBindingID, + Source: rule.Spec.Source, + Type: rule.Spec.Type, + } + url := c.serverHostname + OpenSearchEgressRulesEndpoint status := &clusterresourcesv1beta1.OpenSearchEgressRulesStatus{} - b, err := json.Marshal(rule.Spec) + b, err := json.Marshal(payload) if err != nil { return "", err } diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index bd6611015..0f63517fe 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -37,9 +37,12 @@ type API interface { GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) UpdatePeering(peerID, peeringEndpoint string, peerSpec any) error DeletePeering(peerID, peeringEndpoint string) error - CreatePeering(url string, peeringSpec any) (*clusterresourcesv1beta1.PeeringStatus, error) + CreateAzureVNetPeering(peeringSpec *clusterresourcesv1beta1.AzureVNetPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) + CreateGCPVPCPeering(peeringSpec *clusterresourcesv1beta1.GCPVPCPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) + CreateAWSVPCPeering(peeringSpec *clusterresourcesv1beta1.AWSVPCPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) GetFirewallRuleStatus(firewallRuleID string, firewallRuleEndpoint string) (*clusterresourcesv1beta1.FirewallRuleStatus, error) - CreateFirewallRule(url string, firewallRuleSpec any) (*clusterresourcesv1beta1.FirewallRuleStatus, error) + CreateAWSSecurityGroupFirewallRule(firewallRuleSpec *clusterresourcesv1beta1.AWSSecurityGroupFirewallRuleSpec, clusterID string) (*clusterresourcesv1beta1.FirewallRuleStatus, error) + CreateClusterNetworkFirewallRule(firewallRuleSpec *clusterresourcesv1beta1.ClusterNetworkFirewallRuleSpec, clusterID string) (*clusterresourcesv1beta1.FirewallRuleStatus, error) DeleteFirewallRule(firewallRuleID string, firewallRuleEndpoint string) error CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (*kafkamanagementv1beta1.KafkaUserStatus, error) UpdateKafkaUser(kafkaUserID string, kafkaUserSpec *models.KafkaUser) error @@ -104,7 +107,7 @@ type API interface { CreateOpenSearchEgressRules(rule *clusterresourcesv1beta1.OpenSearchEgressRules) (string, error) GetOpenSearchEgressRule(id string) (*clusterresourcesv1beta1.OpenSearchEgressRulesStatus, error) DeleteOpenSearchEgressRule(id string) error - CreateAWSEndpointServicePrincipal(spec any) ([]byte, error) + CreateAWSEndpointServicePrincipal(spec clusterresourcesv1beta1.AWSEndpointServicePrincipalSpec, CDCID string) ([]byte, error) DeleteAWSEndpointServicePrincipal(principalID string) error GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*v1beta1.ResizeOperation, error) } diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index ee0c57b8b..ac4ff867b 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -79,7 +79,27 @@ func (c *mockClient) DeletePeering(peerID, peeringEndpoint string) error { panic("DeletePeering: is not implemented") } -func (c *mockClient) CreatePeering(url string, peeringSpec any) (*clusterresourcesv1beta1.PeeringStatus, error) { +func (c *mockClient) CreateAzureVNetPeering(peeringSpec *clusterresourcesv1beta1.AzureVNetPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) { + ps := &clusterresourcesv1beta1.PeeringStatus{ + ID: StatusID, + Name: "name", + StatusCode: "statusCode", + FailureReason: "failureReason", + } + return ps, nil +} + +func (c *mockClient) CreateAWSVPCPeering(peeringSpec *clusterresourcesv1beta1.AWSVPCPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) { + ps := &clusterresourcesv1beta1.PeeringStatus{ + ID: StatusID, + Name: "name", + StatusCode: "statusCode", + FailureReason: "failureReason", + } + return ps, nil +} + +func (c *mockClient) CreateGCPVPCPeering(peeringSpec *clusterresourcesv1beta1.GCPVPCPeeringSpec, cdcId string) (*clusterresourcesv1beta1.PeeringStatus, error) { ps := &clusterresourcesv1beta1.PeeringStatus{ ID: StatusID, Name: "name", @@ -98,7 +118,15 @@ func (c *mockClient) GetFirewallRuleStatus(firewallRuleID string, firewallRuleEn return fwRule, nil } -func (c *mockClient) CreateFirewallRule(url string, firewallRuleSpec any) (*clusterresourcesv1beta1.FirewallRuleStatus, error) { +func (c *mockClient) CreateAWSSecurityGroupFirewallRule(firewallRuleSpec *clusterresourcesv1beta1.AWSSecurityGroupFirewallRuleSpec, clusterID string) (*clusterresourcesv1beta1.FirewallRuleStatus, error) { + fwRule := &clusterresourcesv1beta1.FirewallRuleStatus{ + ID: StatusID, + Status: "OK", + DeferredReason: "NO", + } + return fwRule, nil +} +func (c *mockClient) CreateClusterNetworkFirewallRule(firewallRuleSpec *clusterresourcesv1beta1.ClusterNetworkFirewallRuleSpec, clusterID string) (*clusterresourcesv1beta1.FirewallRuleStatus, error) { fwRule := &clusterresourcesv1beta1.FirewallRuleStatus{ ID: StatusID, Status: "OK", @@ -390,7 +418,7 @@ func (c *mockClient) UpdateClusterSettings(clusterID string, settings *models.Cl panic("UpdateClusterSettings: is not implemented") } -func (c *mockClient) CreateAWSEndpointServicePrincipal(spec any) ([]byte, error) { +func (c *mockClient) CreateAWSEndpointServicePrincipal(spec clusterresourcesv1beta1.AWSEndpointServicePrincipalSpec, CDCID string) ([]byte, error) { panic("CreateAWSEndpointServicePrincipal: is not implemented") } diff --git a/pkg/models/errors.go b/pkg/models/errors.go index a86d5ca0f..1b46d0f61 100644 --- a/pkg/models/errors.go +++ b/pkg/models/errors.go @@ -63,11 +63,14 @@ var ( ErrPrivateLinkSupportedOnlyForAWS = errors.New("private link is supported only for an AWS cloud provider") ErrImmutableSpec = errors.New("resource specification is immutable") ErrUnsupportedBackupClusterKind = errors.New("backups for provided cluster kind are not supported") + ErrUnsupportedClusterKind = errors.New("provided cluster kind is not supported") ErrExposeServiceNotCreatedYet = errors.New("expose service is not created yet") ErrExposeServiceEndpointsNotCreatedYet = errors.New("expose service endpoints is not created yet") ErrOnlySingleConcurrentResizeAvailable = errors.New("only single concurrent resize is allowed") ErrBundledUseOnlyResourceUpdateIsNotSupported = errors.New("updating of bundled use resource is not supported") ErrDebeziumImmutable = errors.New("debezium array is immutable") + ErrEmptyNamespace = errors.New("namespace field is empty") + ErrEmptyName = errors.New("name field is empty") ErrCreateClusterWithMultiDC = errors.New("Multiple data center is still not supported. Please create a cluster with one data centre and add a second one when the cluster is in the running state") ErrOnPremicesWithMultiDC = errors.New("on-premises cluster can be provisioned with only one data centre") ) diff --git a/pkg/models/operator.go b/pkg/models/operator.go index fe862c369..7670fb612 100644 --- a/pkg/models/operator.go +++ b/pkg/models/operator.go @@ -64,6 +64,8 @@ const ( PgClusterKind = "PostgreSQL" RedisClusterKind = "Redis" OsClusterKind = "OpenSearch" + CadenceClusterKind = "Cadence" + KafkaConnectClusterKind = "KafkaConnect" CassandraClusterKind = "Cassandra" ZookeeperClusterKind = "Zookeeper" ClusterNetworkFirewallRuleKind = "ClusterNetworkFirewallRule"