Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Arvindthiru committed Oct 29, 2024
1 parent 902717e commit e5124cf
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 61 deletions.
79 changes: 32 additions & 47 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,13 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters)

// Remove scheduler CRB cleanup finalizer on all deleting bindings.
if err := f.removeFinalizer(ctx, deleting); err != nil {
if err := f.updateBindings(ctx, deleting, removeFinalizerAndUpdate); err != nil {
klog.ErrorS(err, "Failed to remove finalizers from deleting bindings", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Mark all dangling bindings as unscheduled.
if err := f.markAsUnscheduledFor(ctx, dangling); err != nil {
if err := f.updateBindings(ctx, dangling, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark dangling bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -356,59 +356,44 @@ func (f *framework) collectBindings(ctx context.Context, crpName string) ([]plac
return bindingList.Items, nil
}

// markAsUnscheduledFor marks a list of bindings as unscheduled.
func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding) error {
// issue all the update requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, binding := range bindings {
unscheduledBinding := binding
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsConflict(err)
},
func() error {
// Remember the previous unscheduledBinding state so that we might be able to revert this change if this
// cluster is being selected again before the resources are removed from it. Need to do a get and set if
// we add more annotations to the binding.
unscheduledBinding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(unscheduledBinding.Spec.State)})
// Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a
// unscheduledBinding from "scheduled" to "bound".
unscheduledBinding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := f.client.Update(cctx, unscheduledBinding, &client.UpdateOptions{})
klog.V(2).InfoS("Marking binding as unscheduled", "clusterResourceBinding", klog.KObj(unscheduledBinding), "error", err)
// We will just retry for conflict errors since the scheduler holds the truth here.
if apierrors.IsConflict(err) {
// get the binding again to make sure we have the latest version to update again.
if getErr := f.client.Get(cctx, client.ObjectKeyFromObject(unscheduledBinding), unscheduledBinding); getErr != nil {
return getErr
}
}
return err
})
})
}
return errs.Wait()
// markAsUnscheduledForAndUpdate marks a binding as unscheduled and updates it.
var markUnscheduledForAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
// Remember the previous unscheduledBinding state so that we might be able to revert this change if this
// cluster is being selected again before the resources are removed from it. Need to do a get and set if
// we add more annotations to the binding.
binding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(binding.Spec.State)})
// Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a
// unscheduledBinding from "scheduled" to "bound".
binding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
klog.V(2).InfoS("Marking binding as unscheduled", "clusterResourceBinding", klog.KObj(binding), "error", err)
return err
}

// removeFinalizerAndUpdate removes scheduler CRB cleanup finalizer from ClusterResourceBinding and updates it.
var removeFinalizerAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
controllerutil.RemoveFinalizer(binding, placementv1beta1.SchedulerCRBCleanupFinalizer)
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
klog.V(2).InfoS("Remove scheduler CRB cleanup finalizer", "clusterResourceBinding", klog.KObj(binding), "error", err)
return err
}

// removeFinalizer removes scheduler CRB cleanup finalizer from ClusterResourceBindings.
func (f *framework) removeFinalizer(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding) error {
func (f *framework) updateBindings(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding, updateFn func(ctx context.Context, client client.Client, binding *placementv1beta1.ClusterResourceBinding) error) error {
// issue all the update requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, binding := range bindings {
deletingBinding := binding
updateBinding := binding
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsConflict(err)
},
func() error {
controllerutil.RemoveFinalizer(deletingBinding, placementv1beta1.SchedulerCRBCleanupFinalizer)
err := f.client.Update(cctx, deletingBinding, &client.UpdateOptions{})
err := updateFn(cctx, f.client, updateBinding)
// We will retry on conflicts.
if apierrors.IsConflict(err) {
// get the binding again to make sure we have the latest version to update again.
if getErr := f.client.Get(cctx, client.ObjectKeyFromObject(deletingBinding), deletingBinding); getErr != nil {
if getErr := f.client.Get(cctx, client.ObjectKeyFromObject(updateBinding), updateBinding); getErr != nil {
return getErr
}
}
Expand Down Expand Up @@ -693,7 +678,7 @@ func (f *framework) manipulateBindings(
//
// This is set to happen after new bindings are created and old bindings are updated, to
// avoid interruptions (deselected then reselected) in a best effort manner.
if err := f.markAsUnscheduledFor(ctx, toDelete); err != nil {
if err := f.updateBindings(ctx, toDelete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark bindings as unschedulable", "clusterSchedulingPolicySnapshot", policyRef)
return err
}
Expand Down Expand Up @@ -846,7 +831,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
klog.V(2).InfoS("Downscaling is needed", "clusterSchedulingPolicySnapshot", policyRef, "downscaleCount", downscaleCount)

// Mark all obsolete bindings as unscheduled first.
if err := f.markAsUnscheduledFor(ctx, obsolete); err != nil {
if err := f.updateBindings(ctx, obsolete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark obsolete bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -1023,10 +1008,10 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement
bindingsToDelete = append(bindingsToDelete, sortedScheduled[i])
}

return sortedScheduled[count:], bound, f.markAsUnscheduledFor(ctx, bindingsToDelete)
return sortedScheduled[count:], bound, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
case count == len(scheduled):
// Trim all scheduled bindings.
return nil, bound, f.markAsUnscheduledFor(ctx, scheduled)
return nil, bound, f.updateBindings(ctx, scheduled, markUnscheduledForAndUpdate)
case count < len(scheduled)+len(bound):
// Trim all scheduled bindings and part of bound bindings.
bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count)
Expand All @@ -1047,13 +1032,13 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement
bindingsToDelete = append(bindingsToDelete, sortedBound[i])
}

return nil, sortedBound[left:], f.markAsUnscheduledFor(ctx, bindingsToDelete)
return nil, sortedBound[left:], f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
case count == len(scheduled)+len(bound):
// Trim all scheduled and bound bindings.
bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count)
bindingsToDelete = append(bindingsToDelete, scheduled...)
bindingsToDelete = append(bindingsToDelete, bound...)
return nil, nil, f.markAsUnscheduledFor(ctx, bindingsToDelete)
return nil, nil, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate)
default:
// Normally this branch will never run, as an earlier check has guaranteed that
// count <= len(scheduled) + len(bound).
Expand Down
99 changes: 94 additions & 5 deletions pkg/scheduler/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ var (
lessFuncFilteredCluster = func(filtered1, filtered2 *filteredClusterWithStatus) bool {
return filtered1.cluster.Name < filtered2.cluster.Name
}
lessFuncBinding = func(binding1, binding2 *placementv1beta1.ClusterResourceBinding) bool {
return binding1.Name < binding2.Name
}
)

// A few utilities for generating a large number of objects.
Expand Down Expand Up @@ -380,8 +383,8 @@ func TestClassifyBindings(t *testing.T) {
}
}

// TestMarkAsUnscheduledFor tests the markAsUnscheduledFor method.
func TestMarkAsUnscheduledFor(t *testing.T) {
// TestUpdateBindingsMarkAsUnscheduledForAndUpdate tests the updateBinding method by passing markUnscheduledForAndUpdate update function.
func TestUpdateBindingsMarkAsUnscheduledForAndUpdate(t *testing.T) {
boundBinding := placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
Expand All @@ -408,10 +411,10 @@ func TestMarkAsUnscheduledFor(t *testing.T) {
f := &framework{
client: fakeClient,
}
// call markAsUnscheduledFor
// call markAsUnscheduledForAndUpdate
ctx := context.Background()
if err := f.markAsUnscheduledFor(ctx, []*placementv1beta1.ClusterResourceBinding{&boundBinding, &scheduledBinding}); err != nil {
t.Fatalf("markAsUnscheduledFor() = %v, want no error", err)
if err := f.updateBindings(ctx, []*placementv1beta1.ClusterResourceBinding{&boundBinding, &scheduledBinding}, markUnscheduledForAndUpdate); err != nil {
t.Fatalf("updateBindings() = %v, want no error", err)
}
// check if the boundBinding has been updated
if err := fakeClient.Get(ctx, types.NamespacedName{Name: bindingName}, &boundBinding); err != nil {
Expand Down Expand Up @@ -451,6 +454,92 @@ func TestMarkAsUnscheduledFor(t *testing.T) {
}
}

func TestUpdateBindingRemoveFinalizerAndUpdate(t *testing.T) {
boundBinding := placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateBound,
},
}
scheduledBinding := placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: altBindingName,
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
},
}
unScheduledBinding := placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: anotherBindingName,
Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateUnscheduled,
},
}

// setup fake client with bindings
fakeClient := fake.NewClientBuilder().
WithScheme(scheme.Scheme).
WithObjects(&boundBinding, &scheduledBinding, &unScheduledBinding).
Build()
// Construct framework manually instead of using NewFramework() to avoid mocking the controller manager.
f := &framework{
client: fakeClient,
}
// call markAsUnscheduledForAndUpdate
ctx := context.Background()
if err := f.updateBindings(ctx, []*placementv1beta1.ClusterResourceBinding{&boundBinding, &scheduledBinding, &unScheduledBinding}, removeFinalizerAndUpdate); err != nil {
t.Fatalf("updateBindings() = %v, want no error", err)
}

var clusterResourceBindingList placementv1beta1.ClusterResourceBindingList
if err := f.client.List(ctx, &clusterResourceBindingList); err != nil {
t.Fatalf("List cluster resource boundBindings returned %v, want no error", err)
}

got := make([]*placementv1beta1.ClusterResourceBinding, len(clusterResourceBindingList.Items))
for i := range clusterResourceBindingList.Items {
got = append(got, &clusterResourceBindingList.Items[i])
}

want := []*placementv1beta1.ClusterResourceBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateBound,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: altBindingName,
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: anotherBindingName,
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateUnscheduled,
},
},
}

if diff := cmp.Diff(got, want, ignoreTypeMetaAPIVersionKindFields, ignoreObjectMetaResourceVersionField, cmpopts.SortSlices(lessFuncBinding)); diff != "" {
t.Errorf("diff (-got, +want): %s", diff)
}
}

// TestRunPreFilterPlugins tests the runPreFilterPlugins method.
func TestRunPreFilterPlugins(t *testing.T) {
dummyPreFilterPluginNameA := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0)
Expand Down
15 changes: 6 additions & 9 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,7 @@ func (s *Scheduler) cleanUpAllBindingsFor(ctx context.Context, crp *fleetv1beta1
}

// Remove scheduler CRB cleanup finalizer from deleting bindings.
for idx := range bindingList.Items {
binding := &bindingList.Items[idx]
controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCRBCleanupFinalizer)
if err := s.client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to remove scheduler reconcile finalizer from cluster resource binding", "clusterResourceBinding", klog.KObj(binding))
return controller.NewUpdateIgnoreConflictError(err)
}
}

//
// Note that once a CRP has been marked for deletion, it will no longer enter the scheduling cycle,
// so any cleanup finalizer has to be removed here.
//
Expand All @@ -300,6 +292,11 @@ func (s *Scheduler) cleanUpAllBindingsFor(ctx context.Context, crp *fleetv1beta1
// run the deletion.
for idx := range bindingList.Items {
binding := &bindingList.Items[idx]
controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCRBCleanupFinalizer)
if err := s.client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to remove scheduler reconcile finalizer from cluster resource binding", "clusterResourceBinding", klog.KObj(binding))
return controller.NewUpdateIgnoreConflictError(err)
}
// Delete the binding if it has not been marked for deletion yet.
if binding.DeletionTimestamp == nil {
if err := s.client.Delete(ctx, binding); err != nil && !errors.IsNotFound(err) {
Expand Down

0 comments on commit e5124cf

Please sign in to comment.