diff --git a/pkg/cli/podgroup/podgroup.go b/pkg/cli/podgroup/podgroup.go new file mode 100644 index 0000000000..ad9ea2c779 --- /dev/null +++ b/pkg/cli/podgroup/podgroup.go @@ -0,0 +1,26 @@ +package podgroup + +import "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + +type PodGroupStatistics struct { + Inqueue int + Pending int + Running int + Unknown int + Completed int +} + +func (pgStats *PodGroupStatistics) StatPodGroupCountsForQueue(pg *v1beta1.PodGroup) { + switch pg.Status.Phase { + case v1beta1.PodGroupInqueue: + pgStats.Inqueue++ + case v1beta1.PodGroupPending: + pgStats.Pending++ + case v1beta1.PodGroupRunning: + pgStats.Running++ + case v1beta1.PodGroupUnknown: + pgStats.Unknown++ + case v1beta1.PodGroupCompleted: + pgStats.Completed++ + } +} diff --git a/pkg/cli/queue/get.go b/pkg/cli/queue/get.go index 057bffda1e..3253ec8bd1 100644 --- a/pkg/cli/queue/get.go +++ b/pkg/cli/queue/get.go @@ -28,6 +28,7 @@ import ( "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/podgroup" ) type getFlags struct { @@ -63,21 +64,37 @@ func GetQueue(ctx context.Context) error { return err } - PrintQueue(queue, os.Stdout) + // Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still + // users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver + // and then filtering them. + pgList, err := queueClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list podgroup for queue %s with err: %v", getQueueFlags.Name, err) + } + + pgStats := &podgroup.PodGroupStatistics{} + for _, pg := range pgList.Items { + if pg.Spec.Queue == getQueueFlags.Name { + pgStats.StatPodGroupCountsForQueue(&pg) + } + } + + PrintQueue(queue, pgStats, os.Stdout) return nil } // PrintQueue prints queue information. -func PrintQueue(queue *v1beta1.Queue, writer io.Writer) { - _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n", - Name, Weight, State, Inqueue, Pending, Running, Unknown) +func PrintQueue(queue *v1beta1.Queue, pgStats *podgroup.PodGroupStatistics, writer io.Writer) { + _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n", + Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } - _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n", - queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue, - queue.Status.Pending, queue.Status.Running, queue.Status.Unknown) + + _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n", + queue.Name, queue.Spec.Weight, queue.Status.State, pgStats.Inqueue, + pgStats.Pending, pgStats.Running, pgStats.Unknown, pgStats.Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } diff --git a/pkg/cli/queue/list.go b/pkg/cli/queue/list.go index b6ceba2f0c..1814c9e7ed 100644 --- a/pkg/cli/queue/list.go +++ b/pkg/cli/queue/list.go @@ -28,6 +28,7 @@ import ( "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/podgroup" ) type listFlags struct { @@ -53,6 +54,9 @@ const ( // Inqueue status of queue Inqueue string = "Inqueue" + // Completed status of the queue + Completed string = "Completed" + // State is state of queue State string = "State" ) @@ -81,22 +85,41 @@ func ListQueue(ctx context.Context) error { fmt.Printf("No resources found\n") return nil } - PrintQueues(queues, os.Stdout) + + // Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still + // users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver + // and then filtering them. + pgList, err := jobClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list podgroups with err: %v", err) + } + + queueStats := make(map[string]*podgroup.PodGroupStatistics, len(queues.Items)) + for _, queue := range queues.Items { + queueStats[queue.Name] = &podgroup.PodGroupStatistics{} + } + + for _, pg := range pgList.Items { + queueStats[pg.Spec.Queue].StatPodGroupCountsForQueue(&pg) + } + + PrintQueues(queues, queueStats, os.Stdout) return nil } // PrintQueues prints queue information. -func PrintQueues(queues *v1beta1.QueueList, writer io.Writer) { - _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n", - Name, Weight, State, Inqueue, Pending, Running, Unknown) +func PrintQueues(queues *v1beta1.QueueList, queueStats map[string]*podgroup.PodGroupStatistics, writer io.Writer) { + _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n", + Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } + for _, queue := range queues.Items { - _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n", - queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue, - queue.Status.Pending, queue.Status.Running, queue.Status.Unknown) + _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n", + queue.Name, queue.Spec.Weight, queue.Status.State, queueStats[queue.Name].Inqueue, queueStats[queue.Name].Pending, + queueStats[queue.Name].Running, queueStats[queue.Name].Unknown, queueStats[queue.Name].Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } diff --git a/pkg/controllers/metrics/queue.go b/pkg/controllers/metrics/queue.go new file mode 100644 index 0000000000..eadd453c4d --- /dev/null +++ b/pkg/controllers/metrics/queue.go @@ -0,0 +1,93 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/scheduler/metrics" +) + +var ( + queuePodGroupInqueue = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_inqueue_count", + Help: "The number of Inqueue PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupPending = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_pending_count", + Help: "The number of Pending PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupRunning = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_running_count", + Help: "The number of Running PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupUnknown = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_unknown_count", + Help: "The number of Unknown PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupCompleted = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_completed_count", + Help: "The number of Completed PodGroup in this queue", + }, []string{"queue_name"}, + ) +) + +// UpdateQueuePodGroupInqueueCount records the number of Inqueue PodGroup in this queue +func UpdateQueuePodGroupInqueueCount(queueName string, count int32) { + queuePodGroupInqueue.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupPendingCount records the number of Pending PodGroup in this queue +func UpdateQueuePodGroupPendingCount(queueName string, count int32) { + queuePodGroupPending.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupRunningCount records the number of Running PodGroup in this queue +func UpdateQueuePodGroupRunningCount(queueName string, count int32) { + queuePodGroupRunning.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupUnknownCount records the number of Unknown PodGroup in this queue +func UpdateQueuePodGroupUnknownCount(queueName string, count int32) { + queuePodGroupUnknown.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupCompletedCount records the number of Completed PodGroup in this queue +func UpdateQueuePodGroupCompletedCount(queueName string, count int32) { + queuePodGroupCompleted.WithLabelValues(queueName).Set(float64(count)) +} + +// DeleteQueueMetrics delete all metrics related to the queue +func DeleteQueueMetrics(queueName string) { + queuePodGroupInqueue.DeleteLabelValues(queueName) + queuePodGroupPending.DeleteLabelValues(queueName) + queuePodGroupRunning.DeleteLabelValues(queueName) + queuePodGroupUnknown.DeleteLabelValues(queueName) + queuePodGroupCompleted.DeleteLabelValues(queueName) +} + +func UpdateQueueMetrics(queueName string, queueStatus *v1beta1.QueueStatus) { + UpdateQueuePodGroupPendingCount(queueName, queueStatus.Pending) + UpdateQueuePodGroupRunningCount(queueName, queueStatus.Running) + UpdateQueuePodGroupUnknownCount(queueName, queueStatus.Unknown) + UpdateQueuePodGroupInqueueCount(queueName, queueStatus.Inqueue) + UpdateQueuePodGroupCompletedCount(queueName, queueStatus.Completed) +} diff --git a/pkg/controllers/queue/queue_controller_action.go b/pkg/controllers/queue/queue_controller_action.go index 60e16b7d09..9b718bb7d7 100644 --- a/pkg/controllers/queue/queue_controller_action.go +++ b/pkg/controllers/queue/queue_controller_action.go @@ -23,7 +23,6 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -34,7 +33,9 @@ import ( "volcano.sh/apis/pkg/apis/bus/v1alpha1" busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" + "volcano.sh/volcano/pkg/controllers/metrics" "volcano.sh/volcano/pkg/controllers/queue/state" ) @@ -55,7 +56,7 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF } podGroups := c.getPodGroups(queue.Name) - queueStatus := schedulingv1beta1.QueueStatus{} + newQueue := queue.DeepCopy() for _, pgKey := range podGroups { // Ignore error here, tt can not occur. @@ -76,38 +77,31 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF switch pg.Status.Phase { case schedulingv1beta1.PodGroupPending: - queueStatus.Pending++ + newQueue.Status.Pending++ case schedulingv1beta1.PodGroupRunning: - queueStatus.Running++ + newQueue.Status.Running++ case schedulingv1beta1.PodGroupUnknown: - queueStatus.Unknown++ + newQueue.Status.Unknown++ case schedulingv1beta1.PodGroupInqueue: - queueStatus.Inqueue++ + newQueue.Status.Inqueue++ + case schedulingv1beta1.PodGroupCompleted: + newQueue.Status.Completed++ } } - if updateStateFn != nil { - updateStateFn(&queueStatus, podGroups) - } else { - queueStatus.State = queue.Status.State - } + // Update the metrics + metrics.UpdateQueueMetrics(queue.Name, &newQueue.Status) - queueStatus.Allocated = queue.Status.Allocated.DeepCopy() - // queue.status.allocated will be updated after every session close in volcano scheduler, we should not depend on it because session may be time-consuming, - // and queue.status.allocated can't be updated timely. We initialize queue.status.allocated and update it here explicitly - // to avoid update queue err because update will fail when queue.status.allocated is nil. - if queueStatus.Allocated == nil { - queueStatus.Allocated = v1.ResourceList{} + if updateStateFn != nil { + updateStateFn(&newQueue.Status, podGroups) } - newQueue := queue.DeepCopy() - // ignore update when status does not change - if !equality.Semantic.DeepEqual(queueStatus, queue.Status) { - newQueue.Status = queueStatus - var err error - newQueue, err = c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}) - if err != nil { - klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err) + // ignore update when state does not change + if newQueue.Status.State != queue.Status.State { + queueStatusApply := v1beta1apply.QueueStatus().WithState(newQueue.Status.State).WithAllocated(newQueue.Status.Allocated) + queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply) + if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil { + klog.Errorf("Update queue state from %s to %s failed for %v", queue.Status.State, newQueue.Status.State, err) return err } } @@ -126,37 +120,19 @@ func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateF } newQueue := queue.DeepCopy() - newQueue.Status.State = schedulingv1beta1.QueueStateOpen + if updateStateFn != nil { + updateStateFn(&newQueue.Status, nil) + } if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { + queueStatusApply := v1beta1apply.QueueStatus().WithState(newQueue.Status.State).WithAllocated(newQueue.Status.Allocated) + queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply) + if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil { c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction), - fmt.Sprintf("Open queue failed for %v", err)) - return err - } - - c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), "Open queue succeed") - - q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{}) - if err != nil { + fmt.Sprintf("Update queue status from %s to %s failed for %v", + queue.Status.State, newQueue.Status.State, err)) return err } - - newQueue = q.DeepCopy() - if updateStateFn != nil { - updateStateFn(&newQueue.Status, nil) - } else { - return fmt.Errorf("internal error, update state function should be provided") - } - - if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction), - fmt.Sprintf("Update queue status from %s to %s failed for %v", - queue.Status.State, newQueue.Status.State, err)) - return err - } - } } _, err := c.updateQueueAnnotation(queue, ClosedByParentAnnotationKey, ClosedByParentAnnotationFalseValue) @@ -173,41 +149,21 @@ func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateState } } + podGroups := c.getPodGroups(queue.Name) newQueue := queue.DeepCopy() - newQueue.Status.State = schedulingv1beta1.QueueStateClosed - - if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction), - fmt.Sprintf("Close queue failed for %v", err)) - return err - } - - c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.CloseQueueAction), "Close queue succeed") - } else { - return nil - } - - q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - newQueue = q.DeepCopy() - podGroups := c.getPodGroups(newQueue.Name) if updateStateFn != nil { updateStateFn(&newQueue.Status, podGroups) - } else { - return fmt.Errorf("internal error, update state function should be provided") } if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { + queueStatusApply := v1beta1apply.QueueStatus().WithState(newQueue.Status.State).WithAllocated(newQueue.Status.Allocated) + queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply) + if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil { c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction), - fmt.Sprintf("Update queue status from %s to %s failed for %v", - queue.Status.State, newQueue.Status.State, err)) + fmt.Sprintf("Close queue failed for %v", err)) return err } + c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.CloseQueueAction), "Close queue succeed") } return nil diff --git a/pkg/controllers/queue/queue_controller_handler.go b/pkg/controllers/queue/queue_controller_handler.go index e5dc6aa979..1abae7c0e0 100644 --- a/pkg/controllers/queue/queue_controller_handler.go +++ b/pkg/controllers/queue/queue_controller_handler.go @@ -23,6 +23,7 @@ import ( busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" + "volcano.sh/volcano/pkg/controllers/metrics" ) func (c *queuecontroller) enqueue(req *apis.Request) { @@ -57,6 +58,7 @@ func (c *queuecontroller) deleteQueue(obj interface{}) { } } + metrics.DeleteQueueMetrics(queue.Name) c.pgMutex.Lock() defer c.pgMutex.Unlock() delete(c.podGroups, queue.Name) diff --git a/pkg/controllers/queue/queue_controller_test.go b/pkg/controllers/queue/queue_controller_test.go index b10d8168a7..81c2f0be29 100644 --- a/pkg/controllers/queue/queue_controller_test.go +++ b/pkg/controllers/queue/queue_controller_test.go @@ -22,15 +22,19 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeclient "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" vcclient "volcano.sh/apis/pkg/client/clientset/versioned/fake" informerfactory "volcano.sh/apis/pkg/client/informers/externalversions" "volcano.sh/volcano/pkg/controllers/framework" + "volcano.sh/volcano/pkg/controllers/queue/state" ) func newFakeController() *queuecontroller { @@ -241,138 +245,94 @@ func TestUpdatePodGroup(t *testing.T) { } func TestSyncQueue(t *testing.T) { - namespace := "c1" - testCases := []struct { - Name string - pgsInCache []*schedulingv1beta1.PodGroup - pgsInInformer []*schedulingv1beta1.PodGroup - queue *schedulingv1beta1.Queue - ExpectStatus schedulingv1beta1.QueueStatus + Name string + queue *schedulingv1beta1.Queue + action v1alpha1.Action + updateStatusFnFactory func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn + ExpectStatus schedulingv1beta1.QueueStatus }{ { - Name: "syncQueue", - pgsInCache: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c1", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, - }, - }, - pgsInInformer: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c1", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, - }, - }, + Name: "From empty state to open", queue: &schedulingv1beta1.Queue{ ObjectMeta: metav1.ObjectMeta{ Name: "c1", }, - Spec: schedulingv1beta1.QueueSpec{ - Weight: 1, + Status: schedulingv1beta1.QueueStatus{ + State: "", }, }, ExpectStatus: schedulingv1beta1.QueueStatus{ - Pending: 1, - Reservation: schedulingv1beta1.Reservation{}, - Allocated: v1.ResourceList{}, + State: schedulingv1beta1.QueueStateOpen, }, + updateStatusFnFactory: func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn { + return func(status *schedulingv1beta1.QueueStatus, podGroupList []string) { + if len(queue.Status.State) == 0 { + status.State = schedulingv1beta1.QueueStateOpen + } + } + }, + action: v1alpha1.OpenQueueAction, }, { - Name: "syncQueueHandlingNotFoundPg", - pgsInCache: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c2", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, + Name: "From open to close", + queue: &schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "c2", }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg2", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c2", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateOpen, }, }, - pgsInInformer: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg2", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c2", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, - }, + ExpectStatus: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateClosed, + }, + updateStatusFnFactory: func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn { + return func(status *schedulingv1beta1.QueueStatus, podGroupList []string) { + status.State = schedulingv1beta1.QueueStateClosed + } }, + action: v1alpha1.CloseQueueAction, + }, + { + Name: "Updated state succeeded but keep allocated unchanged", queue: &schedulingv1beta1.Queue{ ObjectMeta: metav1.ObjectMeta{ - Name: "c2", + Name: "c3", }, - Spec: schedulingv1beta1.QueueSpec{ - Weight: 1, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateUnknown, + Allocated: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + }, }, }, ExpectStatus: schedulingv1beta1.QueueStatus{ - Pending: 1, - Reservation: schedulingv1beta1.Reservation{}, - Allocated: v1.ResourceList{}, + State: schedulingv1beta1.QueueStateOpen, + Allocated: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + }, }, + updateStatusFnFactory: func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn { + return func(status *schedulingv1beta1.QueueStatus, podGroupList []string) { + status.State = schedulingv1beta1.QueueStateOpen + } + }, + action: v1alpha1.OpenQueueAction, }, } for i, testcase := range testCases { c := newFakeController() - for j := range testcase.pgsInCache { - key, _ := cache.MetaNamespaceKeyFunc(testcase.pgsInCache[j]) - if _, ok := c.podGroups[testcase.pgsInCache[j].Spec.Queue]; !ok { - c.podGroups[testcase.pgsInCache[j].Spec.Queue] = make(map[string]struct{}) - } - c.podGroups[testcase.pgsInCache[j].Spec.Queue][key] = struct{}{} - } - - for j := range testcase.pgsInInformer { - c.pgInformer.Informer().GetIndexer().Add(testcase.pgsInInformer[j]) - } - - c.queueInformer.Informer().GetIndexer().Add(testcase.queue) - c.vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), testcase.queue, metav1.CreateOptions{}) + _, err := c.vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), testcase.queue, metav1.CreateOptions{}) + assert.NoError(t, err) - err := c.syncQueue(testcase.queue, nil) + updateStatusFn := testcase.updateStatusFnFactory(testcase.queue) + err = c.syncQueue(testcase.queue, testcase.action, updateStatusFn) + assert.NoError(t, err) - item, _ := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), testcase.queue.Name, metav1.GetOptions{}) + item, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), testcase.queue.Name, metav1.GetOptions{}) if err != nil && !reflect.DeepEqual(testcase.ExpectStatus, item.Status) { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectStatus, item.Status) } diff --git a/pkg/controllers/queue/queue_controller_util.go b/pkg/controllers/queue/queue_controller_util.go index d54d56a475..d3cf3c4a6c 100644 --- a/pkg/controllers/queue/queue_controller_util.go +++ b/pkg/controllers/queue/queue_controller_util.go @@ -22,6 +22,10 @@ import ( schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) +const ( + controllerName = "queue-controller" +) + type patchOperation struct { Op string `json:"op"` Path string `json:"path"` diff --git a/pkg/scheduler/metrics/queue.go b/pkg/scheduler/metrics/queue.go index ce2504d0f5..8d2d4675ff 100644 --- a/pkg/scheduler/metrics/queue.go +++ b/pkg/scheduler/metrics/queue.go @@ -93,38 +93,6 @@ var ( Help: "If one queue is overused", }, []string{"queue_name"}, ) - - queuePodGroupInqueue = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_inqueue_count", - Help: "The number of Inqueue PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupPending = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_pending_count", - Help: "The number of Pending PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupRunning = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_running_count", - Help: "The number of Running PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupUnknown = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_unknown_count", - Help: "The number of Unknown PodGroup in this queue", - }, []string{"queue_name"}, - ) ) // UpdateQueueAllocated records allocated resources for one queue @@ -166,26 +134,6 @@ func UpdateQueueOverused(queueName string, overused bool) { queueOverused.WithLabelValues(queueName).Set(value) } -// UpdateQueuePodGroupInqueueCount records the number of Inqueue PodGroup in this queue -func UpdateQueuePodGroupInqueueCount(queueName string, count int32) { - queuePodGroupInqueue.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupPendingCount records the number of Pending PodGroup in this queue -func UpdateQueuePodGroupPendingCount(queueName string, count int32) { - queuePodGroupPending.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupRunningCount records the number of Running PodGroup in this queue -func UpdateQueuePodGroupRunningCount(queueName string, count int32) { - queuePodGroupRunning.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupUnknownCount records the number of Unknown PodGroup in this queue -func UpdateQueuePodGroupUnknownCount(queueName string, count int32) { - queuePodGroupUnknown.WithLabelValues(queueName).Set(float64(count)) -} - // DeleteQueueMetrics delete all metrics related to the queue func DeleteQueueMetrics(queueName string) { queueAllocatedMilliCPU.DeleteLabelValues(queueName) @@ -197,8 +145,4 @@ func DeleteQueueMetrics(queueName string) { queueShare.DeleteLabelValues(queueName) queueWeight.DeleteLabelValues(queueName) queueOverused.DeleteLabelValues(queueName) - queuePodGroupInqueue.DeleteLabelValues(queueName) - queuePodGroupPending.DeleteLabelValues(queueName) - queuePodGroupRunning.DeleteLabelValues(queueName) - queuePodGroupUnknown.DeleteLabelValues(queueName) } diff --git a/pkg/scheduler/plugins/capacity/capacity.go b/pkg/scheduler/plugins/capacity/capacity.go index 32cc3d6e3b..9c661588c0 100644 --- a/pkg/scheduler/plugins/capacity/capacity.go +++ b/pkg/scheduler/plugins/capacity/capacity.go @@ -375,10 +375,6 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) { metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory) metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) continue } deservedCPU, deservedMem := 0.0, 0.0 @@ -389,10 +385,6 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) { metrics.UpdateQueueDeserved(queueInfo.Name, deservedCPU, deservedMem) metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) metrics.UpdateQueueRequest(queueInfo.Name, 0, 0) - metrics.UpdateQueuePodGroupInqueueCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupPendingCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupRunningCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupUnknownCount(queueInfo.Name, 0) } ssn.AddQueueOrderFn(cp.Name(), func(l, r interface{}) int { @@ -509,15 +501,10 @@ func (cp *capacityPlugin) buildHierarchicalQueueAttrs(ssn *framework.Session) bo // Record metrics for queueID := range ssn.Queues { - queue := ssn.Queues[queueID] attr := cp.queueOpts[queueID] metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory) metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) } ssn.AddQueueOrderFn(cp.Name(), func(l, r interface{}) int { diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index b0081ee478..948487c331 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -167,19 +167,10 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) metrics.UpdateQueueWeight(attr.name, attr.weight) - queue := ssn.Queues[attr.queueID] - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) continue } metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) metrics.UpdateQueueRequest(queueInfo.Name, 0, 0) - metrics.UpdateQueuePodGroupInqueueCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupPendingCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupRunningCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupUnknownCount(queueInfo.Name, 0) } remaining := pp.totalResource.Clone() diff --git a/test/e2e/jobseq/queue_job_status.go b/test/e2e/jobseq/queue_job_status.go index 53bdac30c9..534b7608e1 100644 --- a/test/e2e/jobseq/queue_job_status.go +++ b/test/e2e/jobseq/queue_job_status.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" @@ -77,17 +76,15 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups inqueue") err := e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Inqueue > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Inqueue > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue inqueue") By("Verify queue have pod groups running") err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -134,9 +131,8 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups running") err := e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") @@ -150,9 +146,8 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups Pending") err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Pending > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Pending > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Pending") }) @@ -195,9 +190,8 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups running") err := e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") @@ -220,11 +214,9 @@ var _ = Describe("Queue Job Status Transition", func() { } By("Verify queue have pod groups unknown") - fieldSelector := fields.OneTermEqualSelector("metadata.name", q1).String() w := &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { - options.FieldSelector = fieldSelector - return ctx.Vcclient.SchedulingV1beta1().Queues().Watch(context.TODO(), options) + return ctx.Vcclient.SchedulingV1beta1().PodGroups(podNamespace).Watch(context.TODO(), options) }, } wctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 5*time.Minute) @@ -232,8 +224,8 @@ var _ = Describe("Queue Job Status Transition", func() { _, err = watchtools.Until(wctx, clusterPods.ResourceVersion, w, func(event watch.Event) (bool, error) { switch t := event.Object.(type) { - case *v1beta1.Queue: - if t.Status.Unknown > 0 { + case *v1beta1.PodGroup: + if t.Status.Phase == v1beta1.PodGroupUnknown { return true, nil } } diff --git a/test/e2e/schedulingaction/reclaim.go b/test/e2e/schedulingaction/reclaim.go index fe1c478529..5179230036 100644 --- a/test/e2e/schedulingaction/reclaim.go +++ b/test/e2e/schedulingaction/reclaim.go @@ -70,14 +70,25 @@ var _ = Describe("Reclaim E2E Test", func() { queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), queue, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", queue) switch status { - case "Running": - return queue.Status.Running == num, nil case "Open": return queue.Status.State == schedulingv1beta1.QueueStateOpen, nil + default: + return false, nil + } + }) + return err + } + + CheckPodGroupStatistics := func(ctx *e2eutil.TestContext, status string, num int, queue string) error { + err := e2eutil.WaitQueueStatus(func() (bool, error) { + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, queue) + switch status { + case "Running": + return pgStats.Running == num, nil case "Pending": - return queue.Status.Pending == num, nil + return pgStats.Pending == num, nil case "Inqueue": - return queue.Status.Inqueue == num, nil + return pgStats.Inqueue == num, nil default: return false, nil } @@ -117,13 +128,13 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -176,10 +187,10 @@ var _ = Describe("Reclaim E2E Test", func() { Expect(err).NotTo(HaveOccurred(), "Get %s pod failed", j3) By("Make sure q1 q2 with job running in it.") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") for _, pod := range job3pods.Items { @@ -188,7 +199,7 @@ var _ = Describe("Reclaim E2E Test", func() { } By("Q3 pending when we delete it.") - err = WaitQueueStatus(ctx, "Pending", 1, q3) + err = CheckPodGroupStatistics(ctx, "Pending", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue pending") }) @@ -223,10 +234,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -265,10 +276,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -306,16 +317,16 @@ var _ = Describe("Reclaim E2E Test", func() { time.Sleep(10 * time.Second) By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Inqueue", 1, q3) + err = CheckPodGroupStatistics(ctx, "Inqueue", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Inqueue") }) @@ -352,14 +363,14 @@ var _ = Describe("Reclaim E2E Test", func() { time.Sleep(10 * time.Second) By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") // TODO: it is a bug : the job status is pending but podgroup status is running - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Running") }) @@ -412,13 +423,13 @@ var _ = Describe("Reclaim E2E Test", func() { time.Sleep(10 * time.Second) By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Inqueue", 1, q3) + err = CheckPodGroupStatistics(ctx, "Inqueue", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Inqueue") }) @@ -457,13 +468,13 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -514,10 +525,10 @@ var _ = Describe("Reclaim E2E Test", func() { err = e2eutil.WaitJobReady(ctx, job2) Expect(err).NotTo(HaveOccurred()) - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue1 running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue2 running") By("Create coming jobs") @@ -530,10 +541,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue3 running") - err = WaitQueueStatus(ctx, "Running", 1, q4) + err = CheckPodGroupStatistics(ctx, "Running", 1, q4) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue4 running") }) @@ -619,10 +630,10 @@ var _ = Describe("Reclaim E2E Test", func() { err = e2eutil.WaitJobReady(ctx, job2) Expect(err).NotTo(HaveOccurred()) - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue1 running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue2 running") By("Create coming jobs") @@ -641,10 +652,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue3 running") - err = WaitQueueStatus(ctx, "Running", 3, q4) + err = CheckPodGroupStatistics(ctx, "Running", 3, q4) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue4 running") }) @@ -800,9 +811,8 @@ var _ = Describe("Reclaim E2E Test", func() { Expect(err).NotTo(HaveOccurred()) err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - return queue.Status.Running == 1, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running == 1, nil }) Expect(err).NotTo(HaveOccurred()) @@ -842,9 +852,8 @@ var _ = Describe("Reclaim E2E Test", func() { err = e2eutil.WaitJobStatePending(ctx, job3) Expect(err).NotTo(HaveOccurred()) err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - return queue.Status.Pending == 1, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Pending == 1, nil }) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/util/podgroup.go b/test/e2e/util/podgroup.go index 1055307b55..1452018aab 100644 --- a/test/e2e/util/podgroup.go +++ b/test/e2e/util/podgroup.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/cli/podgroup" ) // CreatePodGroup creates a PodGroup with the specified name in the namespace @@ -90,3 +91,15 @@ func PodGroupIsReady(ctx *TestContext, namespace string) (bool, error) { return false, fmt.Errorf("pod group phase is Pending") } + +func GetPodGroupStatistics(ctx *TestContext, namespace, queue string) *podgroup.PodGroupStatistics { + pgList, err := ctx.Vcclient.SchedulingV1beta1().PodGroups(namespace).List(context.TODO(), metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred(), "List podgroups failed") + pgStats := &podgroup.PodGroupStatistics{} + for _, pg := range pgList.Items { + if pg.Spec.Queue == queue { + pgStats.StatPodGroupCountsForQueue(&pg) + } + } + return pgStats +}