Skip to content

Commit

Permalink
chore: remove hardcoded timeouts for bind and filter stages
Browse files Browse the repository at this point in the history
  • Loading branch information
marwanad committed Jun 23, 2024
1 parent 6dfde78 commit ac8e619
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 16 deletions.
6 changes: 3 additions & 3 deletions charts/multicluster-scheduler/templates/cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ data:
plugins:
multiPoint:
enabled:
- name: proxy
- name: Proxy
filter:
enabled:
- name: proxy
- name: Proxy
candidate-scheduler-config: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
Expand All @@ -34,4 +34,4 @@ data:
plugins:
multiPoint:
enabled:
- name: candidate
- name: Candidate
1 change: 1 addition & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"os"

_ "admiralty.io/multicluster-scheduler/pkg/apis/config/scheme" // for plugin config
"admiralty.io/multicluster-scheduler/pkg/scheduler_plugins/candidate"
"admiralty.io/multicluster-scheduler/pkg/scheduler_plugins/proxy"
"k8s.io/component-base/cli"
Expand Down
27 changes: 19 additions & 8 deletions pkg/scheduler_plugins/candidate/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package candidate

import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
Expand All @@ -28,23 +29,26 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/controller-runtime/pkg/client/config"
crConfig "sigs.k8s.io/controller-runtime/pkg/client/config"

"admiralty.io/multicluster-scheduler/pkg/apis/config"
"admiralty.io/multicluster-scheduler/pkg/common"
"admiralty.io/multicluster-scheduler/pkg/generated/clientset/versioned"
)

type Plugin struct {
handle framework.Handle
client versioned.Interface

preBindWaitDuration time.Duration
}

var _ framework.PreFilterPlugin = &Plugin{}
var _ framework.ReservePlugin = &Plugin{}
var _ framework.PreBindPlugin = &Plugin{}

// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "candidate"
const Name = "Candidate"

// Name returns name of the plugin. It is used in logs, etc.
func (pl *Plugin) Name() string {
Expand Down Expand Up @@ -75,10 +79,8 @@ func (pl *Plugin) Reserve(ctx context.Context, state *framework.CycleState, p *v
func (pl *Plugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
}

const waitDuration = 30 * time.Second

func (pl *Plugin) PreBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
ctx, cancel := context.WithTimeout(ctx, waitDuration)
ctx, cancel := context.WithTimeout(ctx, pl.preBindWaitDuration)
defer cancel()

// wait until pod is allowed, if ever
Expand Down Expand Up @@ -111,9 +113,18 @@ func (pl *Plugin) isAllowed(ctx context.Context, p *v1.Pod) (bool, error) {
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
cfg := config.GetConfigOrDie()
func New(obj runtime.Object, h framework.Handle) (framework.Plugin, error) {
args, ok := obj.(*config.CandidateArgs)
if !ok {
return nil, fmt.Errorf("expected args to be of type ProxyArgs, got %T", obj)
}

cfg := crConfig.GetConfigOrDie()
client, err := versioned.NewForConfig(cfg)
utilruntime.Must(err)
return &Plugin{handle: h, client: client}, nil
return &Plugin{
handle: h,
client: client,
preBindWaitDuration: time.Duration(args.PreBindWaitDurationSeconds) * time.Second,
}, nil
}
17 changes: 12 additions & 5 deletions pkg/scheduler_plugins/proxy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"admiralty.io/multicluster-scheduler/pkg/apis/config"
"admiralty.io/multicluster-scheduler/pkg/apis/multicluster/v1alpha1"
"admiralty.io/multicluster-scheduler/pkg/common"
agentconfig "admiralty.io/multicluster-scheduler/pkg/config/agent"
Expand All @@ -45,6 +46,8 @@ type Plugin struct {
targets map[string]*versioned.Clientset
targetNamespaces map[string]string

filterWaitDuration time.Duration

failedNodeNamesByPodUID map[types.UID]map[string]bool
mx sync.RWMutex
}
Expand All @@ -56,7 +59,7 @@ var _ framework.PreBindPlugin = &Plugin{}
var _ framework.PostBindPlugin = &Plugin{}

// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "proxy"
const Name = "Proxy"

// Name returns name of the plugin. It is used in logs, etc.
func (pl *Plugin) Name() string {
Expand Down Expand Up @@ -95,8 +98,6 @@ func (pl *Plugin) allowCandidate(ctx context.Context, c *v1alpha1.PodChaperon, c
return err
}

const filterWaitDuration = 30 * time.Second // TODO configure

func (pl *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if nodeInfo.Node().Labels[common.LabelAndTaintKeyVirtualKubeletProvider] != common.VirtualKubeletProviderName {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "")
Expand All @@ -117,7 +118,7 @@ func (pl *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *

targetClusterName := virtualNodeNameToClusterName(nodeInfo.Node().Name)

ctx, cancel := context.WithTimeout(ctx, filterWaitDuration)
ctx, cancel := context.WithTimeout(ctx, pl.filterWaitDuration)
defer cancel()

var isReserved, isUnschedulable bool
Expand Down Expand Up @@ -287,7 +288,12 @@ func (pl *Plugin) PostBind(ctx context.Context, state *framework.CycleState, p *
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
func New(obj runtime.Object, h framework.Handle) (framework.Plugin, error) {
args, ok := obj.(*config.ProxyArgs)
if !ok {
return nil, fmt.Errorf("expected args to be of type ProxyArgs, got %T", obj)
}

agentCfg := agentconfig.NewFromCRD(context.Background())
n := len(agentCfg.Targets)
targets := make(map[string]*versioned.Clientset, n)
Expand All @@ -305,6 +311,7 @@ func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
clusterName: os.Getenv("CLUSTER_NAME"),
targets: targets,
targetNamespaces: targetNamespaces,
filterWaitDuration: time.Duration(args.FilterWaitDurationSeconds) * time.Second,
failedNodeNamesByPodUID: map[types.UID]map[string]bool{},
}, nil
}

0 comments on commit ac8e619

Please sign in to comment.