diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index bf8335592daf..c5a746701b22 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -381,6 +381,9 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error) ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, } + if err := mcs.IndexField(ctx.Mgr); err != nil { + return false, err + } serviceExportController.RunWorkQueue() if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil { return false, err diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index e7068b00567f..bc4c1ca9a8c6 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -467,6 +467,9 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, } + if err = mcs.IndexField(ctx.Mgr); err != nil { + return false, err + } serviceExportController.RunWorkQueue() if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil { return false, err diff --git a/pkg/controllers/mcs/common.go b/pkg/controllers/mcs/common.go new file mode 100644 index 000000000000..e30a63a5e790 --- /dev/null +++ b/pkg/controllers/mcs/common.go @@ -0,0 +1,47 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mcs + +import ( + "context" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" +) + +const ( + workKeyIndex = "workSpec.suspendDispatching" +) + +// IndexField registers Indexer functions to controller manager. +func IndexField(mgr controllerruntime.Manager) error { + workIndexerFunc := func(obj client.Object) []string { + work, ok := obj.(*workv1alpha1.Work) + if !ok { + return nil + } + return util.GetWorkSuspendDispatching(&work.Spec) + } + + return utilerrors.NewAggregate([]error{ + mgr.GetFieldIndexer().IndexField(context.TODO(), &workv1alpha1.Work{}, workKeyIndex, workIndexerFunc), + }) +} diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 16a682edb797..2a1155722de4 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -151,7 +152,9 @@ func (c *ServiceExportController) RunWorkQueue() { func (c *ServiceExportController) enqueueReportedEpsServiceExport() { workList := &workv1alpha1.WorkList{} err := wait.PollUntilContextCancel(context.TODO(), 1*time.Second, true, func(ctx context.Context) (done bool, err error) { - err = c.List(ctx, workList, client.MatchingLabels{util.PropagationInstruction: util.PropagationInstructionSuppressed}) + err = c.List(ctx, workList, client.MatchingFields{ + workKeyIndex: "true", + }) if err != nil { klog.Errorf("Failed to list collected EndpointSlices Work from member clusters: %v", err) return false, nil @@ -428,10 +431,10 @@ func (c *ServiceExportController) removeOrphanWork(ctx context.Context, endpoint if err := c.List(ctx, collectedEpsWorkList, &client.ListOptions{ Namespace: names.GenerateExecutionSpaceName(serviceExportKey.Cluster), LabelSelector: labels.SelectorFromSet(labels.Set{ - util.PropagationInstruction: util.PropagationInstructionSuppressed, - util.ServiceNamespaceLabel: serviceExportKey.Namespace, - util.ServiceNameLabel: serviceExportKey.Name, + util.ServiceNamespaceLabel: serviceExportKey.Namespace, + util.ServiceNameLabel: serviceExportKey.Name, }), + FieldSelector: fields.OneTermEqualSelector(workKeyIndex, "true"), }); err != nil { klog.Errorf("Failed to list endpointslice work with serviceExport(%s/%s) under namespace %s: %v", serviceExportKey.Namespace, serviceExportKey.Name, names.GenerateExecutionSpaceName(serviceExportKey.Cluster), err) @@ -495,7 +498,8 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un return err } - if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil { + // indicate the Work should be not propagated since it's collected resource. + if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil { return err } @@ -518,10 +522,8 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w Namespace: ns, Finalizers: []string{util.EndpointSliceControllerFinalizer}, Labels: map[string]string{ - util.ServiceNamespaceLabel: endpointSlice.GetNamespace(), - util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName], - // indicate the Work should be not propagated since it's collected resource. - util.PropagationInstruction: util.PropagationInstructionSuppressed, + util.ServiceNamespaceLabel: endpointSlice.GetNamespace(), + util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName], util.EndpointSliceWorkManagedByLabel: util.ServiceExportKind, }, } diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go index 8f398166b9df..5a98a0d44720 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -386,7 +386,8 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un return err } - if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil { + // indicate the Work should be not propagated since it's collected resource. + if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil { klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err) return err } @@ -408,9 +409,7 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w ls := map[string]string{ util.MultiClusterServiceNamespaceLabel: endpointSlice.GetNamespace(), util.MultiClusterServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName], - // indicate the Work should be not propagated since it's collected resource. - util.PropagationInstruction: util.PropagationInstructionSuppressed, - util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind, + util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind, } if existWork.Labels == nil || (err != nil && apierrors.IsNotFound(err)) { workMeta := metav1.ObjectMeta{Name: workName, Namespace: ns, Labels: ls} diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go index 011f8e4ac4d8..86b9782e1a08 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go @@ -144,7 +144,6 @@ func TestGetEndpointSliceWorkMeta(t *testing.T) { Labels: map[string]string{ util.MultiClusterServiceNamespaceLabel: "default", util.MultiClusterServiceNameLabel: "test-service", - util.PropagationInstruction: util.PropagationInstructionSuppressed, util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind, }, }, @@ -159,7 +158,6 @@ func TestGetEndpointSliceWorkMeta(t *testing.T) { Labels: map[string]string{ util.MultiClusterServiceNamespaceLabel: "default", util.MultiClusterServiceNameLabel: "test-service", - util.PropagationInstruction: util.PropagationInstructionSuppressed, util.EndpointSliceWorkManagedByLabel: "ExistingController.MultiClusterService", }, Finalizers: []string{util.MCSEndpointSliceDispatchControllerFinalizer}, diff --git a/pkg/controllers/multiclusterservice/mcs_controller.go b/pkg/controllers/multiclusterservice/mcs_controller.go index a3558b6e4508..a6686e1f1ca4 100644 --- a/pkg/controllers/multiclusterservice/mcs_controller.go +++ b/pkg/controllers/multiclusterservice/mcs_controller.go @@ -299,7 +299,6 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n Labels: map[string]string{ // We add this id in mutating webhook, let's just use it networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel), - util.PropagationInstruction: util.PropagationInstructionSuppressed, util.MultiClusterServiceNamespaceLabel: mcs.Namespace, util.MultiClusterServiceNameLabel: mcs.Name, }, @@ -310,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err) return err } - if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil { + if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, ctrlutil.WithSuspendDispatching(true)); err != nil { klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v", mcs.Namespace, mcs.Name, clusterName, err) return err diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 03bb37af92f0..8a53688396f7 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -46,7 +46,7 @@ const ( // // Note: This instruction is intended to set on Work objects to indicate the Work should be ignored by // execution controller. The instruction maybe deprecated once we extend the Work API and no other scenario want this. - PropagationInstruction = "propagation.karmada.io/instruction" + PropagationInstruction = "propagation.karmada.io/instruction" // deprecated // FederatedResourceQuotaNamespaceLabel is added to Work to specify associated FederatedResourceQuota's namespace. FederatedResourceQuotaNamespaceLabel = "federatedresourcequota.karmada.io/namespace" diff --git a/pkg/util/helper/predicate.go b/pkg/util/helper/predicate.go index 433256b66929..9aa6e2e1a409 100644 --- a/pkg/util/helper/predicate.go +++ b/pkg/util/helper/predicate.go @@ -84,6 +84,11 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi return false } + if IsWorkSuspendDispatching(obj) { + klog.V(5).Infof("Ignored Work(%s/%s) %s event as dispatching is suspended.", obj.Namespace, obj.Name, eventType) + return false + } + clusterName, err := names.GetClusterName(obj.GetNamespace()) if err != nil { klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName()) @@ -176,6 +181,11 @@ func NewPredicateForServiceExportControllerOnAgent(curClusterName string) predic return false } + if IsWorkSuspendDispatching(obj) { + klog.V(5).Infof("Ignored Work(%s/%s) %s event as dispatching is suspended.", obj.Namespace, obj.Name, eventType) + return false + } + clusterName, err := names.GetClusterName(obj.GetNamespace()) if err != nil { klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName()) diff --git a/pkg/util/work.go b/pkg/util/work.go new file mode 100644 index 000000000000..cd8bcccca7ca --- /dev/null +++ b/pkg/util/work.go @@ -0,0 +1,30 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "strconv" + + "k8s.io/utils/ptr" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" +) + +// GetWorkSuspendDispatching will get suspendDispatching field from work spec +func GetWorkSuspendDispatching(spec *workv1alpha1.WorkSpec) []string { + return []string{strconv.FormatBool(ptr.Deref(spec.SuspendDispatching, false))} +}