Skip to content

Commit

Permalink
Cadence packaged provisioning was refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
testisnullus committed Mar 4, 2024
1 parent 5d2261a commit 65e399a
Show file tree
Hide file tree
Showing 16 changed files with 548 additions and 670 deletions.
23 changes: 8 additions & 15 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@
"filename": "apis/clusters/v1beta1/cadence_types.go",
"hashed_secret": "a242f4a16b957f7ff99eb24e189e94d270d2348b",
"is_verified": false,
"line_number": 291
"line_number": 292
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/cadence_types.go",
"hashed_secret": "a57ce131bd944bdf8ba2f2f93e179dc416ed0315",
"is_verified": false,
"line_number": 300
"line_number": 301
}
],
"apis/clusters/v1beta1/cassandra_types.go": [
Expand Down Expand Up @@ -208,7 +208,7 @@
"filename": "apis/clusters/v1beta1/cassandra_webhook.go",
"hashed_secret": "e0a46b27231f798fe22dc4d5d82b5feeb5dcf085",
"is_verified": false,
"line_number": 236
"line_number": 232
}
],
"apis/clusters/v1beta1/kafka_types.go": [
Expand Down Expand Up @@ -360,7 +360,7 @@
"filename": "apis/clusters/v1beta1/zz_generated.deepcopy.go",
"hashed_secret": "44e17306b837162269a410204daaa5ecee4ec22c",
"is_verified": false,
"line_number": 665
"line_number": 676
}
],
"apis/kafkamanagement/v1beta1/kafkauser_types.go": [
Expand Down Expand Up @@ -499,16 +499,9 @@
{
"type": "Secret Keyword",
"filename": "controllers/clusters/cadence_controller.go",
"hashed_secret": "bcf196cdeea4d7ed8b04dcbbd40111eb5e9abeac",
"is_verified": false,
"line_number": 644
},
{
"type": "Secret Keyword",
"filename": "controllers/clusters/cadence_controller.go",
"hashed_secret": "192d703e91a60432ce06bfe26adfd12f5c7b931f",
"hashed_secret": "5ffe533b830f08a0326348a9160afafc8ada44db",
"is_verified": false,
"line_number": 677
"line_number": 750
}
],
"controllers/clusters/datatest/kafka_v1beta1.yaml": [
Expand All @@ -535,14 +528,14 @@
"filename": "controllers/clusters/helpers.go",
"hashed_secret": "5ffe533b830f08a0326348a9160afafc8ada44db",
"is_verified": false,
"line_number": 119
"line_number": 122
},
{
"type": "Secret Keyword",
"filename": "controllers/clusters/helpers.go",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 124
"line_number": 127
}
],
"controllers/clusters/kafkaconnect_controller_test.go": [
Expand Down
22 changes: 16 additions & 6 deletions apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ type AWSArchival struct {
}

type PackagedProvisioning struct {
UseAdvancedVisibility bool `json:"useAdvancedVisibility"`
BundledKafkaSpec *BundledKafkaSpec `json:"bundledKafkaSpec,omitempty"`
BundledOpenSearchSpec *BundledOpenSearchSpec `json:"bundledOpenSearchSpec,omitempty"`
BundledCassandraSpec *BundledCassandraSpec `json:"bundledCassandraSpec"`
UseAdvancedVisibility bool `json:"useAdvancedVisibility"`

// +kubebuilder:validation:Enum=Developer;Production-Starter;Production-Small
SolutionSize string `json:"solutionSize"`
}

type SharedProvisioning struct {
Expand All @@ -131,8 +131,9 @@ type AdvancedVisibility struct {
type CadenceStatus struct {
GenericStatus `json:",inline"`

DataCentres []*CadenceDataCentreStatus `json:"dataCentres,omitempty"`
TargetSecondaryCadence []*CadenceDependencyTarget `json:"targetSecondaryCadence,omitempty"`
PackagedProvisioningClusterRefs []*Reference `json:"packagedProvisioningClusterRefs,omitempty"`
DataCentres []*CadenceDataCentreStatus `json:"dataCentres,omitempty"`
TargetSecondaryCadence []*CadenceDependencyTarget `json:"targetSecondaryCadence,omitempty"`
}

type CadenceDataCentreStatus struct {
Expand Down Expand Up @@ -580,3 +581,12 @@ func (cdc *CadenceDataCentreStatus) Equals(o *CadenceDataCentreStatus) bool {
cdc.PrivateLink.Equal(o.PrivateLink) &&
nodesEqual(cdc.Nodes, o.Nodes)
}

func (c *CadenceSpec) CalculateNodeSize(cloudProvider, solution, app string) string {
if appSizes, ok := models.SolutionSizesMap[cloudProvider]; ok {
if solutionMap, ok := appSizes[app]; ok {
return solutionMap[solution]
}
}
return ""
}
149 changes: 9 additions & 140 deletions apis/clusters/v1beta1/cadence_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package v1beta1
import (
"context"
"fmt"
"net"
"regexp"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -129,11 +128,6 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
}
}

err = c.Spec.validatePackagedProvisioningCreation()
if err != nil {
return err
}

for _, dc := range c.Spec.DataCentres {
err = dc.GenericDataCentreSpec.validateCreation()
if err != nil {
Expand All @@ -147,6 +141,14 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
if dc.CloudProvider != models.AWSVPC && dc.PrivateLink != nil {
return models.ErrPrivateLinkSupportedOnlyForAWS
}

if len(c.Spec.PackagedProvisioning) != 0 {
err = validateNetwork(dc.Network)
if err != nil {
return err
}
}

}

for _, rs := range c.Spec.ResizeSettings {
Expand Down Expand Up @@ -348,24 +350,7 @@ func (cs *CadenceSpec) validatePackagedProvisioning(old []*PackagedProvisioning)
}

for i, pp := range cs.PackagedProvisioning {
if pp.UseAdvancedVisibility {
if pp.BundledKafkaSpec == nil || pp.BundledOpenSearchSpec == nil {
return fmt.Errorf("BundledKafkaSpec and BundledOpenSearchSpec structs must not be empty because UseAdvancedVisibility is set to true")
}
if *pp.BundledKafkaSpec != *old[i].BundledKafkaSpec {
return models.ErrImmutablePackagedProvisioning
}
if *pp.BundledOpenSearchSpec != *old[i].BundledOpenSearchSpec {
return models.ErrImmutablePackagedProvisioning
}
} else {
if pp.BundledKafkaSpec != nil || pp.BundledOpenSearchSpec != nil {
return fmt.Errorf("BundledKafkaSpec and BundledOpenSearchSpec structs must be empty because UseAdvancedVisibility is set to false")
}
}

if *pp.BundledCassandraSpec != *old[i].BundledCassandraSpec ||
pp.UseAdvancedVisibility != old[i].UseAdvancedVisibility {
if *pp != *old[i] {
return models.ErrImmutablePackagedProvisioning
}
}
Expand Down Expand Up @@ -506,119 +491,3 @@ func (sp *StandardProvisioning) validate() error {

return nil
}

func (b *BundledKafkaSpec) validate() error {
networkMatched, err := regexp.Match(models.PeerSubnetsRegExp, []byte(b.Network))
if !networkMatched || err != nil {
return fmt.Errorf("the provided CIDR: %s must contain four dot separated parts and form the Private IP address. All bits in the host part of the CIDR must be 0. Suffix must be between 16-28. %v", b.Network, err)
}

err = validateReplicationFactor(models.KafkaReplicationFactors, b.ReplicationFactor)
if err != nil {
return err
}

if ((b.NodesNumber*b.ReplicationFactor)/b.ReplicationFactor)%b.ReplicationFactor != 0 {
return fmt.Errorf("kafka: number of nodes must be a multiple of replication factor: %v", b.ReplicationFactor)
}

return nil
}

func (c *BundledCassandraSpec) validate() error {
networkMatched, err := regexp.Match(models.PeerSubnetsRegExp, []byte(c.Network))
if !networkMatched || err != nil {
return fmt.Errorf("the provided CIDR: %s must contain four dot separated parts and form the Private IP address. All bits in the host part of the CIDR must be 0. Suffix must be between 16-28. %v", c.Network, err)
}

err = validateReplicationFactor(models.CassandraReplicationFactors, c.ReplicationFactor)
if err != nil {
return err
}

if ((c.NodesNumber*c.ReplicationFactor)/c.ReplicationFactor)%c.ReplicationFactor != 0 {
return fmt.Errorf("cassandra: number of nodes must be a multiple of replication factor: %v", c.ReplicationFactor)
}

return nil
}

func (o *BundledOpenSearchSpec) validate() error {
networkMatched, err := regexp.Match(models.PeerSubnetsRegExp, []byte(o.Network))
if !networkMatched || err != nil {
return fmt.Errorf("the provided CIDR: %s must contain four dot separated parts and form the Private IP address. All bits in the host part of the CIDR must be 0. Suffix must be between 16-28. %v", o.Network, err)
}

err = validateOpenSearchNumberOfRacks(o.NumberOfRacks)
if err != nil {
return err
}

return nil
}

func (cs *CadenceSpec) validatePackagedProvisioningCreation() error {
for _, dc := range cs.DataCentres {
for _, pp := range cs.PackagedProvisioning {
if (pp.UseAdvancedVisibility && pp.BundledKafkaSpec == nil) || (pp.UseAdvancedVisibility && pp.BundledOpenSearchSpec == nil) {
return fmt.Errorf("BundledKafkaSpec and BundledOpenSearchSpec structs must not be empty because UseAdvancedVisibility is set to true")
}

if pp.BundledKafkaSpec != nil {
err := pp.BundledKafkaSpec.validate()
if err != nil {
return err
}

err = dc.validateNetwork(pp.BundledKafkaSpec.Network)
if err != nil {
return err
}
}

if pp.BundledCassandraSpec != nil {
err := pp.BundledCassandraSpec.validate()
if err != nil {
return err
}

err = dc.validateNetwork(pp.BundledCassandraSpec.Network)
if err != nil {
return err
}
}

if pp.BundledOpenSearchSpec != nil {
err := pp.BundledOpenSearchSpec.validate()
if err != nil {
return err
}

err = dc.validateNetwork(pp.BundledOpenSearchSpec.Network)
if err != nil {
return err
}
}
}
}

return nil
}

func (cdc *CadenceDataCentre) validateNetwork(network string) error {
_, ipnet, err := net.ParseCIDR(cdc.Network)
if err != nil {
return err
}

ip, _, err := net.ParseCIDR(network)
if err != nil {
return err
}

if ipnet.Contains(ip) {
return fmt.Errorf("cluster network %s overlaps with network %s", cdc.Network, network)
}

return nil
}
4 changes: 0 additions & 4 deletions apis/clusters/v1beta1/cassandra_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ func (cv *cassandraValidator) ValidateUpdate(ctx context.Context, old runtime.Ob
return models.ErrTypeAssertion
}

if oldCluster.Spec.BundledUseOnly && c.Generation != oldCluster.Generation {
return models.ErrBundledUseOnlyResourceUpdateIsNotSupported
}

if oldCluster.Spec.RestoreFrom != nil {
return nil
}
Expand Down
4 changes: 0 additions & 4 deletions apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ func (kv *kafkaValidator) ValidateUpdate(ctx context.Context, old runtime.Object
return fmt.Errorf("cannot assert object %v to Kafka", old.GetObjectKind())
}

if oldKafka.Spec.BundledUseOnly && k.Generation != oldKafka.Generation {
return models.ErrBundledUseOnlyResourceUpdateIsNotSupported
}

err := k.Spec.validateUpdate(&oldKafka.Spec)
if err != nil {
return fmt.Errorf("cannot update, error: %v", err)
Expand Down
4 changes: 0 additions & 4 deletions apis/clusters/v1beta1/opensearch_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,6 @@ func (osv *openSearchValidator) ValidateUpdate(ctx context.Context, old runtime.

oldCluster := old.(*OpenSearch)

if oldCluster.Spec.BundledUseOnly && !oldCluster.Spec.IsEqual(os.Spec) {
return models.ErrBundledUseOnlyResourceUpdateIsNotSupported
}

if oldCluster.Spec.RestoreFrom != nil {
return nil
}
Expand Down
19 changes: 19 additions & 0 deletions apis/clusters/v1beta1/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"errors"
"fmt"
"net"
"regexp"
"strconv"
"strings"

k8sappsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -361,3 +363,20 @@ func (s *GenericDataCentreSpec) validateCloudProviderSettings() error {
func (s *GenericDataCentreSpec) hasCloudProviderSettings() bool {
return s.AWSSettings != nil || s.GCPSettings != nil && s.AzureSettings != nil
}

func validateNetwork(network string) error {
ip, _, err := net.ParseCIDR(network)
if err != nil {
return err
}

ipParts := strings.Split(ip.String(), ".")
secondOctet, err := strconv.Atoi(ipParts[1])
if err != nil {
return err
}
if secondOctet > 251 || secondOctet < 1 {
return models.ErrInvalidCIDR
}
return nil
}
Loading

0 comments on commit 65e399a

Please sign in to comment.