Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add task status named ReleasingFailed #2943

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 256 additions & 3 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package allocate
import (
"context"
"fmt"
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"reflect"
"testing"
"time"
"volcano.sh/volcano/pkg/scheduler/plugins/binpack"
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
"volcano.sh/volcano/pkg/scheduler/plugins/nodeorder"
"volcano.sh/volcano/pkg/scheduler/plugins/overcommit"

"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
Expand Down Expand Up @@ -492,3 +496,252 @@ func TestAllocateWithDynamicPVC(t *testing.T) {
})
}
}

func TestAllocatedWithTerminatingPod(t *testing.T) {
var tmp *cache.SchedulerCache
patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error {
scCache.Binder.Bind(nil, []*api.TaskInfo{task})
return nil
})
defer patches.Reset()
patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error {
return nil
})
defer patchUpdateQueueStatus.Reset()
options.ServerOpts = &options.ServerOption{
MinNodesToFind: 100,
MinPercentageOfNodesToFind: 5,
PercentageOfNodesToFind: 100,
}
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("gang", gang.New)
framework.RegisterPluginBuilder("conformance", conformance.New)
framework.RegisterPluginBuilder("overcommit", overcommit.New)
framework.RegisterPluginBuilder("drf", drf.New)
framework.RegisterPluginBuilder("proportion", proportion.New)
framework.RegisterPluginBuilder("nodeorder", nodeorder.New)
framework.RegisterPluginBuilder("binpack", binpack.New)
// case1 pod reproduce the case
// p2 will waiting p1 releasing completed and can not be scheduled n2
// p2 will be pending if p1 is status releasing always
case1Pod1 := util.BuildPod("c1", "p1", "n1", v1.PodRunning, util.BuildResourceListWithGPU("2", "4G", "3"), "pg1", make(map[string]string), make(map[string]string))
case1Pod1.CreationTimestamp = metav1.Time{Time: time.Now().Add(-20 * time.Minute)}
case1Pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()}
case1Pod2 := util.BuildPod("c1", "p2", "", v1.PodPending, util.BuildResourceListWithGPU("2", "4G", "2"), "pg2", make(map[string]string), make(map[string]string))

// case2 pod solve case
// p3 releasing failed and p4 will be scheduled n4

case2Pod3 := util.BuildPod("c2", "p3", "n3", v1.PodRunning, util.BuildResourceListWithGPU("2", "4G", "3"), "pg3", make(map[string]string), make(map[string]string))
case2Pod3.CreationTimestamp = metav1.Time{Time: time.Now().Add(-20 * time.Minute)}
case2Pod3.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-10 * time.Minute)}
case2Pod4 := util.BuildPod("c2", "p4", "", v1.PodPending, util.BuildResourceListWithGPU("2", "4G", "2"), "pg4", make(map[string]string), make(map[string]string))
tests := []struct {
name string
podGroups []*schedulingv1.PodGroup
pods []*v1.Pod
nodes []*v1.Node
queues []*schedulingv1.Queue
expected map[string]string
}{
{
name: "two Job with one Releasing Pods and two nodes",
podGroups: []*schedulingv1.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
Namespace: "c1",
},
Spec: schedulingv1.PodGroupSpec{
Queue: "c1",
},
Status: schedulingv1.PodGroupStatus{
Phase: schedulingv1.PodGroupRunning,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg2",
Namespace: "c1",
},
Spec: schedulingv1.PodGroupSpec{
Queue: "c1",
},
Status: schedulingv1.PodGroupStatus{
Phase: schedulingv1.PodGroupInqueue,
},
},
},
pods: []*v1.Pod{
case1Pod1,
case1Pod2,
},
nodes: []*v1.Node{
util.BuildNode("n1", util.BuildResourceListWithGPU("20", "400Gi", "4"), make(map[string]string)),
util.BuildNode("n2", util.BuildResourceListWithGPU("20", "400Gi", "8"), make(map[string]string)),
},
queues: []*schedulingv1.Queue{
{
ObjectMeta: metav1.ObjectMeta{
Name: "c1",
},
Spec: schedulingv1.QueueSpec{
Weight: 1,
},
},
},
expected: map[string]string{},
},

{
name: "two Job with one ReleasingFailed Pods and two nodes ",
podGroups: []*schedulingv1.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg3",
Namespace: "c2",
},
Spec: schedulingv1.PodGroupSpec{
Queue: "c2",
},
Status: schedulingv1.PodGroupStatus{
Phase: schedulingv1.PodGroupRunning,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg4",
Namespace: "c2",
},
Spec: schedulingv1.PodGroupSpec{
Queue: "c2",
},
Status: schedulingv1.PodGroupStatus{
Phase: schedulingv1.PodGroupInqueue,
},
},
},
pods: []*v1.Pod{
case2Pod3,
case2Pod4,
},
nodes: []*v1.Node{
util.BuildNode("n3", util.BuildResourceListWithGPU("20", "400Gi", "4"), make(map[string]string)),
util.BuildNode("n4", util.BuildResourceListWithGPU("20", "400Gi", "8"), make(map[string]string)),
},
queues: []*schedulingv1.Queue{
{
ObjectMeta: metav1.ObjectMeta{
Name: "c2",
},
Spec: schedulingv1.QueueSpec{
Weight: 1,
},
},
},
expected: map[string]string{
"c2/p4": "n4",
},
},
}
allocate := New()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: binder,
StatusUpdater: &util.FakeStatusUpdater{},
VolumeBinder: &util.FakeVolumeBinder{},

Recorder: record.NewFakeRecorder(100),
}

for _, node := range test.nodes {
schedulerCache.AddNode(node)
}
for _, pod := range test.pods {
schedulerCache.AddPod(pod)
}

for _, ss := range test.podGroups {
schedulerCache.AddPodGroupV1beta1(ss)
}

for _, q := range test.queues {
schedulerCache.AddQueueV1beta1(q)
}

trueValue := true
ssn := framework.OpenSession(schedulerCache, []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: "gang",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
},
{
Name: "priority",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
},
{
Name: "conformance",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
},

{
Name: "overcommit",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
},
{
Name: "drf",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
},
{
Name: "proportion",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
},
{
Name: "binpack",
EnabledJobPipelined: &trueValue,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledNodeOrder: &trueValue,
EnabledPredicate: &trueValue,
},
},
},
}, nil)
defer framework.CloseSession(ssn)

allocate.Execute(ssn)

if !reflect.DeepEqual(test.expected, binder.Binds) {
t.Errorf("expected: %v, got %v ", test.expected, binder.Binds)
}
})
}
}
19 changes: 15 additions & 4 deletions pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package api

import (
"fmt"

v1 "k8s.io/api/core/v1"
clientcache "k8s.io/client-go/tools/cache"
"time"
)

// PodKey returns the string key of a pod.
Expand All @@ -33,16 +33,27 @@ func PodKey(pod *v1.Pod) TaskID {
}

func getTaskStatus(pod *v1.Pod) TaskStatus {
var waitTime int64 = 30
if pod.Spec.TerminationGracePeriodSeconds != nil {
waitTime = *pod.Spec.TerminationGracePeriodSeconds
}
waitTime += 5
switch pod.Status.Phase {
case v1.PodRunning:
if pod.DeletionTimestamp != nil {
if pod.DeletionTimestamp != nil &&
time.Now().Unix()-pod.DeletionTimestamp.Unix() <= waitTime {
return Releasing
} else if pod.DeletionTimestamp != nil {
return ReleasingFailed
}

return Running
case v1.PodPending:
if pod.DeletionTimestamp != nil {
if pod.DeletionTimestamp != nil &&
time.Now().Unix()-pod.DeletionTimestamp.Unix() <= waitTime {
return Releasing
} else if pod.DeletionTimestamp != nil {
return ReleasingFailed
}

if len(pod.Spec.NodeName) == 0 {
Expand All @@ -63,7 +74,7 @@ func getTaskStatus(pod *v1.Pod) TaskStatus {
// AllocatedStatus checks whether the tasks has AllocatedStatus
func AllocatedStatus(status TaskStatus) bool {
switch status {
case Bound, Binding, Running, Allocated:
case Bound, Binding, Running, Allocated, ReleasingFailed:
return true
default:
return false
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ func (ji *JobInfo) ReadyTaskNum() int32 {
occupied += len(ji.TaskStatusIndex[Bound])
occupied += len(ji.TaskStatusIndex[Binding])
occupied += len(ji.TaskStatusIndex[Running])
occupied += len(ji.TaskStatusIndex[ReleasingFailed])
occupied += len(ji.TaskStatusIndex[Allocated])
occupied += len(ji.TaskStatusIndex[Succeeded])

Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ const (
// Releasing means a task/pod is deleted.
Releasing

// ReleasingFailed means a task/pod delete failed
ReleasingFailed

// Succeeded means that all containers in the pod have voluntarily terminated
// with a container exit code of 0, and the system is not going to restart any of these containers.
Succeeded
Expand Down Expand Up @@ -73,6 +76,8 @@ func (ts TaskStatus) String() string {
return "Running"
case Releasing:
return "Releasing"
case ReleasingFailed:
return "ReleasingFailed"
case Succeeded:
return "Succeeded"
case Failed:
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ func updateQueueStatus(ssn *Session) {
for _, runningTask := range job.TaskStatusIndex[api.Running] {
allocatedResources[job.Queue].Add(runningTask.Resreq)
}
for _, runningTask := range job.TaskStatusIndex[api.ReleasingFailed] {
allocatedResources[job.Queue].Add(runningTask.Resreq)
}
}

// update queue status
Expand Down
Loading