diff --git a/apis/placement/v1beta1/binding_types.go b/apis/placement/v1beta1/binding_types.go index 1beda3f9f..eb06fb28a 100644 --- a/apis/placement/v1beta1/binding_types.go +++ b/apis/placement/v1beta1/binding_types.go @@ -10,6 +10,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + // SchedulerCRBCleanupFinalizer is a finalizer added to ClusterResourceBindings to ensure we can look up the + // corresponding CRP name for deleting ClusterResourceBindings to trigger a new scheduling cycle. + SchedulerCRBCleanupFinalizer = fleetPrefix + "scheduler-crb-cleanup" +) + // +kubebuilder:object:root=true // +kubebuilder:resource:scope=Cluster,categories={fleet,fleet-placement},shortName=rb // +kubebuilder:subresource:status diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index 358ceabee..908d6ef4c 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -39,6 +39,7 @@ import ( "go.goms.io/fleet/pkg/scheduler/framework" "go.goms.io/fleet/pkg/scheduler/profile" "go.goms.io/fleet/pkg/scheduler/queue" + schedulercrbwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourcebinding" schedulercrpwatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourceplacement" schedulercspswatcher "go.goms.io/fleet/pkg/scheduler/watchers/clusterschedulingpolicysnapshot" "go.goms.io/fleet/pkg/scheduler/watchers/membercluster" @@ -278,6 +279,15 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, return err } + klog.Info("Setting up the clusterResourceBinding watcher for scheduler") + if err := (&schedulercrbwatcher.Reconciler{ + Client: mgr.GetClient(), + SchedulerWorkQueue: defaultSchedulingQueue, + }).SetupWithManager(mgr); err != nil { + klog.ErrorS(err, "Unable to set up clusterResourceBinding watcher for scheduler") + return err + } + klog.Info("Setting up the memberCluster watcher for scheduler") if err := (&membercluster.Reconciler{ Client: mgr.GetClient(), diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index f8c4c22d6..a4cf68380 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" @@ -287,15 +288,21 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // * dangling bindings, i.e., bindings that are associated with a cluster that is no longer // in a normally operating state (the cluster has left the fleet, or is in the state of leaving), // yet has not been marked as unscheduled by the scheduler; and - // + // * deleting bindings, i.e., bindings that have a deletionTimeStamp on them. // Any deleted binding is also ignored. // Note that bindings marked as unscheduled are ignored by the scheduler, as they // are irrelevant to the scheduling cycle. However, we will reconcile them with the latest scheduling // result so that we won't have a ever increasing chain of flip flop bindings. - bound, scheduled, obsolete, unscheduled, dangling := classifyBindings(policy, bindings, clusters) + bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters) + + // Remove scheduler CRB cleanup finalizer on all deleting bindings. + if err := f.updateBindings(ctx, deleting, removeFinalizerAndUpdate); err != nil { + klog.ErrorS(err, "Failed to remove finalizers from deleting bindings", "clusterSchedulingPolicySnapshot", policyRef) + return ctrl.Result{}, err + } // Mark all dangling bindings as unscheduled. - if err := f.markAsUnscheduledFor(ctx, dangling); err != nil { + if err := f.updateBindings(ctx, dangling, markUnscheduledForAndUpdate); err != nil { klog.ErrorS(err, "Failed to mark dangling bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } @@ -349,31 +356,51 @@ func (f *framework) collectBindings(ctx context.Context, crpName string) ([]plac return bindingList.Items, nil } -// markAsUnscheduledFor marks a list of bindings as unscheduled. -func (f *framework) markAsUnscheduledFor(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding) error { +// markAsUnscheduledForAndUpdate marks a binding as unscheduled and updates it. +var markUnscheduledForAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error { + // Remember the previous unscheduledBinding state so that we might be able to revert this change if this + // cluster is being selected again before the resources are removed from it. Need to do a get and set if + // we add more annotations to the binding. + binding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(binding.Spec.State)}) + // Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a + // unscheduledBinding from "scheduled" to "bound". + binding.Spec.State = placementv1beta1.BindingStateUnscheduled + err := hubClient.Update(ctx, binding, &client.UpdateOptions{}) + if err == nil { + klog.V(2).InfoS("Marked binding as unscheduled", "clusterResourceBinding", klog.KObj(binding)) + } + return err +} + +// removeFinalizerAndUpdate removes scheduler CRB cleanup finalizer from ClusterResourceBinding and updates it. +var removeFinalizerAndUpdate = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error { + controllerutil.RemoveFinalizer(binding, placementv1beta1.SchedulerCRBCleanupFinalizer) + err := hubClient.Update(ctx, binding, &client.UpdateOptions{}) + if err == nil { + klog.V(2).InfoS("Removed scheduler CRB cleanup finalizer", "clusterResourceBinding", klog.KObj(binding)) + } + return err +} + +// updateBindings iterates over bindings and updates them using the update function provided. +func (f *framework) updateBindings(ctx context.Context, bindings []*placementv1beta1.ClusterResourceBinding, updateFn func(ctx context.Context, client client.Client, binding *placementv1beta1.ClusterResourceBinding) error) error { // issue all the update requests in parallel errs, cctx := errgroup.WithContext(ctx) for _, binding := range bindings { - unscheduledBinding := binding + updateBinding := binding errs.Go(func() error { return retry.OnError(retry.DefaultBackoff, func(err error) bool { return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsConflict(err) }, func() error { - // Remember the previous unscheduledBinding state so that we might be able to revert this change if this - // cluster is being selected again before the resources are removed from it. Need to do a get and set if - // we add more annotations to the binding. - unscheduledBinding.SetAnnotations(map[string]string{placementv1beta1.PreviousBindingStateAnnotation: string(unscheduledBinding.Spec.State)}) - // Mark the unscheduledBinding as unscheduled which can conflict with the rollout controller which also changes the state of a - // unscheduledBinding from "scheduled" to "bound". - unscheduledBinding.Spec.State = placementv1beta1.BindingStateUnscheduled - err := f.client.Update(cctx, unscheduledBinding, &client.UpdateOptions{}) - klog.V(2).InfoS("Marking binding as unscheduled", "clusterResourceBinding", klog.KObj(unscheduledBinding), "error", err) - // We will just retry for conflict errors since the scheduler holds the truth here. + err := updateFn(cctx, f.client, updateBinding) + // We will retry on conflicts. if apierrors.IsConflict(err) { // get the binding again to make sure we have the latest version to update again. - return f.client.Get(cctx, client.ObjectKeyFromObject(unscheduledBinding), unscheduledBinding) + if getErr := f.client.Get(cctx, client.ObjectKeyFromObject(updateBinding), updateBinding); getErr != nil { + return getErr + } } return err }) @@ -656,7 +683,7 @@ func (f *framework) manipulateBindings( // // This is set to happen after new bindings are created and old bindings are updated, to // avoid interruptions (deselected then reselected) in a best effort manner. - if err := f.markAsUnscheduledFor(ctx, toDelete); err != nil { + if err := f.updateBindings(ctx, toDelete, markUnscheduledForAndUpdate); err != nil { klog.ErrorS(err, "Failed to mark bindings as unschedulable", "clusterSchedulingPolicySnapshot", policyRef) return err } @@ -809,7 +836,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType( klog.V(2).InfoS("Downscaling is needed", "clusterSchedulingPolicySnapshot", policyRef, "downscaleCount", downscaleCount) // Mark all obsolete bindings as unscheduled first. - if err := f.markAsUnscheduledFor(ctx, obsolete); err != nil { + if err := f.updateBindings(ctx, obsolete, markUnscheduledForAndUpdate); err != nil { klog.ErrorS(err, "Failed to mark obsolete bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } @@ -986,10 +1013,10 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement bindingsToDelete = append(bindingsToDelete, sortedScheduled[i]) } - return sortedScheduled[count:], bound, f.markAsUnscheduledFor(ctx, bindingsToDelete) + return sortedScheduled[count:], bound, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate) case count == len(scheduled): // Trim all scheduled bindings. - return nil, bound, f.markAsUnscheduledFor(ctx, scheduled) + return nil, bound, f.updateBindings(ctx, scheduled, markUnscheduledForAndUpdate) case count < len(scheduled)+len(bound): // Trim all scheduled bindings and part of bound bindings. bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count) @@ -1010,13 +1037,13 @@ func (f *framework) downscale(ctx context.Context, scheduled, bound []*placement bindingsToDelete = append(bindingsToDelete, sortedBound[i]) } - return nil, sortedBound[left:], f.markAsUnscheduledFor(ctx, bindingsToDelete) + return nil, sortedBound[left:], f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate) case count == len(scheduled)+len(bound): // Trim all scheduled and bound bindings. bindingsToDelete := make([]*placementv1beta1.ClusterResourceBinding, 0, count) bindingsToDelete = append(bindingsToDelete, scheduled...) bindingsToDelete = append(bindingsToDelete, bound...) - return nil, nil, f.markAsUnscheduledFor(ctx, bindingsToDelete) + return nil, nil, f.updateBindings(ctx, bindingsToDelete, markUnscheduledForAndUpdate) default: // Normally this branch will never run, as an earlier check has guaranteed that // count <= len(scheduled) + len(bound). diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index e1be90892..8a5842f97 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -7,6 +7,7 @@ package framework import ( "context" + "errors" "fmt" "log" "os" @@ -16,8 +17,10 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -65,6 +68,9 @@ var ( lessFuncFilteredCluster = func(filtered1, filtered2 *filteredClusterWithStatus) bool { return filtered1.cluster.Name < filtered2.cluster.Name } + lessFuncBinding = func(binding1, binding2 placementv1beta1.ClusterResourceBinding) bool { + return binding1.Name < binding2.Name + } ) // A few utilities for generating a large number of objects. @@ -352,8 +358,9 @@ func TestClassifyBindings(t *testing.T) { wantObsolete := []*placementv1beta1.ClusterResourceBinding{&obsoleteBinding} wantUnscheduled := []*placementv1beta1.ClusterResourceBinding{&unscheduledBinding} wantDangling := []*placementv1beta1.ClusterResourceBinding{&associatedWithLeavingClusterBinding, &assocaitedWithDisappearedClusterBinding} + wantDeleting := []*placementv1beta1.ClusterResourceBinding{&deletingBinding} - bound, scheduled, obsolete, unscheduled, dangling := classifyBindings(policy, bindings, clusters) + bound, scheduled, obsolete, unscheduled, dangling, deleting := classifyBindings(policy, bindings, clusters) if diff := cmp.Diff(bound, wantBound); diff != "" { t.Errorf("classifyBindings() bound diff (-got, +want): %s", diff) } @@ -373,10 +380,130 @@ func TestClassifyBindings(t *testing.T) { if diff := cmp.Diff(dangling, wantDangling); diff != "" { t.Errorf("classifyBindings() dangling diff (-got, +want) = %s", diff) } + + if diff := cmp.Diff(deleting, wantDeleting); diff != "" { + t.Errorf("classifyBIndings() deleting diff (-got, +want) = %s", diff) + } } -// TestMarkAsUnscheduledFor tests the markAsUnscheduledFor method. -func TestMarkAsUnscheduledFor(t *testing.T) { +func TestUpdateBindingsWithErrors(t *testing.T) { + binding := placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateBound, + }, + } + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + Build() + + var genericUpdateFn = func(ctx context.Context, hubClient client.Client, binding *placementv1beta1.ClusterResourceBinding) error { + binding.SetLabels(map[string]string{"test-key": "test-value"}) + return hubClient.Update(ctx, binding, &client.UpdateOptions{}) + } + + testCases := []struct { + name string + bindings []*placementv1beta1.ClusterResourceBinding + customClient client.Client + wantErr error + }{ + { + name: "service unavailable error on update, successful get & retry update keeps returning service unavailable error", + bindings: []*placementv1beta1.ClusterResourceBinding{&binding}, + customClient: &errorClient{ + Client: fakeClient, + // set large error retry count to keep returning of update error. + errorForRetryCount: 1000000, + returnUpdateErr: "ServiceUnavailable", + }, + wantErr: k8serrors.NewServiceUnavailable("service is unavailable"), + }, + { + name: "service unavailable error on update, successful get & retry update returns nil", + bindings: []*placementv1beta1.ClusterResourceBinding{&binding}, + customClient: &errorClient{ + Client: fakeClient, + errorForRetryCount: 1, + returnUpdateErr: "ServiceUnavailable", + }, + wantErr: nil, + }, + { + name: "server timeout error on update, successful get & retry update returns nil", + bindings: []*placementv1beta1.ClusterResourceBinding{&binding}, + customClient: &errorClient{ + Client: fakeClient, + errorForRetryCount: 1, + returnUpdateErr: "ServerTimeout", + }, + wantErr: nil, + }, + { + name: "conflict error on update, get failed, retry update returns get error", + bindings: []*placementv1beta1.ClusterResourceBinding{&binding}, + customClient: &errorClient{ + Client: fakeClient, + errorForRetryCount: 1, + returnUpdateErr: "Conflict", + returnGetErr: "GetError", + }, + wantErr: errors.New("get error"), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + client: tc.customClient, + } + ctx := context.Background() + gotErr := f.updateBindings(ctx, tc.bindings, genericUpdateFn) + got, want := gotErr != nil, tc.wantErr != nil + if got != want { + t.Fatalf("updateBindings() = %v, want %v", gotErr, tc.wantErr) + } + if got && want && !strings.Contains(gotErr.Error(), tc.wantErr.Error()) { + t.Errorf("updateBindings() = %v, want %v", gotErr, tc.wantErr) + } + }) + } +} + +type errorClient struct { + client.Client + errorForRetryCount int + returnGetErr string + returnUpdateErr string +} + +func (e *errorClient) Get(_ context.Context, _ client.ObjectKey, _ client.Object, _ ...client.GetOption) error { + if e.returnGetErr == "GetError" { + return errors.New("get error") + } + return nil +} + +func (e *errorClient) Update(_ context.Context, _ client.Object, _ ...client.UpdateOption) error { + if e.returnUpdateErr == "ServiceUnavailable" && e.errorForRetryCount > 0 { + e.errorForRetryCount-- + return k8serrors.NewServiceUnavailable("service is unavailable") + } + if e.returnUpdateErr == "ServerTimeout" && e.errorForRetryCount > 0 { + e.errorForRetryCount-- + return k8serrors.NewServerTimeout(schema.GroupResource{Group: placementv1beta1.GroupVersion.Group, Resource: "clusterresourcebinding"}, "UPDATE", 0) + } + if e.returnUpdateErr == "Conflict" && e.errorForRetryCount > 0 { + e.errorForRetryCount-- + return k8serrors.NewConflict(schema.GroupResource{Group: placementv1beta1.GroupVersion.Group, Resource: "clusterresourcebinding"}, "UPDATE", errors.New("conflict")) + } + return nil +} + +// TestUpdateBindingsMarkAsUnscheduledForAndUpdate tests the updateBinding method by passing markUnscheduledForAndUpdate update function. +func TestUpdateBindingsMarkAsUnscheduledForAndUpdate(t *testing.T) { boundBinding := placementv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: bindingName, @@ -403,10 +530,10 @@ func TestMarkAsUnscheduledFor(t *testing.T) { f := &framework{ client: fakeClient, } - // call markAsUnscheduledFor + // call markAsUnscheduledForAndUpdate ctx := context.Background() - if err := f.markAsUnscheduledFor(ctx, []*placementv1beta1.ClusterResourceBinding{&boundBinding, &scheduledBinding}); err != nil { - t.Fatalf("markAsUnscheduledFor() = %v, want no error", err) + if err := f.updateBindings(ctx, []*placementv1beta1.ClusterResourceBinding{&boundBinding, &scheduledBinding}, markUnscheduledForAndUpdate); err != nil { + t.Fatalf("updateBindings() = %v, want no error", err) } // check if the boundBinding has been updated if err := fakeClient.Get(ctx, types.NamespacedName{Name: bindingName}, &boundBinding); err != nil { @@ -446,6 +573,87 @@ func TestMarkAsUnscheduledFor(t *testing.T) { } } +func TestUpdateBindingRemoveFinalizerAndUpdate(t *testing.T) { + boundBinding := placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateBound, + }, + } + scheduledBinding := placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: altBindingName, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateScheduled, + }, + } + unScheduledBinding := placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: anotherBindingName, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateUnscheduled, + }, + } + + // setup fake client with bindings + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(&boundBinding, &scheduledBinding, &unScheduledBinding). + Build() + // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. + f := &framework{ + client: fakeClient, + } + // call markAsUnscheduledForAndUpdate + ctx := context.Background() + if err := f.updateBindings(ctx, []*placementv1beta1.ClusterResourceBinding{&boundBinding, &scheduledBinding, &unScheduledBinding}, removeFinalizerAndUpdate); err != nil { + t.Fatalf("updateBindings() = %v, want no error", err) + } + + var clusterResourceBindingList placementv1beta1.ClusterResourceBindingList + if err := f.client.List(ctx, &clusterResourceBindingList); err != nil { + t.Fatalf("List cluster resource boundBindings returned %v, want no error", err) + } + + want := []placementv1beta1.ClusterResourceBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateBound, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: altBindingName, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateScheduled, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: anotherBindingName, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateUnscheduled, + }, + }, + } + + if diff := cmp.Diff(clusterResourceBindingList.Items, want, ignoreTypeMetaAPIVersionKindFields, ignoreObjectMetaResourceVersionField, cmpopts.SortSlices(lessFuncBinding)); diff != "" { + t.Errorf("diff (-got, +want): %s", diff) + } +} + // TestRunPreFilterPlugins tests the runPreFilterPlugins method. func TestRunPreFilterPlugins(t *testing.T) { dummyPreFilterPluginNameA := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0) @@ -1274,6 +1482,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1296,6 +1505,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1318,6 +1528,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1510,6 +1721,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1619,6 +1831,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1641,6 +1854,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1663,6 +1877,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1706,6 +1921,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1728,6 +1944,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -1813,6 +2030,7 @@ func TestCrossReferencePickedClustersAndDeDupBindings(t *testing.T) { Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go index cf5caeda8..d23be4782 100644 --- a/pkg/scheduler/framework/frameworkutils.go +++ b/pkg/scheduler/framework/frameworkutils.go @@ -29,16 +29,18 @@ import ( // - dangling bindings, i.e., bindings that are associated with a cluster that is no longer in // a normally operating state (the cluster has left the fleet, or is in the state of leaving), // yet has not been marked as unscheduled by the scheduler; and -// - unscheduled bindings, i.e., bindings that are marked to be removed by the scheduler. +// - unscheduled bindings, i.e., bindings that are marked to be removed by the scheduler; and // - obsolete bindings, i.e., bindings that are no longer associated with the latest scheduling -// policy. -func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot, bindings []placementv1beta1.ClusterResourceBinding, clusters []clusterv1beta1.MemberCluster) (bound, scheduled, obsolete, unscheduled, dangling []*placementv1beta1.ClusterResourceBinding) { +// policy; and +// - deleting bindings, i.e., bindings that have a deletionTimeStamp on them. +func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot, bindings []placementv1beta1.ClusterResourceBinding, clusters []clusterv1beta1.MemberCluster) (bound, scheduled, obsolete, unscheduled, dangling, deleting []*placementv1beta1.ClusterResourceBinding) { // Pre-allocate arrays. bound = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings)) scheduled = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings)) obsolete = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings)) unscheduled = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings)) dangling = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings)) + deleting = make([]*placementv1beta1.ClusterResourceBinding, 0, len(bindings)) // Build a map for clusters for quick loopup. clusterMap := make(map[string]clusterv1beta1.MemberCluster) @@ -52,11 +54,8 @@ func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot, switch { case !binding.DeletionTimestamp.IsZero(): - // Ignore any binding that has been deleted. - // - // Note that the scheduler will not add any cleanup scheduler to a binding, as - // in normal operations bound and scheduled bindings will not be deleted, and - // unscheduled bindings are disregarded by the scheduler. + // we need remove scheduler CRB cleanup finalizer from deleting ClusterResourceBindings. + deleting = append(deleting, &binding) case binding.Spec.State == placementv1beta1.BindingStateUnscheduled: // we need to remember those bindings so that we will not create another one. unscheduled = append(unscheduled, &binding) @@ -83,7 +82,7 @@ func classifyBindings(policy *placementv1beta1.ClusterSchedulingPolicySnapshot, } } - return bound, scheduled, obsolete, unscheduled, dangling + return bound, scheduled, obsolete, unscheduled, dangling, deleting } // bindingWithPatch is a helper struct that includes a binding that needs to be patched and the @@ -186,6 +185,7 @@ func crossReferencePickedClustersAndDeDupBindings( Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -684,6 +684,7 @@ func crossReferenceValidTargetsWithBindings( Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 471020de4..6766f0c80 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -282,7 +282,7 @@ func (s *Scheduler) cleanUpAllBindingsFor(ctx context.Context, crp *fleetv1beta1 return controller.NewAPIServerError(false, err) } - // Remove the scheduler cleanup finalizer from all the bindings, and delete them. + // Remove scheduler CRB cleanup finalizer from deleting bindings. // // Note that once a CRP has been marked for deletion, it will no longer enter the scheduling cycle, // so any cleanup finalizer has to be removed here. @@ -291,16 +291,19 @@ func (s *Scheduler) cleanUpAllBindingsFor(ctx context.Context, crp *fleetv1beta1 // the scheduler no longer marks them as deleting and waits for another controller to actually // run the deletion. for idx := range bindingList.Items { - binding := bindingList.Items[idx] + binding := &bindingList.Items[idx] + controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCRBCleanupFinalizer) + if err := s.client.Update(ctx, binding); err != nil { + klog.ErrorS(err, "Failed to remove scheduler reconcile finalizer from cluster resource binding", "clusterResourceBinding", klog.KObj(binding)) + return controller.NewUpdateIgnoreConflictError(err) + } // Delete the binding if it has not been marked for deletion yet. if binding.DeletionTimestamp == nil { - if err := s.client.Delete(ctx, &binding); err != nil && !errors.IsNotFound(err) { - klog.ErrorS(err, "Failed to delete binding", "clusterResourceBinding", klog.KObj(&binding)) + if err := s.client.Delete(ctx, binding); err != nil && !errors.IsNotFound(err) { + klog.ErrorS(err, "Failed to delete binding", "clusterResourceBinding", klog.KObj(binding)) return controller.NewAPIServerError(false, err) } } - - // Note that the scheduler will not add any cleanup finalizer to a binding. } // All bindings have been deleted; remove the scheduler cleanup finalizer from the CRP. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 7373b3d25..bdf0109dd 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -70,6 +70,7 @@ func TestCleanUpAllBindingsFor(t *testing.T) { Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{fleetv1beta1.SchedulerCRBCleanupFinalizer}, }, }, { @@ -78,6 +79,7 @@ func TestCleanUpAllBindingsFor(t *testing.T) { Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{fleetv1beta1.SchedulerCRBCleanupFinalizer}, }, }, } diff --git a/pkg/scheduler/watchers/clusterresourcebinding/controller_integration_test.go b/pkg/scheduler/watchers/clusterresourcebinding/controller_integration_test.go new file mode 100644 index 000000000..bc2951185 --- /dev/null +++ b/pkg/scheduler/watchers/clusterresourcebinding/controller_integration_test.go @@ -0,0 +1,122 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package clusterresourcebinding + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" +) + +const ( + eventuallyDuration = time.Second * 5 + eventuallyInterval = time.Millisecond * 250 + consistentlyDuration = time.Second + consistentlyInterval = time.Millisecond * 200 + + crbName = "test-crb" + crpName = "test-crp" + clusterName = "test-cluster" +) + +var ( + noKeyEnqueuedActual = func() error { + if queueLen := keyCollector.Len(); queueLen != 0 { + return fmt.Errorf("work queue is not empty: current len %d, want 0", queueLen) + } + return nil + } + + expectedKeySetEnqueuedActual = func() error { + if isAllPresent, absentKeys := keyCollector.IsPresent(crpName); !isAllPresent { + return fmt.Errorf("expected key(s) %v is not found", absentKeys) + } + + if queueLen := keyCollector.Len(); queueLen != 1 { + return fmt.Errorf("more than one key is enqueued: current len %d, want 1", queueLen) + } + + return nil + } +) + +// This container cannot be run in parallel since we are trying to access a common shared queue. +var _ = Describe("scheduler - cluster resource binding watcher", Ordered, func() { + BeforeAll(func() { + Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty") + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + Context("create, update & delete cluster resource binding", func() { + BeforeAll(func() { + affinityScore := int32(1) + topologyScore := int32(1) + crb := fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: crbName, + Labels: map[string]string{ + fleetv1beta1.CRPTrackingLabel: crpName, + }, + Finalizers: []string{fleetv1beta1.SchedulerCRBCleanupFinalizer}, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateScheduled, + // Leave the associated resource snapshot name empty; it is up to another controller + // to fulfill this field. + SchedulingPolicySnapshotName: "test-policy", + TargetCluster: clusterName, + ClusterDecision: fleetv1beta1.ClusterDecision{ + ClusterName: clusterName, + Selected: true, + ClusterScore: &fleetv1beta1.ClusterScore{ + AffinityScore: &affinityScore, + TopologySpreadScore: &topologyScore, + }, + Reason: "test-reason", + }, + }, + } + // Create cluster resource binding. + Expect(hubClient.Create(ctx, &crb)).Should(Succeed(), "Failed to create cluster resource binding") + }) + + It("should not enqueue the CRP name when CRB is created", func() { + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + It("update cluster resource binding", func() { + var crb fleetv1beta1.ClusterResourceBinding + Expect(hubClient.Get(ctx, client.ObjectKey{Name: crbName}, &crb)).Should(Succeed()) + crb.Spec.State = fleetv1beta1.BindingStateBound + Expect(hubClient.Update(ctx, &crb)).Should(Succeed()) + }) + + It("should not enqueue the CRP name when it CRB is updated", func() { + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + It("delete cluster resource binding", func() { + var crb fleetv1beta1.ClusterResourceBinding + Expect(hubClient.Get(ctx, client.ObjectKey{Name: crbName}, &crb)).Should(Succeed()) + Expect(hubClient.Delete(ctx, &crb)).Should(Succeed()) + }) + + It("should enqueue CRP name when CRB is deleted", func() { + Eventually(expectedKeySetEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is either empty or it contains more than one element") + Consistently(expectedKeySetEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is either empty or it contains more than one element") + }) + + AfterAll(func() { + keyCollector.Reset() + }) + }) +}) diff --git a/pkg/scheduler/watchers/clusterresourcebinding/suite_test.go b/pkg/scheduler/watchers/clusterresourcebinding/suite_test.go new file mode 100644 index 000000000..a0c4a5f4c --- /dev/null +++ b/pkg/scheduler/watchers/clusterresourcebinding/suite_test.go @@ -0,0 +1,105 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package clusterresourcebinding + +import ( + "context" + "path/filepath" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/scheduler/queue" + "go.goms.io/fleet/test/utils/keycollector" +) + +var ( + hubTestEnv *envtest.Environment + hubClient client.Client + ctx context.Context + cancel context.CancelFunc + keyCollector *keycollector.SchedulerWorkqueueKeyCollector +) + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Scheduler Source Cluster Resource Binding Controller Suite") +} + +var _ = BeforeSuite(func() { + klog.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + ctx, cancel = context.WithCancel(context.TODO()) + + By("bootstrap the test environment") + + // Start the hub cluster. + hubTestEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + hubCfg, err := hubTestEnv.Start() + Expect(err).ToNot(HaveOccurred(), "Failed to start test environment") + Expect(hubCfg).ToNot(BeNil(), "Hub cluster configuration is nil") + + // Add custom APIs to the runtime scheme. + Expect(fleetv1beta1.AddToScheme(scheme.Scheme)).Should(Succeed()) + + // Set up a client for the hub cluster. + hubClient, err = client.New(hubCfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred(), "Failed to create hub cluster client") + Expect(hubClient).ToNot(BeNil(), "Hub cluster client is nil") + + // Set up a controller manager and let it manage the hub cluster controller. + ctrlMgr, err := ctrl.NewManager(hubCfg, ctrl.Options{ + Scheme: scheme.Scheme, + Metrics: metricsserver.Options{ + BindAddress: "0", + }, + }) + Expect(err).NotTo(HaveOccurred(), "Failed to create controller manager") + + schedulerWorkQueue := queue.NewSimpleClusterResourcePlacementSchedulingQueue() + + // Create ClusterResourceBinding watcher reconciler. + reconciler := &Reconciler{ + Client: hubClient, + SchedulerWorkQueue: schedulerWorkQueue, + } + err = reconciler.SetupWithManager(ctrlMgr) + Expect(err).ToNot(HaveOccurred(), "Failed to set up controller with controller manager") + + // Start the key collector. + keyCollector = keycollector.NewSchedulerWorkqueueKeyCollector(schedulerWorkQueue) + go func() { + keyCollector.Run(ctx) + }() + + // Start the controller manager. + go func() { + defer GinkgoRecover() + err := ctrlMgr.Start(ctx) + Expect(err).ToNot(HaveOccurred(), "Failed to start controller manager") + }() +}) + +var _ = AfterSuite(func() { + defer klog.Flush() + cancel() + + By("tearing down the test environment") + Expect(hubTestEnv.Stop()).Should(Succeed(), "Failed to stop test environment") +}) diff --git a/pkg/scheduler/watchers/clusterresourcebinding/watcher.go b/pkg/scheduler/watchers/clusterresourcebinding/watcher.go new file mode 100644 index 000000000..8e2f6b900 --- /dev/null +++ b/pkg/scheduler/watchers/clusterresourcebinding/watcher.go @@ -0,0 +1,103 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package clusterresourcebinding + +import ( + "context" + "fmt" + "time" + + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/scheduler/queue" + "go.goms.io/fleet/pkg/utils/controller" +) + +// Reconciler reconciles the deletion of a ClusterResourceBinding. +type Reconciler struct { + // Client is the client the controller uses to access the hub cluster. + client.Client + // SchedulerWorkQueue is the workqueue in use by the scheduler. + SchedulerWorkQueue queue.ClusterResourcePlacementSchedulingQueueWriter +} + +// Reconcile reconciles the CRB. +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + crbRef := klog.KRef("", req.Name) + startTime := time.Now() + klog.V(2).InfoS("Scheduler source reconciliation starts", "clusterResourceBinding", crbRef) + defer func() { + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Scheduler source reconciliation ends", "clusterResourceBinding", crbRef, "latency", latency) + }() + + // Retrieve the CRB. + crb := &fleetv1beta1.ClusterResourceBinding{} + if err := r.Client.Get(ctx, req.NamespacedName, crb); err != nil { + klog.ErrorS(err, "Failed to get cluster resource binding", "clusterResourceBinding", crbRef) + return ctrl.Result{}, controller.NewAPIServerError(true, client.IgnoreNotFound(err)) + } + + // Check if the CRB has been deleted and has the scheduler CRB cleanup finalizer. + if crb.DeletionTimestamp != nil && controllerutil.ContainsFinalizer(crb, fleetv1beta1.SchedulerCRBCleanupFinalizer) { + // The CRB has been deleted and still has the scheduler CRB cleanup finalizer; enqueue it's corresponding CRP + // for the scheduler to process. + crpName, exist := crb.GetLabels()[fleetv1beta1.CRPTrackingLabel] + if !exist { + err := controller.NewUnexpectedBehaviorError(fmt.Errorf("clusterResourceBinding %s doesn't have CRP tracking label", crb.Name)) + klog.ErrorS(err, "Failed to enqueue CRP name for CRB") + // error cannot be retried. + return ctrl.Result{}, nil + } + r.SchedulerWorkQueue.AddRateLimited(queue.ClusterResourcePlacementKey(crpName)) + } + + // No action is needed for the scheduler to take in other cases. + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the manager. +func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { + customPredicate := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + // Ignore creation events. + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + // Ignore deletion events (events emitted when the object is actually removed + // from storage). + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { + // Check if the update event is valid. + if e.ObjectOld == nil || e.ObjectNew == nil { + err := controller.NewUnexpectedBehaviorError(fmt.Errorf("update event is invalid")) + klog.ErrorS(err, "Failed to process update event") + return false + } + + // Check if the deletion timestamp has been set. + oldDeletionTimestamp := e.ObjectOld.GetDeletionTimestamp() + newDeletionTimestamp := e.ObjectNew.GetDeletionTimestamp() + if oldDeletionTimestamp == nil && newDeletionTimestamp != nil { + return true + } + + return false + }, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&fleetv1beta1.ClusterResourceBinding{}). + WithEventFilter(customPredicate). + Complete(r) +} diff --git a/test/scheduler/actuals_test.go b/test/scheduler/actuals_test.go index d9831f994..de7e1a6f4 100644 --- a/test/scheduler/actuals_test.go +++ b/test/scheduler/actuals_test.go @@ -109,6 +109,7 @@ func scheduledBindingsCreatedOrUpdatedForClustersActual(clusters []string, score Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateScheduled, @@ -169,6 +170,7 @@ func boundBindingsCreatedOrUpdatedForClustersActual(clusters []string, scoreByCl Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateBound, @@ -229,6 +231,7 @@ func unscheduledBindingsCreatedOrUpdatedForClustersActual(clusters []string, sco Labels: map[string]string{ placementv1beta1.CRPTrackingLabel: crpName, }, + Finalizers: []string{placementv1beta1.SchedulerCRBCleanupFinalizer}, }, Spec: placementv1beta1.ResourceBindingSpec{ State: placementv1beta1.BindingStateUnscheduled, diff --git a/test/scheduler/suite_test.go b/test/scheduler/suite_test.go index d1168e11c..1b2b20102 100644 --- a/test/scheduler/suite_test.go +++ b/test/scheduler/suite_test.go @@ -38,6 +38,7 @@ import ( "go.goms.io/fleet/pkg/scheduler" "go.goms.io/fleet/pkg/scheduler/clustereligibilitychecker" "go.goms.io/fleet/pkg/scheduler/queue" + "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourcebinding" "go.goms.io/fleet/pkg/scheduler/watchers/clusterresourceplacement" "go.goms.io/fleet/pkg/scheduler/watchers/clusterschedulingpolicysnapshot" "go.goms.io/fleet/pkg/scheduler/watchers/membercluster" @@ -584,6 +585,13 @@ func beforeSuiteForProcess1() []byte { err = memberClusterWatcher.SetupWithManager(ctrlMgr) Expect(err).NotTo(HaveOccurred(), "Failed to set up member cluster watcher with controller manager") + clusterResourceBindingWatcher := clusterresourcebinding.Reconciler{ + Client: hubClient, + SchedulerWorkQueue: schedulerWorkQueue, + } + err = clusterResourceBindingWatcher.SetupWithManager(ctrlMgr) + Expect(err).NotTo(HaveOccurred(), "Failed to set up cluster resource binding watcher with controller manager") + // Set up the scheduler. fw := buildSchedulerFramework(ctrlMgr, clusterEligibilityChecker) sched := scheduler.NewScheduler(defaultSchedulerName, fw, schedulerWorkQueue, ctrlMgr, 3)