Skip to content

Commit

Permalink
add deepcopy to Devices interface and remove allocate in mgpu device
Browse files Browse the repository at this point in the history
Signed-off-by: jiangchenxi.530 <[email protected]>
  • Loading branch information
jiangchenxi.530 committed Oct 18, 2023
1 parent b8fa22b commit 1e7f04a
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 38 deletions.
6 changes: 5 additions & 1 deletion pkg/scheduler/api/devices/nvidia/gpushare/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
return &gpudevices
}

func (gs *GPUDevices) DeepCopy() interface{} {
return gs
}

// GetIgnoredDevices return device names which wish vc-scheduler to ignore
func (gs *GPUDevices) GetIgnoredDevices() []string {
return []string{""}
Expand Down Expand Up @@ -148,7 +152,7 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
func (gs *GPUDevices) FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) {
klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name)
if GpuSharingEnable {
fit, err := checkNodeGPUSharingPredicate(pod, gs)
Expand Down
111 changes: 87 additions & 24 deletions pkg/scheduler/api/devices/nvidia/mgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"strconv"
"sync"
"time"
"volcano.sh/volcano/pkg/scheduler/api/devices"
)
Expand Down Expand Up @@ -48,6 +49,7 @@ type GPUDevices struct {
NodePolicy string
// Pod2OptionMap is the map for pod and it's option
Pod2OptionMap map[string]*GPUOption
lock sync.RWMutex
GPUs GPUs
CoreName v1.ResourceName
MemName v1.ResourceName
Expand Down Expand Up @@ -80,13 +82,76 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
return decodeNodeDevices(name, node, rater)
}

func (gs *GPUDevices) DeepCopy() interface{} {
if gs == nil {
return nil
}

newPod2OptionMap := make(map[string]*GPUOption)
if gs.Pod2OptionMap != nil {
for k, v := range gs.Pod2OptionMap {
newAllocated := make([][]int, len(v.Request))
if len(v.Allocated) > 0 || len(v.Allocated[0]) > 0 {
for i := range v.Allocated {
newAllocated[i] = make([]int, len(v.Allocated[i]))
for j := range v.Allocated[i] {
newAllocated[i][j] = v.Allocated[i][j]
}
}

}

newOpt := &GPUOption{
Request: v.Request,
CMRequest: v.CMRequest,
Allocated: newAllocated,
Score: v.Score,
}
newPod2OptionMap[k] = newOpt
}
}

newGPUs := make([]*GPUDevice, 0)
for _, g := range gs.GPUs {
newMulti := make(map[string]int)
for k, v := range g.MultiUsedContainers {
newMulti[k] = v
}
newGPUDevice := &GPUDevice{
Index: g.Index,
CoreAvailable: g.CoreAvailable,
MemoryAvailable: g.MemoryAvailable,
CoreTotal: g.CoreTotal,
MemoryTotal: g.MemoryTotal,
ContainerCount: g.ContainerCount,
MGPUInstanceCount: g.MGPUInstanceCount,
MultiUsedContainers: newMulti,
}
newGPUs = append(newGPUs, newGPUDevice)
}

return &GPUDevices{
Name: gs.Name,
Rater: gs.Rater,
NodePolicy: gs.NodePolicy,
Pod2OptionMap: newPod2OptionMap,
GPUs: newGPUs,
CoreName: VKEResourceMGPUCore,
MemName: VKEResourceMGPUMemory,
}
}

// AddResource adds the pod to GPU pool if it is assigned
func (gs *GPUDevices) AddResource(pod *v1.Pod) {
if !isSharedMGPUPod(pod) || (pod.Status.Phase != v1.PodRunning) {
return
}
klog.V(3).Infof("Start to add pod %s/%s", pod.Namespace, pod.Name)
podName := getPodNamespaceName(pod)

gs.lock.Lock()
defer gs.lock.Unlock()

if _, ok := gs.Pod2OptionMap[podName]; !ok {
option := NewGPUOptionFromPod(pod, VKEResourceMGPUCore, VKEResourceMGPUMemory)
if option.Allocated != nil && option.Allocated[0] == nil {
Expand All @@ -108,6 +173,10 @@ func (gs *GPUDevices) SubResource(pod *v1.Pod) {
}
klog.Infof("Start to forget pod %s/%s", pod.Namespace, pod.Name)
podName := getPodNamespaceName(pod)

gs.lock.Lock()
defer gs.lock.Unlock()

option, ok := gs.Pod2OptionMap[podName]
if !ok {
return
Expand All @@ -131,56 +200,50 @@ func (gs *GPUDevices) HasDeviceRequest(pod *v1.Pod) bool {
}

// FilterNode checks if the 'pod' fit in current node
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
func (gs *GPUDevices) FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) {
klog.Infoln("MGPU DeviceSharing: Into FitInPod", pod.Name)
if MGPUEnable {
gs.lock.Lock()
defer gs.lock.Unlock()

fit, err := checkNodeMGPUSharingPredicate(pod, gs)
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return devices.Unschedulable, fmt.Sprintf("mgpuDevice %s", err.Error()), err
return devices.Unschedulable, "MGPU DeviceSharing FilterNode check node failed", err
}
}
klog.V(3).Infoln("vke.volcengine.com mGPU DeviceSharing: FitInPod succeed", pod.Name)
return devices.Success, "", nil
}

// Allocate action in predicate
func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error {
klog.Infoln("MGPU DeviceSharing:Into AllocateToPod", pod.Name)
if MGPUEnable {
fit, err := checkNodeMGPUSharingPredicate(pod, gs)
if err != nil || !fit {
klog.Errorln("DeviceSharing err=", err.Error())
return err
}
if klog.V(5).Enabled() {
klog.Infof("GPU Devices: %+v\n", gs.GPUs.ToString())
}
// Achieve the option of GPU for pod's containers
option, err := tradeForResourceOption(pod, gs)
if err != nil {
return err
return devices.Unschedulable, "MGPU DeviceSharing FilterNode trade for option failed", err
}
if err := gs.Add(pod, option); err != nil {
return fmt.Errorf("add pod to node allocator failed: %v", err)
return devices.Unschedulable, "MGPU DeviceSharing FilterNode add pod to node allocator failed", err
}

// patch GPU annotations and labels to the pod
if err := patchGPUInfo(kubeClient, pod, option); err != nil {
return fmt.Errorf("patch mgpu annotations and labels to pod failed: %v", err)
return devices.Unschedulable, "MGPU DeviceSharing FilterNode patch mgpu annotations and labels to pod failed", err
}
klog.V(3).Infoln("DeviceSharing:Allocate Success")
}
if klog.V(5).Enabled() {
klog.Infof("Allocated %s successfully: %s: %+v\n", pod.Name, gs.Name, gs.GPUs.ToString())
}
klog.V(3).Infoln("vke.volcengine.com mGPU DeviceSharing: FitInPod succeed", pod.Name)
return devices.Success, "", nil
}

// Allocate action in predicate
func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error {
return nil
}

// Release action in predicate
func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) error {
klog.Infoln("MGPU DeviceSharing:Into ReleaseToPod", pod.Name)
podName := GetPodNamespaceName(pod)

gs.lock.Lock()
defer gs.lock.Unlock()

option, ok := gs.Pod2OptionMap[podName]
if !ok {
// remove patched GPU annotations of the pod
Expand Down
11 changes: 5 additions & 6 deletions pkg/scheduler/api/devices/nvidia/mgpu/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,22 @@ func decodeNodeDevices(name string, node *v1.Node, rater Rater) *GPUDevices {
return gs
}

func checkNodeMGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices) (bool, error) {
func checkNodeMGPUSharingPredicate(pod *v1.Pod, gs *GPUDevices) (bool, error) {
if !isSharedMGPUPod(pod) {
return true, nil
}

gs := getMGPUDevicesSnapShot(gssnap)
if isMultipleGPUPod(pod) && (getPodComputePolicy(pod) != NativeBurstSharePolicy ||
gs.NodePolicy != NativeBurstSharePolicy) {
return false, fmt.Errorf("compute policy not match multiple mgpu")
}
if !isFullCardGPUPod(pod) && getPodComputePolicy(pod) != gs.NodePolicy {
return false, fmt.Errorf("compute policy not match normal mgpu")
}
_, err := tradeForResourceOption(pod, gs)
if err != nil {
return false, err
}
//_, err := tradeForResourceOption(pod, gs)
//if err != nil {
// return false, err
//}

return true, nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
return nodedevices
}

func (gs *GPUDevices) DeepCopy() interface{} {
return gs
}

func (gs *GPUDevices) GetIgnoredDevices() []string {
return []string{VolcanoVGPUMemory, VolcanoVGPUMemoryPercentage, VolcanoVGPUCores}
}
Expand Down Expand Up @@ -179,7 +183,7 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
func (gs *GPUDevices) FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error) {
klog.V(5).Infoln("4pdvgpu DeviceSharing starts filtering pods", pod.Name)
if VGPUEnable {
fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (ni *NodeInfo) CloneImageSummary() map[string]*k8sframework.ImageStateSumma
func (ni *NodeInfo) CloneOthers() map[string]interface{} {
others := make(map[string]interface{})
for k, v := range ni.Others {
others[k] = v
others[k] = v.(Devices).DeepCopy()
}
return others
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/api/shared_device_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Devices interface {
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
FilterNode(kubeClient kubernetes.Interface, pod *v1.Pod) (int, string, error)
//Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
//Release action in predicate
Expand All @@ -68,6 +68,8 @@ type Devices interface {

//used for debug and monitor
GetStatus() string

DeepCopy() interface{}
}

// make sure GPUDevices implements Devices interface
Expand Down
4 changes: 1 addition & 3 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package framework

import (
"fmt"
"reflect"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -30,7 +28,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"reflect"
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {

for _, val := range api.RegisteredDevices {
if devices, ok := node.Others[val].(api.Devices); ok {
code, msg, err := devices.FilterNode(task.Pod)
code, msg, err := devices.FilterNode(ssn.KubeClient(), task.Pod)
filterNodeStatus := &api.Status{
Code: code,
Reason: msg,
Expand Down

0 comments on commit 1e7f04a

Please sign in to comment.