Skip to content

Commit

Permalink
improve cal podgroup min and add test case
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed Aug 15, 2023
1 parent fce3a1f commit 7942a33
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 20 deletions.
21 changes: 1 addition & 20 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"context"
"fmt"
"reflect"
"sort"
"sync"
"sync/atomic"
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand All @@ -38,7 +36,6 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
"volcano.sh/volcano/pkg/controllers/util"
)

var calMutex sync.Mutex
Expand Down Expand Up @@ -784,23 +781,7 @@ func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
tasksPriority = append(tasksPriority, tp)
}

sort.Sort(tasksPriority)

minReq := v1.ResourceList{}
podCnt := int32(0)
for _, task := range tasksPriority {
for i := int32(0); i < task.Replicas; i++ {
if podCnt >= job.Spec.MinAvailable {
break
}

podCnt++
pod := &v1.Pod{
Spec: task.Template.Spec,
}
minReq = quotav1.Add(minReq, *util.GetPodQuotaUsage(pod))
}
}
minReq := tasksPriority.CalcPGMinResources(job.Spec.MinAvailable)

return &minReq
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package job

import (
"fmt"
"sort"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand All @@ -30,6 +32,7 @@ import (
schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/util"
)

// MakePodName append podname,jobname,taskName and index and returns the string.
Expand Down Expand Up @@ -255,3 +258,65 @@ func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool {
}
return false
}

// CalcPGMinResources sums up all task's min available; if not enough, then fill up to jobMinAvailable via task's replicas
func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList {
sort.Sort(p)
minReq := v1.ResourceList{}
podCnt := int32(0)

// 1. first sum up those tasks whose MinAvailable is set
for _, task := range p {
if task.MinAvailable == nil { // actually, all task's min available is set by webhook
continue
}

validReplics := *task.MinAvailable
if left := jobMinAvailable - podCnt; left < validReplics {
validReplics = left
}
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, validReplics))
podCnt += validReplics
if podCnt >= jobMinAvailable {
break
}
}

if podCnt >= jobMinAvailable {
return minReq
}

// 2. fill up the count of pod to jobMinAvailable with tasks whose replicas is not used up, higher priority first
leftCnt := jobMinAvailable - podCnt
for _, task := range p {
left := task.Replicas
if task.MinAvailable != nil {
if *task.MinAvailable == task.Replicas {
continue
} else {
left = task.Replicas - *task.MinAvailable
}
}

if leftCnt >= left {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, left))
leftCnt -= left
} else {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, leftCnt))
leftCnt = 0
}
if leftCnt <= 0 {
break
}
}
return minReq
}

// calTaskRequests returns requests resource with validReplica replicas
func calTaskRequests(pod *v1.Pod, validReplica int32) v1.ResourceList {
minReq := v1.ResourceList{}
for i := int32(0); i < validReplica; i++ {
minReq = quotav1.Add(minReq, *util.GetPodQuotaUsage(pod))
}
return minReq
}
Loading

0 comments on commit 7942a33

Please sign in to comment.