Skip to content

Commit

Permalink
use group-min-member annotation to initialize PodGroup
Browse files Browse the repository at this point in the history
Signed-off-by: sceneryback <[email protected]>
  • Loading branch information
jingyp authored and sceneryback committed Feb 12, 2025
1 parent bff75bb commit 05dd3e0
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 80 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
sigs.k8s.io/controller-runtime v0.13.0
sigs.k8s.io/yaml v1.4.0
stathat.com/c/consistent v1.0.0
volcano.sh/apis v1.10.0-alpha.0.0.20241210014034-bf27f4e986d0
volcano.sh/apis v1.11.1-0.20250211082520-7f8222e881d9
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -510,5 +510,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
volcano.sh/apis v1.10.0-alpha.0.0.20241210014034-bf27f4e986d0 h1:qcQNg8mEsXU+7YYX6hff9JT+jDj2RJB4aEGwOoWwjBY=
volcano.sh/apis v1.10.0-alpha.0.0.20241210014034-bf27f4e986d0/go.mod h1:FOdmG++9+8lgENJ9XXDh+O3Jcb9YVRnlMSpgIh3NSVI=
volcano.sh/apis v1.11.1-0.20250211082520-7f8222e881d9 h1:FaXN5C42er0oqvmyviJ6QSQcs1uTUJ8/Txz0AI4QkAI=
volcano.sh/apis v1.11.1-0.20250211082520-7f8222e881d9/go.mod h1:FOdmG++9+8lgENJ9XXDh+O3Jcb9YVRnlMSpgIh3NSVI=
20 changes: 5 additions & 15 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ func (p TasksPriority) CalcFirstCountResources(count int32) v1.ResourceList {

for _, task := range p {
if count <= task.Replicas {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, count))
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, count))
break
} else {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas))
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas))
count -= task.Replicas
}
}
Expand All @@ -353,7 +353,7 @@ func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList
if left := jobMinAvailable - podCnt; left < validReplics {
validReplics = left
}
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, validReplics))
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, validReplics))
podCnt += validReplics
if podCnt >= jobMinAvailable {
break
Expand All @@ -377,10 +377,10 @@ func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList
}

if leftCnt >= left {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, left))
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, left))
leftCnt -= left
} else {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, leftCnt))
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, leftCnt))
leftCnt = 0
}
if leftCnt <= 0 {
Expand All @@ -390,16 +390,6 @@ func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList
return minReq
}

// calTaskRequests returns requests resource with validReplica replicas
func calTaskRequests(pod *v1.Pod, validReplica int32) v1.ResourceList {
minReq := v1.ResourceList{}
usage := *util.GetPodQuotaUsage(pod)
for i := int32(0); i < validReplica; i++ {
minReq = quotav1.Add(minReq, usage)
}
return minReq
}

// isInternalEvent checks if the event is an internal event
func isInternalEvent(event v1alpha1.Event) bool {
switch event {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ func (pg *pgcontroller) processNextReq() bool {

// normal pod use volcano
klog.V(4).Infof("Try to create podgroup for pod %s/%s", pod.Namespace, pod.Name)
if err := pg.createNormalPodPGIfNotExist(pod); err != nil {
minMember := pg.getMinMemberFromUpperRes(pod)
if err := pg.createNormalPodPGIfNotExist(pod, minMember); err != nil {
klog.Errorf("Failed to handle Pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
pg.queue.AddRateLimited(req)
return true
Expand Down
39 changes: 35 additions & 4 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package podgroup
import (
"context"
"encoding/json"
"math"
"slices"
"strconv"
"strings"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -98,7 +100,8 @@ func (pg *pgcontroller) addReplicaSet(obj interface{}) {
klog.V(4).Infof("Pod %s field SchedulerName is not matched", klog.KObj(&pod))
return
}
err := pg.createNormalPodPGIfNotExist(&pod)
minMember := pg.getMinMemberFromUpperRes(&pod)
err := pg.createNormalPodPGIfNotExist(&pod, minMember)
if err != nil {
klog.Errorf("Failed to create PodGroup for pod %s: %v", klog.KObj(&pod), err)
}
Expand Down Expand Up @@ -177,6 +180,33 @@ func (pg *pgcontroller) getAnnotationsFromUpperRes(kind string, name string, nam
}
}

func (pg *pgcontroller) getMinMemberFromUpperRes(pod *v1.Pod) int32 {
minMember := int32(1)

for _, reference := range pod.OwnerReferences {
// Currently we assume the group-min-member annotation will be specified only once
if reference.Kind != "" && reference.Name != "" {
annotations := pg.getAnnotationsFromUpperRes(reference.Kind, reference.Name, pod.Namespace)
if minMemberAnno, ok := annotations[scheduling.VolcanoGroupMinMemberAnnotationKey]; ok {
minMemberFromAnno, err := strconv.Atoi(minMemberAnno)
if err != nil {
klog.Infof("Failed to convert minMemberAnnotation of Pod owners <%s/%s> into number, skipping",
pod.Namespace, pod.Name)
return minMember
}
if minMemberFromAnno < math.MinInt32 || minMemberFromAnno > math.MaxInt32 {
klog.Infof("minMember %d exceeds bounds of int32, skipping", minMember)
return minMember
}
minMember = int32(minMemberFromAnno)
break
}
}
}

return minMember
}

// Inherit annotations from upper resources.
func (pg *pgcontroller) inheritUpperAnnotations(pod *v1.Pod, obj *scheduling.PodGroup) {
if pg.inheritOwnerAnnotations {
Expand All @@ -193,7 +223,7 @@ func (pg *pgcontroller) inheritUpperAnnotations(pod *v1.Pod, obj *scheduling.Pod
}
}

func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod, minMember int32) error {
pgName := helpers.GeneratePodgroupName(pod)

if _, err := pg.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil {
Expand All @@ -203,6 +233,7 @@ func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
return err
}

minResources := util.CalTaskRequests(pod, minMember)
obj := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Expand All @@ -212,9 +243,9 @@ func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
Labels: map[string]string{},
},
Spec: scheduling.PodGroupSpec{
MinMember: 1,
MinMember: minMember,
PriorityClassName: pod.Spec.PriorityClassName,
MinResources: util.GetPodQuotaUsage(pod),
MinResources: &minResources,
},
Status: scheduling.PodGroupStatus{
Phase: scheduling.PodGroupPending,
Expand Down
Loading

0 comments on commit 05dd3e0

Please sign in to comment.