Skip to content

Commit

Permalink
fix Filter and Allocate in mgpu device
Browse files Browse the repository at this point in the history
Signed-off-by: jiangchenxi.530 <[email protected]>
  • Loading branch information
jiangchenxi.530 committed Oct 16, 2023
1 parent 4698ac2 commit b8fa22b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module volcano.sh/volcano

go 1.20
go 1.19

require (
github.com/agiledragon/gomonkey/v2 v2.1.0
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/api/devices/nvidia/mgpu/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ const (
NotNeedMultipleGPU = -3
// GPUTypeMGPU is GPUTypeMGPU
GPUTypeMGPU = "mgpu"
// GPUTypePGPU is GPUTypePGPU
GPUTypePGPU = "nvidia"
// GPUTypeNvidiaGPU is GPUTypeNvidiaGPU
GPUTypeNvidiaGPU = "nvidia"
// DefaultComputePolicy is DefaultComputePolicy
DefaultComputePolicy = "fixed-share"
// NativeBurstSharePolicy is NativeBurstSharePolicy
Expand Down
62 changes: 33 additions & 29 deletions pkg/scheduler/api/devices/nvidia/mgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
if node == nil {
return nil
}
value, ok := node.Labels[VKELabelNodeResourceType]
if !ok || value == GPUTypeNvidiaGPU {
return nil
}
rater := getRater()
devices := decodeNodeDevices(name, node, rater)

return devices
return decodeNodeDevices(name, node, rater)
}

// AddResource adds the pod to GPU pool if it is assigned
func (gs *GPUDevices) AddResource(pod *v1.Pod) {
if !isSharedMGPUPod(pod) {
if !isSharedMGPUPod(pod) || (pod.Status.Phase != v1.PodRunning) {
return
}
klog.V(3).Infof("Start to add pod %s/%s", pod.Namespace, pod.Name)
Expand All @@ -105,21 +108,20 @@ func (gs *GPUDevices) SubResource(pod *v1.Pod) {
}
klog.Infof("Start to forget pod %s/%s", pod.Namespace, pod.Name)
podName := getPodNamespaceName(pod)
option := NewGPUOptionFromPod(pod, VKEResourceMGPUCore, VKEResourceMGPUMemory)
//option, ok := gs.Pod2OptionMap[podName]
//if !ok {
// return
//}
option, ok := gs.Pod2OptionMap[podName]
if !ok {
return
}
if option.Allocated != nil && option.Allocated[0] == nil {
return
}
if klog.V(3).Enabled() {
klog.Infof("Cancel pod %s/%s option %+v on %+v", pod.Namespace, pod.Name, option, gs.GPUs.ToString())
}
gs.GPUs.Cancel(pod, option)
//if klog.V(3).Enabled() {
klog.Infof("After Cancel, Current GPU allocation of node %s: %+v", gs.Name, gs.GPUs.ToString())
//}
if klog.V(3).Enabled() {
klog.Infof("After Cancel, Current GPU allocation of node %s: %+v", gs.Name, gs.GPUs.ToString())
}
delete(gs.Pod2OptionMap, podName)
}

Expand Down Expand Up @@ -154,23 +156,8 @@ func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) err
if klog.V(5).Enabled() {
klog.Infof("GPU Devices: %+v\n", gs.GPUs.ToString())
}
var (
req GPURequest
cmReq ContainerMultiGPURequest
)
req, cmReq = NewGPURequest(pod, VKEResourceMGPUCore, VKEResourceMGPUMemory)
// Check container specific number whether exceed the node's GPU number
gpuCountList, _ := ExtraMultipleGPUCountList(pod)
if len(gpuCountList) > 0 {
for i, count := range gpuCountList {
if count > len(gs.GPUs) {
return fmt.Errorf("request multiple GPU count %d is exceed the allocatable GPU number, container index: %d", count, i+1)
}
}
}

// Achieve the option of GPU for pod's containers
option, err := gs.GPUs.Trade(gs.Rater, req, pod, cmReq)
option, err := tradeForResourceOption(pod, gs)
if err != nil {
return err
}
Expand All @@ -196,7 +183,11 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
podName := GetPodNamespaceName(pod)
option, ok := gs.Pod2OptionMap[podName]
if !ok {
return fmt.Errorf("")
// remove patched GPU annotations of the pod
if err := removePatchGPUInfo(kubeClient, pod); err != nil {
return fmt.Errorf("remove patched mgpu annotations failed: %v", err)
}
return fmt.Errorf("not found pod option from cache when ReleaseToPod")
}
if klog.V(3).Enabled() {
klog.Infof("Cancel pod %s/%s option %+v on %+v", pod.Namespace, pod.Name, option, gs.GPUs.ToString())
Expand Down Expand Up @@ -289,7 +280,7 @@ func (g GPUs) Transact(pod *v1.Pod, option *GPUOption) error {
// judge whether the only needed card can satisfy the request of container i
if !g[option.Allocated[i][0]].CanAllocate(option.Request[i]) {
g.cancelAdded(addedDict, pod, option)
klog.Errorf("Fail to trade option %+v on %+v because the GPU's residual memory or core can't satisfy the container", option, g.ToString())
//klog.Errorf("Fail to trade option %+v on %+v because the GPU's residual memory or core can't satisfy the container", option, g.ToString())
return fmt.Errorf("can't trade option %+v on %+v because the GPU's residual memory or core can't satisfy the container", option, g)
}
g[option.Allocated[i][0]].Add(option.Request[i])
Expand Down Expand Up @@ -676,6 +667,19 @@ func patchGPUInfo(kubeClient kubernetes.Interface, pod *v1.Pod, option *GPUOptio
return Patch(kubeClient, pod, podCopy)
}

func removePatchGPUInfo(kubeClient kubernetes.Interface, pod *v1.Pod) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
podCopy.Annotations = make(map[string]string)
}
for _, c := range podCopy.Spec.Containers {
delete(podCopy.Annotations, fmt.Sprintf(VKEAnnotationMGPUContainer, c.Name))
}
delete(podCopy.Annotations, VKEAnnotationMGPUAssumed)

return Patch(kubeClient, pod, podCopy)
}

// IsMultipleGPUPod return true when there is one pod required mgpu-core, mgpu-mem and multiple gpu count.
func IsMultipleGPUPod(pod *v1.Pod) bool {
gpuCountList, err := ExtraMultipleGPUCountList(pod)
Expand Down
43 changes: 28 additions & 15 deletions pkg/scheduler/api/devices/nvidia/mgpu/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ func checkMGPUResourceInPod(pod *v1.Pod) bool {
}

func getRater() Rater {
//var rater Rater
//switch GlobalConfig.Policy {
//case Binpack:
// rater = &GPUBinpack{}
//case Spread:
// rater = &GPUSpread{}
//default:
// klog.Errorf("priority algorithm is not supported: %s", GlobalConfig.Policy)
// return nil
//}
return &GPUBinpack{}
var rater Rater
switch GlobalConfig.Policy {
case Binpack:
rater = &GPUBinpack{}
case Spread:
rater = &GPUSpread{}
default:
klog.Errorf("priority algorithm is not supported: %s", GlobalConfig.Policy)
return nil
}
return rater
}

func decodeNodeDevices(name string, node *v1.Node, rater Rater) *GPUDevices {
Expand Down Expand Up @@ -102,7 +102,7 @@ func decodeNodeDevices(name string, node *v1.Node, rater Rater) *GPUDevices {
})
}
default:
klog.Errorf("invalid resource value of %s", VKELabelNodeResourceType)
klog.Errorf("invalid resource value of %s on node %s", VKELabelNodeResourceType, node.Name)
return nil
}
}
Expand All @@ -123,17 +123,30 @@ func checkNodeMGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices) (bool, error
if !isFullCardGPUPod(pod) && getPodComputePolicy(pod) != gs.NodePolicy {
return false, fmt.Errorf("compute policy not match normal mgpu")
}
_, err := tradeForResourceOption(pod, gs)
if err != nil {
return false, err
}

return true, nil
}

func tradeForResourceOption(pod *v1.Pod, gs *GPUDevices) (option *GPUOption, err error) {
var (
req GPURequest
cmReq ContainerMultiGPURequest
)
req, cmReq = NewGPURequest(pod, VKEResourceMGPUCore, VKEResourceMGPUMemory)
// Check container specific number whether exceed the node's GPU number
gpuCountList, _ := ExtraMultipleGPUCountList(pod)
if len(gpuCountList) > 0 {
for i, count := range gpuCountList {
if count > len(gs.GPUs) {
return false, fmt.Errorf("request multiple GPU count %d is exceed the allocatable GPU number, container index: %d", count, i+1)
return nil, fmt.Errorf("request multiple GPU count %d is exceed the allocatable GPU number, container index: %d", count, i+1)
}
}
}

return true, nil
return gs.GPUs.Trade(gs.Rater, req, pod, cmReq)
}

func getPodNamespaceName(pod *v1.Pod) string {
Expand Down

0 comments on commit b8fa22b

Please sign in to comment.