Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webhook: support elasticquota enable update resource key #2323

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const (
// to belong to the users and will not be preempted back.
ElasticQuotaGuaranteeUsage featuregate.Feature = "ElasticQuotaGuaranteeUsage"

// ElasticQuotaEnableUpdateResourceKey allows to update resource key in standard operation
// when delete resource type: from child to parent
// when add resource type: from parent to child
ElasticQuotaEnableUpdateResourceKey featuregate.Feature = "ElasticQuotaEnableUpdateResourceKey"

// DisableDefaultQuota disable default quota.
DisableDefaultQuota featuregate.Feature = "DisableDefaultQuota"

Expand All @@ -94,6 +99,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
MultiQuotaTree: {Default: false, PreRelease: featuregate.Alpha},
ElasticQuotaIgnorePodOverhead: {Default: false, PreRelease: featuregate.Alpha},
ElasticQuotaGuaranteeUsage: {Default: false, PreRelease: featuregate.Alpha},
ElasticQuotaEnableUpdateResourceKey: {Default: false, PreRelease: featuregate.Alpha},
DisableDefaultQuota: {Default: false, PreRelease: featuregate.Alpha},
SupportParentQuotaSubmitPod: {Default: false, PreRelease: featuregate.Alpha},
EnableQuotaAdmission: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
24 changes: 16 additions & 8 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,24 +563,33 @@ func (gqm *GroupQuotaManager) updateOneGroupSharedWeightNoLock(quotaInfo *QuotaI
gqm.runtimeQuotaCalculatorMap[quotaInfo.ParentName].updateOneGroupSharedWeight(quotaInfo)
}

// updateResourceKeyNoLock based on quotaInfo.CalculateInfo.Max of self
// Note: RootQuotaName need to be updated as allResourceKeys
func (gqm *GroupQuotaManager) updateResourceKeyNoLock() {
// collect all dimensions
resourceKeys := make(map[v1.ResourceName]struct{})
allResourceKeys := make(map[v1.ResourceName]struct{})
for quotaName, quotaInfo := range gqm.quotaInfoMap {
if quotaName == extension.DefaultQuotaName || quotaName == extension.SystemQuotaName {
if quotaName == extension.RootQuotaName || quotaName == extension.DefaultQuotaName || quotaName == extension.SystemQuotaName {
continue
}
resourceKeys := make(map[v1.ResourceName]struct{})
for resName := range quotaInfo.CalculateInfo.Max {
allResourceKeys[resName] = struct{}{}
resourceKeys[resName] = struct{}{}
}
}

if !reflect.DeepEqual(resourceKeys, gqm.resourceKeys) {
gqm.resourceKeys = resourceKeys
for _, runtimeQuotaCalculator := range gqm.runtimeQuotaCalculatorMap {
// update right now
if runtimeQuotaCalculator, ok := gqm.runtimeQuotaCalculatorMap[quotaName]; ok && runtimeQuotaCalculator != nil && !reflect.DeepEqual(resourceKeys, runtimeQuotaCalculator.resourceKeys) {
runtimeQuotaCalculator.updateResourceKeys(resourceKeys)
}
}

if !reflect.DeepEqual(allResourceKeys, gqm.resourceKeys) {
gqm.resourceKeys = allResourceKeys
}
// in case RootQuota-resourceKey is nil
if runtimeQuotaCalculator, ok := gqm.runtimeQuotaCalculatorMap[extension.RootQuotaName]; ok && runtimeQuotaCalculator != nil && !reflect.DeepEqual(allResourceKeys, runtimeQuotaCalculator.resourceKeys) {
runtimeQuotaCalculator.updateResourceKeys(allResourceKeys)
}
}

func (gqm *GroupQuotaManager) GetAllQuotaNames() map[string]struct{} {
Expand Down Expand Up @@ -1019,7 +1028,6 @@ func (gqm *GroupQuotaManager) updateQuotaInternalNoLock(newQuotaInfo, oldQuotaIn

// update resource keys
gqm.updateResourceKeyNoLock()

oldMin := v1.ResourceList{}
if oldQuotaInfo != nil {
oldMin = oldQuotaInfo.CalculateInfo.Min
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import (
)

const (
GigaByte = 1024 * 1048576
GigaByte = 1024 * 1048576
ExtendedResourceKeyXCPU = "x-cpu"
)

func TestGroupQuotaManager_QuotaAdd(t *testing.T) {
Expand Down Expand Up @@ -151,6 +152,7 @@ func TestGroupQuotaManager_UpdateQuota(t *testing.T) {
}

func TestGroupQuotaManager_UpdateQuotaInternalAndRequest(t *testing.T) {
// add resource to node
gqm := NewGroupQuotaManagerForTest()
deltaRes := createResourceList(96, 160*GigaByte)
gqm.UpdateClusterTotalResource(deltaRes)
Expand All @@ -160,23 +162,61 @@ func TestGroupQuotaManager_UpdateQuotaInternalAndRequest(t *testing.T) {

AddQuotaToManager(t, gqm, "test1", extension.RootQuotaName, 96, 160*GigaByte, 50, 80*GigaByte, true, false)

// test1 request[120, 290] runtime == maxQuota
// request[120, 290] > maxQuota, runtime == maxQuota
request := createResourceList(120, 290*GigaByte)
gqm.updateGroupDeltaRequestNoLock("test1", request, request, 0)
runtime := gqm.RefreshRuntime("test1")
assert.Equal(t, deltaRes, runtime)
expectCurrentRuntime := deltaRes
assert.Equal(t, expectCurrentRuntime, runtime)

// update resourceKey
quota1 := CreateQuota("test1", extension.RootQuotaName, 64, 100*GigaByte, 60, 90*GigaByte, true, false)
quota1.Labels[extension.LabelQuotaIsParent] = "false"
err := gqm.UpdateQuota(quota1, false)
assert.Nil(t, err)
quotaInfo := gqm.GetQuotaInfoByName("test1")
assert.Equal(t, createResourceList(64, 100*GigaByte), quotaInfo.CalculateInfo.Max)
runtime = gqm.RefreshRuntime("test1")
expectCurrentRuntime = createResourceList(64, 100*GigaByte)
assert.Equal(t, expectCurrentRuntime, runtime)

// added max ExtendedResourceKeyXCPU without node resource added
// runtime.ExtendedResourceKeyXCPU = 0
request[ExtendedResourceKeyXCPU] = *resource.NewQuantity(80, resource.DecimalSI)
gqm.updateGroupDeltaRequestNoLock("test1", request, request, 0)
xCPUQuantity := resource.NewQuantity(100, resource.DecimalSI)
quota1.Spec.Max[ExtendedResourceKeyXCPU] = *xCPUQuantity
maxJson, err := json.Marshal(quota1.Spec.Max)
assert.Nil(t, err)
quota1.Annotations[extension.AnnotationSharedWeight] = string(maxJson)
gqm.UpdateQuota(quota1, false)
quotaInfo = gqm.quotaInfoMap["test1"]
assert.True(t, quotaInfo != nil)
assert.Equal(t, *xCPUQuantity, quotaInfo.CalculateInfo.Max[ExtendedResourceKeyXCPU])
runtime = gqm.RefreshRuntime("test1")
assert.Equal(t, createResourceList(64, 100*GigaByte), runtime)
}
expectCurrentRuntime[ExtendedResourceKeyXCPU] = resource.Quantity{Format: resource.DecimalSI}
assert.Equal(t, expectCurrentRuntime, runtime)

// add ExtendedResourceKeyXCPU to node resource
deltaRes[ExtendedResourceKeyXCPU] = *xCPUQuantity
gqm.UpdateClusterTotalResource(deltaRes)
runtime = gqm.RefreshRuntime("test1")
expectCurrentRuntime[ExtendedResourceKeyXCPU] = *resource.NewQuantity(80, resource.DecimalSI)
assert.Equal(t, expectCurrentRuntime, runtime)

// delete max ExtendedResourceKeyXCPU
delete(quota1.Spec.Max, ExtendedResourceKeyXCPU)
maxJson, err = json.Marshal(quota1.Spec.Max)
assert.Nil(t, err)
quota1.Annotations[extension.AnnotationSharedWeight] = string(maxJson)
gqm.UpdateQuota(quota1, false)
quotaInfo = gqm.quotaInfoMap["test1"]
assert.True(t, quotaInfo != nil)
assert.Equal(t, resource.Quantity{}, quotaInfo.CalculateInfo.Max[ExtendedResourceKeyXCPU])
runtime = gqm.RefreshRuntime("test1")
delete(expectCurrentRuntime, ExtendedResourceKeyXCPU)
assert.Equal(t, expectCurrentRuntime, runtime)
}
func TestGroupQuotaManager_DeleteOneGroup(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()
gqm.UpdateClusterTotalResource(createResourceList(1000, 1000*GigaByte))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (qt *quotaTree) iterationForRedistribution(totalRes, totalSharedWeight int6
// if totalSharedWeight is not larger than 0, no need to iterate anymore.
return
}

needAdjustQuotaNodes := make([]*quotaNode, 0)
toPartitionResource, needAdjustTotalSharedWeight := int64(0), int64(0)
for _, node := range nodes {
Expand Down
152 changes: 132 additions & 20 deletions pkg/scheduler/plugins/elasticquota/core/runtime_quota_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ import (
"github.com/koordinator-sh/koordinator/apis/thirdparty/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
)

const (
TestNode1 = "node1"
TestNode2 = "node2"
TestNode3 = "node3"
TestNode4 = "node4"
)

func TestQuotaInfo_GetLimitRequest(t *testing.T) {
max := createResourceList(100, 10000)
req := createResourceList(1000, 1000)
Expand Down Expand Up @@ -129,29 +136,134 @@ func createElasticQuota() *v1alpha1.ElasticQuota {
return eQ
}

func TestRuntimeQuotaCalculator_Iteration4AdjustQuota(t *testing.T) {
qtw := NewRuntimeQuotaCalculator("testTreeName")
resourceKey := make(map[corev1.ResourceName]struct{})
cpu := corev1.ResourceCPU
resourceKey[cpu] = struct{}{}
qtw.updateResourceKeys(resourceKey)
qtw.quotaTree[cpu].insert("node1", 40, 5, 10, 0, true)
qtw.quotaTree[cpu].insert("node2", 60, 20, 15, 0, true)
qtw.quotaTree[cpu].insert("node3", 50, 40, 20, 0, true)
qtw.quotaTree[cpu].insert("node4", 80, 70, 15, 0, true)
qtw.totalResource = corev1.ResourceList{}
qtw.totalResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(100, resource.DecimalSI)
qtw.calculateRuntimeNoLock()
if qtw.globalRuntimeVersion == 0 {
t.Error("error")
func TestRuntimeQuotaCalculator_IterationAdjustQuota(t *testing.T) {
type quotaNodeInfo = struct {
groupName string
sharedWeight int64
request int64
min int64
guarantee int64
allowLentResource bool
}
if qtw.quotaTree[cpu].quotaNodes["node1"].runtimeQuota != 5 ||
qtw.quotaTree[cpu].quotaNodes["node2"].runtimeQuota != 20 ||
qtw.quotaTree[cpu].quotaNodes["node3"].runtimeQuota != 35 ||
qtw.quotaTree[cpu].quotaNodes["node4"].runtimeQuota != 40 {
t.Error("error")
node1 := &quotaNodeInfo{
groupName: TestNode1,
sharedWeight: 40,
request: 5,
min: 10,
guarantee: 0,
allowLentResource: true,
}
node2 := &quotaNodeInfo{
groupName: TestNode2,
sharedWeight: 60,
request: 20,
min: 15,
guarantee: 0,
allowLentResource: true,
}
node3 := &quotaNodeInfo{
groupName: TestNode3,
sharedWeight: 50,
request: 40,
min: 20,
guarantee: 0,
allowLentResource: true,
}
node4 := &quotaNodeInfo{
groupName: TestNode4,
sharedWeight: 80,
request: 70,
min: 15,
guarantee: 0,
allowLentResource: true,
}
node4_1 := &quotaNodeInfo{
groupName: TestNode4,
sharedWeight: 0,
request: 70,
min: 15,
guarantee: 0,
allowLentResource: true,
}
node4_2 := &quotaNodeInfo{
groupName: TestNode4,
sharedWeight: 0,
request: 70,
min: 15,
guarantee: 45,
allowLentResource: true,
}

testCases := []struct {
name string
totalResource corev1.ResourceList
nodes []*quotaNodeInfo
expectedRuntimeMp map[string]map[corev1.ResourceName]int64
}{
{
name: "case1-no-guarantee",
totalResource: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
},
nodes: []*quotaNodeInfo{node1, node2, node3, node4},
expectedRuntimeMp: map[string]map[corev1.ResourceName]int64{
TestNode1: {corev1.ResourceCPU: 5},
TestNode2: {corev1.ResourceCPU: 20},
TestNode3: {corev1.ResourceCPU: 35},
TestNode4: {corev1.ResourceCPU: 40},
},
},
{
name: "case2-node4.sharedWeight=0",
totalResource: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
},
nodes: []*quotaNodeInfo{node1, node2, node3, node4_1},
expectedRuntimeMp: map[string]map[corev1.ResourceName]int64{
TestNode1: {corev1.ResourceCPU: 5},
TestNode2: {corev1.ResourceCPU: 20},
TestNode3: {corev1.ResourceCPU: 40},
TestNode4: {corev1.ResourceCPU: 15},
},
},
{
name: "case3-node4.guarantee>min",
totalResource: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
},
nodes: []*quotaNodeInfo{node1, node2, node3, node4_2},
expectedRuntimeMp: map[string]map[corev1.ResourceName]int64{
TestNode1: {corev1.ResourceCPU: 5},
TestNode2: {corev1.ResourceCPU: 20},
TestNode3: {corev1.ResourceCPU: 30},
TestNode4: {corev1.ResourceCPU: 45},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
qtw := NewRuntimeQuotaCalculator("testTreeName")
resourceKey := make(map[corev1.ResourceName]struct{})
for key, value := range tc.totalResource {
if !value.IsZero() {
resourceKey[key] = struct{}{}
}
}
qtw.updateResourceKeys(resourceKey)
qtw.totalResource = tc.totalResource
for _, node := range tc.nodes {
for resKey := range resourceKey {
qtw.quotaTree[resKey].insert(node.groupName, node.sharedWeight, node.request, node.min, node.guarantee, node.allowLentResource)
}
}
qtw.calculateRuntimeNoLock()
for node, rq := range tc.expectedRuntimeMp {
for resKey, q := range rq {
assert.Equal(t, q, qtw.quotaTree[resKey].quotaNodes[node].runtimeQuota)
}
}
})
}
}

func createQuotaInfoWithRes(name string, max, min corev1.ResourceList) *QuotaInfo {
Expand Down
41 changes: 39 additions & 2 deletions pkg/webhook/elasticquota/quota_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (qt *quotaTopology) ValidDeleteQuota(quota *v1alpha1.ElasticQuota) error {
return nil
}

// fillQuotaDefaultInformation fills quota with default information if not configure
// fillQuotaDefaultInformation fills quota with default information if not be configured
func (qt *quotaTopology) fillQuotaDefaultInformation(quota *v1alpha1.ElasticQuota) error {
if quota.Name == extension.RootQuotaName {
return nil
Expand Down Expand Up @@ -235,8 +235,20 @@ func (qt *quotaTopology) fillQuotaDefaultInformation(quota *v1alpha1.ElasticQuot
if sharedWeight, exist := quota.Annotations[extension.AnnotationSharedWeight]; !exist || len(sharedWeight) == 0 {
quota.Annotations[extension.AnnotationSharedWeight] = string(maxQuota)
klog.V(5).Infof("fill quota %v sharedWeight as max", quota.Name)
} else {
sharedWeightRL := make(corev1.ResourceList)
err = json.Unmarshal([]byte(sharedWeight), &sharedWeightRL)
if err != nil {
return fmt.Errorf("fillDefaultQuotaInfo unmarshal sharedWeight failed:%v", err)
}
if fixedSharedWeight(sharedWeightRL, quota.Spec.Max) {
fixedSharedWeightRL, err := json.Marshal(&sharedWeightRL)
if err != nil {
return fmt.Errorf("fillDefaultQuotaInfo marshal fixedSharedWeight max failed:%v", err)
}
quota.Annotations[extension.AnnotationSharedWeight] = string(fixedSharedWeightRL)
}
}

return nil
}

Expand Down Expand Up @@ -286,3 +298,28 @@ func (qt *quotaTopology) getQuotaInfo(name, namespace string) *QuotaInfo {
}
return nil
}

// fixedSharedWeight keep keys in sharedWeight and maxQuota same
// if key in maxQuota not included in sharedWeight, add key/value in sharedWeight
// if key in sharedWeight not included in maxQuota, delete key/value in sharedWeight
// if fixed, return true
func fixedSharedWeight(sharedWeight, maxQuota corev1.ResourceList) bool {
fixed := false
for key, value := range maxQuota {
if _, ok := sharedWeight[key]; !ok {
sharedWeight[key] = value
fixed = true
}
}
toDeleted := make([]corev1.ResourceName, 0)
for key := range sharedWeight {
if _, ok := maxQuota[key]; !ok {
toDeleted = append(toDeleted, key)
}
}
for _, key := range toDeleted {
fixed = true
delete(sharedWeight, key)
}
return fixed
}
Loading
Loading