Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace instruction annotation with work suspendDispatching field #6043

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
}
if err := mcs.IndexField(ctx.Mgr); err != nil {
return false, err
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
if err = mcs.IndexField(ctx.Mgr); err != nil {
return false, err
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down
47 changes: 47 additions & 0 deletions pkg/controllers/mcs/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2025 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mcs

import (
"context"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)

const (
workKeyIndex = "workSpec.suspendDispatching"
)

// IndexField registers Indexer functions to controller manager.
func IndexField(mgr controllerruntime.Manager) error {
workIndexerFunc := func(obj client.Object) []string {
work, ok := obj.(*workv1alpha1.Work)
if !ok {
return nil
}
return util.GetWorkSuspendDispatching(&work.Spec)
}

return utilerrors.NewAggregate([]error{
mgr.GetFieldIndexer().IndexField(context.TODO(), &workv1alpha1.Work{}, workKeyIndex, workIndexerFunc),
})
}
20 changes: 11 additions & 9 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -151,7 +152,9 @@ func (c *ServiceExportController) RunWorkQueue() {
func (c *ServiceExportController) enqueueReportedEpsServiceExport() {
workList := &workv1alpha1.WorkList{}
err := wait.PollUntilContextCancel(context.TODO(), 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
err = c.List(ctx, workList, client.MatchingLabels{util.PropagationInstruction: util.PropagationInstructionSuppressed})
err = c.List(ctx, workList, client.MatchingFields{
workKeyIndex: "true",
})
if err != nil {
klog.Errorf("Failed to list collected EndpointSlices Work from member clusters: %v", err)
return false, nil
Expand Down Expand Up @@ -428,10 +431,10 @@ func (c *ServiceExportController) removeOrphanWork(ctx context.Context, endpoint
if err := c.List(ctx, collectedEpsWorkList, &client.ListOptions{
Namespace: names.GenerateExecutionSpaceName(serviceExportKey.Cluster),
LabelSelector: labels.SelectorFromSet(labels.Set{
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ServiceNamespaceLabel: serviceExportKey.Namespace,
util.ServiceNameLabel: serviceExportKey.Name,
util.ServiceNamespaceLabel: serviceExportKey.Namespace,
util.ServiceNameLabel: serviceExportKey.Name,
}),
FieldSelector: fields.OneTermEqualSelector(workKeyIndex, "true"),
}); err != nil {
klog.Errorf("Failed to list endpointslice work with serviceExport(%s/%s) under namespace %s: %v",
serviceExportKey.Namespace, serviceExportKey.Name, names.GenerateExecutionSpaceName(serviceExportKey.Cluster), err)
Expand Down Expand Up @@ -495,7 +498,8 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
// indicate the Work should be not propagated since it's collected resource.
if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil {
return err
}

Expand All @@ -518,10 +522,8 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w
Namespace: ns,
Finalizers: []string{util.EndpointSliceControllerFinalizer},
Labels: map[string]string{
util.ServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
// indicate the Work should be not propagated since it's collected resource.
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.ServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
util.EndpointSliceWorkManagedByLabel: util.ServiceExportKind,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
// indicate the Work should be not propagated since it's collected resource.
if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand All @@ -408,9 +409,7 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w
ls := map[string]string{
util.MultiClusterServiceNamespaceLabel: endpointSlice.GetNamespace(),
util.MultiClusterServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName],
// indicate the Work should be not propagated since it's collected resource.
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind,
util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind,
}
if existWork.Labels == nil || (err != nil && apierrors.IsNotFound(err)) {
workMeta := metav1.ObjectMeta{Name: workName, Namespace: ns, Labels: ls}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func TestGetEndpointSliceWorkMeta(t *testing.T) {
Labels: map[string]string{
util.MultiClusterServiceNamespaceLabel: "default",
util.MultiClusterServiceNameLabel: "test-service",
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.EndpointSliceWorkManagedByLabel: util.MultiClusterServiceKind,
},
},
Expand All @@ -159,7 +158,6 @@ func TestGetEndpointSliceWorkMeta(t *testing.T) {
Labels: map[string]string{
util.MultiClusterServiceNamespaceLabel: "default",
util.MultiClusterServiceNameLabel: "test-service",
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.EndpointSliceWorkManagedByLabel: "ExistingController.MultiClusterService",
},
Finalizers: []string{util.MCSEndpointSliceDispatchControllerFinalizer},
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
Labels: map[string]string{
// We add this id in mutating webhook, let's just use it
networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel),
util.PropagationInstruction: util.PropagationInstructionSuppressed,
util.MultiClusterServiceNamespaceLabel: mcs.Namespace,
util.MultiClusterServiceNameLabel: mcs.Name,
},
Expand All @@ -310,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, ctrlutil.WithSuspendDispatching(true)); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
//
// Note: This instruction is intended to set on Work objects to indicate the Work should be ignored by
// execution controller. The instruction maybe deprecated once we extend the Work API and no other scenario want this.
PropagationInstruction = "propagation.karmada.io/instruction"
PropagationInstruction = "propagation.karmada.io/instruction" // deprecated

// FederatedResourceQuotaNamespaceLabel is added to Work to specify associated FederatedResourceQuota's namespace.
FederatedResourceQuotaNamespaceLabel = "federatedresourcequota.karmada.io/namespace"
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/helper/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi
return false
}

if IsWorkSuspendDispatching(obj) {
klog.V(5).Infof("Ignored Work(%s/%s) %s event as dispatching is suspended.", obj.Namespace, obj.Name, eventType)
return false
}

clusterName, err := names.GetClusterName(obj.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName())
Expand Down Expand Up @@ -176,6 +181,11 @@ func NewPredicateForServiceExportControllerOnAgent(curClusterName string) predic
return false
}

if IsWorkSuspendDispatching(obj) {
klog.V(5).Infof("Ignored Work(%s/%s) %s event as dispatching is suspended.", obj.Namespace, obj.Name, eventType)
return false
}

clusterName, err := names.GetClusterName(obj.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName())
Expand Down
30 changes: 30 additions & 0 deletions pkg/util/work.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2025 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"strconv"

"k8s.io/utils/ptr"

workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
)

// GetWorkSuspendDispatching will get suspendDispatching field from work spec
func GetWorkSuspendDispatching(spec *workv1alpha1.WorkSpec) []string {
return []string{strconv.FormatBool(ptr.Deref(spec.SuspendDispatching, false))}
}