Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add, remove finalizer for CRB in Scheduler and scheduler watcher for CRB #924

Merged
merged 22 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apis/placement/v1beta1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// SchedulerCRBCleanupFinalizer is a finalizer added to ClusterResourceBindings to ensure we can look up the
// corresponding CRP name for deleting ClusterResourceBindings to trigger a new scheduling cycle.
SchedulerCRBCleanupFinalizer = fleetPrefix + "scheduler-crb-cleanup"
)

// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,categories={fleet,fleet-placement},shortName=rb
// +kubebuilder:subresource:status
Expand Down
10 changes: 10 additions & 0 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.goms.io/fleet/pkg/scheduler/framework"
"go.goms.io/fleet/pkg/scheduler/profile"
"go.goms.io/fleet/pkg/scheduler/queue"
schedulercrbwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourcebinding"
schedulercrpwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourceplacement"
schedulercspswatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterschedulingpolicysnapshot"
"go.goms.io/fleet/pkg/scheduler/watchers/membercluster"
Expand Down Expand Up @@ -278,6 +279,15 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

klog.Info("Setting up the clusterResourceBinding watcher for scheduler")
if err := (&schedulercrbwatcher.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterResourceBinding watcher for scheduler")
return err
}

klog.Info("Setting up the memberCluster watcher for scheduler")
if err := (&membercluster.Reconciler{
Client: mgr.GetClient(),
Expand Down
73 changes: 50 additions & 23 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
Expand Down Expand Up @@ -287,15 +288,21 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
// * dangling bindings, i.e., bindings that are associated with a cluster that is no longer
// in a normally operating state (the cluster has left the fleet, or is in the state of leaving),
// yet has not been marked as unscheduled by the scheduler; and
//
// * deleting bindings, i.e., bindings that have a deletionTimeStamp on them.
// Any deleted binding is also ignored.
// Note that bindings marked as unscheduled are ignored by the scheduler, as they
// are irrelevant to the scheduling cycle. However, we will reconcile them with the latest scheduling
// result so that we won't have a ever increasing chain of flip flop bindings.
bound, scheduled, obsolete, unscheduled, dangling := classifyBindings(policy, bindings, clusters)
bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters)

// Remove scheduler CRB cleanup finalizer on all deleting bindings.
if err := f.updateBindings(ctx, deleting, removeFinalizerAndUpdate); err != nil {
klog.ErrorS(err, "Failed to remove finalizers from deleting bindings", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Mark all dangling bindings as unscheduled.
if err := f.markAsUnscheduledFor(ctx, dangling); err != nil {
if err := f.updateBindings(ctx, dangling, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark dangling bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -349,31 +356,51 @@ func (f *framework) collectBindings(ctx context.Context, crpName string) ([]plac
return bindingList.Items, nil
}

// markAsUnscheduledFor marks a list of bindings as unscheduled.
func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding) error {
// markAsUnscheduledForAndUpdate marks a binding as unscheduled and updates it.
var markUnscheduledForAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
// Remember the previous unscheduledBinding state so that we might be able to revert this change if this
// cluster is being selected again before the resources are removed from it. Need to do a get and set if
// we add more annotations to the binding.
binding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(binding.Spec.State)})
// Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a
// unscheduledBinding from "scheduled" to "bound".
binding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
if err == nil {
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved
klog.V(2).InfoS("Marked binding as unscheduled", "clusterResourceBinding", klog.KObj(binding))
}
return err
}

// removeFinalizerAndUpdate removes scheduler CRB cleanup finalizer from ClusterResourceBinding and updates it.
var removeFinalizerAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
controllerutil.RemoveFinalizer(binding, placementv1beta1.SchedulerCRBCleanupFinalizer)
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
if err == nil {
klog.V(2).InfoS("Removed scheduler CRB cleanup finalizer", "clusterResourceBinding", klog.KObj(binding))
}
return err
}

// updateBindings iterates over bindings and updates them using the update function provided.
func (f *framework) updateBindings(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding, updateFn func(ctx context.Context, client client.Client, binding *placementv1beta1.ClusterResourceBinding) error) error {
// issue all the update requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, binding := range bindings {
unscheduledBinding := binding
updateBinding := binding
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsConflict(err)
},
func() error {
// Remember the previous unscheduledBinding state so that we might be able to revert this change if this
// cluster is being selected again before the resources are removed from it. Need to do a get and set if
// we add more annotations to the binding.
unscheduledBinding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(unscheduledBinding.Spec.State)})
// Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a
// unscheduledBinding from "scheduled" to "bound".
unscheduledBinding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := f.client.Update(cctx, unscheduledBinding, &client.UpdateOptions{})
klog.V(2).InfoS("Marking binding as unscheduled", "clusterResourceBinding", klog.KObj(unscheduledBinding), "error", err)
// We will just retry for conflict errors since the scheduler holds the truth here.
err := updateFn(cctx, f.client, updateBinding)
// We will retry on conflicts.
if apierrors.IsConflict(err) {
// get the binding again to make sure we have the latest version to update again.
return f.client.Get(cctx, client.ObjectKeyFromObject(unscheduledBinding), unscheduledBinding)
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved
if getErr := f.client.Get(cctx, client.ObjectKeyFromObject(updateBinding), updateBinding); getErr != nil {
return getErr
}
}
return err
})
Expand Down Expand Up @@ -656,7 +683,7 @@ func (f *framework) manipulateBindings(
//
// This is set to happen after new bindings are created and old bindings are updated, to
// avoid interruptions (deselected then reselected) in a best effort manner.
if err := f.markAsUnscheduledFor(ctx, toDelete); err != nil {
if err := f.updateBindings(ctx, toDelete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark bindings as unschedulable", "clusterSchedulingPolicySnapshot", policyRef)
return err
}
Expand Down Expand Up @@ -809,7 +836,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
klog.V(2).InfoS("Downscaling is needed", "clusterSchedulingPolicySnapshot", policyRef, "downscaleCount", downscaleCount)

// Mark all obsolete bindings as unscheduled first.
if err := f.markAsUnscheduledFor(ctx, obsolete); err != nil {
if err := f.updateBindings(ctx, obsolete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark obsolete bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -986,10 +1013,10 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement
bindingsToDelete = append(bindingsToDelete, sortedScheduled[i])
}

return sortedScheduled[count:], bound, f.markAsUnscheduledFor(ctx, bindingsToDelete)
return sortedScheduled[count:], bound, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
case count == len(scheduled):
// Trim all scheduled bindings.
return nil, bound, f.markAsUnscheduledFor(ctx, scheduled)
return nil, bound, f.updateBindings(ctx, scheduled, markUnscheduledForAndUpdate)
case count < len(scheduled)+len(bound):
// Trim all scheduled bindings and part of bound bindings.
bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count)
Expand All @@ -1010,13 +1037,13 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement
bindingsToDelete = append(bindingsToDelete, sortedBound[i])
}

return nil, sortedBound[left:], f.markAsUnscheduledFor(ctx, bindingsToDelete)
return nil, sortedBound[left:], f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
case count == len(scheduled)+len(bound):
// Trim all scheduled and bound bindings.
bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count)
bindingsToDelete = append(bindingsToDelete, scheduled...)
bindingsToDelete = append(bindingsToDelete, bound...)
return nil, nil, f.markAsUnscheduledFor(ctx, bindingsToDelete)
return nil, nil, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
default:
// Normally this branch will never run, as an earlier check has guaranteed that
// count <= len(scheduled) + len(bound).
Expand Down
Loading
Loading