Skip to content

Commit

Permalink
fix deleteAvailableLimit bug (openkruise#1481)
Browse files Browse the repository at this point in the history
* fix deleteAvailableLimit bug

Signed-off-by: liheng.zms <[email protected]>

* add clonesets scale ut

Signed-off-by: liheng.zms <[email protected]>

---------

Signed-off-by: liheng.zms <[email protected]>
  • Loading branch information
zmberg authored Jan 3, 2024
1 parent fa7a1da commit f4e238f
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 35 deletions.
18 changes: 10 additions & 8 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ func (r *realControl) Scale(
if podsToDelete := util.DiffPods(podsSpecifiedToDelete, podsInPreDelete); len(podsToDelete) > 0 {
newPodsToDelete, oldPodsToDelete := clonesetutils.GroupUpdateAndNotUpdatePods(podsToDelete, updateRevision)
klog.V(3).Infof("CloneSet %s try to delete pods specified. Delete ready limit: %d. New Pods: %v, old Pods: %v.",
controllerKey, diffRes.deleteReadyLimit, util.GetPodNames(newPodsToDelete).List(), util.GetPodNames(oldPodsToDelete).List())
controllerKey, diffRes.deleteAvailableLimit, util.GetPodNames(newPodsToDelete).List(), util.GetPodNames(oldPodsToDelete).List())

podsCanDelete := make([]*v1.Pod, 0, len(podsToDelete))
for _, pod := range podsToDelete {
if !isPodReady(coreControl, pod) {
// Determine pod available, since deleteAvailableLimit is also based on the pod available calculation
if !IsPodAvailable(coreControl, pod, updateCS.Spec.MinReadySeconds) {
podsCanDelete = append(podsCanDelete, pod)
} else if diffRes.deleteReadyLimit > 0 {
} else if diffRes.deleteAvailableLimit > 0 {
podsCanDelete = append(podsCanDelete, pod)
diffRes.deleteReadyLimit--
diffRes.deleteAvailableLimit--
}
}

Expand All @@ -136,16 +137,17 @@ func (r *realControl) Scale(
}

klog.V(3).Infof("CloneSet %s begin to scale in %d pods including %d (current rev), delete ready limit: %d",
controllerKey, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, diffRes.deleteReadyLimit)
controllerKey, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, diffRes.deleteAvailableLimit)

podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, notUpdatedPods, updatedPods)
podsToDelete := make([]*v1.Pod, 0, len(podsPreparingToDelete))
for _, pod := range podsPreparingToDelete {
if !isPodReady(coreControl, pod) {
// Determine pod available, since deleteAvailableLimit is also based on the pod available calculation
if !IsPodAvailable(coreControl, pod, updateCS.Spec.MinReadySeconds) {
podsToDelete = append(podsToDelete, pod)
} else if diffRes.deleteReadyLimit > 0 {
} else if diffRes.deleteAvailableLimit > 0 {
podsToDelete = append(podsToDelete, pod)
diffRes.deleteReadyLimit--
diffRes.deleteAvailableLimit--
}
}

Expand Down
247 changes: 247 additions & 0 deletions pkg/controller/cloneset/sync/cloneset_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package sync

import (
"context"
"fmt"
"reflect"
"sort"
"testing"
"time"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand All @@ -32,12 +34,29 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
kubecontroller "k8s.io/kubernetes/pkg/controller"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var (
kscheme *runtime.Scheme
)

func init() {
kscheme = runtime.NewScheme()
utilruntime.Must(appsv1alpha1.AddToScheme(scheme.Scheme))
utilruntime.Must(corev1.AddToScheme(kscheme))
}

func newFakeControl() *realControl {
return &realControl{
Client: fake.NewClientBuilder().Build(),
Expand Down Expand Up @@ -527,3 +546,231 @@ func TestGetOrGenAvailableIDs(t *testing.T) {
t.Fatalf("expected got random id, but actually %v", id)
}
}

func TestScale(t *testing.T) {
cases := []struct {
name string
getCloneSets func() [2]*appsv1alpha1.CloneSet
getRevisions func() [2]string
getPods func() []*v1.Pod
expectedPodsLen int
expectedModified bool
}{
{
name: "cloneSet(replicas=3,maxUnavailable=20%,partition=nil,maxSurge=nil,minReadySeconds=9999), pods=5, and scale replicas 5 -> 3",
getCloneSets: func() [2]*appsv1alpha1.CloneSet {
obj := &appsv1alpha1.CloneSet{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
},
Spec: appsv1alpha1.CloneSetSpec{
Replicas: utilpointer.Int32(3),
MinReadySeconds: 9999,
UpdateStrategy: appsv1alpha1.CloneSetUpdateStrategy{
MaxUnavailable: &intstr.IntOrString{
Type: intstr.String,
StrVal: "20%",
},
},
},
}
return [2]*appsv1alpha1.CloneSet{obj.DeepCopy(), obj.DeepCopy()}
},
getRevisions: func() [2]string {
return [2]string{"sample-b976d4544", "sample-b976d4544"}
},
getPods: func() []*v1.Pod {
t := time.Now().Add(-time.Second * 10)
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "sample-b976d4544",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main",
Image: "sample:v1",
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: t},
},
},
},
}
return generatePods(obj, 5)
},
expectedPodsLen: 3,
expectedModified: true,
},
{
name: "cloneSet(replicas=3,maxUnavailable=20%,partition=nil,maxSurge=nil,minReadySeconds=0), specified delete pod-0, pods=5, and scale replicas 5 -> 3",
getCloneSets: func() [2]*appsv1alpha1.CloneSet {
obj := &appsv1alpha1.CloneSet{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
},
Spec: appsv1alpha1.CloneSetSpec{
Replicas: utilpointer.Int32(3),
UpdateStrategy: appsv1alpha1.CloneSetUpdateStrategy{
MaxUnavailable: &intstr.IntOrString{
Type: intstr.String,
StrVal: "20%",
},
},
},
}
return [2]*appsv1alpha1.CloneSet{obj.DeepCopy(), obj.DeepCopy()}
},
getRevisions: func() [2]string {
return [2]string{"sample-b976d4544", "sample-b976d4544"}
},
getPods: func() []*v1.Pod {
t := time.Now().Add(-time.Second * 10)
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "sample-b976d4544",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main",
Image: "sample:v1",
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: t},
},
},
},
}
pods := generatePods(obj, 5)
pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey] = "true"
return pods
},
expectedPodsLen: 4,
expectedModified: true,
},
{
name: "cloneSet(replicas=3,maxUnavailable=20%,partition=nil,maxSurge=nil,minReadySeconds=0), pods=5, and scale replicas 5 -> 3",
getCloneSets: func() [2]*appsv1alpha1.CloneSet {
obj := &appsv1alpha1.CloneSet{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
},
Spec: appsv1alpha1.CloneSetSpec{
Replicas: utilpointer.Int32(3),
UpdateStrategy: appsv1alpha1.CloneSetUpdateStrategy{
MaxUnavailable: &intstr.IntOrString{
Type: intstr.String,
StrVal: "20%",
},
},
},
}
return [2]*appsv1alpha1.CloneSet{obj.DeepCopy(), obj.DeepCopy()}
},
getRevisions: func() [2]string {
return [2]string{"sample-b976d4544", "sample-b976d4544"}
},
getPods: func() []*v1.Pod {
t := time.Now().Add(-time.Second * 10)
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "sample-b976d4544",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main",
Image: "sample:v1",
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: t},
},
},
},
}
return generatePods(obj, 5)
},
expectedPodsLen: 3,
expectedModified: true,
},
}

for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fClient := fake.NewClientBuilder().WithScheme(kscheme).Build()
pods := cs.getPods()
for _, pod := range pods {
err := fClient.Create(context.TODO(), pod)
if err != nil {
t.Fatalf(err.Error())
}
}
rControl := &realControl{
Client: fClient,
recorder: record.NewFakeRecorder(10),
}
modified, err := rControl.Scale(cs.getCloneSets()[0], cs.getCloneSets()[1], cs.getRevisions()[0], cs.getRevisions()[1], pods, nil)
if err != nil {
t.Fatalf(err.Error())
}
if cs.expectedModified != modified {
t.Fatalf("expect(%v), but get(%v)", cs.expectedModified, modified)
}
podList := &v1.PodList{}
err = fClient.List(context.TODO(), podList, &client.ListOptions{})
if err != nil {
t.Fatalf(err.Error())
}
actives := 0
for _, pod := range podList.Items {
if kubecontroller.IsPodActive(&pod) {
actives++
}
}
if actives != cs.expectedPodsLen {
t.Fatalf("expect(%v), but get(%v)", cs.expectedPodsLen, actives)
}
})
}
}

func generatePods(base *v1.Pod, replicas int) []*v1.Pod {
objs := make([]*v1.Pod, 0, replicas)
for i := 0; i < replicas; i++ {
obj := base.DeepCopy()
obj.Name = fmt.Sprintf("%s-%d", base.Name, i)
objs = append(objs, obj)
}
return objs
}
10 changes: 3 additions & 7 deletions pkg/controller/cloneset/sync/cloneset_sync_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type expectationDiffs struct {
// scaleUpLimit is the limit number of creating Pods when scaling up
// it is limited by scaleStrategy.maxUnavailable
scaleUpLimit int
// deleteReadyLimit is the limit number of ready Pods that can be deleted
// deleteAvailableLimit is the limit number of ready Pods that can be deleted
// it is limited by UpdateStrategy.maxUnavailable
deleteReadyLimit int
deleteAvailableLimit int

// useSurge is the number that temporarily expect to be above the desired replicas
useSurge int
Expand Down Expand Up @@ -253,7 +253,7 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu
res.scaleDownNumOldRevision = integer.IntMax(currentTotalOldCount-toDeleteOldRevisionCount-expectedTotalOldCount, 0)
}
if toDeleteNewRevisionCount > 0 || toDeleteOldRevisionCount > 0 || res.scaleDownNum > 0 {
res.deleteReadyLimit = integer.IntMax(maxUnavailable+(len(pods)-replicas)-totalUnavailable, 0)
res.deleteAvailableLimit = integer.IntMax(maxUnavailable+(len(pods)-replicas)-totalUnavailable, 0)
}

// The consistency between scale and update will be guaranteed by syncCloneSet and expectations
Expand Down Expand Up @@ -281,10 +281,6 @@ func isSpecifiedDelete(cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
return false
}

func isPodReady(coreControl clonesetcore.Control, pod *v1.Pod) bool {
return IsPodAvailable(coreControl, pod, 0)
}

func IsPodAvailable(coreControl clonesetcore.Control, pod *v1.Pod, minReadySeconds int32) bool {
state := lifecycle.GetPodLifecycleState(pod)
if state != "" && state != appspub.LifecycleStateNormal {
Expand Down
Loading

0 comments on commit f4e238f

Please sign in to comment.