Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance usage plugin #3056

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (sc *SchedulerCache) GetMetricsData() {
}
sc.Mutex.Unlock()

supportedPeriods := []string{"5m"}
supportedPeriods := []string{source.Period}
for node := range nodeUsageMap {
for _, period := range supportedPeriods {
nodeMetrics, err := client.NodeMetricsAvg(ctx, node, period)
Expand Down
16 changes: 9 additions & 7 deletions pkg/scheduler/metrics/source/metrics_client_prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewPrometheusMetricsClient(address string, conf map[string]string) (*Promet
return &PrometheusMetricsClient{address: address, conf: conf}, nil
}

var Period string
func (p *PrometheusMetricsClient) NodeMetricsAvg(ctx context.Context, nodeName string, period string) (*NodeMetrics, error) {
klog.V(4).Infof("Get node metrics from Prometheus: %s", p.address)
var client api.Client
Expand All @@ -66,18 +67,19 @@ func (p *PrometheusMetricsClient) NodeMetricsAvg(ctx context.Context, nodeName s
}
v1api := prometheusv1.NewAPI(client)
nodeMetrics := &NodeMetrics{}
for _, metric := range []string{promCPUUsageAvg, promMemUsageAvg} {
queryStr := fmt.Sprintf("%s_%s{instance=\"%s\"}", metric, period, nodeName)
klog.V(4).Infof("Query prometheus by %s", queryStr)
res, warnings, err := v1api.Query(ctx, queryStr, time.Now())
cpuQueryStr := fmt.Sprintf("avg_over_time((100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\",instance=\"%s\"}[30s])) * 100))[%s:30s])", nodeName, period)
memQueryStr := fmt.Sprintf("100*avg_over_time(((1-node_memory_MemAvailable_bytes{instance=\"%s\"}/node_memory_MemTotal_bytes{instance=\"%s\"}))[%s:30s])", nodeName, nodeName, period)

for _, metric := range []string{cpuQueryStr, memQueryStr} {
res, warnings, err := v1api.Query(ctx, metric, time.Now())
if err != nil {
klog.Errorf("Error querying Prometheus: %v", err)
}
if len(warnings) > 0 {
klog.V(3).Infof("Warning querying Prometheus: %v", warnings)
}
if res == nil || res.String() == "" {
klog.Warningf("Warning querying Prometheus: no data found for %s", queryStr)
klog.Warningf("Warning querying Prometheus: no data found for %s", metric)
continue
}
// plugin.usage only need type pmodel.ValVector in Prometheus.rulues
Expand All @@ -89,10 +91,10 @@ func (p *PrometheusMetricsClient) NodeMetricsAvg(ctx context.Context, nodeName s
rowValues := strings.Split(strings.TrimSpace(firstRowValVector), "=>")
value := strings.Split(strings.TrimSpace(rowValues[1]), " ")
switch metric {
case promCPUUsageAvg:
case cpuQueryStr:
cpuUsage, _ := strconv.ParseFloat(value[0], 64)
nodeMetrics.CPU = cpuUsage
case promMemUsageAvg:
case memQueryStr:
memUsage, _ := strconv.ParseFloat(value[0], 64)
nodeMetrics.Memory = memUsage
}
Expand Down
116 changes: 58 additions & 58 deletions pkg/scheduler/plugins/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,17 @@ package usage

import (
"fmt"
"strconv"
"strings"

"k8s.io/klog/v2"
k8sFramework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
)

const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "usage"
cpuUsageAvgPrefix = "CPUUsageAvg."
memUsageAvgPrefix = "MEMUsageAvg."
thresholdSection = "thresholds"
cpuUsageAvg5m = "5m"
PluginName = "usage"
thresholdSection = "thresholds"
)

/*
Expand All @@ -43,35 +37,55 @@ const (
- plugins:
- name: usage
arguments:
thresholds:
CPUUsageAvg.5m: 80
MEMUsageAvg.5m: 90
usage.weight: 10
type: average
thresholds:
cpu: 70
mem: 70
period: 10m
*/

type thresholdConfig struct {
cpuUsageAvg map[string]float64
memUsageAvg map[string]float64
}
const AVG string = "average"
const COMMON string = "common"
const MAX string = "max"

type usagePlugin struct {
pluginArguments framework.Arguments
weight int
threshold thresholdConfig
usageType string
cpuThresholds float64
memThresholds float64
period string
}

// New function returns usagePlugin object
func New(args framework.Arguments) framework.Plugin {
usageWeight := 1
args.GetInt(&usageWeight, "usage.weight")
config := thresholdConfig{
cpuUsageAvg: make(map[string]float64),
memUsageAvg: make(map[string]float64),
}
return &usagePlugin{
var plugin *usagePlugin = &usagePlugin{
pluginArguments: args,
weight: usageWeight,
threshold: config,
weight: 1,
usageType: AVG,
cpuThresholds: 80,
memThresholds: 80,
period: "10m",
}
args.GetInt(&plugin.weight, "usage.weight")

if averageStr, ok := args["type"]; ok {
if average, success := averageStr.(string); success {
plugin.usageType = average
} else {
klog.Warningf("usage parameter[%v] is wrong", args)
}
}

if periodStr, ok := args["period"]; ok {
if period, success := periodStr.(string); success {
plugin.period = period
} else {
klog.Warningf("usage parameter[%v] is wrong", args)
}
}
return plugin
}

func (up *usagePlugin) Name() string {
Expand All @@ -87,10 +101,9 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) {
if klog.V(4).Enabled() {
for node := range ssn.Nodes {
usage := ssn.Nodes[node].ResourceUsage
klog.V(4).Infof("node:%v, cpu usage:%v, mem usage:%v", node, usage.CPUUsageAvg["5m"], usage.MEMUsageAvg["5m"])
klog.V(4).Infof("node:%v, cpu usage:%v, mem usage:%v", node, usage.CPUUsageAvg, usage.MEMUsageAvg)
}
}

argsValue, ok := up.pluginArguments[thresholdSection]
if ok {
args, ok := argsValue.(map[interface{}]interface{})
Expand All @@ -99,26 +112,13 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) {
}
for k, v := range args {
key, _ := k.(string)
var val float64
switch a := v.(type) {
case string:
val, _ = strconv.ParseFloat(a, 64)
case int:
val = float64(a)
case float64:
val = a
default:
klog.V(4).Infof("The threshold %v is an unknown type", a)
value, _ := v.(int)
switch key {
case "cpu":
up.cpuThresholds = float64(value)
case "mem":
up.memThresholds = float64(value)
}
if strings.Contains(key, cpuUsageAvgPrefix) {
periodKey := strings.Replace(key, cpuUsageAvgPrefix, "", 1)
up.threshold.cpuUsageAvg[periodKey] = val
}
if strings.Contains(key, memUsageAvgPrefix) {
periodKey := strings.Replace(key, memUsageAvgPrefix, "", 1)
up.threshold.memUsageAvg[periodKey] = val
}
klog.V(4).Infof("Threshold config key: %s, value: %f", key, val)
}
} else {
klog.V(4).Infof("Threshold arguments :%v", argsValue)
Expand All @@ -127,28 +127,25 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) {
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
usageStatus := &api.Status{}
for period, value := range up.threshold.cpuUsageAvg {
klog.V(4).Infof("predicateFn cpuUsageAvg:%v", up.threshold.cpuUsageAvg)
if node.ResourceUsage.CPUUsageAvg[period] > value {
msg := fmt.Sprintf("Node %s cpu usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.CPUUsageAvg[period], value)
if up.period != "" {
klog.V(4).Infof("predicateFn cpuUsageAvg:%v,predicateFn memUsageAvg:%v", up.cpuThresholds, up.memThresholds)
if node.ResourceUsage.CPUUsageAvg[up.period] > up.cpuThresholds {
msg := fmt.Sprintf("Node %s cpu usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.CPUUsageAvg[up.period], up.cpuThresholds)
usageStatus.Code = api.Unschedulable
usageStatus.Reason = msg
predicateStatus = append(predicateStatus, usageStatus)
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", up.Name(), msg)
}
}

for period, value := range up.threshold.memUsageAvg {
klog.V(4).Infof("predicateFn memUsageAvg:%v", up.threshold.memUsageAvg)
if node.ResourceUsage.MEMUsageAvg[period] > value {
msg := fmt.Sprintf("Node %s mem usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.MEMUsageAvg[period], value)
}
if node.ResourceUsage.MEMUsageAvg[up.period] > up.memThresholds {
msg := fmt.Sprintf("Node %s mem usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.MEMUsageAvg[up.period], up.memThresholds)
usageStatus.Code = api.Unschedulable
usageStatus.Reason = msg
predicateStatus = append(predicateStatus, usageStatus)
return predicateStatus, fmt.Errorf("plugin %s memory usage predicates failed %s", up.Name(), msg)

}
}

usageStatus.Code = api.Success
predicateStatus = append(predicateStatus, usageStatus)
klog.V(4).Infof("Usage plugin filter for task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name)
Expand All @@ -157,7 +154,10 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) {

nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
score := 0.0
cpuUsage, exist := node.ResourceUsage.CPUUsageAvg[cpuUsageAvg5m]
if up.period == "" {
return 0, nil
}
cpuUsage, exist := node.ResourceUsage.CPUUsageAvg[up.period]
klog.V(4).Infof("Node %s cpu usage is %f.", node.Name, cpuUsage)
if !exist {
return 0, nil
Expand Down