diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index fa36d32e38..3e3cd903bd 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -19,15 +19,19 @@ package allocate import ( "context" "fmt" - "reflect" - "testing" - "github.com/agiledragon/gomonkey/v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + "reflect" + "testing" + "time" + "volcano.sh/volcano/pkg/scheduler/plugins/binpack" + "volcano.sh/volcano/pkg/scheduler/plugins/conformance" + "volcano.sh/volcano/pkg/scheduler/plugins/nodeorder" + "volcano.sh/volcano/pkg/scheduler/plugins/overcommit" "volcano.sh/volcano/pkg/scheduler/plugins/gang" "volcano.sh/volcano/pkg/scheduler/plugins/priority" @@ -492,3 +496,252 @@ func TestAllocateWithDynamicPVC(t *testing.T) { }) } } + +func TestAllocatedWithTerminatingPod(t *testing.T) { + var tmp *cache.SchedulerCache + patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error { + scCache.Binder.Bind(nil, []*api.TaskInfo{task}) + return nil + }) + defer patches.Reset() + patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error { + return nil + }) + defer patchUpdateQueueStatus.Reset() + options.ServerOpts = &options.ServerOption{ + MinNodesToFind: 100, + MinPercentageOfNodesToFind: 5, + PercentageOfNodesToFind: 100, + } + framework.RegisterPluginBuilder("priority", priority.New) + framework.RegisterPluginBuilder("gang", gang.New) + framework.RegisterPluginBuilder("conformance", conformance.New) + framework.RegisterPluginBuilder("overcommit", overcommit.New) + framework.RegisterPluginBuilder("drf", drf.New) + framework.RegisterPluginBuilder("proportion", proportion.New) + framework.RegisterPluginBuilder("nodeorder", nodeorder.New) + framework.RegisterPluginBuilder("binpack", binpack.New) + // case1 pod reproduce the case + // p2 will waiting p1 releasing completed and can not be scheduled n2 + // p2 will be pending if p1 is status releasing always + case1Pod1 := util.BuildPod("c1", "p1", "n1", v1.PodRunning, util.BuildResourceListWithGPU("2", "4G", "3"), "pg1", make(map[string]string), make(map[string]string)) + case1Pod1.CreationTimestamp = metav1.Time{Time: time.Now().Add(-20 * time.Minute)} + case1Pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()} + case1Pod2 := util.BuildPod("c1", "p2", "", v1.PodPending, util.BuildResourceListWithGPU("2", "4G", "2"), "pg2", make(map[string]string), make(map[string]string)) + + // case2 pod solve case + // p3 releasing failed and p4 will be scheduled n4 + + case2Pod3 := util.BuildPod("c2", "p3", "n3", v1.PodRunning, util.BuildResourceListWithGPU("2", "4G", "3"), "pg3", make(map[string]string), make(map[string]string)) + case2Pod3.CreationTimestamp = metav1.Time{Time: time.Now().Add(-20 * time.Minute)} + case2Pod3.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-10 * time.Minute)} + case2Pod4 := util.BuildPod("c2", "p4", "", v1.PodPending, util.BuildResourceListWithGPU("2", "4G", "2"), "pg4", make(map[string]string), make(map[string]string)) + tests := []struct { + name string + podGroups []*schedulingv1.PodGroup + pods []*v1.Pod + nodes []*v1.Node + queues []*schedulingv1.Queue + expected map[string]string + }{ + { + name: "two Job with one Releasing Pods and two nodes", + podGroups: []*schedulingv1.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "c1", + }, + Spec: schedulingv1.PodGroupSpec{ + Queue: "c1", + }, + Status: schedulingv1.PodGroupStatus{ + Phase: schedulingv1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg2", + Namespace: "c1", + }, + Spec: schedulingv1.PodGroupSpec{ + Queue: "c1", + }, + Status: schedulingv1.PodGroupStatus{ + Phase: schedulingv1.PodGroupInqueue, + }, + }, + }, + pods: []*v1.Pod{ + case1Pod1, + case1Pod2, + }, + nodes: []*v1.Node{ + util.BuildNode("n1", util.BuildResourceListWithGPU("20", "400Gi", "4"), make(map[string]string)), + util.BuildNode("n2", util.BuildResourceListWithGPU("20", "400Gi", "8"), make(map[string]string)), + }, + queues: []*schedulingv1.Queue{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "c1", + }, + Spec: schedulingv1.QueueSpec{ + Weight: 1, + }, + }, + }, + expected: map[string]string{}, + }, + + { + name: "two Job with one ReleasingFailed Pods and two nodes ", + podGroups: []*schedulingv1.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg3", + Namespace: "c2", + }, + Spec: schedulingv1.PodGroupSpec{ + Queue: "c2", + }, + Status: schedulingv1.PodGroupStatus{ + Phase: schedulingv1.PodGroupRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg4", + Namespace: "c2", + }, + Spec: schedulingv1.PodGroupSpec{ + Queue: "c2", + }, + Status: schedulingv1.PodGroupStatus{ + Phase: schedulingv1.PodGroupInqueue, + }, + }, + }, + pods: []*v1.Pod{ + case2Pod3, + case2Pod4, + }, + nodes: []*v1.Node{ + util.BuildNode("n3", util.BuildResourceListWithGPU("20", "400Gi", "4"), make(map[string]string)), + util.BuildNode("n4", util.BuildResourceListWithGPU("20", "400Gi", "8"), make(map[string]string)), + }, + queues: []*schedulingv1.Queue{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "c2", + }, + Spec: schedulingv1.QueueSpec{ + Weight: 1, + }, + }, + }, + expected: map[string]string{ + "c2/p4": "n4", + }, + }, + } + allocate := New() + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + binder := &util.FakeBinder{ + Binds: map[string]string{}, + Channel: make(chan string), + } + schedulerCache := &cache.SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + Queues: make(map[api.QueueID]*api.QueueInfo), + Binder: binder, + StatusUpdater: &util.FakeStatusUpdater{}, + VolumeBinder: &util.FakeVolumeBinder{}, + + Recorder: record.NewFakeRecorder(100), + } + + for _, node := range test.nodes { + schedulerCache.AddNode(node) + } + for _, pod := range test.pods { + schedulerCache.AddPod(pod) + } + + for _, ss := range test.podGroups { + schedulerCache.AddPodGroupV1beta1(ss) + } + + for _, q := range test.queues { + schedulerCache.AddQueueV1beta1(q) + } + + trueValue := true + ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: "gang", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + }, + { + Name: "priority", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + }, + { + Name: "conformance", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + }, + + { + Name: "overcommit", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + }, + { + Name: "drf", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + }, + { + Name: "proportion", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + }, + { + Name: "binpack", + EnabledJobPipelined: &trueValue, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledPredicate: &trueValue, + }, + }, + }, + }, nil) + defer framework.CloseSession(ssn) + + allocate.Execute(ssn) + + if !reflect.DeepEqual(test.expected, binder.Binds) { + t.Errorf("expected: %v, got %v ", test.expected, binder.Binds) + } + }) + } +} diff --git a/pkg/scheduler/api/helpers.go b/pkg/scheduler/api/helpers.go index 7509da2981..a53bfff1fb 100644 --- a/pkg/scheduler/api/helpers.go +++ b/pkg/scheduler/api/helpers.go @@ -18,9 +18,9 @@ package api import ( "fmt" - v1 "k8s.io/api/core/v1" clientcache "k8s.io/client-go/tools/cache" + "time" ) // PodKey returns the string key of a pod. @@ -33,16 +33,27 @@ func PodKey(pod *v1.Pod) TaskID { } func getTaskStatus(pod *v1.Pod) TaskStatus { + var waitTime int64 = 30 + if pod.Spec.TerminationGracePeriodSeconds != nil { + waitTime = *pod.Spec.TerminationGracePeriodSeconds + } + waitTime += 5 switch pod.Status.Phase { case v1.PodRunning: - if pod.DeletionTimestamp != nil { + if pod.DeletionTimestamp != nil && + time.Now().Unix()-pod.DeletionTimestamp.Unix() <= waitTime { return Releasing + } else if pod.DeletionTimestamp != nil { + return ReleasingFailed } return Running case v1.PodPending: - if pod.DeletionTimestamp != nil { + if pod.DeletionTimestamp != nil && + time.Now().Unix()-pod.DeletionTimestamp.Unix() <= waitTime { return Releasing + } else if pod.DeletionTimestamp != nil { + return ReleasingFailed } if len(pod.Spec.NodeName) == 0 { @@ -63,7 +74,7 @@ func getTaskStatus(pod *v1.Pod) TaskStatus { // AllocatedStatus checks whether the tasks has AllocatedStatus func AllocatedStatus(status TaskStatus) bool { switch status { - case Bound, Binding, Running, Allocated: + case Bound, Binding, Running, Allocated, ReleasingFailed: return true default: return false diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index b0b04b980b..d1ba5627f5 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -678,6 +678,7 @@ func (ji *JobInfo) ReadyTaskNum() int32 { occupied += len(ji.TaskStatusIndex[Bound]) occupied += len(ji.TaskStatusIndex[Binding]) occupied += len(ji.TaskStatusIndex[Running]) + occupied += len(ji.TaskStatusIndex[ReleasingFailed]) occupied += len(ji.TaskStatusIndex[Allocated]) occupied += len(ji.TaskStatusIndex[Succeeded]) diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 699cf8a918..d81562d815 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -45,6 +45,9 @@ const ( // Releasing means a task/pod is deleted. Releasing + // ReleasingFailed means a task/pod delete failed + ReleasingFailed + // Succeeded means that all containers in the pod have voluntarily terminated // with a container exit code of 0, and the system is not going to restart any of these containers. Succeeded @@ -73,6 +76,8 @@ func (ts TaskStatus) String() string { return "Running" case Releasing: return "Releasing" + case ReleasingFailed: + return "ReleasingFailed" case Succeeded: return "Succeeded" case Failed: diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index d95174e7cf..e2129783a9 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -201,6 +201,9 @@ func updateQueueStatus(ssn *Session) { for _, runningTask := range job.TaskStatusIndex[api.Running] { allocatedResources[job.Queue].Add(runningTask.Resreq) } + for _, runningTask := range job.TaskStatusIndex[api.ReleasingFailed] { + allocatedResources[job.Queue].Add(runningTask.Resreq) + } } // update queue status