diff --git a/cmd/onmetal-controller-manager/main.go b/cmd/onmetal-controller-manager/main.go index 69a332777..06eda2df5 100644 --- a/cmd/onmetal-controller-manager/main.go +++ b/cmd/onmetal-controller-manager/main.go @@ -29,7 +29,7 @@ import ( networkingclient "github.com/onmetal/onmetal-api/internal/client/networking" storageclient "github.com/onmetal/onmetal-api/internal/client/storage" computecontrollers "github.com/onmetal/onmetal-api/internal/controllers/compute" - "github.com/onmetal/onmetal-api/internal/controllers/compute/scheduler" + computescheduler "github.com/onmetal/onmetal-api/internal/controllers/compute/scheduler" corecontrollers "github.com/onmetal/onmetal-api/internal/controllers/core" certificateonmetal "github.com/onmetal/onmetal-api/internal/controllers/core/certificate/onmetal" quotacontrollergeneric "github.com/onmetal/onmetal-api/internal/controllers/core/quota/generic" @@ -37,6 +37,7 @@ import ( ipamcontrollers "github.com/onmetal/onmetal-api/internal/controllers/ipam" networkingcontrollers "github.com/onmetal/onmetal-api/internal/controllers/networking" storagecontrollers "github.com/onmetal/onmetal-api/internal/controllers/storage" + storagescheduler "github.com/onmetal/onmetal-api/internal/controllers/storage/scheduler" quotaevaluatoronmetal "github.com/onmetal/onmetal-api/internal/quota/evaluator/onmetal" "github.com/onmetal/onmetal-api/utils/quota" "k8s.io/utils/lru" @@ -73,11 +74,11 @@ const ( machineClassController = "machineclass" // storage controllers - bucketScheduler = "bucketscheduler" - bucketClassController = "bucketclass" - volumeReleaseController = "volumerelease" - volumeScheduler = "volumescheduler" - volumeClassController = "volumeclass" + bucketScheduler = "bucketscheduler" + bucketClassController = "bucketclass" + volumeReleaseController = "volumerelease" + volumeSchedulerController = "volumescheduler" + volumeClassController = "volumeclass" // ipam controllers prefixController = "prefix" @@ -137,7 +138,7 @@ func main() { bucketScheduler, bucketClassController, volumeReleaseController, - volumeScheduler, + volumeSchedulerController, volumeClassController, // ipam controllers @@ -211,7 +212,7 @@ func main() { } if controllers.Enabled(machineSchedulerController) { - schedulerCache := scheduler.NewCache(mgr.GetLogger(), scheduler.DefaultCacheStrategy) + schedulerCache := computescheduler.NewCache(mgr.GetLogger(), computescheduler.DefaultCacheStrategy) if err := mgr.Add(schedulerCache); err != nil { setupLog.Error(err, "unable to create cache", "controller", "MachineSchedulerCache") os.Exit(1) @@ -220,7 +221,7 @@ func main() { if err := (&computecontrollers.MachineScheduler{ Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor("machine-scheduler"), - Cache: scheduler.NewCache(logger, scheduler.DefaultCacheStrategy), + Cache: schedulerCache, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MachineScheduler") os.Exit(1) @@ -270,10 +271,17 @@ func main() { } } - if controllers.Enabled(volumeScheduler) { + if controllers.Enabled(volumeSchedulerController) { + schedulerCache := storagescheduler.NewCache(mgr.GetLogger(), storagescheduler.DefaultCacheStrategy) + if err := mgr.Add(schedulerCache); err != nil { + setupLog.Error(err, "unable to create cache", "controller", "VolumeSchedulerCache") + os.Exit(1) + } + if err := (&storagecontrollers.VolumeScheduler{ EventRecorder: mgr.GetEventRecorderFor("volume-scheduler"), Client: mgr.GetClient(), + Cache: schedulerCache, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "VolumeScheduler") os.Exit(1) @@ -578,14 +586,14 @@ func main() { } } - if controllers.AnyEnabled(volumeScheduler) { + if controllers.AnyEnabled(volumeSchedulerController) { if err := storageclient.SetupVolumeSpecVolumePoolRefNameFieldIndexer(ctx, mgr.GetFieldIndexer()); err != nil { setupLog.Error(err, "unable to setup field indexer", "field", storageclient.VolumeSpecVolumePoolRefNameField) os.Exit(1) } } - if controllers.AnyEnabled(volumeScheduler) { + if controllers.AnyEnabled(volumeSchedulerController) { if err := storageclient.SetupVolumePoolAvailableVolumeClassesFieldIndexer(ctx, mgr.GetFieldIndexer()); err != nil { setupLog.Error(err, "unable to setup field indexer", "field", storageclient.VolumePoolAvailableVolumeClassesField) os.Exit(1) diff --git a/internal/controllers/storage/scheduler/cache.go b/internal/controllers/storage/scheduler/cache.go new file mode 100644 index 000000000..4dadc5d74 --- /dev/null +++ b/internal/controllers/storage/scheduler/cache.go @@ -0,0 +1,456 @@ +// Copyright 2023 OnMetal authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + corev1alpha1 "github.com/onmetal/onmetal-api/api/core/v1alpha1" + "github.com/onmetal/onmetal-api/api/storage/v1alpha1" + "golang.org/x/exp/maps" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +type CacheStrategy interface { + Key(instance *v1alpha1.Volume) (types.UID, error) + ContainerKey(instance *v1alpha1.Volume) string +} + +type defaultCacheStrategy struct{} + +var DefaultCacheStrategy CacheStrategy = defaultCacheStrategy{} + +func (defaultCacheStrategy) Key(instance *v1alpha1.Volume) (types.UID, error) { + uid := instance.GetUID() + if uid == "" { + return "", fmt.Errorf("instance has no UID") + } + return uid, nil +} + +func (defaultCacheStrategy) ContainerKey(instance *v1alpha1.Volume) string { + if instance.Spec.VolumePoolRef == nil { + return "" + } + return instance.Spec.VolumePoolRef.Name +} + +type InstanceInfo struct { + instance *v1alpha1.Volume +} + +type ContainerInfo struct { + node *v1alpha1.VolumePool + instances map[types.UID]*InstanceInfo +} + +func newNodeInfo() *ContainerInfo { + return &ContainerInfo{ + instances: make(map[types.UID]*InstanceInfo), + } +} + +func (n *ContainerInfo) Node() *v1alpha1.VolumePool { + return n.node +} + +func (n *ContainerInfo) MaxAllocatable(className string) resource.Quantity { + var assigned = resource.NewQuantity(0, resource.BinarySI) + for _, instance := range n.instances { + if instance.instance.Spec.VolumeClassRef != nil && instance.instance.Spec.VolumeClassRef.Name == className { + assigned.Add(*instance.instance.Spec.Resources.Storage()) + } + } + allocatable, ok := n.node.Status.Allocatable[corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, className)] + if !ok { + return *resource.NewQuantity(0, resource.BinarySI) + } + allocatable.Sub(*assigned) + + return allocatable +} + +func (n *ContainerInfo) NumInstances() int { + return len(n.instances) +} + +func (n *ContainerInfo) shallowCopy() *ContainerInfo { + return &ContainerInfo{ + node: n.node, + instances: maps.Clone(n.instances), + } +} + +type instanceState struct { + instance *v1alpha1.Volume + bindingFinished bool +} + +func NewCache(log logr.Logger, strategy CacheStrategy) *Cache { + return &Cache{ + log: log, + assumedInstances: sets.New[types.UID](), + instanceStates: make(map[types.UID]*instanceState), + nodes: make(map[string]*ContainerInfo), + strategy: strategy, + } +} + +type Cache struct { + mu sync.RWMutex + + log logr.Logger + + assumedInstances sets.Set[types.UID] + instanceStates map[types.UID]*instanceState + nodes map[string]*ContainerInfo + + strategy CacheStrategy +} + +type Snapshot struct { + cache *Cache + + nodes map[string]*ContainerInfo + nodesList []*ContainerInfo +} + +func (s *Snapshot) Update() { + s.cache.mu.RLock() + defer s.cache.mu.RUnlock() + + s.nodes = make(map[string]*ContainerInfo, len(s.cache.nodes)) + s.nodesList = make([]*ContainerInfo, 0, len(s.cache.nodes)) + for key, node := range s.cache.nodes { + if node.node == nil { + continue + } + + node := node.shallowCopy() + s.nodes[key] = node + s.nodesList = append(s.nodesList, node) + } +} + +func (s *Snapshot) NumNodes() int { + return len(s.nodesList) +} + +func (s *Snapshot) ListNodes() []*ContainerInfo { + return s.nodesList +} + +func (s *Snapshot) GetNode(name string) (*ContainerInfo, error) { + node, ok := s.nodes[name] + if !ok { + return nil, fmt.Errorf("node %q not found", name) + } + return node, nil +} + +func (c *Cache) Snapshot() *Snapshot { + snapshot := &Snapshot{cache: c} + snapshot.Update() + return snapshot +} + +func (c *Cache) IsAssumedInstance(instance *v1alpha1.Volume) (bool, error) { + key, err := c.strategy.Key(instance) + if err != nil { + return false, err + } + + c.mu.RLock() + defer c.mu.RUnlock() + return c.assumedInstances.Has(key), nil +} + +func (c *Cache) AssumeInstance(instance *v1alpha1.Volume) error { + log := c.log.WithValues("Instance", klog.KObj(instance)) + key, err := c.strategy.Key(instance) + if err != nil { + return err + } + log = log.WithValues("InstanceKey", key) + + c.mu.Lock() + defer c.mu.Unlock() + + if _, ok := c.instanceStates[key]; ok { + return fmt.Errorf("instance %s(%v) is in the cache, so can't be assumed", key, klog.KObj(instance)) + } + + c.addInstance(log, key, instance, true) + return nil +} + +func (c *Cache) ForgetInstance(instance *v1alpha1.Volume) error { + log := c.log.WithValues("Instance", klog.KObj(instance)) + key, err := c.strategy.Key(instance) + if err != nil { + return err + } + log = log.WithValues("InstanceKey", key) + + currState, ok := c.instanceStates[key] + if ok { + oldContainerKey := c.strategy.ContainerKey(currState.instance) + newContainerKey := c.strategy.ContainerKey(instance) + if oldContainerKey != newContainerKey { + return fmt.Errorf("instance %s(%v) was assumed on container %s but assinged to %s", key, klog.KObj(instance), newContainerKey, oldContainerKey) + } + } + + if ok && c.assumedInstances.Has(key) { + c.removeInstance(log, key, instance) + } + return fmt.Errorf("instance %s(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(instance)) +} + +func (c *Cache) FinishBinding(instance *v1alpha1.Volume) error { + log := c.log.WithValues("Instance", klog.KObj(instance)) + key, err := c.strategy.Key(instance) + if err != nil { + return err + } + log = log.WithValues("InstanceKey", key) + + c.mu.RLock() + defer c.mu.RUnlock() + + log.V(5).Info("Finished binding for instance, can be expired") + currState, ok := c.instanceStates[key] + if ok && c.assumedInstances.Has(key) { + currState.bindingFinished = true + } + return nil +} + +func (c *Cache) AddContainer(node *v1alpha1.VolumePool) { + c.mu.Lock() + defer c.mu.Unlock() + + n, ok := c.nodes[node.Name] + if !ok { + n = newNodeInfo() + c.nodes[node.Name] = n + } + n.node = node +} + +func (c *Cache) UpdateContainer(_, newNode *v1alpha1.VolumePool) { + c.mu.Lock() + defer c.mu.Unlock() + + n, ok := c.nodes[newNode.Name] + if !ok { + n = newNodeInfo() + c.nodes[newNode.Name] = n + } + n.node = newNode +} + +func (c *Cache) RemoveContainer(node *v1alpha1.VolumePool) error { + c.mu.Lock() + defer c.mu.Unlock() + + n, ok := c.nodes[node.Name] + if !ok { + return fmt.Errorf("node %s not found", node.Name) + } + + n.node = nil + if len(n.instances) == 0 { + delete(c.nodes, node.Name) + } + return nil +} + +func (c *Cache) AddInstance(instance *v1alpha1.Volume) error { + log := c.log.WithValues("Instance", klog.KObj(instance)) + key, err := c.strategy.Key(instance) + if err != nil { + return err + } + log = log.WithValues("InstanceKey", key) + + c.mu.Lock() + defer c.mu.Unlock() + + currState, ok := c.instanceStates[key] + switch { + case ok && c.assumedInstances.Has(key): + // The instance was previously assumed, but now we have actual knowledge. + c.updateInstance(log, key, currState.instance, instance) + oldContainerKey := c.strategy.ContainerKey(currState.instance) + newContainerKey := c.strategy.ContainerKey(instance) + if oldContainerKey != newContainerKey { + log.Info("Instance was added to a different container than assumed", + "AssumedContainer", oldContainerKey, + "ActualContainer", newContainerKey, + ) + } + return nil + case !ok: + // Instance was expired, add it back to the cache. + c.addInstance(log, key, instance, false) + return nil + default: + return fmt.Errorf("instance %s(%s) was already in added state", key, klog.KObj(instance)) + } +} + +func (c *Cache) UpdateInstance(oldInstance, newInstance *v1alpha1.Volume) error { + log := c.log.WithValues("Instance", klog.KObj(oldInstance)) + key, err := c.strategy.Key(oldInstance) + if err != nil { + return err + } + log = log.WithValues("InstanceKey", key) + + c.mu.Lock() + defer c.mu.Unlock() + + currState, ok := c.instanceStates[key] + if !ok { + return fmt.Errorf("instance %s is not present in the cache and thus cannot be updated", key) + } + + if c.assumedInstances.Has(key) { + // An assumed instance won't have an Update / Remove event. It needs to have an Add event + // before an Update event, in which case the state would change from assumed to added. + return fmt.Errorf("assumed instance %s should not be updated", key) + } + + oldContainerKey := c.strategy.ContainerKey(currState.instance) + newContainerKey := c.strategy.ContainerKey(newInstance) + if oldContainerKey != newContainerKey { + // In this case, the scheduler cache is corrupted, and we cannot handle this correctly in any way - panic to + // signal abnormal exit. + err := fmt.Errorf("instance %s updated on container %s which is different than the container %s it was previously added to", + key, oldContainerKey, newContainerKey) + panic(err) + } + c.updateInstance(log, key, oldInstance, newInstance) + return nil +} + +func (c *Cache) RemoveInstance(instance *v1alpha1.Volume) error { + log := c.log.WithValues("Instance", klog.KObj(instance)) + key, err := c.strategy.Key(instance) + if err != nil { + return err + } + log = log.WithValues("InstanceKey", key) + + c.mu.Lock() + defer c.mu.Unlock() + + currState, ok := c.instanceStates[key] + if !ok { + return fmt.Errorf("instance %s not found", key) + } + + oldContainerKey := c.strategy.ContainerKey(currState.instance) + newContainerKey := c.strategy.ContainerKey(instance) + if oldContainerKey != newContainerKey { + // In this case, the scheduler cache is corrupted, and we cannot handle this correctly in any way - panic to + // signal abnormal exit. + err := fmt.Errorf("instance %s updated on container %s which is different than the container %s it was previously added to", + key, oldContainerKey, newContainerKey) + panic(err) + } + c.removeInstance(log, key, instance) + return nil +} + +func (c *Cache) updateInstance(log logr.Logger, key types.UID, oldInstance, newInstance *v1alpha1.Volume) { + c.removeInstance(log, key, oldInstance) + c.addInstance(log, key, newInstance, false) +} + +func (c *Cache) addInstance(_ logr.Logger, key types.UID, instance *v1alpha1.Volume, assume bool) { + containerKey := c.strategy.ContainerKey(instance) + n, ok := c.nodes[containerKey] + if !ok { + n = newNodeInfo() + c.nodes[containerKey] = n + } + n.instances[key] = &InstanceInfo{instance: instance} + is := &instanceState{ + instance: instance, + } + c.instanceStates[key] = is + if assume { + c.assumedInstances.Insert(key) + } +} + +func (c *Cache) removeInstance(log logr.Logger, key types.UID, instance *v1alpha1.Volume) { + containerKey := c.strategy.ContainerKey(instance) + n, ok := c.nodes[containerKey] + if !ok { + err := fmt.Errorf("container %s not found when trying to remove instance %s", containerKey, key) + log.Error(err, "Container not found") + } else { + delete(n.instances, key) + if len(n.instances) == 0 && n.node == nil { + // Garbage collect container if it's not used anymore. + delete(c.nodes, containerKey) + } + } + + c.assumedInstances.Delete(key) + delete(c.instanceStates, key) +} + +func (c *Cache) cleanupAssumedInstances() { + log := c.log + + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.assumedInstances { + log := log.WithValues("InstanceKey", key) + is, ok := c.instanceStates[key] + if !ok { + err := fmt.Errorf("instance key %s is assumed but no state recorded, potential logical error", key) + panic(err) + } + + if !is.bindingFinished { + log.V(5).Info("Won't expire cache for an instance where binding is still in progress") + continue + } + + log.V(5).Info("Removing expired instance") + c.removeInstance(log, key, is.instance) + } +} + +func (c *Cache) Start(ctx context.Context) error { + wait.UntilWithContext(ctx, func(ctx context.Context) { + c.cleanupAssumedInstances() + }, 1*time.Second) + return nil +} diff --git a/internal/controllers/storage/suite_test.go b/internal/controllers/storage/suite_test.go index 882b22c2d..2d7af7e41 100644 --- a/internal/controllers/storage/suite_test.go +++ b/internal/controllers/storage/suite_test.go @@ -24,10 +24,14 @@ import ( "github.com/onmetal/controller-utils/buildutils" computev1alpha1 "github.com/onmetal/onmetal-api/api/compute/v1alpha1" + corev1alpha1 "github.com/onmetal/onmetal-api/api/core/v1alpha1" computeclient "github.com/onmetal/onmetal-api/internal/client/compute" storageclient "github.com/onmetal/onmetal-api/internal/client/storage" + "github.com/onmetal/onmetal-api/internal/controllers/storage/scheduler" utilsenvtest "github.com/onmetal/onmetal-api/utils/envtest" "github.com/onmetal/onmetal-api/utils/envtest/apiserver" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "k8s.io/utils/lru" ctrl "sigs.k8s.io/controller-runtime" @@ -35,6 +39,7 @@ import ( storagev1alpha1 "github.com/onmetal/onmetal-api/api/storage/v1alpha1" + . "github.com/onmetal/onmetal-api/utils/testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" @@ -138,6 +143,9 @@ var _ = BeforeSuite(func() { Expect(storageclient.SetupBucketPoolAvailableBucketClassesFieldIndexer(ctx, k8sManager.GetFieldIndexer())).To(Succeed()) Expect(storageclient.SetupBucketSpecBucketPoolRefNameFieldIndexer(ctx, k8sManager.GetFieldIndexer())).To(Succeed()) + schedulerCache := scheduler.NewCache(k8sManager.GetLogger(), scheduler.DefaultCacheStrategy) + Expect(k8sManager.Add(schedulerCache)).To(Succeed()) + // register reconciler here Expect((&VolumeReleaseReconciler{ Client: k8sManager.GetClient(), @@ -158,6 +166,7 @@ var _ = BeforeSuite(func() { Expect((&VolumeScheduler{ Client: k8sManager.GetClient(), EventRecorder: &record.FakeRecorder{}, + Cache: schedulerCache, }).SetupWithManager(k8sManager)).To(Succeed()) Expect((&BucketClassReconciler{ @@ -175,3 +184,17 @@ var _ = BeforeSuite(func() { Expect(k8sManager.Start(ctx)).To(Succeed(), "failed to start manager") }() }) + +func SetupVolumeClass() *storagev1alpha1.VolumeClass { + return SetupObjectStruct[*storagev1alpha1.VolumeClass](&k8sClient, func(volumeClass *storagev1alpha1.VolumeClass) { + *volumeClass = storagev1alpha1.VolumeClass{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "volume-class-", + }, + Capabilities: corev1alpha1.ResourceList{ + corev1alpha1.ResourceTPS: resource.MustParse("250Mi"), + corev1alpha1.ResourceIOPS: resource.MustParse("15000"), + }, + } + }) +} diff --git a/internal/controllers/storage/volume_scheduler.go b/internal/controllers/storage/volume_scheduler.go index b19ddbbbe..cde2a8616 100644 --- a/internal/controllers/storage/volume_scheduler.go +++ b/internal/controllers/storage/volume_scheduler.go @@ -17,27 +17,37 @@ package storage import ( "context" "fmt" - "math/rand" "github.com/go-logr/logr" "github.com/onmetal/onmetal-api/api/common/v1alpha1" + corev1alpha1 "github.com/onmetal/onmetal-api/api/core/v1alpha1" storagev1alpha1 "github.com/onmetal/onmetal-api/api/storage/v1alpha1" storageclient "github.com/onmetal/onmetal-api/internal/client/storage" + "github.com/onmetal/onmetal-api/internal/controllers/storage/scheduler" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" ) +const ( + outOfCapacity = "OutOfCapacity" +) + type VolumeScheduler struct { record.EventRecorder client.Client + + Cache *scheduler.Cache + snapshot *scheduler.Snapshot } //+kubebuilder:rbac:groups="",resources=events,verbs=create;patch @@ -54,127 +64,263 @@ func (s *VolumeScheduler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, client.IgnoreNotFound(err) } - if !volume.DeletionTimestamp.IsZero() { - log.Info("Volume is already deleting") + if s.skipSchedule(log, volume) { return ctrl.Result{}, nil } - if volume.Spec.VolumePoolRef != nil { - log.Info("Volume is already assigned", "VolumePoolRef", volume.Spec.VolumePoolRef) - return ctrl.Result{}, nil + + return s.reconcileExists(ctx, log, volume) +} + +func (s *VolumeScheduler) skipSchedule(log logr.Logger, volume *storagev1alpha1.Volume) bool { + if !volume.DeletionTimestamp.IsZero() { + log.V(1).Info("Skipping scheduling for instance", "Reason", "Deleting") + return true + } + + if volume.Spec.VolumeClassRef == nil { + log.V(1).Info("Skipping scheduling for instance", "Reason", "No VolumeClassRef") + return true + } + + isAssumed, err := s.Cache.IsAssumedInstance(volume) + if err != nil { + log.Error(err, "Error checking whether volume has been assumed") + return false + } + + log.V(1).Info("Skipping scheduling for instance", "Reason", "Assumed") + return isAssumed +} + +func (s *VolumeScheduler) matchesLabels(ctx context.Context, pool *scheduler.ContainerInfo, volume *storagev1alpha1.Volume) bool { + nodeLabels := labels.Set(pool.Node().Labels) + volumePoolSelector := labels.SelectorFromSet(volume.Spec.VolumePoolSelector) + + return volumePoolSelector.Matches(nodeLabels) +} + +func (s *VolumeScheduler) tolerateTaints(ctx context.Context, pool *scheduler.ContainerInfo, volume *storagev1alpha1.Volume) bool { + return v1alpha1.TolerateTaints(volume.Spec.Tolerations, pool.Node().Spec.Taints) +} + +func (s *VolumeScheduler) fitsPool(ctx context.Context, pool *scheduler.ContainerInfo, volume *storagev1alpha1.Volume) bool { + volumeClassName := volume.Spec.VolumeClassRef.Name + + allocatable, ok := pool.Node().Status.Allocatable[corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClassName)] + if !ok { + return false } - return s.schedule(ctx, log, volume) + + return allocatable.Cmp(*volume.Spec.Resources.Storage()) > 0 } -func (s *VolumeScheduler) schedule(ctx context.Context, log logr.Logger, volume *storagev1alpha1.Volume) (ctrl.Result, error) { - log.Info("Scheduling volume") - list := &storagev1alpha1.VolumePoolList{} - if err := s.List(ctx, list, - client.MatchingFields{storageclient.VolumePoolAvailableVolumeClassesField: volume.Spec.VolumeClassRef.Name}, - client.MatchingLabels(volume.Spec.VolumePoolSelector), - ); err != nil { - return ctrl.Result{}, fmt.Errorf("error listing volume pools: %w", err) +func (s *VolumeScheduler) updateSnapshot() { + if s.snapshot == nil { + s.snapshot = s.Cache.Snapshot() + } else { + s.snapshot.Update() } +} - var available []storagev1alpha1.VolumePool - for _, volumePool := range list.Items { - if volumePool.DeletionTimestamp.IsZero() { - available = append(available, volumePool) +func (s *VolumeScheduler) assume(assumed *storagev1alpha1.Volume, nodeName string) error { + assumed.Spec.VolumePoolRef = &corev1.LocalObjectReference{Name: nodeName} + if err := s.Cache.AssumeInstance(assumed); err != nil { + return err + } + return nil +} + +func (s *VolumeScheduler) bindingCycle(ctx context.Context, log logr.Logger, assumedInstance *storagev1alpha1.Volume) error { + if err := s.bind(ctx, log, assumedInstance); err != nil { + return fmt.Errorf("error binding: %w", err) + } + return nil +} + +func (s *VolumeScheduler) bind(ctx context.Context, log logr.Logger, assumed *storagev1alpha1.Volume) error { + defer func() { + if err := s.Cache.FinishBinding(assumed); err != nil { + log.Error(err, "Error finishing cache binding") } + }() + + nonAssumed := assumed.DeepCopy() + nonAssumed.Spec.VolumePoolRef = nil + + if err := s.Patch(ctx, assumed, client.MergeFrom(nonAssumed)); err != nil { + return fmt.Errorf("error patching instance: %w", err) } - if len(available) == 0 { - log.Info("No volume pool available for volume class", "VolumeClass", volume.Spec.VolumeClassRef.Name) - s.Eventf(volume, corev1.EventTypeNormal, "CannotSchedule", "No VolumePoolRef found for VolumeClass %s", volume.Spec.VolumeClassRef.Name) + return nil +} + +func (s *VolumeScheduler) reconcileExists(ctx context.Context, log logr.Logger, volume *storagev1alpha1.Volume) (ctrl.Result, error) { + s.updateSnapshot() + + nodes := s.snapshot.ListNodes() + if len(nodes) == 0 { + s.EventRecorder.Event(volume, corev1.EventTypeNormal, outOfCapacity, "No nodes available to schedule volume on") return ctrl.Result{}, nil } - // Filter volume pools by checking if the volume tolerates all the taints of a volume pool - var filtered []storagev1alpha1.VolumePool - for _, pool := range available { - if v1alpha1.TolerateTaints(volume.Spec.Tolerations, pool.Spec.Taints) { - filtered = append(filtered, pool) + var filteredNodes []*scheduler.ContainerInfo + for _, node := range nodes { + if !s.tolerateTaints(ctx, node, volume) { + log.Info("node filtered", "reason", "taints do not match") + continue + } + if !s.matchesLabels(ctx, node, volume) { + log.Info("node filtered", "reason", "label do not match") + continue } + if !s.fitsPool(ctx, node, volume) { + log.Info("node filtered", "reason", "resources do not match") + continue + } + + filteredNodes = append(filteredNodes, node) } - if len(filtered) == 0 { - log.Info("No volume pool tolerated by the volume", "Tolerations", volume.Spec.Tolerations) - s.Eventf(volume, corev1.EventTypeNormal, "CannotSchedule", "No VolumePoolRef tolerated by %s", &volume.Spec.Tolerations) + + if len(filteredNodes) == 0 { + s.EventRecorder.Event(volume, corev1.EventTypeNormal, outOfCapacity, "No nodes available after filtering to schedule volume on") return ctrl.Result{}, nil } - available = filtered - // Get a random pool to distribute evenly. - // TODO: Instead of random distribution, try to come up w/ metrics that include usage of each pool to - // avoid unfortunate random distribution of items. - pool := available[rand.Intn(len(available))] - log = log.WithValues("VolumePoolRef", pool.Name) - base := volume.DeepCopy() - volume.Spec.VolumePoolRef = &corev1.LocalObjectReference{Name: pool.Name} - log.Info("Patching volume") - if err := s.Patch(ctx, volume, client.MergeFrom(base)); err != nil { - return ctrl.Result{}, fmt.Errorf("error scheduling volume on pool: %w", err) + maxAllocatableNode := filteredNodes[0] + for _, node := range filteredNodes[1:] { + current := node.MaxAllocatable(volume.Spec.VolumeClassRef.Name) + if current.Cmp(maxAllocatableNode.MaxAllocatable(volume.Spec.VolumeClassRef.Name)) == 1 { + maxAllocatableNode = node + } + } + log.V(1).Info("Determined node to schedule on", "NodeName", maxAllocatableNode.Node().Name, "Instances", maxAllocatableNode.NumInstances(), "Allocatable", maxAllocatableNode.MaxAllocatable(volume.Spec.VolumeClassRef.Name)) + + log.V(1).Info("Assuming volume to be on node") + if err := s.assume(volume, maxAllocatableNode.Node().Name); err != nil { + return ctrl.Result{}, err } - log.Info("Successfully assigned volume") + log.V(1).Info("Running binding asynchronously") + go func() { + if err := s.bindingCycle(ctx, log, volume); err != nil { + if err := s.Cache.ForgetInstance(volume); err != nil { + log.Error(err, "Error forgetting instance") + } + } + }() return ctrl.Result{}, nil } -func filterVolume(volume *storagev1alpha1.Volume) bool { - return volume.DeletionTimestamp.IsZero() && - volume.Spec.VolumePoolRef == nil && - volume.Spec.VolumeClassRef != nil -} +func (s *VolumeScheduler) enqueueUnscheduledVolumes(ctx context.Context, queue workqueue.RateLimitingInterface) { + log := ctrl.LoggerFrom(ctx) + volumeList := &storagev1alpha1.VolumeList{} + if err := s.List(ctx, volumeList, client.MatchingFields{storageclient.VolumeSpecVolumePoolRefNameField: ""}); err != nil { + log.Error(fmt.Errorf("could not list volumes w/o volume pool: %w", err), "Error listing volume pools") + return + } -func (s *VolumeScheduler) enqueueRequestsByVolumePool() handler.EventHandler { - return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []ctrl.Request { - pool := object.(*storagev1alpha1.VolumePool) - log := ctrl.LoggerFrom(ctx) - if !pool.DeletionTimestamp.IsZero() { - return nil + for _, volume := range volumeList.Items { + if !volume.DeletionTimestamp.IsZero() { + continue } - - list := &storagev1alpha1.VolumeList{} - if err := s.List(ctx, list, client.MatchingFields{storageclient.VolumeSpecVolumePoolRefNameField: ""}); err != nil { - log.Error(err, "error listing unscheduled volumes") - return nil + if volume.Spec.VolumePoolRef != nil { + continue } + queue.Add(ctrl.Request{NamespacedName: client.ObjectKeyFromObject(&volume)}) + } +} - availableClassNames := sets.NewString() - for _, availableVolumeClass := range pool.Status.AvailableVolumeClasses { - availableClassNames.Insert(availableVolumeClass.Name) - } +func (s *VolumeScheduler) isVolumeAssigned() predicate.Predicate { + return predicate.NewPredicateFuncs(func(obj client.Object) bool { + volume := obj.(*storagev1alpha1.Volume) + return volume.Spec.VolumePoolRef != nil + }) +} + +func (s *VolumeScheduler) isVolumeNotAssigned() predicate.Predicate { + return predicate.NewPredicateFuncs(func(obj client.Object) bool { + volume := obj.(*storagev1alpha1.Volume) + return volume.Spec.VolumePoolRef == nil + }) +} - var requests []ctrl.Request - for _, volume := range list.Items { - if !filterVolume(&volume) { - continue +func (s *VolumeScheduler) handleVolume() handler.EventHandler { + return handler.Funcs{ + CreateFunc: func(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) { + volume := evt.Object.(*storagev1alpha1.Volume) + log := ctrl.LoggerFrom(ctx) + + if err := s.Cache.AddInstance(volume); err != nil { + log.Error(err, "Error adding volume to cache") } + }, + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) { + log := ctrl.LoggerFrom(ctx) - if !availableClassNames.Has(volume.Spec.VolumeClassRef.Name) { - continue + oldInstance := evt.ObjectOld.(*storagev1alpha1.Volume) + newInstance := evt.ObjectNew.(*storagev1alpha1.Volume) + if err := s.Cache.UpdateInstance(oldInstance, newInstance); err != nil { + log.Error(err, "Error updating volume in cache") } + }, + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { + log := ctrl.LoggerFrom(ctx) - if !labels.SelectorFromSet(volume.Spec.VolumePoolSelector).Matches(labels.Set(pool.Labels)) { - continue + instance := evt.Object.(*storagev1alpha1.Volume) + if err := s.Cache.RemoveInstance(instance); err != nil { + log.Error(err, "Error adding volume to cache") } + }, + } +} - requests = append(requests, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(&volume)}) - } - return requests - }) +func (s *VolumeScheduler) handleVolumePool() handler.EventHandler { + return handler.Funcs{ + CreateFunc: func(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) { + pool := evt.Object.(*storagev1alpha1.VolumePool) + s.Cache.AddContainer(pool) + s.enqueueUnscheduledVolumes(ctx, queue) + }, + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) { + oldPool := evt.ObjectOld.(*storagev1alpha1.VolumePool) + newPool := evt.ObjectNew.(*storagev1alpha1.VolumePool) + s.Cache.UpdateContainer(oldPool, newPool) + s.enqueueUnscheduledVolumes(ctx, queue) + }, + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { + log := ctrl.LoggerFrom(ctx) + + pool := evt.Object.(*storagev1alpha1.VolumePool) + if err := s.Cache.RemoveContainer(pool); err != nil { + log.Error(err, "Error removing volume pool from cache") + } + }, + } } func (s *VolumeScheduler) SetupWithManager(mgr manager.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("volume-scheduler"). + WithOptions(controller.Options{ + // Only a single concurrent reconcile since it is serialized on the scheduling algorithm's node fitting. + MaxConcurrentReconciles: 1, + }). + // Enqueue unscheduled volumes. For(&storagev1alpha1.Volume{}, - builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool { - volume := object.(*storagev1alpha1.Volume) - return filterVolume(volume) - })), + builder.WithPredicates( + s.isVolumeNotAssigned(), + ), + ). + Watches( + &storagev1alpha1.Volume{}, + s.handleVolume(), + builder.WithPredicates( + s.isVolumeAssigned(), + ), ). // Enqueue unscheduled volumes if a volume pool w/ required volume classes becomes available. Watches( &storagev1alpha1.VolumePool{}, - s.enqueueRequestsByVolumePool(), + s.handleVolumePool(), ). Complete(s) } diff --git a/internal/controllers/storage/volume_scheduler_test.go b/internal/controllers/storage/volume_scheduler_test.go index 72a15877f..52ebacbee 100644 --- a/internal/controllers/storage/volume_scheduler_test.go +++ b/internal/controllers/storage/volume_scheduler_test.go @@ -15,6 +15,9 @@ package storage import ( + "fmt" + "math" + corev1alpha1 "github.com/onmetal/onmetal-api/api/core/v1alpha1" . "github.com/onmetal/onmetal-api/utils/testing" . "github.com/onsi/ginkgo/v2" @@ -31,6 +34,7 @@ import ( var _ = Describe("VolumeScheduler", func() { ns := SetupNamespace(&k8sClient) + volumeClass := SetupVolumeClass() BeforeEach(func(ctx SpecContext) { By("waiting for the cached client to report no volume pools") @@ -52,10 +56,12 @@ var _ = Describe("VolumeScheduler", func() { Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create volume pool") By("patching the volume pool status to contain a volume class") - volumePoolBase := volumePool.DeepCopy() - volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: "my-volumeclass"}} - Expect(k8sClient.Status().Patch(ctx, volumePool, client.MergeFrom(volumePoolBase))). - To(Succeed(), "failed to patch volume pool status") + Eventually(UpdateStatus(volumePool, func() { + volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) By("creating a volume w/ the requested volume class") volume := &storagev1alpha1.Volume{ @@ -64,7 +70,7 @@ var _ = Describe("VolumeScheduler", func() { GenerateName: "test-volume-", }, Spec: storagev1alpha1.VolumeSpec{ - VolumeClassRef: &corev1.LocalObjectReference{Name: "my-volumeclass"}, + VolumeClassRef: &corev1.LocalObjectReference{Name: volumeClass.Name}, Resources: corev1alpha1.ResourceList{ corev1alpha1.ResourceStorage: resource.MustParse("1Gi"), }, @@ -88,7 +94,7 @@ var _ = Describe("VolumeScheduler", func() { GenerateName: "test-volume-", }, Spec: storagev1alpha1.VolumeSpec{ - VolumeClassRef: &corev1.LocalObjectReference{Name: "my-volumeclass"}, + VolumeClassRef: &corev1.LocalObjectReference{Name: volumeClass.Name}, Resources: corev1alpha1.ResourceList{ corev1alpha1.ResourceStorage: resource.MustParse("1Gi"), }, @@ -112,10 +118,12 @@ var _ = Describe("VolumeScheduler", func() { Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create volume pool") By("patching the volume pool status to contain a volume class") - volumePoolBase := volumePool.DeepCopy() - volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: "my-volumeclass"}} - Expect(k8sClient.Status().Patch(ctx, volumePool, client.MergeFrom(volumePoolBase))). - To(Succeed(), "failed to patch volume pool status") + Eventually(UpdateStatus(volumePool, func() { + volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) By("waiting for the volume to be scheduled onto the volume pool") Eventually(func() *corev1.LocalObjectReference { @@ -134,10 +142,12 @@ var _ = Describe("VolumeScheduler", func() { Expect(k8sClient.Create(ctx, volumePoolNoMatchingLabels)).To(Succeed(), "failed to create volume pool") By("patching the volume pool status to contain a volume class") - volumePoolNoMatchingLabelsBase := volumePoolNoMatchingLabels.DeepCopy() - volumePoolNoMatchingLabels.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: "my-volumeclass"}} - Expect(k8sClient.Status().Patch(ctx, volumePoolNoMatchingLabels, client.MergeFrom(volumePoolNoMatchingLabelsBase))). - To(Succeed(), "failed to patch volume pool status") + Eventually(UpdateStatus(volumePoolNoMatchingLabels, func() { + volumePoolNoMatchingLabels.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePoolNoMatchingLabels.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) By("creating a volume pool w/ matching labels") volumePoolMatchingLabels := &storagev1alpha1.VolumePool{ @@ -151,10 +161,12 @@ var _ = Describe("VolumeScheduler", func() { Expect(k8sClient.Create(ctx, volumePoolMatchingLabels)).To(Succeed(), "failed to create volume pool") By("patching the volume pool status to contain a volume class") - volumePoolMatchingLabelsBase := volumePoolMatchingLabels.DeepCopy() - volumePoolMatchingLabels.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: "my-volumeclass"}} - Expect(k8sClient.Status().Patch(ctx, volumePoolMatchingLabels, client.MergeFrom(volumePoolMatchingLabelsBase))). - To(Succeed(), "failed to patch volume pool status") + Eventually(UpdateStatus(volumePoolMatchingLabels, func() { + volumePoolMatchingLabels.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePoolMatchingLabels.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) By("creating a volume w/ the requested volume class") volume := &storagev1alpha1.Volume{ @@ -166,7 +178,7 @@ var _ = Describe("VolumeScheduler", func() { VolumePoolSelector: map[string]string{ "foo": "bar", }, - VolumeClassRef: &corev1.LocalObjectReference{Name: "my-volumeclass"}, + VolumeClassRef: &corev1.LocalObjectReference{Name: volumeClass.Name}, Resources: corev1alpha1.ResourceList{ corev1alpha1.ResourceStorage: resource.MustParse("1Gi"), }, @@ -205,10 +217,12 @@ var _ = Describe("VolumeScheduler", func() { Expect(k8sClient.Create(ctx, taintedVolumePool)).To(Succeed(), "failed to create the volume pool") By("patching the volume pool status to contain a volume class") - volumePoolBase := taintedVolumePool.DeepCopy() - taintedVolumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: "my-volumeclass"}} - Expect(k8sClient.Status().Patch(ctx, taintedVolumePool, client.MergeFrom(volumePoolBase))). - To(Succeed(), "failed to patch the volume pool status") + Eventually(UpdateStatus(taintedVolumePool, func() { + taintedVolumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + taintedVolumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) By("creating a volume") volume := &storagev1alpha1.Volume{ @@ -217,7 +231,7 @@ var _ = Describe("VolumeScheduler", func() { GenerateName: "test-volume-", }, Spec: storagev1alpha1.VolumeSpec{ - VolumeClassRef: &corev1.LocalObjectReference{Name: "my-volumeclass"}, + VolumeClassRef: &corev1.LocalObjectReference{Name: volumeClass.Name}, Resources: corev1alpha1.ResourceList{ corev1alpha1.ResourceStorage: resource.MustParse("1Gi"), }, @@ -263,4 +277,202 @@ var _ = Describe("VolumeScheduler", func() { g.Expect(volume.Spec.VolumePoolRef).To(Equal(&corev1.LocalObjectReference{Name: taintedVolumePool.Name})) }).Should(Succeed()) }) + + It("should schedule volume on pool with most allocatable resources", func(ctx SpecContext) { + By("creating a volume pool") + volumePool := &storagev1alpha1.VolumePool{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-pool-", + }, + } + Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create volume pool") + + By("patching the volume pool status to contain a volume class") + Eventually(UpdateStatus(volumePool, func() { + volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) + + By("creating a second volume pool") + secondVolumePool := &storagev1alpha1.VolumePool{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "second-test-pool-", + }, + } + Expect(k8sClient.Create(ctx, secondVolumePool)).To(Succeed(), "failed to create the second volume pool") + + By("creating a second volume class") + secondVolumeClass := &storagev1alpha1.VolumeClass{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "second-volume-class-", + }, + Capabilities: corev1alpha1.ResourceList{ + corev1alpha1.ResourceTPS: resource.MustParse("50Mi"), + corev1alpha1.ResourceIOPS: resource.MustParse("5000"), + }, + } + Expect(k8sClient.Create(ctx, secondVolumeClass)).To(Succeed(), "failed to create second volume class") + + By("patching the second volume pool status to contain a both volume classes") + Eventually(UpdateStatus(secondVolumePool, func() { + secondVolumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{ + {Name: volumeClass.Name}, + {Name: secondVolumeClass.Name}, + } + secondVolumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("50Gi"), + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, secondVolumeClass.Name): resource.MustParse("200Gi"), + } + })).Should(Succeed()) + + By("creating a volume") + volume := &storagev1alpha1.Volume{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + GenerateName: "test-volume-", + }, + Spec: storagev1alpha1.VolumeSpec{ + VolumeClassRef: &corev1.LocalObjectReference{ + Name: volumeClass.Name, + }, + Resources: corev1alpha1.ResourceList{ + corev1alpha1.ResourceStorage: resource.MustParse("5Gi"), + }, + }, + } + Expect(k8sClient.Create(ctx, volume)).To(Succeed(), "failed to create the volume") + + By("checking that the volume is scheduled onto the volume pool") + Eventually(Object(volume)).Should(SatisfyAll( + HaveField("Spec.VolumePoolRef.Name", Equal(volumePool.Name)), + )) + }) + + It("should schedule volumes evenly on pools", func(ctx SpecContext) { + By("creating a volume pool") + volumePool := &storagev1alpha1.VolumePool{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-pool-", + }, + } + Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create volume pool") + + By("patching the volume pool status to contain a volume class") + Eventually(UpdateStatus(volumePool, func() { + volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) + + By("creating a second volume pool") + secondVolumePool := &storagev1alpha1.VolumePool{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "second-test-pool-", + }, + } + Expect(k8sClient.Create(ctx, secondVolumePool)).To(Succeed(), "failed to create the second volume pool") + + By("patching the second volume pool status to contain a both volume classes") + Eventually(UpdateStatus(secondVolumePool, func() { + secondVolumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{ + {Name: volumeClass.Name}, + } + secondVolumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("100Gi"), + } + })).Should(Succeed()) + + By("creating volumes") + var volumes []*storagev1alpha1.Volume + for i := 0; i < 50; i++ { + volume := &storagev1alpha1.Volume{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + GenerateName: fmt.Sprintf("test-volume-%d-", i), + }, + Spec: storagev1alpha1.VolumeSpec{ + VolumeClassRef: &corev1.LocalObjectReference{ + Name: volumeClass.Name, + }, + Resources: corev1alpha1.ResourceList{ + corev1alpha1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + } + Expect(k8sClient.Create(ctx, volume)).To(Succeed(), "failed to create the volume") + volumes = append(volumes, volume) + } + + By("checking that every volume is scheduled onto a volume pool") + var numInstancesPool1, numInstancesPool2 int64 + for i := 0; i < 50; i++ { + Eventually(Object(volumes[i])).Should(SatisfyAll( + HaveField("Spec.VolumePoolRef", Not(BeNil())), + )) + + switch volumes[i].Spec.VolumePoolRef.Name { + case volumePool.Name: + numInstancesPool1++ + case secondVolumePool.Name: + numInstancesPool2++ + } + } + + By("checking that volume are roughly distributed") + Expect(math.Abs(float64(numInstancesPool1 - numInstancesPool2))).To(BeNumerically("<", 5)) + }) + + It("should schedule a volumes once the capacity is sufficient", func(ctx SpecContext) { + By("creating a volume pool") + volumePool := &storagev1alpha1.VolumePool{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-pool-", + }, + } + Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create volume pool") + By("patching the volume pool status to contain a volume class") + Eventually(UpdateStatus(volumePool, func() { + volumePool.Status.AvailableVolumeClasses = []corev1.LocalObjectReference{{Name: volumeClass.Name}} + volumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("5Gi"), + } + })).Should(Succeed()) + + By("creating a volume") + volume := &storagev1alpha1.Volume{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + GenerateName: "test-volume-", + }, + Spec: storagev1alpha1.VolumeSpec{ + VolumeClassRef: &corev1.LocalObjectReference{ + Name: volumeClass.Name, + }, + Resources: corev1alpha1.ResourceList{ + corev1alpha1.ResourceStorage: resource.MustParse("10Gi"), + }, + }, + } + Expect(k8sClient.Create(ctx, volume)).To(Succeed(), "failed to create the volume") + + By("checking that the volume is scheduled onto the volume pool") + Consistently(Object(volume)).Should(SatisfyAll( + HaveField("Spec.VolumePoolRef", BeNil()), + )) + + By("patching the volume pool status to contain a volume class") + Eventually(UpdateStatus(volumePool, func() { + volumePool.Status.Allocatable = corev1alpha1.ResourceList{ + corev1alpha1.ClassCountFor(corev1alpha1.ClassTypeVolumeClass, volumeClass.Name): resource.MustParse("20Gi"), + } + })).Should(Succeed()) + + By("checking that the volume is scheduled onto the volume pool") + Eventually(Object(volume)).Should(SatisfyAll( + HaveField("Spec.VolumePoolRef", Equal(&corev1.LocalObjectReference{Name: volumePool.Name})), + )) + }) })