Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add, remove finalizer for CRB in Scheduler and scheduler watcher for CRB #924

Merged
merged 22 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,18 +366,23 @@ var markUnscheduledForAndUpdate = func(ctx context.Context, hubClient client.Cli
// unscheduledBinding from "scheduled" to "bound".
binding.Spec.State = placementv1beta1.BindingStateUnscheduled
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
klog.V(2).InfoS("Marking binding as unscheduled", "clusterResourceBinding", klog.KObj(binding), "error", err)
if err == nil {
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved
klog.V(2).InfoS("Marked binding as unscheduled", "clusterResourceBinding", klog.KObj(binding))
}
return err
}

// removeFinalizerAndUpdate removes scheduler CRB cleanup finalizer from ClusterResourceBinding and updates it.
var removeFinalizerAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
controllerutil.RemoveFinalizer(binding, placementv1beta1.SchedulerCRBCleanupFinalizer)
err := hubClient.Update(ctx, binding, &client.UpdateOptions{})
klog.V(2).InfoS("Remove scheduler CRB cleanup finalizer", "clusterResourceBinding", klog.KObj(binding), "error", err)
if err == nil {
klog.V(2).InfoS("Removed scheduler CRB cleanup finalizer", "clusterResourceBinding", klog.KObj(binding))
}
return err
}

// updateBindings iterates over bindings and updates them using the update function provided.
func (f *framework) updateBindings(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding, updateFn func(ctx context.Context, client client.Client, binding *placementv1beta1.ClusterResourceBinding) error) error {
// issue all the update requests in parallel
errs, cctx := errgroup.WithContext(ctx)
Expand Down
119 changes: 119 additions & 0 deletions pkg/scheduler/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package framework

import (
"context"
"errors"
"fmt"
"log"
"os"
Expand All @@ -16,8 +17,10 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -383,6 +386,122 @@ func TestClassifyBindings(t *testing.T) {
}
}

func TestUpdateBindingsWithErrors(t *testing.T) {
boundBinding := placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateBound,
},
}
fakeClient := fake.NewClientBuilder().
WithScheme(scheme.Scheme).
Build()

var genericUpdateFn = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error {
binding.SetLabels(map[string]string{"test-key": "test-value"})
return hubClient.Update(ctx, binding, &client.UpdateOptions{})
}

testCases := []struct {
name string
bindings []*placementv1beta1.ClusterResourceBinding
customClient client.Client
wantErr error
}{
{
name: "service unavailable error on update, successful get & return return service unavailable",
bindings: []*placementv1beta1.ClusterResourceBinding{&boundBinding},
customClient: &errorClient{
Client: fakeClient,
// set large error retry count to force the return of update error.
errorForRetryCount: 1000000,
returnUpdateErr: "ServiceUnavailable",
},
wantErr: k8serrors.NewServiceUnavailable("service is unavailable"),
},
{
name: "service unavailable error on update, successful get & retry return nil",
bindings: []*placementv1beta1.ClusterResourceBinding{&boundBinding},
customClient: &errorClient{
Client: fakeClient,
errorForRetryCount: 1,
returnUpdateErr: "ServiceUnavailable",
},
wantErr: nil,
},
{
name: "server timeout error on update, successful get & retry return nil",
bindings: []*placementv1beta1.ClusterResourceBinding{&boundBinding},
customClient: &errorClient{
Client: fakeClient,
errorForRetryCount: 1,
returnUpdateErr: "ServerTimeout",
},
wantErr: nil,
},
{
name: "conflict error on update, get failed, return get error",
bindings: []*placementv1beta1.ClusterResourceBinding{&boundBinding},
customClient: &errorClient{
Client: fakeClient,
errorForRetryCount: 1,
returnUpdateErr: "Conflict",
returnGetErr: "GetError",
},
wantErr: errors.New("get error"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Construct framework manually instead of using NewFramework() to avoid mocking the controller manager.
f := &framework{
client: tc.customClient,
}
ctx := context.Background()
gotErr := f.updateBindings(ctx, tc.bindings, genericUpdateFn)
got, want := gotErr != nil, tc.wantErr != nil
if got != want {
t.Fatalf("updateBindings() = %v, want %v", gotErr, tc.wantErr)
}
if got && want && !strings.Contains(gotErr.Error(), tc.wantErr.Error()) {
t.Errorf("updateBindings() = %v, want %v", gotErr, tc.wantErr)
}
})
}
}

type errorClient struct {
client.Client
errorForRetryCount int
returnGetErr string
returnUpdateErr string
}

func (e *errorClient) Get(_ context.Context, _ client.ObjectKey, _ client.Object, _ ...client.GetOption) error {
if e.returnGetErr == "GetError" {
return errors.New("get error")
}
return nil
}

func (e *errorClient) Update(_ context.Context, _ client.Object, _ ...client.UpdateOption) error {
if e.returnUpdateErr == "ServiceUnavailable" && e.errorForRetryCount > 0 {
e.errorForRetryCount--
return k8serrors.NewServiceUnavailable("service is unavailable")
}
if e.returnUpdateErr == "ServerTimeout" && e.errorForRetryCount > 0 {
e.errorForRetryCount--
return k8serrors.NewServerTimeout(schema.GroupResource{Group: placementv1beta1.GroupVersion.Group, Resource: "clusterresourcebinding"}, "UPDATE", 0)
}
if e.returnUpdateErr == "Conflict" && e.errorForRetryCount > 0 {
e.errorForRetryCount--
return k8serrors.NewConflict(schema.GroupResource{Group: placementv1beta1.GroupVersion.Group, Resource: "clusterresourcebinding"}, "UPDATE", errors.New("conflict"))
}
return nil
}

// TestUpdateBindingsMarkAsUnscheduledForAndUpdate tests the updateBinding method by passing markUnscheduledForAndUpdate update function.
func TestUpdateBindingsMarkAsUnscheduledForAndUpdate(t *testing.T) {
Arvindthiru marked this conversation as resolved.
Show resolved Hide resolved
boundBinding := placementv1beta1.ClusterResourceBinding{
Expand Down
Loading