diff --git a/controllers/slicegateway/reconciler.go b/controllers/slicegateway/reconciler.go index 817688c72..c96ae4807 100644 --- a/controllers/slicegateway/reconciler.go +++ b/controllers/slicegateway/reconciler.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -188,6 +189,7 @@ func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if err := r.reconcileGatewayHeadlessService(ctx, sliceGw); err != nil { return ctrl.Result{}, err } + //create an endpoint if not exists requeue, res, err := r.reconcileGatewayEndpoint(ctx, sliceGw) if requeue { @@ -279,7 +281,7 @@ func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } return ctrl.Result{ - RequeueAfter: controllers.ReconcileInterval, + RequeueAfter: controllers.SliceGatewayReconcileInterval, }, nil } @@ -357,6 +359,8 @@ func (r *SliceGwReconciler) findSliceGwObjectsToReconcile(ctx context.Context, p podType := getPodType(podLabels) + r.Log.Info("Invoke SliceGw reconciler", "pod type update", podType) + sliceGwList := &kubeslicev1beta1.SliceGatewayList{} var err error @@ -381,6 +385,15 @@ func (r *SliceGwReconciler) findSliceGwObjectsToReconcile(ctx context.Context, p if err != nil { return []reconcile.Request{} } + case "slicegateway": + sliceGwName, found := podLabels[controllers.SliceGatewaySelectorLabelKey] + if !found { + return []reconcile.Request{} + } + sliceGwList, err = r.findObjectsForSliceGwUpdate(sliceGwName) + if err != nil { + return []reconcile.Request{} + } default: return []reconcile.Request{} } @@ -414,9 +427,24 @@ func (r *SliceGwReconciler) sliceGwObjectsToReconcileForNodeRestart(ctx context. return requests } +func (r *SliceGwReconciler) findObjectsForSliceGwUpdate(sliceGwName string) (*kubeslicev1beta1.SliceGatewayList, error) { + sliceGwList := &kubeslicev1beta1.SliceGatewayList{} + sliceGw := kubeslicev1beta1.SliceGateway{} + + err := r.Get(context.Background(), types.NamespacedName{Name: sliceGwName, Namespace: controllers.ControlPlaneNamespace}, &sliceGw) + if err != nil { + return nil, err + } + + sliceGwList.Items = []kubeslicev1beta1.SliceGateway{sliceGw} + + return sliceGwList, nil +} + func (r *SliceGwReconciler) findObjectsForSliceRouterUpdate(sliceName string) (*kubeslicev1beta1.SliceGatewayList, error) { sliceGwList := &kubeslicev1beta1.SliceGatewayList{} listOpts := []client.ListOption{ + client.InNamespace(controllers.ControlPlaneNamespace), client.MatchingLabels(map[string]string{controllers.ApplicationNamespaceSelectorLabelKey: sliceName}), } err := r.List(context.Background(), sliceGwList, listOpts...) @@ -479,8 +507,15 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error { return err } + // Check for updates to slice gw pods + labelSelector.MatchLabels = map[string]string{webhook.PodInjectLabelKey: "slicegateway"} + slicegwPredicate, err := predicate.LabelSelectorPredicate(labelSelector) + if err != nil { + return err + } + sliceGwUpdPredicate := predicate.Or( - slicerouterPredicate, netopPredicate, nsmgrPredicate, nsmfwdPredicate, + slicerouterPredicate, netopPredicate, nsmgrPredicate, nsmfwdPredicate, slicegwPredicate, ) // The slice gateway reconciler needs to be invoked whenever there is an update to the @@ -507,7 +542,39 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error { ). Watches(&corev1.Node{}, handler.EnqueueRequestsFromMapFunc(r.sliceGwObjectsToReconcileForNodeRestart), - builder.WithPredicates(nodePredicate), + builder.WithPredicates(nodePredicate, predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj, ok := e.ObjectOld.(*corev1.Node) + if !ok { + return true + } + newObj, ok := e.ObjectNew.(*corev1.Node) + if !ok { + // The reconcile would be triggered by the Create func + return false + } + if oldObj.ObjectMeta.Labels != nil { + nodelabel, ok := oldObj.ObjectMeta.Labels[controllers.NodeTypeSelectorLabelKey] + if !ok { + // Check if the label was added to the node + nodelabel, ok = newObj.ObjectMeta.Labels[controllers.NodeTypeSelectorLabelKey] + if ok && nodelabel == "gateway" { + return true + } + } + } + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + }), ). Complete(r) } diff --git a/controllers/slicegateway/slicegateway.go b/controllers/slicegateway/slicegateway.go index a1019cd7d..432fd0963 100644 --- a/controllers/slicegateway/slicegateway.go +++ b/controllers/slicegateway/slicegateway.go @@ -700,17 +700,29 @@ func (r *SliceGwReconciler) ReconcileGwPodStatus(ctx context.Context, slicegatew if toUpdate { log.Info("gwPodsInfo", "gwPodsInfo", gwPodsInfo) - slicegateway.Status.GatewayPodStatus = gwPodsInfo - slicegateway.Status.ConnectionContextUpdatedOn = 0 - err := r.Status().Update(ctx, slicegateway) + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := r.Get(ctx, types.NamespacedName{Namespace: controllers.ControlPlaneNamespace, Name: slicegateway.Name}, slicegateway) + if err != nil { + log.Error(err, "Failed to get SliceGateway") + return err + } + slicegateway.Status.GatewayPodStatus = gwPodsInfo + slicegateway.Status.ConnectionContextUpdatedOn = 0 + err = r.Status().Update(ctx, slicegateway) + if err != nil { + debugLog.Error(err, "Failed to update SliceGateway status for gateway status") + return err + } + return nil + }) if err != nil { - debugLog.Error(err, "error while update", "Failed to update SliceGateway status for gateway status") - return ctrl.Result{}, err, true + log.Error(err, "Failed to update SliceGateway status for gw pods") + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil, true } toReconcile = true } if toReconcile { - return ctrl.Result{}, err, true + return ctrl.Result{}, nil, true } return ctrl.Result{}, nil, false } @@ -745,25 +757,6 @@ func (r *SliceGwReconciler) SendConnectionContextAndQosToGwPod(ctx context.Conte log.Error(err, "Failed to send qos to gateway") return ctrl.Result{RequeueAfter: 10 * time.Second}, err, true } - - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - err := r.Get(ctx, req.NamespacedName, slicegateway) - if err != nil { - log.Error(err, "Failed to get SliceGateway") - return err - } - slicegateway.Status.ConnectionContextUpdatedOn = time.Now().Unix() - err = r.Status().Update(ctx, slicegateway) - if err != nil { - log.Error(err, "Failed to update SliceGateway status for conn ctx update in retry loop") - return err - } - return nil - }) - if err != nil { - log.Error(err, "Failed to update SliceGateway status for conn ctx update") - return ctrl.Result{}, err, true - } } return ctrl.Result{}, nil, false } diff --git a/controllers/vars.go b/controllers/vars.go index 9e7b07af8..a97f6470d 100644 --- a/controllers/vars.go +++ b/controllers/vars.go @@ -37,6 +37,11 @@ var ( ImagePullSecretName = utils.GetEnvOrDefault("IMAGE_PULL_SECRET_NAME", "kubeslice-nexus") ReconcileInterval = 10 * time.Second + // This value is the periodic reconcile interval for slicegateway CRs. The slicegateway CRD reconciler is set up to + // be event driven. In addition to being triggered due to updates to the slicegateway CR objects, it is also invoked + // in an event driven manner whenever important pods like the slice router, slice gw pods, and other critical infra + // pods restart. Hence, it does not really need an aggressive periodic reconcile interval. + SliceGatewayReconcileInterval = 120 * time.Second ) const ( diff --git a/pkg/hub/controllers/slicegateway_controller.go b/pkg/hub/controllers/slicegateway_controller.go index 4e0e4982b..d5ff04ee3 100644 --- a/pkg/hub/controllers/slicegateway_controller.go +++ b/pkg/hub/controllers/slicegateway_controller.go @@ -30,6 +30,7 @@ import ( kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1" "github.com/kubeslice/worker-operator/controllers" ossEvents "github.com/kubeslice/worker-operator/events" + hubutils "github.com/kubeslice/worker-operator/pkg/hub" "github.com/kubeslice/worker-operator/pkg/logger" "github.com/kubeslice/worker-operator/pkg/utils" corev1 "k8s.io/api/core/v1" @@ -94,47 +95,91 @@ func (r *SliceGwReconciler) Reconcile(ctx context.Context, req reconcile.Request } else { utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWCreated, sliceGWController) } - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - sliceGwRef := client.ObjectKey{ - Name: sliceGwName, - Namespace: ControlPlaneNamespace, - } - err := r.MeshClient.Get(ctx, sliceGwRef, meshSliceGw) - if err != nil { - return err - } - meshSliceGw.Status.Config = kubeslicev1beta1.SliceGatewayConfig{ - SliceName: sliceGw.Spec.SliceName, - SliceGatewayID: sliceGw.Spec.LocalGatewayConfig.GatewayName, - SliceGatewaySubnet: sliceGw.Spec.LocalGatewayConfig.GatewaySubnet, - SliceGatewayRemoteSubnet: sliceGw.Spec.RemoteGatewayConfig.GatewaySubnet, - SliceGatewayHostType: sliceGw.Spec.GatewayHostType, - SliceGatewayRemoteNodeIPs: sliceGw.Spec.RemoteGatewayConfig.NodeIps, - SliceGatewayRemoteNodePorts: sliceGw.Spec.RemoteGatewayConfig.NodePorts, - SliceGatewayRemoteClusterID: sliceGw.Spec.RemoteGatewayConfig.ClusterName, - SliceGatewayRemoteGatewayID: sliceGw.Spec.RemoteGatewayConfig.GatewayName, - SliceGatewayLocalVpnIP: sliceGw.Spec.LocalGatewayConfig.VpnIp, - SliceGatewayRemoteVpnIP: sliceGw.Spec.RemoteGatewayConfig.VpnIp, - SliceGatewayName: strconv.Itoa(sliceGw.Spec.GatewayNumber), - SliceGatewayIntermediateDeployments: meshSliceGw.Status.Config.SliceGatewayIntermediateDeployments, - SliceGatewayConnectivityType: sliceGw.Spec.GatewayConnectivityType, - SliceGatewayProtocol: sliceGw.Spec.GatewayProtocol, - SliceGatewayServerLBIPs: sliceGw.Spec.RemoteGatewayConfig.LoadBalancerIps, + + // Update the slicegateway CR on the worker cluster only if something has changed. + toUpdate := false + sliceGwRef := client.ObjectKey{ + Name: sliceGwName, + Namespace: ControlPlaneNamespace, + } + err = r.MeshClient.Get(ctx, sliceGwRef, meshSliceGw) + if err != nil { + return reconcile.Result{}, err + } + // First check all the static fields. + if meshSliceGw.Status.Config.SliceGatewayID != sliceGw.Spec.LocalGatewayConfig.GatewayName || + meshSliceGw.Status.Config.SliceGatewaySubnet != sliceGw.Spec.LocalGatewayConfig.GatewaySubnet || + meshSliceGw.Status.Config.SliceGatewayRemoteSubnet != sliceGw.Spec.RemoteGatewayConfig.GatewaySubnet || + meshSliceGw.Status.Config.SliceGatewayHostType != sliceGw.Spec.GatewayHostType || + meshSliceGw.Status.Config.SliceGatewayRemoteClusterID != sliceGw.Spec.RemoteGatewayConfig.ClusterName || + meshSliceGw.Status.Config.SliceGatewayRemoteGatewayID != sliceGw.Spec.RemoteGatewayConfig.GatewayName || + meshSliceGw.Status.Config.SliceGatewayName != strconv.Itoa(sliceGw.Spec.GatewayNumber) || + meshSliceGw.Status.Config.SliceGatewayConnectivityType != sliceGw.Spec.GatewayConnectivityType || + meshSliceGw.Status.Config.SliceGatewayProtocol != sliceGw.Spec.GatewayProtocol { + toUpdate = true + } + // If no change in static fields, check the dynamic fields + if !toUpdate { + // For client type, check the following fields + if meshSliceGw.Status.Config.SliceGatewayHostType == "Client" { + if !hubutils.ListEqual(meshSliceGw.Status.Config.SliceGatewayRemoteNodeIPs, sliceGw.Spec.RemoteGatewayConfig.NodeIps) { + log.Info("Update from hub: Node IPs changed", "SliceGw", sliceGw.Name, "NodeIPs", sliceGw.Spec.RemoteGatewayConfig.NodeIps) + toUpdate = true + } + // Next check node port + if !toUpdate { + if !hubutils.ListEqual(meshSliceGw.Status.Config.SliceGatewayNodePorts, sliceGw.Spec.RemoteGatewayConfig.NodePorts) { + log.Info("Update from hub: NodePort numbers changed", "SliceGw", sliceGw.Name, "NodePorts", sliceGw.Spec.RemoteGatewayConfig.NodePorts) + toUpdate = true + } + } } + // Nothing dynamic to check for the server type + } - err = r.MeshClient.Status().Update(ctx, meshSliceGw) + if toUpdate { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + sliceGwRef := client.ObjectKey{ + Name: sliceGwName, + Namespace: ControlPlaneNamespace, + } + err := r.MeshClient.Get(ctx, sliceGwRef, meshSliceGw) + if err != nil { + return err + } + meshSliceGw.Status.Config = kubeslicev1beta1.SliceGatewayConfig{ + SliceName: sliceGw.Spec.SliceName, + SliceGatewayID: sliceGw.Spec.LocalGatewayConfig.GatewayName, + SliceGatewaySubnet: sliceGw.Spec.LocalGatewayConfig.GatewaySubnet, + SliceGatewayRemoteSubnet: sliceGw.Spec.RemoteGatewayConfig.GatewaySubnet, + SliceGatewayHostType: sliceGw.Spec.GatewayHostType, + SliceGatewayRemoteNodeIPs: sliceGw.Spec.RemoteGatewayConfig.NodeIps, + SliceGatewayRemoteNodePorts: sliceGw.Spec.RemoteGatewayConfig.NodePorts, + SliceGatewayRemoteClusterID: sliceGw.Spec.RemoteGatewayConfig.ClusterName, + SliceGatewayRemoteGatewayID: sliceGw.Spec.RemoteGatewayConfig.GatewayName, + SliceGatewayLocalVpnIP: sliceGw.Spec.LocalGatewayConfig.VpnIp, + SliceGatewayRemoteVpnIP: sliceGw.Spec.RemoteGatewayConfig.VpnIp, + SliceGatewayName: strconv.Itoa(sliceGw.Spec.GatewayNumber), + SliceGatewayIntermediateDeployments: meshSliceGw.Status.Config.SliceGatewayIntermediateDeployments, + SliceGatewayConnectivityType: sliceGw.Spec.GatewayConnectivityType, + SliceGatewayProtocol: sliceGw.Spec.GatewayProtocol, + SliceGatewayServerLBIPs: sliceGw.Spec.RemoteGatewayConfig.LoadBalancerIps, + } + + err = r.MeshClient.Status().Update(ctx, meshSliceGw) + if err != nil { + return err + } + utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWUpdated, sliceGWController) + return nil + }) if err != nil { utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWUpdateFailed, sliceGWController) log.Error(err, "unable to update sliceGw status in spoke cluster", "sliceGw", sliceGwName) - return err - } else { - utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWUpdated, sliceGWController) + return reconcile.Result{}, err } - return nil - }) - if err != nil { - return reconcile.Result{}, err } + return reconcile.Result{}, nil } diff --git a/pkg/hub/hubclient/hubclient.go b/pkg/hub/hubclient/hubclient.go index 597038c93..1c01da909 100644 --- a/pkg/hub/hubclient/hubclient.go +++ b/pkg/hub/hubclient/hubclient.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "os" - "reflect" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -38,6 +37,7 @@ import ( hubv1alpha1 "github.com/kubeslice/apis/pkg/controller/v1alpha1" spokev1alpha1 "github.com/kubeslice/apis/pkg/worker/v1alpha1" kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1" + hubutils "github.com/kubeslice/worker-operator/pkg/hub" "github.com/kubeslice/worker-operator/pkg/logger" "github.com/kubeslice/worker-operator/pkg/monitoring" ) @@ -168,8 +168,8 @@ func (hubClient *HubClientConfig) UpdateNodePortForSliceGwServer(ctx context.Con return err } - if reflect.DeepEqual(sliceGw.Spec.LocalGatewayConfig.NodePorts, sliceGwNodePorts) { - // No update needed + // If the node ports on the controller cluster and the worker are the same, no need to update + if hubutils.ListEqual(sliceGw.Spec.LocalGatewayConfig.NodePorts, sliceGwNodePorts) { return nil } diff --git a/pkg/hub/utils.go b/pkg/hub/utils.go new file mode 100644 index 000000000..8335ecd34 --- /dev/null +++ b/pkg/hub/utils.go @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2025 Avesha, Inc. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hubutils + +func ListContains[T comparable](li []T, v T) bool { + for _, val := range li { + if val == v { + return true + } + } + return false +} + +func ListEqual[T comparable](l1, l2 []T) bool { + if len(l1) != len(l2) { + return false + } + for _, val := range l1 { + if !ListContains(l2, val) { + return false + } + } + + return true +}