From 1017f1c84b89b6d7c7a9195bedbf07169d2e4529 Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Fri, 8 Nov 2024 15:04:38 +0800 Subject: [PATCH 1/5] chore(qrm): refactor cpu/memory read-only state --- .../qrm-plugins/cpu/dynamicpolicy/policy.go | 21 +----------- .../cpu/dynamicpolicy/policy_test.go | 9 ----- .../cpu/dynamicpolicy/state/state.go | 27 +++++++++++++++ .../cpu/dynamicpolicy/state/state_test.go | 9 +++++ .../qrm-plugins/cpu/nativepolicy/policy.go | 21 +----------- .../cpu/nativepolicy/policy_test.go | 9 ----- .../memory/dynamicpolicy/policy.go | 21 +----------- .../memory/dynamicpolicy/policy_test.go | 10 ------ .../memory/dynamicpolicy/state/state.go | 27 +++++++++++++++ .../memory/dynamicpolicy/state/state_test.go | 33 +++++++++++++++++++ 10 files changed, 99 insertions(+), 88 deletions(-) create mode 100644 pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index 4458f4712..7662ba48e 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -71,23 +71,6 @@ const ( healthCheckTolerationTimes = 3 ) -var ( - readonlyStateLock sync.RWMutex - readonlyState state.ReadonlyState -) - -// GetReadonlyState returns state.ReadonlyState to provides a way -// to obtain the running states of the plugin -func GetReadonlyState() (state.ReadonlyState, error) { - readonlyStateLock.RLock() - defer readonlyStateLock.RUnlock() - - if readonlyState == nil { - return nil, fmt.Errorf("readonlyState isn't set") - } - return readonlyState, nil -} - // DynamicPolicy is the policy that's used by default; // it will consider the dynamic running information to calculate // and adjust resource requirements and configurations @@ -150,9 +133,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr) } - readonlyStateLock.Lock() - readonlyState = stateImpl - readonlyStateLock.Unlock() + state.SetReadonlyState(stateImpl) wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 42c599a5f..51eafa946 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -4614,15 +4614,6 @@ func entriesMatch(entries1, entries2 state.PodEntries) (bool, error) { return true, nil } -func TestGetReadonlyState(t *testing.T) { - t.Parallel() - - as := require.New(t) - readonlyState, err := GetReadonlyState() - as.NotNil(err) - as.Nil(readonlyState) -} - func TestClearResidualState(t *testing.T) { t.Parallel() diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go index 22befed02..1ad80bb47 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go @@ -19,6 +19,7 @@ package state import ( "encoding/json" "fmt" + "sync" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -562,3 +563,29 @@ type ReadonlyState interface { } type GenerateMachineStateFromPodEntriesFunc func(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error) + +var ( + readonlyStateLock sync.RWMutex + readonlyState ReadonlyState +) + +// GetReadonlyState returns state.ReadonlyState to provides a way +// to obtain the running states of the plugin +func GetReadonlyState() (ReadonlyState, error) { + readonlyStateLock.RLock() + defer readonlyStateLock.RUnlock() + + if readonlyState == nil { + return nil, fmt.Errorf("readonlyState isn't set") + } + return readonlyState, nil +} + +// SetReadonlyState sets state.ReadonlyState to be used by the plugin +// this function should only be called once during initialization +func SetReadonlyState(state ReadonlyState) { + readonlyStateLock.Lock() + defer readonlyStateLock.Unlock() + + readonlyState = state +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go index 711e38efa..9b6334774 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go @@ -3233,3 +3233,12 @@ func TestGetAvailableCPUQuantity(t *testing.T) { cpuQuantity := int(math.Ceil(nodeState.GetAvailableCPUQuantity(machine.NewCPUSet()))) require.Equal(t, 15, cpuQuantity) } + +func TestGetReadonlyState(t *testing.T) { + t.Parallel() + + as := require.New(t) + state, err := GetReadonlyState() + as.NotNil(err) + as.Nil(state) +} diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go index 823d28af5..5d9dbc38b 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go @@ -55,23 +55,6 @@ const ( maxResidualTime = 5 * time.Minute ) -var ( - readonlyStateLock sync.RWMutex - readonlyState state.ReadonlyState -) - -// GetReadonlyState returns state.ReadonlyState to provides a way -// to obtain the running states of the plugin -func GetReadonlyState() (state.ReadonlyState, error) { - readonlyStateLock.RLock() - defer readonlyStateLock.RUnlock() - - if readonlyState == nil { - return nil, fmt.Errorf("readonlyState isn't setted") - } - return readonlyState, nil -} - // NativePolicy is a policy compatible with Kubernetes native semantics and is used in topology-aware scheduling scenarios. type NativePolicy struct { sync.RWMutex @@ -115,9 +98,7 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration, return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr) } - readonlyStateLock.Lock() - readonlyState = stateImpl - readonlyStateLock.Unlock() + state.SetReadonlyState(stateImpl) wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go index 5271b1f00..318f8dbe3 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go @@ -390,15 +390,6 @@ func TestAllocateForPod(t *testing.T) { _ = os.RemoveAll(tmpDir) } -func TestGetReadonlyState(t *testing.T) { - t.Parallel() - - as := require.New(t) - readonlyState, err := GetReadonlyState() - as.NotNil(err) - as.Nil(readonlyState) -} - func TestClearResidualState(t *testing.T) { t.Parallel() diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 0aeb097e7..62939f779 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -90,23 +90,6 @@ const ( movePagesWorkLimit = 2 ) -var ( - readonlyStateLock sync.RWMutex - readonlyState state.ReadonlyState -) - -// GetReadonlyState returns state.ReadonlyState to provides a way -// to obtain the running states of the plugin -func GetReadonlyState() (state.ReadonlyState, error) { - readonlyStateLock.RLock() - defer readonlyStateLock.RUnlock() - - if readonlyState == nil { - return nil, fmt.Errorf("readonlyState isn't setted") - } - return readonlyState, nil -} - type DynamicPolicy struct { sync.RWMutex @@ -193,9 +176,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration general.Infof("empty ExtraControlKnobConfigFile, initialize empty extraControlKnobConfigs") } - readonlyStateLock.Lock() - readonlyState = stateImpl - readonlyStateLock.Unlock() + state.SetReadonlyState(stateImpl) wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 77f1ef8f0..7cd3485ac 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -1953,16 +1953,6 @@ func TestGetResourcesAllocation(t *testing.T) { }, resp5.PodResources[req.PodUid].ContainerResources[testName].ResourceAllocation[string(v1.ResourceMemory)]) } -func TestGetReadonlyState(t *testing.T) { - t.Parallel() - - as := require.New(t) - readonlyState, err := GetReadonlyState() - if readonlyState == nil { - as.NotNil(err) - } -} - func TestGenerateResourcesMachineStateFromPodEntries(t *testing.T) { t.Parallel() diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go index 5914f9c5c..c66f0b0f6 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go @@ -19,6 +19,7 @@ package state import ( "encoding/json" "fmt" + "sync" info "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" @@ -384,3 +385,29 @@ type State interface { writer ReadonlyState } + +var ( + readonlyStateLock sync.RWMutex + readonlyState ReadonlyState +) + +// GetReadonlyState returns state.ReadonlyState to provides a way +// to obtain the running states of the plugin +func GetReadonlyState() (ReadonlyState, error) { + readonlyStateLock.RLock() + defer readonlyStateLock.RUnlock() + + if readonlyState == nil { + return nil, fmt.Errorf("readonlyState isn't setted") + } + return readonlyState, nil +} + +// SetReadonlyState sets state.ReadonlyState to be used by the plugin +// this function should only be called once during initialization +func SetReadonlyState(state ReadonlyState) { + readonlyStateLock.Lock() + defer readonlyStateLock.Unlock() + + readonlyState = state +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go new file mode 100644 index 000000000..727e5b7be --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetReadonlyState(t *testing.T) { + t.Parallel() + + as := require.New(t) + state, err := GetReadonlyState() + if state == nil { + as.NotNil(err) + } +} From 0a151ab4f7cf2f742138b3db9dcedebfaa27fd3f Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Fri, 8 Nov 2024 16:05:19 +0800 Subject: [PATCH 2/5] feat(qrm): support numa binding manager --- .../qrm-plugins/cpu/dynamicpolicy/policy.go | 14 + .../shared_numa_binding_manager.go | 35 ++ .../qrm-plugins/util/allocation/manager.go | 124 ++++++ .../calculator/backtracking_calculator.go | 378 ++++++++++++++++++ .../backtracking_calculator_test.go | 281 +++++++++++++ .../util/numabinding/calculator/calculator.go | 57 +++ .../calculator/dryrun_calculator.go | 66 +++ .../calculator/greedy_calculator.go | 105 +++++ .../calculator/greedy_calculator_test.go | 177 ++++++++ .../util/numabinding/calculator/util.go | 97 +++++ .../qrm-plugins/util/numabinding/manager.go | 142 +++++++ pkg/agent/qrm-plugins/util/state/manager.go | 118 ++++++ 12 files changed, 1594 insertions(+) create mode 100644 pkg/agent/qrm-plugins/cpu/dynamicpolicy/shared_numa_binding_manager.go create mode 100644 pkg/agent/qrm-plugins/util/allocation/manager.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator_test.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/util.go create mode 100644 pkg/agent/qrm-plugins/util/numabinding/manager.go create mode 100644 pkg/agent/qrm-plugins/util/state/manager.go diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index 7662ba48e..6b017730c 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -44,6 +44,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator" cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/numabinding" "github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler" "github.com/kubewharf/katalyst-core/pkg/config" dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" @@ -116,6 +117,8 @@ type DynamicPolicy struct { transitionPeriod time.Duration cpuNUMAHintPreferPolicy string cpuNUMAHintPreferLowThreshold float64 + + sharedNUNMABindingManager numabinding.Manager } func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration, @@ -217,6 +220,11 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy initReclaimPool failed with error: %v", err) } + policyImplement.sharedNUNMABindingManager, err = numabinding.NewSharedNUMABindingManager(conf, wrappedEmitter, agentCtx.MetaServer, NewCPUAllocationUpdater(stateImpl)) + if err != nil { + return false, agent.ComponentStub{}, err + } + err = agentCtx.MetaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR) if err != nil { return false, nil, err @@ -328,6 +336,7 @@ func (p *DynamicPolicy) Start() (err error) { return } go p.advisorMonitor.Run(p.stopCh) + go p.sharedNUNMABindingManager.Run(p.stopCh) go wait.BackoffUntil(func() { p.serveForAdvisor(p.stopCh) }, wait.NewExponentialBackoffManager( 800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh) @@ -676,6 +685,11 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context, if p.hintHandlers[qosLevel] == nil { return nil, fmt.Errorf("katalyst QoS level: %s is not supported yet", qosLevel) } + + if p.sharedNUNMABindingManager.IsProcessing() { + return nil, fmt.Errorf("numa binding manager is processing") + } + return p.hintHandlers[qosLevel](ctx, req) } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/shared_numa_binding_manager.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/shared_numa_binding_manager.go new file mode 100644 index 000000000..d551c0a83 --- /dev/null +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/shared_numa_binding_manager.go @@ -0,0 +1,35 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicpolicy + +import ( + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" +) + +type cpuAllocationUpdater struct { + state state.State +} + +func NewCPUAllocationUpdater(state state.State) allocation.Updater { + return &cpuAllocationUpdater{state: state} +} + +func (s *cpuAllocationUpdater) UpdateAllocation(m allocation.PodAllocations) error { + // todo + return nil +} diff --git a/pkg/agent/qrm-plugins/util/allocation/manager.go b/pkg/agent/qrm-plugins/util/allocation/manager.go new file mode 100644 index 000000000..b3591e7c1 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/allocation/manager.go @@ -0,0 +1,124 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocation + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +type PodAllocations map[string]*Allocation + +func (p *PodAllocations) Clone() PodAllocations { + a := make(PodAllocations) + for uid, allocation := range *p { + a[uid] = allocation.Clone() + } + return a +} + +type Request struct { + CPUMilli int64 + Memory int64 +} + +type Allocation struct { + types.NamespacedName + Request + BindingNUMA int +} + +func (n *Allocation) Clone() *Allocation { + if n == nil { + return nil + } + return &Allocation{ + BindingNUMA: n.BindingNUMA, + NamespacedName: n.NamespacedName, + Request: Request{ + CPUMilli: n.CPUMilli, + Memory: n.Memory, + }, + } +} + +func (n *Allocation) String() string { + if n == nil { + return "" + } + return fmt.Sprintf("%s/%d/%d/%d", n.NamespacedName, n.BindingNUMA, n.CPUMilli, n.Memory) +} + +type Updater interface { + UpdateAllocation(PodAllocations) error +} + +func GetPodAllocations(ctx context.Context, metaServer *metaserver.MetaServer, memoryState state.ReadonlyState) (PodAllocations, error) { + allocations := make(PodAllocations) + podEntries := memoryState.GetPodResourceEntries()[v1.ResourceMemory] + for uid, podEntry := range podEntries { + for _, container := range podEntry { + if !container.CheckMainContainer() || !container.CheckShared() { + continue + } + + bindingNUMA := -1 + if container.CheckNUMABinding() { + bindingNUMA = container.NumaAllocationResult.ToSliceNoSortInt()[0] + } + + pod, err := metaServer.GetPod(ctx, uid) + if err != nil { + general.Errorf("get pod %s failed: %v", uid, err) + return nil, err + } + + if !native.PodIsActive(pod) { + continue + } + + allocations[uid] = &Allocation{ + NamespacedName: types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + }, + BindingNUMA: bindingNUMA, + Request: getPodRequest(pod), + } + } + } + + return allocations, nil +} + +func getPodRequest(pod *v1.Pod) Request { + req := native.SumUpPodRequestResources(pod) + cpuRequest := native.CPUQuantityGetter()(req) + memoryRequest := native.MemoryQuantityGetter()(req) + return Request{ + CPUMilli: cpuRequest.MilliValue(), + Memory: memoryRequest.Value(), + } +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go new file mode 100644 index 000000000..1c2714f63 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go @@ -0,0 +1,378 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "context" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/bitmask" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +const ( + calculatorNameBackTracking = "backTracking" +) + +const ( + metricNameAsyncCalculateTimeCost = "async_calculate_time_cost" +) + +type backTrackingCalculator struct { + emitter metrics.MetricEmitter + metaServer *metaserver.MetaServer + reservedCPUs machine.CPUSet + maxNUMANum int + numaNodes []int + + simpleCalculator NUMABindingCalculator + + mux sync.RWMutex + lastOptimalResult allocation.PodAllocations +} + +func NewBackTrackingCalculator( + _ *coreconfig.Configuration, + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + reservedCPUs machine.CPUSet, +) NUMABindingCalculator { + return &backTrackingCalculator{ + emitter: emitter, + metaServer: metaServer, + // todo: use numa num from conf + maxNUMANum: 1, + numaNodes: metaServer.CPUDetails.NUMANodes().ToSliceInt(), + reservedCPUs: reservedCPUs, + simpleCalculator: NewGreedyCalculator(), + } +} + +func (b *backTrackingCalculator) Name() string { + return calculatorNameBackTracking +} + +func (b *backTrackingCalculator) Run(ctx context.Context) { + wait.UntilWithContext(ctx, b.sync, 30*time.Second) +} + +func (b *backTrackingCalculator) getLastOptimalResult() allocation.PodAllocations { + b.mux.RLock() + defer b.mux.RUnlock() + return b.lastOptimalResult +} + +func (b *backTrackingCalculator) CalculateNUMABindingResult(current allocation.PodAllocations, + numaAllocatable state.NUMAResource, +) (allocation.PodAllocations, bool, error) { + lastOptimalResult := b.getLastOptimalResult() + if lastOptimalResult == nil { + return b.simpleCalculator.CalculateNUMABindingResult(current, numaAllocatable) + } + + result := current.Clone() + // if the pod is not in last optimal result, we should use a simple calculator + for podUID, alloc := range result { + if alloc.BindingNUMA != -1 { + continue + } + if optimalResult, ok := lastOptimalResult[podUID]; ok { + alloc.BindingNUMA = optimalResult.BindingNUMA + } + } + + return b.simpleCalculator.CalculateNUMABindingResult(result, numaAllocatable) +} + +func (b *backTrackingCalculator) asyncCalculateNUMABindingResult(current allocation.PodAllocations, + numaAllocatable state.NUMAResource, +) (allocation.PodAllocations, bool, error) { + begin := time.Now() + defer func() { + costs := time.Since(begin) + general.InfoS("async calculate numa result", "calculator", b.Name(), "costs", costs) + _ = b.emitter.StoreInt64(metricNameAsyncCalculateTimeCost, costs.Microseconds(), metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "calculator", Val: b.Name()}) + }() + + allNUMABindingResults, resultsIndex, err := b.getAllNUMABindingResults(current, numaAllocatable) + if err != nil { + general.Errorf("get numa allocation failed: %v", err) + return nil, false, err + } + + optimalResults := b.mergeNUMABindingResults(allNUMABindingResults, resultsIndex, current, numaAllocatable) + podAllocation := current.Clone() + for i, result := range optimalResults { + uid := resultsIndex[i] + pod := podAllocation[uid] + if pod.BindingNUMA != -1 { + continue + } + if result.numaNodeAffinity.Count() == 1 { + pod.BindingNUMA = result.numaNodeAffinity.GetBits()[0] + } + } + return podAllocation, len(optimalResults) > 0, nil +} + +func (b *backTrackingCalculator) sync(ctx context.Context) { + cpuState, memoryState, err := state.GetCPUMemoryReadonlyState() + if err != nil { + return + } + + podAllocation, err := allocation.GetPodAllocations(ctx, b.metaServer, memoryState) + if err != nil { + general.Errorf("get numa allocation failed: %v", err) + return + } + + numaAllocatable, err := state.GetSharedNUMAAllocatable( + b.metaServer.CPUDetails.NUMANodes().ToSliceNoSortInt(), + b.reservedCPUs, + cpuState, memoryState) + if err != nil { + general.Errorf("get numa allocation failed: %v", err) + return + } + + podAllocation, _, err = b.asyncCalculateNUMABindingResult(podAllocation, numaAllocatable) + if err != nil { + general.Errorf("calculate numa allocation failed: %v", err) + return + } + + b.mux.Lock() + b.lastOptimalResult = podAllocation + b.mux.Unlock() +} + +func (b *backTrackingCalculator) getAllNUMABindingResults(podAllocation allocation.PodAllocations, + numaAllocatable state.NUMAResource, +) ([][]numaBindingResult, map[int]string, error) { + numaBindingNUMAs := b.getNUMABindingNUMAs(podAllocation) + resultsIndex := make(map[int]string) + allNUMABindingResults := make([][]numaBindingResult, 0, len(podAllocation)) + for podUID, alloc := range podAllocation { + resultsIndex[len(allNUMABindingResults)] = podUID + results, err := getNUMABindingResults(alloc, b.numaNodes, numaAllocatable, b.maxNUMANum, numaBindingNUMAs) + if err != nil { + general.Errorf("get numa allocation for %s failed: %v", alloc.String(), err) + return nil, nil, err + } + allNUMABindingResults = append(allNUMABindingResults, results) + } + return allNUMABindingResults, resultsIndex, nil +} + +func (b *backTrackingCalculator) mergeNUMABindingResults(results [][]numaBindingResult, index map[int]string, + podAllocation allocation.PodAllocations, numaAllocatable state.NUMAResource, +) []numaBindingResult { + var optimalResults []numaBindingResult + optimalNUMACount := 0 + minNonNUMABindingCount := 0 + withAllPodNUMABindingResult := false + iterateAllNUMABindingResults(results, func(results []numaBindingResult) { + current := numaAllocatable.Clone() + if b.maxNUMANum > 1 { + maxNUMACount := 1 + maxNUMANodeAffinity, _ := bitmask.NewBitMask(1) + for _, result := range results { + if result.numaNodeAffinity.Count() > maxNUMACount { + maxNUMACount = result.numaNodeAffinity.Count() + maxNUMANodeAffinity = result.numaNodeAffinity + } + } + + if maxNUMACount > 1 { + if withAllPodNUMABindingResult { + return + } + + totalAllocatable := state.Resource{} + for i := 0; i < maxNUMACount; i++ { + totalAllocatable.AddResource(numaAllocatable[i]) + } + + nonNUMABindingCount := 0 + for i, result := range results { + if bitmask.And(result.numaNodeAffinity, maxNUMANodeAffinity).Count() != 0 { + if !totalAllocatable.IsSatisfied(podAllocation[index[i]]) { + return + } + totalAllocatable.SubAllocation(podAllocation[index[i]]) + nonNUMABindingCount += 1 + } else { + numaID := result.numaNodeAffinity.GetBits()[0] + if !current[numaID].IsSatisfied(podAllocation[index[i]]) { + return + } + current[numaID].SubAllocation(podAllocation[index[i]]) + } + } + optimalNUMACount = len(b.numaNodes) - maxNUMACount + if nonNUMABindingCount < minNonNUMABindingCount { + minNonNUMABindingCount = nonNUMABindingCount + optimalResults = deepCopyNUMABindingResults(results) + } + return + } + } + + numaMask := bitmask.NewEmptyBitMask() + for i, result := range results { + numaID := result.numaNodeAffinity.GetBits()[0] + if !current[numaID].IsSatisfied(podAllocation[index[i]]) { + return + } + numaMask.Or(result.numaNodeAffinity) + current[numaID].SubAllocation(podAllocation[index[i]]) + } + numaCount := numaMask.Count() + if !withAllPodNUMABindingResult || numaCount < optimalNUMACount { + optimalNUMACount = numaCount + optimalResults = deepCopyNUMABindingResults(results) + withAllPodNUMABindingResult = true + } + }) + return optimalResults +} + +func (b *backTrackingCalculator) getNUMABindingNUMAs(podAllocation allocation.PodAllocations) sets.Int { + numaSet := sets.NewInt() + for _, alloc := range podAllocation { + if alloc.BindingNUMA != -1 { + numaSet.Insert(alloc.BindingNUMA) + } + } + return numaSet +} + +// numaBindingResult is a struct containing the numaNodeAffinity for a pod +type numaBindingResult struct { + numaNodeAffinity bitmask.BitMask +} + +func deepCopyNUMABindingResults(results []numaBindingResult) []numaBindingResult { + c := make([]numaBindingResult, 0, len(results)) + for _, result := range results { + c = append(c, result) + } + return c +} + +func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int, + numaAllocatable state.NUMAResource, maxNUMANum int, numaBindingNUMAs sets.Int, +) ([]numaBindingResult, error) { + numaBindingResults := make([]numaBindingResult, 0, len(numaNodes)) + for _, n := range numaNodes { + m, _ := bitmask.NewBitMask(n) + if !numaAllocatable[n].IsSatisfied(allocation) { + continue + } + + if allocation.BindingNUMA != -1 && allocation.BindingNUMA != n { + continue + } + + numaBindingResults = append(numaBindingResults, numaBindingResult{ + numaNodeAffinity: m, + }) + } + + if allocation.BindingNUMA != -1 { + return numaBindingResults, nil + } + + maxNUMANum = general.Min(maxNUMANum, len(numaNodes)) + if maxNUMANum > 1 { + for i := 2; i < maxNUMANum; i++ { + m := bitmask.NewEmptyBitMask() + totalResource := state.Resource{} + for j := 0; j < i; j++ { + err := m.Add(j) + if err != nil { + return nil, err + } + totalResource.AddResource(numaAllocatable[j]) + } + + if numaBindingNUMAs.Intersection(sets.NewInt(m.GetBits()...)).Len() > 0 { + continue + } + + if !totalResource.IsSatisfied(allocation) { + continue + } + + numaBindingResults = append(numaBindingResults, numaBindingResult{ + numaNodeAffinity: m, + }) + } + } + return numaBindingResults, nil +} + +// Iterate over all permutations of hints in 'allNUMABindingResults [][]numaBindingResult'. +// +// This procedure is implemented as a recursive function over the set of results +// in 'allNUMABindingResults[i]'. It applies the function 'callback' to each +// permutation as it is found. It is the equivalent of: +// +// for i := 0; i < len(allNUMABindingResults[0]); i++ +// +// for j := 0; j < len(allNUMABindingResults[1]); j++ +// for k := 0; k < len(allNUMABindingResults[2]); k++ +// ... +// for z := 0; z < len(allNUMABindingResults[-1]); z++ +// permutation := []numaBindingResult{ +// allNUMABindingResults[0][i], +// allNUMABindingResults[1][j], +// allNUMABindingResults[2][k], +// ... +// allNUMABindingResults[-1][z] +// } +// callback(permutation) +func iterateAllNUMABindingResults(allNUMABindingResults [][]numaBindingResult, callback func([]numaBindingResult)) { + // Internal helper function to accumulate the permutation before calling the callback. + var iterate func(i int, accum []numaBindingResult) + iterate = func(i int, accum []numaBindingResult) { + // Base case: we have looped through all providers and have a full permutation. + if i == len(allNUMABindingResults) { + callback(accum) + return + } + + // Loop through all hints for provider 'i', and recurse to build the + // the permutation of this hint with all hints from providers 'i++'. + for j := range allNUMABindingResults[i] { + iterate(i+1, append(accum, allNUMABindingResults[i][j])) + } + } + iterate(0, []numaBindingResult{}) +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go new file mode 100644 index 000000000..f025a3e26 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go @@ -0,0 +1,281 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" + "github.com/kubewharf/katalyst-core/pkg/util/bitmask" +) + +func newBitMask(bits ...int) bitmask.BitMask { + mask, _ := bitmask.NewBitMask(bits...) + return mask +} + +func Test_getNUMABindingResults(t *testing.T) { + t.Parallel() + type args struct { + allocation *allocation.Allocation + numaNodes []int + numaAllocatable state.NUMAResource + maxNUMANum int + numaBindingNUMAs sets.Int + } + tests := []struct { + name string + args args + want []numaBindingResult + wantErr bool + }{ + { + name: "test1", + args: args{ + allocation: &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 1000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + numaAllocatable: state.NUMAResource{ + 0: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 1: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 2: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 3: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + }, + maxNUMANum: 1, + numaNodes: []int{ + 0, 1, 2, 3, + }, + numaBindingNUMAs: sets.NewInt(), + }, + want: []numaBindingResult{ + { + numaNodeAffinity: newBitMask(0), + }, + { + numaNodeAffinity: newBitMask(1), + }, + { + numaNodeAffinity: newBitMask(2), + }, + { + numaNodeAffinity: newBitMask(3), + }, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := getNUMABindingResults(tt.args.allocation, tt.args.numaNodes, tt.args.numaAllocatable, tt.args.maxNUMANum, tt.args.numaBindingNUMAs) + if (err != nil) != tt.wantErr { + t.Errorf("getNUMABindingResults() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getNUMABindingResults() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) { + t.Parallel() + type fields struct { + maxNUMANum int + numaNodes []int + } + type args struct { + current allocation.PodAllocations + numaAllocatable state.NUMAResource + } + tests := []struct { + name string + fields fields + args args + want allocation.PodAllocations + want1 bool + wantErr bool + }{ + { + name: "test1", + fields: fields{ + maxNUMANum: 1, + numaNodes: []int{0, 1, 2, 3}, + }, + args: args{ + current: allocation.PodAllocations{ + "pod1": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 1000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + "pod2": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod2", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 2000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + "pod3": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod3", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 3000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + "pod4": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod4", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 4000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + }, + numaAllocatable: state.NUMAResource{ + 0: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 1: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 2: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 3: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + }, + }, + want: allocation.PodAllocations{ + "pod1": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 1000, + Memory: 1000, + }, + BindingNUMA: 0, + }, + "pod2": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod2", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 2000, + Memory: 1000, + }, + BindingNUMA: 1, + }, + "pod3": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod3", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 3000, + Memory: 1000, + }, + BindingNUMA: 1, + }, + "pod4": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod4", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 4000, + Memory: 1000, + }, + BindingNUMA: 0, + }, + }, + want1: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + b := &backTrackingCalculator{ + maxNUMANum: tt.fields.maxNUMANum, + numaNodes: tt.fields.numaNodes, + } + got, got1, err := b.asyncCalculateNUMABindingResult(tt.args.current, tt.args.numaAllocatable) + if (err != nil) != tt.wantErr { + t.Errorf("asyncCalculateNUMABindingResult() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("asyncCalculateNUMABindingResult() got = %v, want %v", got, tt.want) + } + if got1 != tt.want1 { + t.Errorf("asyncCalculateNUMABindingResult() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go new file mode 100644 index 000000000..17727dd70 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "context" + "sync" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +var ( + numaBindingCalculatorOnce sync.Once + numaBindingCalculator NUMABindingCalculator +) + +type NUMABindingCalculator interface { + Name() string + Run(ctx context.Context) + CalculateNUMABindingResult(current allocation.PodAllocations, + numaAllocatable state.NUMAResource) (allocation.PodAllocations, bool, error) +} + +func GetOrInitNUMABindingCalculator( + conf *coreconfig.Configuration, + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + reservedCPUs machine.CPUSet, +) NUMABindingCalculator { + numaBindingCalculatorOnce.Do(func() { + calculators := make([]NUMABindingCalculator, 0) + calculators = append(calculators, NewGreedyCalculator()) + calculators = append(calculators, NewBackTrackingCalculator(conf, emitter, metaServer, reservedCPUs)) + numaBindingCalculator = NewDryRunCalculator(emitter, calculators...) + numaBindingCalculator.Run(context.Background()) + }) + return numaBindingCalculator +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go new file mode 100644 index 000000000..860805d88 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go @@ -0,0 +1,66 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "context" + + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" +) + +const ( + CalculatorNameDryRun = "dryRun" +) + +type dryRunCalculator struct { + calculators []NUMABindingCalculator + emitter metrics.MetricEmitter +} + +func NewDryRunCalculator(emitter metrics.MetricEmitter, calculators ...NUMABindingCalculator) NUMABindingCalculator { + withLoggingCalculators := make([]NUMABindingCalculator, 0, len(calculators)) + for _, calculator := range calculators { + withLoggingCalculators = append(withLoggingCalculators, WithExecutionTimeLogging(calculator, emitter)) + } + return &dryRunCalculator{ + calculators: withLoggingCalculators, + emitter: emitter, + } +} + +func (d *dryRunCalculator) Run(ctx context.Context) { + for _, calc := range d.calculators { + go calc.Run(ctx) + } +} + +func (d *dryRunCalculator) CalculateNUMABindingResult(current allocation.PodAllocations, numaAllocatable state.NUMAResource) (allocation.PodAllocations, bool, error) { + for _, calc := range d.calculators { + result, success, err := calc.CalculateNUMABindingResult(current, numaAllocatable) + general.Infof("dry run calculator %s result: %v, success: %v, err: %v", calc.Name(), result, success, err) + CheckAllNUMABindingResult(d.emitter, calc.Name(), success, result) + } + return current, true, nil +} + +func (d *dryRunCalculator) Name() string { + return CalculatorNameDryRun +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator.go new file mode 100644 index 000000000..ae3b8c0f5 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator.go @@ -0,0 +1,105 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "context" + "sort" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" +) + +const ( + calculatorNameGreedy = "greedy" +) + +type greedyCalculator struct{} + +func NewGreedyCalculator() NUMABindingCalculator { + return &greedyCalculator{} +} + +func (g greedyCalculator) Name() string { + return calculatorNameGreedy +} + +func (g greedyCalculator) Run(_ context.Context) {} + +func (g greedyCalculator) CalculateNUMABindingResult(current allocation.PodAllocations, + numaAllocatable state.NUMAResource, +) (allocation.PodAllocations, bool, error) { + sortedPods := make([]string, 0, len(current)) + result := current.Clone() + numaAvailable := numaAllocatable.Clone() + for uid, pod := range current { + numaBinding := pod.BindingNUMA + if numaBinding == -1 { + sortedPods = append(sortedPods, uid) + continue + } + + numaAvailable[numaBinding].SubAllocation(pod) + } + + // sort pods by cpu and memory request + sort.SliceStable(sortedPods, func(i, j int) bool { + if current[sortedPods[i]].CPUMilli != current[sortedPods[j]].CPUMilli { + return current[sortedPods[i]].CPUMilli > current[sortedPods[j]].CPUMilli + } + + return current[sortedPods[i]].Memory > current[sortedPods[j]].Memory + }) + + // sort available numa by cpu available and memory available + // and the more available, the more preferred + availableNUMAs := make([]int, 0, len(numaAvailable)) + for numaID := range numaAvailable { + availableNUMAs = append(availableNUMAs, numaID) + } + sort.SliceStable(availableNUMAs, func(i, j int) bool { + availableI := numaAvailable[availableNUMAs[i]] + availableJ := numaAvailable[availableNUMAs[j]] + if availableI.CPU != availableJ.CPU { + return availableI.CPU > availableJ.CPU + } + + if availableI.Memory != availableJ.Memory { + return availableI.Memory > availableJ.Memory + } + + return availableNUMAs[i] < availableNUMAs[j] + }) + + success := true + for _, uid := range sortedPods { + podSuccess := false + for _, numaID := range availableNUMAs { + if numaAvailable[numaID].IsSatisfied(current[uid]) { + result[uid].BindingNUMA = numaID + numaAvailable[numaID].SubAllocation(current[uid]) + podSuccess = true + break + } + } + if !podSuccess { + success = false + } + } + + return result, success, nil +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator_test.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator_test.go new file mode 100644 index 000000000..11ca7608b --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/greedy_calculator_test.go @@ -0,0 +1,177 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/types" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" +) + +func Test_greedyCalculator_CalculateNUMABindingResult(t *testing.T) { + t.Parallel() + type args struct { + current allocation.PodAllocations + numaAllocatable state.NUMAResource + } + tests := []struct { + name string + args args + want allocation.PodAllocations + want1 bool + wantErr bool + }{ + { + name: "test1", + args: args{ + current: allocation.PodAllocations{ + "pod1": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 1000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + "pod2": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod2", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 2000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + "pod3": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod3", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 3000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + "pod4": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod4", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 4000, + Memory: 1000, + }, + BindingNUMA: -1, + }, + }, + numaAllocatable: state.NUMAResource{ + 0: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 1: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 2: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + 3: &state.Resource{ + CPU: 5, + Memory: 10000, + }, + }, + }, + want: allocation.PodAllocations{ + "pod1": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 1000, + Memory: 1000, + }, + BindingNUMA: 0, + }, + "pod2": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod2", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 2000, + Memory: 1000, + }, + BindingNUMA: 1, + }, + "pod3": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod3", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 3000, + Memory: 1000, + }, + BindingNUMA: 1, + }, + "pod4": &allocation.Allocation{ + NamespacedName: types.NamespacedName{ + Name: "pod4", + Namespace: "default", + }, + Request: allocation.Request{ + CPUMilli: 4000, + Memory: 1000, + }, + BindingNUMA: 0, + }, + }, + want1: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := greedyCalculator{} + got, got1, err := g.CalculateNUMABindingResult(tt.args.current, tt.args.numaAllocatable) + if (err != nil) != tt.wantErr { + t.Errorf("CalculateNUMABindingResult() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("CalculateNUMABindingResult() got = %v, want %v", got, tt.want) + } + if got1 != tt.want1 { + t.Errorf("CalculateNUMABindingResult() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go new file mode 100644 index 000000000..950ab5927 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go @@ -0,0 +1,97 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "strconv" + "time" + + "k8s.io/apimachinery/pkg/types" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" +) + +const ( + metricsNameTryNUMABindingPodFailed = "try_numa_binding_pod_failed" + metricNameTryNUMABindingNodeFailed = "try_numa_binding_node_failed" + metricNameTryNUMABindingNodeSuccess = "try_numa_binding_node_success" + + metricNameCalculateTimeCost = "calculate_time_cost" +) + +func CheckAllNUMABindingResult(emitter metrics.MetricEmitter, calculator string, success bool, result allocation.PodAllocations) bool { + unSuccessPods := make(map[string]types.NamespacedName) + for podUID, alloc := range result { + if alloc.BindingNUMA == -1 { + unSuccessPods[podUID] = alloc.NamespacedName + } + } + + if len(unSuccessPods) > 0 || !success { + general.Errorf("check calculator %s result failed, unSuccessPods: %v", calculator, unSuccessPods) + for _, nn := range unSuccessPods { + _ = emitter.StoreInt64(metricsNameTryNUMABindingPodFailed, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "calculator", Val: calculator}, + metrics.MetricTag{Key: "podName", Val: nn.Name}, + metrics.MetricTag{Key: "podNamespace", Val: nn.Namespace}, + metrics.MetricTag{Key: "success", Val: strconv.FormatBool(success)}) + } + _ = emitter.StoreInt64(metricNameTryNUMABindingNodeFailed, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "calculator", Val: calculator}, + metrics.MetricTag{Key: "success", Val: strconv.FormatBool(success)}) + return false + } + _ = emitter.StoreInt64(metricNameTryNUMABindingNodeSuccess, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "calculator", Val: calculator}) + return true +} + +type withExecutionTimeLogging struct { + emitter metrics.MetricEmitter + NUMABindingCalculator +} + +func (w *withExecutionTimeLogging) CalculateNUMABindingResult(current allocation.PodAllocations, + numaAllocatable state.NUMAResource, +) (allocation.PodAllocations, bool, error) { + begin := time.Now() + defer func() { + costs := time.Since(begin) + general.InfoS("finished calculate numa result", "calculator", w.Name(), "costs", costs) + _ = w.emitter.StoreInt64(metricNameCalculateTimeCost, costs.Microseconds(), metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "calculator", Val: w.Name()}) + }() + + result, success, err := w.NUMABindingCalculator.CalculateNUMABindingResult(current, numaAllocatable) + if err != nil { + return nil, false, err + } + + general.InfoS("calculate numa result", "calculator", w.Name(), "result", result, "success", success) + + return result, success, nil +} + +func WithExecutionTimeLogging(calculator NUMABindingCalculator, emitter metrics.MetricEmitter) NUMABindingCalculator { + return &withExecutionTimeLogging{ + emitter: emitter, + NUMABindingCalculator: calculator, + } +} diff --git a/pkg/agent/qrm-plugins/util/numabinding/manager.go b/pkg/agent/qrm-plugins/util/numabinding/manager.go new file mode 100644 index 000000000..c79ce2a22 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/manager.go @@ -0,0 +1,142 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package numabinding + +import ( + "context" + "fmt" + "strconv" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/numabinding/calculator" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +const ( + metricsNameNUMABindingCalculatePodCount = "numa_binding_calculate_pod_count" +) + +type Manager interface { + IsProcessing() bool + Run(stopCh <-chan struct{}) +} + +type managerImpl struct { + allocationUpdater allocation.Updater + numaBindingCalculator calculator.NUMABindingCalculator + + reservedCPUs machine.CPUSet + + conf *coreconfig.Configuration + emitter metrics.MetricEmitter + metaServer *metaserver.MetaServer +} + +func (m *managerImpl) IsProcessing() bool { + // TODO implement me + return false +} + +func (m *managerImpl) Run(stopCh <-chan struct{}) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go wait.JitterUntilWithContext(ctx, m.sync, 30*time.Second, 0.3, true) + <-stopCh +} + +func NewSharedNUMABindingManager( + conf *coreconfig.Configuration, + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + updater allocation.Updater, +) (Manager, error) { + reservedCPUs, reserveErr := cpuutil.GetCoresReservedForSystem(conf, metaServer, metaServer.KatalystMachineInfo, metaServer.CPUDetails.CPUs().Clone()) + if reserveErr != nil { + return nil, fmt.Errorf("GetCoresReservedForSystem for reservedCPUsNum: %d failed with error: %v", + conf.ReservedCPUCores, reserveErr) + } + + numaBindingCalculator := calculator.GetOrInitNUMABindingCalculator(conf, emitter, metaServer, reservedCPUs) + return &managerImpl{ + conf: conf, + emitter: emitter, + metaServer: metaServer, + reservedCPUs: reservedCPUs, + allocationUpdater: updater, + numaBindingCalculator: calculator.WithExecutionTimeLogging(numaBindingCalculator, emitter), + }, nil +} + +func (m *managerImpl) sync(ctx context.Context) { + cpuState, memoryState, err := state.GetCPUMemoryReadonlyState() + if err != nil { + general.Errorf("get cpu/memory state failed: %v", err) + return + } + + numaAllocation, err := allocation.GetPodAllocations(ctx, m.metaServer, memoryState) + if err != nil { + general.Errorf("get numa allocation failed: %v", err) + return + } + + numaAllocatable, err := state.GetSharedNUMAAllocatable( + m.metaServer.CPUDetails.NUMANodes().ToSliceNoSortInt(), + m.reservedCPUs, + cpuState, memoryState) + if err != nil { + general.Errorf("get numa allocation failed: %v", err) + return + } + + general.InfoS("state summary", + "numaAllocation", numaAllocation, + "numaAllocatable", numaAllocatable, + "podCount", len(numaAllocation), + "numaCount", len(numaAllocatable)) + + _ = m.emitter.StoreInt64(metricsNameNUMABindingCalculatePodCount, int64(len(numaAllocation)), + metrics.MetricTypeNameRaw, metrics.MetricTag{ + Key: "numaCount", + Val: strconv.Itoa(len(numaAllocatable)), + }) + + result, success, err := m.numaBindingCalculator.CalculateNUMABindingResult(numaAllocation, numaAllocatable) + if err != nil { + general.Errorf("calculate result failed: %v", err) + return + } + + if !calculator.CheckAllNUMABindingResult(m.emitter, m.numaBindingCalculator.Name(), success, result) { + return + } + + err = m.allocationUpdater.UpdateAllocation(result) + if err != nil { + general.Errorf("update numa allocation failed: %v", err) + return + } +} diff --git a/pkg/agent/qrm-plugins/util/state/manager.go b/pkg/agent/qrm-plugins/util/state/manager.go new file mode 100644 index 000000000..55ffe2364 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/state/manager.go @@ -0,0 +1,118 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + + cpustate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" + memorystate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +type Resource struct { + CPU int64 + Memory int64 +} + +type NUMAResource map[int]*Resource + +func (c NUMAResource) Clone() NUMAResource { + n := make(NUMAResource, len(c)) + for k, v := range c { + n[k] = v.Clone() + } + return n +} + +func (c *Resource) Clone() *Resource { + return &Resource{ + CPU: c.CPU, + Memory: c.Memory, + } +} + +func (c *Resource) String() string { + if c == nil { + return "" + } + return fmt.Sprintf("%d/%d", c.CPU, c.Memory) +} + +func (c *Resource) AddResource(request *Resource) { + if request == nil || c == nil { + return + } + c.CPU += request.CPU + c.Memory += request.Memory +} + +func (c *Resource) SubAllocation(request *allocation.Allocation) { + if request == nil || c == nil { + return + } + c.CPU -= request.CPUMilli / 1000 + c.Memory -= request.Memory +} + +func (c *Resource) IsSatisfied(request *allocation.Allocation) bool { + if request == nil || c == nil { + return false + } + return c.CPU >= request.CPUMilli/1000 && c.Memory >= request.Memory +} + +func GetCPUMemoryReadonlyState() (cpustate.ReadonlyState, memorystate.ReadonlyState, error) { + cpuState, err := cpustate.GetReadonlyState() + if err != nil { + return nil, nil, err + } + memoryState, err := memorystate.GetReadonlyState() + if err != nil { + return nil, nil, err + } + return cpuState, memoryState, nil +} + +func GetSharedNUMAAllocatable( + numaNodes []int, + reservedCPUs machine.CPUSet, + cpuState cpustate.ReadonlyState, + memoryState memorystate.ReadonlyState, +) (NUMAResource, error) { + cpuNUMAState := cpuState.GetMachineState() + memoryNUMAState := memoryState.GetMachineState()[v1.ResourceMemory] + + numaAllocatable := make(NUMAResource) + for _, numaID := range numaNodes { + sharedCPUNUMAAllocatable := int64(cpuNUMAState[numaID].GetAvailableCPUSet(reservedCPUs).Size()) + // TODO: currently we don't consider dedicated_cores with numa_binding and without numa exclusive + // pod's request memory + SharedMemoryNUMAAllocatable := int64(0) + if !memoryNUMAState[numaID].HasDedicatedNUMABindingAndNUMAExclusivePods() { + SharedMemoryNUMAAllocatable = int64(memoryNUMAState[numaID].Allocatable) + } + numaAllocatable[numaID] = &Resource{ + CPU: sharedCPUNUMAAllocatable, + Memory: SharedMemoryNUMAAllocatable, + } + } + return numaAllocatable, nil +} From 477b0791cc5d7d52f216017abdd363d7124f7b10 Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Thu, 21 Nov 2024 16:24:00 -0800 Subject: [PATCH 3/5] chore(qrm): refactor back tracking calculator to only consider per-pod one numa and stop iterating only when the one satisfied solution is found --- .../calculator/backtracking_calculator.go | 164 ++++++------------ .../backtracking_calculator_test.go | 22 +-- 2 files changed, 59 insertions(+), 127 deletions(-) diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go index 1c2714f63..de5dacbc7 100644 --- a/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" @@ -43,11 +42,11 @@ const ( ) type backTrackingCalculator struct { - emitter metrics.MetricEmitter - metaServer *metaserver.MetaServer - reservedCPUs machine.CPUSet - maxNUMANum int - numaNodes []int + emitter metrics.MetricEmitter + metaServer *metaserver.MetaServer + numaNodes []int + reservedCPUs machine.CPUSet + maxIterateTime time.Duration simpleCalculator NUMABindingCalculator @@ -62,12 +61,11 @@ func NewBackTrackingCalculator( reservedCPUs machine.CPUSet, ) NUMABindingCalculator { return &backTrackingCalculator{ - emitter: emitter, - metaServer: metaServer, - // todo: use numa num from conf - maxNUMANum: 1, + emitter: emitter, + metaServer: metaServer, numaNodes: metaServer.CPUDetails.NUMANodes().ToSliceInt(), reservedCPUs: reservedCPUs, + maxIterateTime: 30 * time.Second, simpleCalculator: NewGreedyCalculator(), } } @@ -175,12 +173,11 @@ func (b *backTrackingCalculator) sync(ctx context.Context) { func (b *backTrackingCalculator) getAllNUMABindingResults(podAllocation allocation.PodAllocations, numaAllocatable state.NUMAResource, ) ([][]numaBindingResult, map[int]string, error) { - numaBindingNUMAs := b.getNUMABindingNUMAs(podAllocation) resultsIndex := make(map[int]string) allNUMABindingResults := make([][]numaBindingResult, 0, len(podAllocation)) for podUID, alloc := range podAllocation { resultsIndex[len(allNUMABindingResults)] = podUID - results, err := getNUMABindingResults(alloc, b.numaNodes, numaAllocatable, b.maxNUMANum, numaBindingNUMAs) + results, err := getNUMABindingResults(alloc, b.numaNodes, numaAllocatable) if err != nil { general.Errorf("get numa allocation for %s failed: %v", alloc.String(), err) return nil, nil, err @@ -194,83 +191,48 @@ func (b *backTrackingCalculator) mergeNUMABindingResults(results [][]numaBinding podAllocation allocation.PodAllocations, numaAllocatable state.NUMAResource, ) []numaBindingResult { var optimalResults []numaBindingResult - optimalNUMACount := 0 - minNonNUMABindingCount := 0 - withAllPodNUMABindingResult := false - iterateAllNUMABindingResults(results, func(results []numaBindingResult) { - current := numaAllocatable.Clone() - if b.maxNUMANum > 1 { - maxNUMACount := 1 - maxNUMANodeAffinity, _ := bitmask.NewBitMask(1) - for _, result := range results { - if result.numaNodeAffinity.Count() > maxNUMACount { - maxNUMACount = result.numaNodeAffinity.Count() - maxNUMANodeAffinity = result.numaNodeAffinity - } - } - - if maxNUMACount > 1 { - if withAllPodNUMABindingResult { - return - } - - totalAllocatable := state.Resource{} - for i := 0; i < maxNUMACount; i++ { - totalAllocatable.AddResource(numaAllocatable[i]) - } - - nonNUMABindingCount := 0 - for i, result := range results { - if bitmask.And(result.numaNodeAffinity, maxNUMANodeAffinity).Count() != 0 { - if !totalAllocatable.IsSatisfied(podAllocation[index[i]]) { - return - } - totalAllocatable.SubAllocation(podAllocation[index[i]]) - nonNUMABindingCount += 1 - } else { - numaID := result.numaNodeAffinity.GetBits()[0] - if !current[numaID].IsSatisfied(podAllocation[index[i]]) { - return - } - current[numaID].SubAllocation(podAllocation[index[i]]) - } - } - optimalNUMACount = len(b.numaNodes) - maxNUMACount - if nonNUMABindingCount < minNonNUMABindingCount { - minNonNUMABindingCount = nonNUMABindingCount - optimalResults = deepCopyNUMABindingResults(results) - } - return + optimalNotSatisfiedPodCount := len(results) + iterateCount := int64(0) + begin := time.Now() + iterateAllNUMABindingResults(results, func(results []numaBindingResult) bool { + iterateCount++ + if iterateCount > 10000 { + iterateCount = 0 + d := time.Since(begin) + if d > b.maxIterateTime { + general.Infof("iterate all numa binding results %d times, costs %v", iterateCount, d) + return false } } - numaMask := bitmask.NewEmptyBitMask() + var currentResults []numaBindingResult + current := numaAllocatable.Clone() + notSatisfiedPodCount := 0 for i, result := range results { numaID := result.numaNodeAffinity.GetBits()[0] if !current[numaID].IsSatisfied(podAllocation[index[i]]) { - return + notSatisfiedPodCount += 1 + if notSatisfiedPodCount > optimalNotSatisfiedPodCount { + return true + } + continue } - numaMask.Or(result.numaNodeAffinity) current[numaID].SubAllocation(podAllocation[index[i]]) + currentResults = append(currentResults, result) } - numaCount := numaMask.Count() - if !withAllPodNUMABindingResult || numaCount < optimalNUMACount { - optimalNUMACount = numaCount - optimalResults = deepCopyNUMABindingResults(results) - withAllPodNUMABindingResult = true + + if notSatisfiedPodCount < optimalNotSatisfiedPodCount { + optimalResults = deepCopyNUMABindingResults(currentResults) + optimalNotSatisfiedPodCount = notSatisfiedPodCount } - }) - return optimalResults -} -func (b *backTrackingCalculator) getNUMABindingNUMAs(podAllocation allocation.PodAllocations) sets.Int { - numaSet := sets.NewInt() - for _, alloc := range podAllocation { - if alloc.BindingNUMA != -1 { - numaSet.Insert(alloc.BindingNUMA) + if notSatisfiedPodCount == 0 { + return false } - } - return numaSet + return true + }) + + return optimalResults } // numaBindingResult is a struct containing the numaNodeAffinity for a pod @@ -287,7 +249,7 @@ func deepCopyNUMABindingResults(results []numaBindingResult) []numaBindingResult } func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int, - numaAllocatable state.NUMAResource, maxNUMANum int, numaBindingNUMAs sets.Int, + numaAllocatable state.NUMAResource, ) ([]numaBindingResult, error) { numaBindingResults := make([]numaBindingResult, 0, len(numaNodes)) for _, n := range numaNodes { @@ -305,36 +267,6 @@ func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int, }) } - if allocation.BindingNUMA != -1 { - return numaBindingResults, nil - } - - maxNUMANum = general.Min(maxNUMANum, len(numaNodes)) - if maxNUMANum > 1 { - for i := 2; i < maxNUMANum; i++ { - m := bitmask.NewEmptyBitMask() - totalResource := state.Resource{} - for j := 0; j < i; j++ { - err := m.Add(j) - if err != nil { - return nil, err - } - totalResource.AddResource(numaAllocatable[j]) - } - - if numaBindingNUMAs.Intersection(sets.NewInt(m.GetBits()...)).Len() > 0 { - continue - } - - if !totalResource.IsSatisfied(allocation) { - continue - } - - numaBindingResults = append(numaBindingResults, numaBindingResult{ - numaNodeAffinity: m, - }) - } - } return numaBindingResults, nil } @@ -358,21 +290,25 @@ func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int, // allNUMABindingResults[-1][z] // } // callback(permutation) -func iterateAllNUMABindingResults(allNUMABindingResults [][]numaBindingResult, callback func([]numaBindingResult)) { +func iterateAllNUMABindingResults(allNUMABindingResults [][]numaBindingResult, callback func([]numaBindingResult) bool) { // Internal helper function to accumulate the permutation before calling the callback. - var iterate func(i int, accum []numaBindingResult) - iterate = func(i int, accum []numaBindingResult) { + var iterate func(i int, accum []numaBindingResult) bool + iterate = func(i int, accum []numaBindingResult) bool { // Base case: we have looped through all providers and have a full permutation. if i == len(allNUMABindingResults) { - callback(accum) - return + return callback(accum) } // Loop through all hints for provider 'i', and recurse to build the // the permutation of this hint with all hints from providers 'i++'. for j := range allNUMABindingResults[i] { - iterate(i+1, append(accum, allNUMABindingResults[i][j])) + con := iterate(i+1, append(accum, allNUMABindingResults[i][j])) + if !con { + return false + } } + return true } + iterate(0, []numaBindingResult{}) } diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go index f025a3e26..a51fb709e 100644 --- a/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/backtracking_calculator_test.go @@ -17,11 +17,11 @@ limitations under the License. package calculator import ( + "github.com/kubewharf/katalyst-core/pkg/metrics" "reflect" "testing" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" @@ -36,11 +36,9 @@ func newBitMask(bits ...int) bitmask.BitMask { func Test_getNUMABindingResults(t *testing.T) { t.Parallel() type args struct { - allocation *allocation.Allocation - numaNodes []int - numaAllocatable state.NUMAResource - maxNUMANum int - numaBindingNUMAs sets.Int + allocation *allocation.Allocation + numaNodes []int + numaAllocatable state.NUMAResource } tests := []struct { name string @@ -80,11 +78,9 @@ func Test_getNUMABindingResults(t *testing.T) { Memory: 10000, }, }, - maxNUMANum: 1, numaNodes: []int{ 0, 1, 2, 3, }, - numaBindingNUMAs: sets.NewInt(), }, want: []numaBindingResult{ { @@ -106,7 +102,7 @@ func Test_getNUMABindingResults(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - got, err := getNUMABindingResults(tt.args.allocation, tt.args.numaNodes, tt.args.numaAllocatable, tt.args.maxNUMANum, tt.args.numaBindingNUMAs) + got, err := getNUMABindingResults(tt.args.allocation, tt.args.numaNodes, tt.args.numaAllocatable) if (err != nil) != tt.wantErr { t.Errorf("getNUMABindingResults() error = %v, wantErr %v", err, tt.wantErr) return @@ -229,7 +225,7 @@ func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) { CPUMilli: 2000, Memory: 1000, }, - BindingNUMA: 1, + BindingNUMA: 0, }, "pod3": &allocation.Allocation{ NamespacedName: types.NamespacedName{ @@ -251,7 +247,7 @@ func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) { CPUMilli: 4000, Memory: 1000, }, - BindingNUMA: 0, + BindingNUMA: 2, }, }, want1: true, @@ -262,8 +258,8 @@ func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() b := &backTrackingCalculator{ - maxNUMANum: tt.fields.maxNUMANum, - numaNodes: tt.fields.numaNodes, + emitter: metrics.DummyMetrics{}, + numaNodes: tt.fields.numaNodes, } got, got1, err := b.asyncCalculateNUMABindingResult(tt.args.current, tt.args.numaAllocatable) if (err != nil) != tt.wantErr { From 191f2c9a9789633a7fdb8192f030cb8abd08ea50 Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Thu, 21 Nov 2024 16:30:53 -0800 Subject: [PATCH 4/5] chore(qrm): add random calculator --- .../util/numabinding/calculator/calculator.go | 1 + .../calculator/random_calculator.go | 102 ++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 pkg/agent/qrm-plugins/util/numabinding/calculator/random_calculator.go diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go index 17727dd70..2d9a28d2a 100644 --- a/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/calculator.go @@ -50,6 +50,7 @@ func GetOrInitNUMABindingCalculator( calculators := make([]NUMABindingCalculator, 0) calculators = append(calculators, NewGreedyCalculator()) calculators = append(calculators, NewBackTrackingCalculator(conf, emitter, metaServer, reservedCPUs)) + calculators = append(calculators, NewRandomCalculator(10000)) numaBindingCalculator = NewDryRunCalculator(emitter, calculators...) numaBindingCalculator.Run(context.Background()) }) diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/random_calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/random_calculator.go new file mode 100644 index 000000000..393c1a0e2 --- /dev/null +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/random_calculator.go @@ -0,0 +1,102 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calculator + +import ( + "context" + "math" + "math/rand" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" +) + +const ( + calculatorNameRandom = "random" +) + +type randomCalculator struct { + numOfRandomTime int +} + +func NewRandomCalculator(numOfRandomTime int) NUMABindingCalculator { + return &randomCalculator{ + numOfRandomTime: numOfRandomTime, + } +} + +func (g *randomCalculator) Name() string { + return calculatorNameRandom +} + +func (g *randomCalculator) Run(_ context.Context) {} + +func (g *randomCalculator) CalculateNUMABindingResult(current allocation.PodAllocations, + numaAllocatable state.NUMAResource, +) (allocation.PodAllocations, bool, error) { + sortedPods := make([]string, 0, len(current)) + numaAvailable := numaAllocatable.Clone() + for uid, pod := range current { + numaBinding := pod.BindingNUMA + if numaBinding == -1 { + sortedPods = append(sortedPods, uid) + continue + } + + numaAvailable[numaBinding].SubAllocation(pod) + } + + var ( + optimalResult allocation.PodAllocations + optimalFailedCount = math.MaxInt + ) + + for i := 0; i < g.numOfRandomTime; i++ { + result := current.Clone() + numaAvailable := numaAllocatable.Clone() + rand.Shuffle(len(sortedPods), func(i, j int) { + sortedPods[i], sortedPods[j] = sortedPods[j], sortedPods[i] + }) + + failedCount := 0 + for _, uid := range sortedPods { + podSuccess := false + for numaID := range numaAvailable { + if numaAvailable[numaID].IsSatisfied(current[uid]) { + result[uid].BindingNUMA = numaID + numaAvailable[numaID].SubAllocation(current[uid]) + podSuccess = true + break + } + } + if !podSuccess { + failedCount += 1 + } + } + + if failedCount < optimalFailedCount { + optimalFailedCount = failedCount + optimalResult = result.Clone() + } + + if optimalFailedCount == 0 { + break + } + } + + return optimalResult, optimalFailedCount == 0, nil +} From 7cd8c6826985c398f57258b9e7a318cf9c0feeca Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Thu, 21 Nov 2024 17:28:23 -0800 Subject: [PATCH 5/5] chore(qrm): include checkAllNUMABindingResult to WithCheckAndExecutionTimeLogging --- .../util/numabinding/calculator/dryrun_calculator.go | 10 +++------- .../qrm-plugins/util/numabinding/calculator/util.go | 12 ++++++------ pkg/agent/qrm-plugins/util/numabinding/manager.go | 4 ++-- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go index 860805d88..6aa3aa10b 100644 --- a/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/dryrun_calculator.go @@ -19,11 +19,9 @@ package calculator import ( "context" - "github.com/kubewharf/katalyst-core/pkg/metrics" - "github.com/kubewharf/katalyst-core/pkg/util/general" - "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state" + "github.com/kubewharf/katalyst-core/pkg/metrics" ) const ( @@ -38,7 +36,7 @@ type dryRunCalculator struct { func NewDryRunCalculator(emitter metrics.MetricEmitter, calculators ...NUMABindingCalculator) NUMABindingCalculator { withLoggingCalculators := make([]NUMABindingCalculator, 0, len(calculators)) for _, calculator := range calculators { - withLoggingCalculators = append(withLoggingCalculators, WithExecutionTimeLogging(calculator, emitter)) + withLoggingCalculators = append(withLoggingCalculators, WithCheckAndExecutionTimeLogging(calculator, emitter)) } return &dryRunCalculator{ calculators: withLoggingCalculators, @@ -54,9 +52,7 @@ func (d *dryRunCalculator) Run(ctx context.Context) { func (d *dryRunCalculator) CalculateNUMABindingResult(current allocation.PodAllocations, numaAllocatable state.NUMAResource) (allocation.PodAllocations, bool, error) { for _, calc := range d.calculators { - result, success, err := calc.CalculateNUMABindingResult(current, numaAllocatable) - general.Infof("dry run calculator %s result: %v, success: %v, err: %v", calc.Name(), result, success, err) - CheckAllNUMABindingResult(d.emitter, calc.Name(), success, result) + _, _, _ = calc.CalculateNUMABindingResult(current, numaAllocatable) } return current, true, nil } diff --git a/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go b/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go index 950ab5927..e56a6f39a 100644 --- a/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go +++ b/pkg/agent/qrm-plugins/util/numabinding/calculator/util.go @@ -36,7 +36,7 @@ const ( metricNameCalculateTimeCost = "calculate_time_cost" ) -func CheckAllNUMABindingResult(emitter metrics.MetricEmitter, calculator string, success bool, result allocation.PodAllocations) bool { +func checkAllNUMABindingResult(emitter metrics.MetricEmitter, calculator string, success bool, result allocation.PodAllocations) bool { unSuccessPods := make(map[string]types.NamespacedName) for podUID, alloc := range result { if alloc.BindingNUMA == -1 { @@ -63,12 +63,12 @@ func CheckAllNUMABindingResult(emitter metrics.MetricEmitter, calculator string, return true } -type withExecutionTimeLogging struct { +type withCheckAndExecutionTimeLogging struct { emitter metrics.MetricEmitter NUMABindingCalculator } -func (w *withExecutionTimeLogging) CalculateNUMABindingResult(current allocation.PodAllocations, +func (w *withCheckAndExecutionTimeLogging) CalculateNUMABindingResult(current allocation.PodAllocations, numaAllocatable state.NUMAResource, ) (allocation.PodAllocations, bool, error) { begin := time.Now() @@ -86,11 +86,11 @@ func (w *withExecutionTimeLogging) CalculateNUMABindingResult(current allocation general.InfoS("calculate numa result", "calculator", w.Name(), "result", result, "success", success) - return result, success, nil + return result, checkAllNUMABindingResult(w.emitter, w.Name(), success, result), nil } -func WithExecutionTimeLogging(calculator NUMABindingCalculator, emitter metrics.MetricEmitter) NUMABindingCalculator { - return &withExecutionTimeLogging{ +func WithCheckAndExecutionTimeLogging(calculator NUMABindingCalculator, emitter metrics.MetricEmitter) NUMABindingCalculator { + return &withCheckAndExecutionTimeLogging{ emitter: emitter, NUMABindingCalculator: calculator, } diff --git a/pkg/agent/qrm-plugins/util/numabinding/manager.go b/pkg/agent/qrm-plugins/util/numabinding/manager.go index c79ce2a22..cfe59aeda 100644 --- a/pkg/agent/qrm-plugins/util/numabinding/manager.go +++ b/pkg/agent/qrm-plugins/util/numabinding/manager.go @@ -86,7 +86,7 @@ func NewSharedNUMABindingManager( metaServer: metaServer, reservedCPUs: reservedCPUs, allocationUpdater: updater, - numaBindingCalculator: calculator.WithExecutionTimeLogging(numaBindingCalculator, emitter), + numaBindingCalculator: calculator.WithCheckAndExecutionTimeLogging(numaBindingCalculator, emitter), }, nil } @@ -130,7 +130,7 @@ func (m *managerImpl) sync(ctx context.Context) { return } - if !calculator.CheckAllNUMABindingResult(m.emitter, m.numaBindingCalculator.Name(), success, result) { + if !success { return }