From e03934b4ac22148c281941d3a780d0813fd7b1bf Mon Sep 17 00:00:00 2001 From: Shalini Bhaskara Date: Wed, 4 Sep 2024 15:08:23 -0700 Subject: [PATCH] WorkloadDomainIsolation: Static volume support on supervisor (#3017) --- .../commonco/k8sorchestrator/topology.go | 6 +- .../cnsregistervolume_controller.go | 148 ++++++++++++------ .../controller/cnsregistervolume/util.go | 45 +++++- 3 files changed, 144 insertions(+), 55 deletions(-) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/topology.go b/pkg/csi/service/common/commonco/k8sorchestrator/topology.go index 1de928eda1..952c1f41ea 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/topology.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/topology.go @@ -1690,8 +1690,12 @@ func (volTopology *wcpControllerVolumeTopology) GetTopologyInfoFromNodes(ctx con // In VC 9.0, if StorageTopologyType is not set, all the zones the selected datastore // is accessible from will be added as node affinity terms on the PV even if the zones // are not associated with the namespace of the PVC. + // This code block runs for static as well as dynamic volume provisioning case. case "": - // This code block runs for static as well as dynamic volume provisioning case. + // TopoSegToDatastoresMap will be nil in case of static volume provisioning. + if params.TopoSegToDatastoresMap == nil { + params.TopoSegToDatastoresMap = make(map[string][]*cnsvsphere.DatastoreInfo) + } var selectedSegments []map[string]string for zone, clusters := range azClustersMap { if _, exists := params.TopoSegToDatastoresMap[zone]; !exists { diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go index 4d11aae97f..1c302bd839 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go @@ -74,6 +74,9 @@ var ( topologyMgr commoncotypes.ControllerTopologyService clusterComputeResourceMoIds []string + // workloadDomainIsolationEnabled determines if the workload domain + // isolation feature is available on a supervisor cluster. + workloadDomainIsolationEnabled bool ) // Add creates a new CnsRegisterVolume Controller and adds it to the Manager, @@ -86,6 +89,9 @@ func Add(mgr manager.Manager, clusterFlavor cnstypes.CnsClusterFlavor, log.Debug("Not initializing the CnsRegisterVolume Controller as its a non-WCP CSI deployment") return nil } + workloadDomainIsolationEnabled = commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, + common.WorkloadDomainIsolation) + var volumeInfoService cnsvolumeinfo.VolumeInfoService if clusterFlavor == cnstypes.CnsClusterFlavorWorkload { if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.TKGsHA) { @@ -216,7 +222,7 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, timeout = backOffDuration[instance.Name] backOffDurationMapMutex.Unlock() - // If the CnsRegistereVolume instance is already registered, remove the + // If the CnsRegisterVolume instance is already registered, remove the // instance from the queue. if instance.Status.Registered { backOffDurationMapMutex.Lock() @@ -298,18 +304,20 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, return reconcile.Result{RequeueAfter: timeout}, nil } - if syncer.IsPodVMOnStretchSupervisorFSSEnabled && len(clusterComputeResourceMoIds) > 1 { - azClustersMap := topologyMgr.GetAZClustersMap(ctx) - isAccessible := isDatastoreAccessibleToAZClusters(ctx, vc, azClustersMap, volume.DatastoreUrl) - if !isAccessible { - log.Errorf("Volume: %s present on datastore: %s is not accessible to any of the AZ clusters: %v", - volumeID, volume.DatastoreUrl, azClustersMap) - setInstanceError(ctx, r, instance, "Volume in the spec is not accessible to any of the AZ clusters") - _, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false) - if err != nil { - log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err) + if syncer.IsPodVMOnStretchSupervisorFSSEnabled { + if workloadDomainIsolationEnabled || len(clusterComputeResourceMoIds) > 1 { + azClustersMap := topologyMgr.GetAZClustersMap(ctx) + isAccessible := isDatastoreAccessibleToAZClusters(ctx, vc, azClustersMap, volume.DatastoreUrl) + if !isAccessible { + log.Errorf("Volume: %s present on datastore: %s is not accessible to any of the AZ clusters: %v", + volumeID, volume.DatastoreUrl, azClustersMap) + setInstanceError(ctx, r, instance, "Volume in the spec is not accessible to any of the AZ clusters") + _, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false) + if err != nil { + log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err) + } + return reconcile.Result{RequeueAfter: timeout}, nil } - return reconcile.Result{RequeueAfter: timeout}, nil } } else { // Verify if the volume is accessible to Supervisor cluster. @@ -338,39 +346,6 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, return reconcile.Result{RequeueAfter: timeout}, nil } - if syncer.IsPodVMOnStretchSupervisorFSSEnabled && len(clusterComputeResourceMoIds) > 1 { - // Calculate accessible topology for the provisioned volume. - datastoreAccessibleTopology, err := topologyMgr.GetTopologyInfoFromNodes(ctx, - commoncotypes.WCPRetrieveTopologyInfoParams{ - DatastoreURL: volume.DatastoreUrl, - StorageTopologyType: "zonal", - TopologyRequirement: nil, - Vc: vc}) - if err != nil { - msg := fmt.Sprintf("failed to find volume topology. Error: %v", err) - log.Error(msg) - setInstanceError(ctx, r, instance, msg) - return reconcile.Result{RequeueAfter: timeout}, nil - } - matchExpressions := make([]v1.NodeSelectorRequirement, 0) - for key, value := range datastoreAccessibleTopology[0] { - matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{ - Key: key, - Operator: v1.NodeSelectorOpIn, - Values: []string{value}, - }) - } - pvNodeAffinity = &v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: matchExpressions, - }, - }, - }, - } - } - k8sclient, err := k8s.NewClient(ctx) if err != nil { log.Errorf("Failed to initialize K8S client when registering the CnsRegisterVolume "+ @@ -392,6 +367,77 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, log.Infof("Volume with storagepolicyId: %s is mapping to K8S storage class: %s and assigned to namespace: %s", volume.StoragePolicyId, storageClassName, request.Namespace) + sc, err := k8sclient.StorageV1().StorageClasses().Get(ctx, storageClassName, metav1.GetOptions{}) + if err != nil { + msg := fmt.Sprintf("Failed to fetch StorageClass: %q with error: %+v", storageClassName, err) + log.Error(msg) + setInstanceError(ctx, r, instance, msg) + return reconcile.Result{RequeueAfter: timeout}, nil + } + + // Calculate accessible topology for the provisioned volume. + var datastoreAccessibleTopology []map[string]string + if syncer.IsPodVMOnStretchSupervisorFSSEnabled { + if workloadDomainIsolationEnabled { + datastoreAccessibleTopology, err = topologyMgr.GetTopologyInfoFromNodes(ctx, + commoncotypes.WCPRetrieveTopologyInfoParams{ + DatastoreURL: volume.DatastoreUrl, + StorageTopologyType: sc.Parameters["StorageTopologyType"], + TopologyRequirement: nil, + Vc: vc}) + } else if len(clusterComputeResourceMoIds) > 1 { + datastoreAccessibleTopology, err = topologyMgr.GetTopologyInfoFromNodes(ctx, + commoncotypes.WCPRetrieveTopologyInfoParams{ + DatastoreURL: volume.DatastoreUrl, + StorageTopologyType: "zonal", + TopologyRequirement: nil, + Vc: vc}) + } + if err != nil { + msg := fmt.Sprintf("failed to find volume topology. Error: %v", err) + log.Error(msg) + setInstanceError(ctx, r, instance, msg) + return reconcile.Result{RequeueAfter: timeout}, nil + } + + // Create node affinity terms from datastoreAccessibleTopology. + var terms []v1.NodeSelectorTerm + if workloadDomainIsolationEnabled { + for _, topologyTerms := range datastoreAccessibleTopology { + + var expressions []v1.NodeSelectorRequirement + for key, value := range topologyTerms { + expressions = append(expressions, v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: []string{value}, + }) + } + terms = append(terms, v1.NodeSelectorTerm{ + MatchExpressions: expressions, + }) + } + } else { + matchExpressions := make([]v1.NodeSelectorRequirement, 0) + for key, value := range datastoreAccessibleTopology[0] { + matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: []string{value}, + }) + } + terms = append(terms, v1.NodeSelectorTerm{ + MatchExpressions: matchExpressions, + }) + } + + pvNodeAffinity = &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: terms, + }, + } + } + capacityInMb := volume.BackingObjectDetails.GetCnsBackingObjectDetails().CapacityInMb accessMode := instance.Spec.AccessMode // Set accessMode to ReadWriteOnce if DiskURLPath is used for import. @@ -436,9 +482,15 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, return reconcile.Result{RequeueAfter: timeout}, nil } // Create PVC mapping to above created PV. - log.Infof("Now creating pvc: %s", instance.Spec.PvcName) - pvcSpec := getPersistentVolumeClaimSpec(instance.Spec.PvcName, instance.Namespace, capacityInMb, - storageClassName, accessMode, pvName) + log.Infof("Creating PVC: %s", instance.Spec.PvcName) + pvcSpec, err := getPersistentVolumeClaimSpec(ctx, instance.Spec.PvcName, instance.Namespace, capacityInMb, + storageClassName, accessMode, pvName, datastoreAccessibleTopology) + if err != nil { + msg := fmt.Sprintf("Failed to create spec for PVC: %q. Error: %v", instance.Spec.PvcName, err) + log.Errorf(msg) + setInstanceError(ctx, r, instance, msg) + return reconcile.Result{RequeueAfter: timeout}, nil + } log.Debugf("PVC spec is: %+v", pvcSpec) pvc, err := k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Create(ctx, pvcSpec, metav1.CreateOptions{}) diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go index 35fe71e576..0277716802 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go @@ -18,6 +18,7 @@ package cnsregistervolume import ( "context" + "encoding/json" "fmt" "os" "strconv" @@ -72,18 +73,30 @@ func isDatastoreAccessibleToAZClusters(ctx context.Context, vc *vsphere.VirtualC azClustersMap map[string][]string, datastoreURL string) bool { log := logger.GetLogger(ctx) for _, clusterIDs := range azClustersMap { + var found bool for _, clusterID := range clusterIDs { sharedDatastores, _, err := vsphere.GetCandidateDatastoresInCluster(ctx, vc, clusterID, false) if err != nil { log.Warnf("Failed to get candidate datastores for cluster: %s with err: %+v", clusterID, err) continue } + found = false for _, ds := range sharedDatastores { if ds.Info.Url == datastoreURL { log.Infof("Found datastoreUrl: %s is accessible to cluster: %s", datastoreURL, clusterID) - return true + found = true } } + // If datastoreURL was found in the list of datastores accessible to the + // cluster with clusterID, continue checking for the rest of the clusters + // in AZ. Otherwise, break and check the next AZ in azClustersMap. + if !found { + break + } + } + // datastoreURL was found in all the clusters with clusterIDs. + if found { + return true } } return false @@ -264,13 +277,33 @@ func getPersistentVolumeSpec(volumeName string, volumeID string, capacity int64, // getPersistentVolumeClaimSpec return the PersistentVolumeClaim spec with // specified storage class. -func getPersistentVolumeClaimSpec(name string, namespace string, capacity int64, - storageClassName string, accessMode v1.PersistentVolumeAccessMode, pvName string) *v1.PersistentVolumeClaim { +func getPersistentVolumeClaimSpec(ctx context.Context, name string, namespace string, capacity int64, + storageClassName string, accessMode v1.PersistentVolumeAccessMode, pvName string, + datastoreAccessibleTopology []map[string]string) (*v1.PersistentVolumeClaim, error) { + + log := logger.GetLogger(ctx) capacityInMb := strconv.FormatInt(capacity, 10) + "Mi" + var ( + segmentsArray []string + topoAnnotation = make(map[string]string) + ) + if datastoreAccessibleTopology != nil { + for _, topologyTerm := range datastoreAccessibleTopology { + jsonSegment, err := json.Marshal(topologyTerm) + if err != nil { + return nil, logger.LogNewErrorf(log, + "failed to marshal topology segment: %+v to json. Error: %+v", topologyTerm, err) + } + segmentsArray = append(segmentsArray, string(jsonSegment)) + } + topoAnnotation[common.AnnVolumeAccessibleTopology] = "[" + strings.Join(segmentsArray, ",") + "]" + } + claim := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, + Name: name, + Namespace: namespace, + Annotations: topoAnnotation, }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{ @@ -285,7 +318,7 @@ func getPersistentVolumeClaimSpec(name string, namespace string, capacity int64, VolumeName: pvName, }, } - return claim + return claim, nil } // isPVCBound return true if the PVC is bound before timeout.