From 1e7f04a5a9488120c96c0abb5105c690425d153f Mon Sep 17 00:00:00 2001 From: "jiangchenxi.530" Date: Wed, 18 Oct 2023 23:08:01 +0800 Subject: [PATCH] add deepcopy to Devices interface and remove allocate in mgpu device Signed-off-by: jiangchenxi.530 --- .../devices/nvidia/gpushare/device_info.go | 6 +- .../api/devices/nvidia/mgpu/device_info.go | 111 ++++++++++++++---- .../api/devices/nvidia/mgpu/utils.go | 11 +- .../api/devices/nvidia/vgpu/device_info.go | 6 +- pkg/scheduler/api/node_info.go | 2 +- pkg/scheduler/api/shared_device_pool.go | 4 +- pkg/scheduler/framework/session.go | 4 +- .../plugins/predicates/predicates.go | 2 +- 8 files changed, 108 insertions(+), 38 deletions(-) diff --git a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go index 3604698364..589363011b 100644 --- a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go @@ -91,6 +91,10 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices { return &gpudevices } +func (gs *GPUDevices) DeepCopy() interface{} { + return gs +} + // GetIgnoredDevices return device names which wish vc-scheduler to ignore func (gs *GPUDevices) GetIgnoredDevices() []string { return []string{""} @@ -148,7 +152,7 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { +func (gs *GPUDevices) FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) { klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name) if GpuSharingEnable { fit, err := checkNodeGPUSharingPredicate(pod, gs) diff --git a/pkg/scheduler/api/devices/nvidia/mgpu/device_info.go b/pkg/scheduler/api/devices/nvidia/mgpu/device_info.go index 78490d251c..ec6fb7fde2 100644 --- a/pkg/scheduler/api/devices/nvidia/mgpu/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/mgpu/device_info.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "strconv" + "sync" "time" "volcano.sh/volcano/pkg/scheduler/api/devices" ) @@ -48,6 +49,7 @@ type GPUDevices struct { NodePolicy string // Pod2OptionMap is the map for pod and it's option Pod2OptionMap map[string]*GPUOption + lock sync.RWMutex GPUs GPUs CoreName v1.ResourceName MemName v1.ResourceName @@ -80,6 +82,65 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices { return decodeNodeDevices(name, node, rater) } +func (gs *GPUDevices) DeepCopy() interface{} { + if gs == nil { + return nil + } + + newPod2OptionMap := make(map[string]*GPUOption) + if gs.Pod2OptionMap != nil { + for k, v := range gs.Pod2OptionMap { + newAllocated := make([][]int, len(v.Request)) + if len(v.Allocated) > 0 || len(v.Allocated[0]) > 0 { + for i := range v.Allocated { + newAllocated[i] = make([]int, len(v.Allocated[i])) + for j := range v.Allocated[i] { + newAllocated[i][j] = v.Allocated[i][j] + } + } + + } + + newOpt := &GPUOption{ + Request: v.Request, + CMRequest: v.CMRequest, + Allocated: newAllocated, + Score: v.Score, + } + newPod2OptionMap[k] = newOpt + } + } + + newGPUs := make([]*GPUDevice, 0) + for _, g := range gs.GPUs { + newMulti := make(map[string]int) + for k, v := range g.MultiUsedContainers { + newMulti[k] = v + } + newGPUDevice := &GPUDevice{ + Index: g.Index, + CoreAvailable: g.CoreAvailable, + MemoryAvailable: g.MemoryAvailable, + CoreTotal: g.CoreTotal, + MemoryTotal: g.MemoryTotal, + ContainerCount: g.ContainerCount, + MGPUInstanceCount: g.MGPUInstanceCount, + MultiUsedContainers: newMulti, + } + newGPUs = append(newGPUs, newGPUDevice) + } + + return &GPUDevices{ + Name: gs.Name, + Rater: gs.Rater, + NodePolicy: gs.NodePolicy, + Pod2OptionMap: newPod2OptionMap, + GPUs: newGPUs, + CoreName: VKEResourceMGPUCore, + MemName: VKEResourceMGPUMemory, + } +} + // AddResource adds the pod to GPU pool if it is assigned func (gs *GPUDevices) AddResource(pod *v1.Pod) { if !isSharedMGPUPod(pod) || (pod.Status.Phase != v1.PodRunning) { @@ -87,6 +148,10 @@ func (gs *GPUDevices) AddResource(pod *v1.Pod) { } klog.V(3).Infof("Start to add pod %s/%s", pod.Namespace, pod.Name) podName := getPodNamespaceName(pod) + + gs.lock.Lock() + defer gs.lock.Unlock() + if _, ok := gs.Pod2OptionMap[podName]; !ok { option := NewGPUOptionFromPod(pod, VKEResourceMGPUCore, VKEResourceMGPUMemory) if option.Allocated != nil && option.Allocated[0] == nil { @@ -108,6 +173,10 @@ func (gs *GPUDevices) SubResource(pod *v1.Pod) { } klog.Infof("Start to forget pod %s/%s", pod.Namespace, pod.Name) podName := getPodNamespaceName(pod) + + gs.lock.Lock() + defer gs.lock.Unlock() + option, ok := gs.Pod2OptionMap[podName] if !ok { return @@ -131,49 +200,39 @@ func (gs *GPUDevices) HasDeviceRequest(pod *v1.Pod) bool { } // FilterNode checks if the 'pod' fit in current node -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { +func (gs *GPUDevices) FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) { klog.Infoln("MGPU DeviceSharing: Into FitInPod", pod.Name) if MGPUEnable { + gs.lock.Lock() + defer gs.lock.Unlock() + fit, err := checkNodeMGPUSharingPredicate(pod, gs) if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return devices.Unschedulable, fmt.Sprintf("mgpuDevice %s", err.Error()), err + return devices.Unschedulable, "MGPU DeviceSharing FilterNode check node failed", err } - } - klog.V(3).Infoln("vke.volcengine.com mGPU DeviceSharing: FitInPod succeed", pod.Name) - return devices.Success, "", nil -} -// Allocate action in predicate -func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { - klog.Infoln("MGPU DeviceSharing:Into AllocateToPod", pod.Name) - if MGPUEnable { - fit, err := checkNodeMGPUSharingPredicate(pod, gs) - if err != nil || !fit { - klog.Errorln("DeviceSharing err=", err.Error()) - return err - } - if klog.V(5).Enabled() { - klog.Infof("GPU Devices: %+v\n", gs.GPUs.ToString()) - } // Achieve the option of GPU for pod's containers option, err := tradeForResourceOption(pod, gs) if err != nil { - return err + return devices.Unschedulable, "MGPU DeviceSharing FilterNode trade for option failed", err } if err := gs.Add(pod, option); err != nil { - return fmt.Errorf("add pod to node allocator failed: %v", err) + return devices.Unschedulable, "MGPU DeviceSharing FilterNode add pod to node allocator failed", err } // patch GPU annotations and labels to the pod if err := patchGPUInfo(kubeClient, pod, option); err != nil { - return fmt.Errorf("patch mgpu annotations and labels to pod failed: %v", err) + return devices.Unschedulable, "MGPU DeviceSharing FilterNode patch mgpu annotations and labels to pod failed", err } klog.V(3).Infoln("DeviceSharing:Allocate Success") } - if klog.V(5).Enabled() { - klog.Infof("Allocated %s successfully: %s: %+v\n", pod.Name, gs.Name, gs.GPUs.ToString()) - } + klog.V(3).Infoln("vke.volcengine.com mGPU DeviceSharing: FitInPod succeed", pod.Name) + return devices.Success, "", nil +} + +// Allocate action in predicate +func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { return nil } @@ -181,6 +240,10 @@ func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) err func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) error { klog.Infoln("MGPU DeviceSharing:Into ReleaseToPod", pod.Name) podName := GetPodNamespaceName(pod) + + gs.lock.Lock() + defer gs.lock.Unlock() + option, ok := gs.Pod2OptionMap[podName] if !ok { // remove patched GPU annotations of the pod diff --git a/pkg/scheduler/api/devices/nvidia/mgpu/utils.go b/pkg/scheduler/api/devices/nvidia/mgpu/utils.go index 2fd9e0cf89..076dc426c2 100644 --- a/pkg/scheduler/api/devices/nvidia/mgpu/utils.go +++ b/pkg/scheduler/api/devices/nvidia/mgpu/utils.go @@ -110,12 +110,11 @@ func decodeNodeDevices(name string, node *v1.Node, rater Rater) *GPUDevices { return gs } -func checkNodeMGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices) (bool, error) { +func checkNodeMGPUSharingPredicate(pod *v1.Pod, gs *GPUDevices) (bool, error) { if !isSharedMGPUPod(pod) { return true, nil } - gs := getMGPUDevicesSnapShot(gssnap) if isMultipleGPUPod(pod) && (getPodComputePolicy(pod) != NativeBurstSharePolicy || gs.NodePolicy != NativeBurstSharePolicy) { return false, fmt.Errorf("compute policy not match multiple mgpu") @@ -123,10 +122,10 @@ func checkNodeMGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices) (bool, error if !isFullCardGPUPod(pod) && getPodComputePolicy(pod) != gs.NodePolicy { return false, fmt.Errorf("compute policy not match normal mgpu") } - _, err := tradeForResourceOption(pod, gs) - if err != nil { - return false, err - } + //_, err := tradeForResourceOption(pod, gs) + //if err != nil { + // return false, err + //} return true, nil } diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go index 2da54485b8..cf5d768787 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go @@ -115,6 +115,10 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices { return nodedevices } +func (gs *GPUDevices) DeepCopy() interface{} { + return gs +} + func (gs *GPUDevices) GetIgnoredDevices() []string { return []string{VolcanoVGPUMemory, VolcanoVGPUMemoryPercentage, VolcanoVGPUCores} } @@ -179,7 +183,7 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { +func (gs *GPUDevices) FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) { klog.V(5).Infoln("4pdvgpu DeviceSharing starts filtering pods", pod.Name) if VGPUEnable { fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index 2120f1fb5f..b21ac3e17a 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -561,7 +561,7 @@ func (ni *NodeInfo) CloneImageSummary() map[string]*k8sframework.ImageStateSumma func (ni *NodeInfo) CloneOthers() map[string]interface{} { others := make(map[string]interface{}) for k, v := range ni.Others { - others[k] = v + others[k] = v.(Devices).DeepCopy() } return others } diff --git a/pkg/scheduler/api/shared_device_pool.go b/pkg/scheduler/api/shared_device_pool.go index 7446693c9b..0378b3abdb 100644 --- a/pkg/scheduler/api/shared_device_pool.go +++ b/pkg/scheduler/api/shared_device_pool.go @@ -57,7 +57,7 @@ type Devices interface { // preemption would not change anything. Plugins should return Unschedulable if it is possible // that the pod can get scheduled with preemption. // The accompanying status message should explain why the pod is unschedulable. - FilterNode(pod *v1.Pod) (int, string, error) + FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) //Allocate action in predicate Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error //Release action in predicate @@ -68,6 +68,8 @@ type Devices interface { //used for debug and monitor GetStatus() string + + DeepCopy() interface{} } // make sure GPUDevices implements Devices interface diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 10feb5657e..17e58caf9f 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -18,8 +18,6 @@ package framework import ( "fmt" - "reflect" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -30,7 +28,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" - + "reflect" "volcano.sh/apis/pkg/apis/scheduling" schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme" vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index e811461c69..ffc0675cc9 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -574,7 +574,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { for _, val := range api.RegisteredDevices { if devices, ok := node.Others[val].(api.Devices); ok { - code, msg, err := devices.FilterNode(task.Pod) + code, msg, err := devices.FilterNode(ssn.KubeClient(), task.Pod) filterNodeStatus := &api.Status{ Code: code, Reason: msg,