Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support shared cores to numa binding inplace #718

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator"
cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/numabinding"
"github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler"
"github.com/kubewharf/katalyst-core/pkg/config"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
Expand Down Expand Up @@ -71,23 +72,6 @@ const (
healthCheckTolerationTimes = 3
)

var (
readonlyStateLock sync.RWMutex
readonlyState state.ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (state.ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't set")
}
return readonlyState, nil
}

// DynamicPolicy is the policy that's used by default;
// it will consider the dynamic running information to calculate
// and adjust resource requirements and configurations
Expand Down Expand Up @@ -133,6 +117,8 @@ type DynamicPolicy struct {
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64

sharedNUNMABindingManager numabinding.Manager
}

func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
Expand All @@ -150,9 +136,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}

readonlyStateLock.Lock()
readonlyState = stateImpl
readonlyStateLock.Unlock()
state.SetReadonlyState(stateImpl)

wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
Key: util.QRMPluginPolicyTagName,
Expand Down Expand Up @@ -236,6 +220,11 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy initReclaimPool failed with error: %v", err)
}

policyImplement.sharedNUNMABindingManager, err = numabinding.NewSharedNUMABindingManager(conf, wrappedEmitter, agentCtx.MetaServer, NewCPUAllocationUpdater(stateImpl))
if err != nil {
return false, agent.ComponentStub{}, err
}

err = agentCtx.MetaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR)
if err != nil {
return false, nil, err
Expand Down Expand Up @@ -347,6 +336,7 @@ func (p *DynamicPolicy) Start() (err error) {
return
}
go p.advisorMonitor.Run(p.stopCh)
go p.sharedNUNMABindingManager.Run(p.stopCh)

go wait.BackoffUntil(func() { p.serveForAdvisor(p.stopCh) }, wait.NewExponentialBackoffManager(
800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)
Expand Down Expand Up @@ -695,6 +685,11 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
if p.hintHandlers[qosLevel] == nil {
return nil, fmt.Errorf("katalyst QoS level: %s is not supported yet", qosLevel)
}

if p.sharedNUNMABindingManager.IsProcessing() {
return nil, fmt.Errorf("numa binding manager is processing")
}

return p.hintHandlers[qosLevel](ctx, req)
}

Expand Down
9 changes: 0 additions & 9 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4614,15 +4614,6 @@ func entriesMatch(entries1, entries2 state.PodEntries) (bool, error) {
return true, nil
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
readonlyState, err := GetReadonlyState()
as.NotNil(err)
as.Nil(readonlyState)
}

func TestClearResidualState(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicpolicy

import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation"
)

type cpuAllocationUpdater struct {
state state.State
}

func NewCPUAllocationUpdater(state state.State) allocation.Updater {
return &cpuAllocationUpdater{state: state}
}

func (s *cpuAllocationUpdater) UpdateAllocation(m allocation.PodAllocations) error {
// todo
return nil
}
27 changes: 27 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"encoding/json"
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -562,3 +563,29 @@ type ReadonlyState interface {
}

type GenerateMachineStateFromPodEntriesFunc func(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error)

var (
readonlyStateLock sync.RWMutex
readonlyState ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't set")
}
return readonlyState, nil
}

// SetReadonlyState sets state.ReadonlyState to be used by the plugin
// this function should only be called once during initialization
func SetReadonlyState(state ReadonlyState) {
readonlyStateLock.Lock()
defer readonlyStateLock.Unlock()

readonlyState = state
}
9 changes: 9 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3233,3 +3233,12 @@ func TestGetAvailableCPUQuantity(t *testing.T) {
cpuQuantity := int(math.Ceil(nodeState.GetAvailableCPUQuantity(machine.NewCPUSet())))
require.Equal(t, 15, cpuQuantity)
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
state, err := GetReadonlyState()
as.NotNil(err)
as.Nil(state)
}
21 changes: 1 addition & 20 deletions pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,6 @@ const (
maxResidualTime = 5 * time.Minute
)

var (
readonlyStateLock sync.RWMutex
readonlyState state.ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (state.ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
}
return readonlyState, nil
}

// NativePolicy is a policy compatible with Kubernetes native semantics and is used in topology-aware scheduling scenarios.
type NativePolicy struct {
sync.RWMutex
Expand Down Expand Up @@ -115,9 +98,7 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}

readonlyStateLock.Lock()
readonlyState = stateImpl
readonlyStateLock.Unlock()
state.SetReadonlyState(stateImpl)

wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
Key: util.QRMPluginPolicyTagName,
Expand Down
9 changes: 0 additions & 9 deletions pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,6 @@ func TestAllocateForPod(t *testing.T) {
_ = os.RemoveAll(tmpDir)
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
readonlyState, err := GetReadonlyState()
as.NotNil(err)
as.Nil(readonlyState)
}

func TestClearResidualState(t *testing.T) {
t.Parallel()

Expand Down
21 changes: 1 addition & 20 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,6 @@ const (
movePagesWorkLimit = 2
)

var (
readonlyStateLock sync.RWMutex
readonlyState state.ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (state.ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
}
return readonlyState, nil
}

type DynamicPolicy struct {
sync.RWMutex

Expand Down Expand Up @@ -193,9 +176,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
general.Infof("empty ExtraControlKnobConfigFile, initialize empty extraControlKnobConfigs")
}

readonlyStateLock.Lock()
readonlyState = stateImpl
readonlyStateLock.Unlock()
state.SetReadonlyState(stateImpl)

wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
Key: util.QRMPluginPolicyTagName,
Expand Down
10 changes: 0 additions & 10 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,16 +1953,6 @@ func TestGetResourcesAllocation(t *testing.T) {
}, resp5.PodResources[req.PodUid].ContainerResources[testName].ResourceAllocation[string(v1.ResourceMemory)])
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
readonlyState, err := GetReadonlyState()
if readonlyState == nil {
as.NotNil(err)
}
}

func TestGenerateResourcesMachineStateFromPodEntries(t *testing.T) {
t.Parallel()

Expand Down
27 changes: 27 additions & 0 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"encoding/json"
"fmt"
"sync"

info "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -384,3 +385,29 @@ type State interface {
writer
ReadonlyState
}

var (
readonlyStateLock sync.RWMutex
readonlyState ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
}
return readonlyState, nil
}

// SetReadonlyState sets state.ReadonlyState to be used by the plugin
// this function should only be called once during initialization
func SetReadonlyState(state ReadonlyState) {
readonlyStateLock.Lock()
defer readonlyStateLock.Unlock()

readonlyState = state
}
33 changes: 33 additions & 0 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package state

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
state, err := GetReadonlyState()
if state == nil {
as.NotNil(err)
}
}
Loading
Loading