Skip to content

Commit

Permalink
fix(): Avoid unnecessary reconcile cycles for slice gw (#416)
Browse files Browse the repository at this point in the history
Signed-off-by: Bharath Horatti <[email protected]>
  • Loading branch information
bharath-avesha authored Jan 16, 2025
1 parent 2fd8592 commit 0294add
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 65 deletions.
73 changes: 70 additions & 3 deletions controllers/slicegateway/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -188,6 +189,7 @@ func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err := r.reconcileGatewayHeadlessService(ctx, sliceGw); err != nil {
return ctrl.Result{}, err
}

//create an endpoint if not exists
requeue, res, err := r.reconcileGatewayEndpoint(ctx, sliceGw)
if requeue {
Expand Down Expand Up @@ -279,7 +281,7 @@ func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

return ctrl.Result{
RequeueAfter: controllers.ReconcileInterval,
RequeueAfter: controllers.SliceGatewayReconcileInterval,
}, nil
}

Expand Down Expand Up @@ -357,6 +359,8 @@ func (r *SliceGwReconciler) findSliceGwObjectsToReconcile(ctx context.Context, p

podType := getPodType(podLabels)

r.Log.Info("Invoke SliceGw reconciler", "pod type update", podType)

sliceGwList := &kubeslicev1beta1.SliceGatewayList{}
var err error

Expand All @@ -381,6 +385,15 @@ func (r *SliceGwReconciler) findSliceGwObjectsToReconcile(ctx context.Context, p
if err != nil {
return []reconcile.Request{}
}
case "slicegateway":
sliceGwName, found := podLabels[controllers.SliceGatewaySelectorLabelKey]
if !found {
return []reconcile.Request{}
}
sliceGwList, err = r.findObjectsForSliceGwUpdate(sliceGwName)
if err != nil {
return []reconcile.Request{}
}
default:
return []reconcile.Request{}
}
Expand Down Expand Up @@ -414,9 +427,24 @@ func (r *SliceGwReconciler) sliceGwObjectsToReconcileForNodeRestart(ctx context.
return requests
}

func (r *SliceGwReconciler) findObjectsForSliceGwUpdate(sliceGwName string) (*kubeslicev1beta1.SliceGatewayList, error) {
sliceGwList := &kubeslicev1beta1.SliceGatewayList{}
sliceGw := kubeslicev1beta1.SliceGateway{}

err := r.Get(context.Background(), types.NamespacedName{Name: sliceGwName, Namespace: controllers.ControlPlaneNamespace}, &sliceGw)
if err != nil {
return nil, err
}

sliceGwList.Items = []kubeslicev1beta1.SliceGateway{sliceGw}

return sliceGwList, nil
}

func (r *SliceGwReconciler) findObjectsForSliceRouterUpdate(sliceName string) (*kubeslicev1beta1.SliceGatewayList, error) {
sliceGwList := &kubeslicev1beta1.SliceGatewayList{}
listOpts := []client.ListOption{
client.InNamespace(controllers.ControlPlaneNamespace),
client.MatchingLabels(map[string]string{controllers.ApplicationNamespaceSelectorLabelKey: sliceName}),
}
err := r.List(context.Background(), sliceGwList, listOpts...)
Expand Down Expand Up @@ -479,8 +507,15 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}

// Check for updates to slice gw pods
labelSelector.MatchLabels = map[string]string{webhook.PodInjectLabelKey: "slicegateway"}
slicegwPredicate, err := predicate.LabelSelectorPredicate(labelSelector)
if err != nil {
return err
}

sliceGwUpdPredicate := predicate.Or(
slicerouterPredicate, netopPredicate, nsmgrPredicate, nsmfwdPredicate,
slicerouterPredicate, netopPredicate, nsmgrPredicate, nsmfwdPredicate, slicegwPredicate,
)

// The slice gateway reconciler needs to be invoked whenever there is an update to the
Expand All @@ -507,7 +542,39 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error {
).
Watches(&corev1.Node{},
handler.EnqueueRequestsFromMapFunc(r.sliceGwObjectsToReconcileForNodeRestart),
builder.WithPredicates(nodePredicate),
builder.WithPredicates(nodePredicate, predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj, ok := e.ObjectOld.(*corev1.Node)
if !ok {
return true
}
newObj, ok := e.ObjectNew.(*corev1.Node)
if !ok {
// The reconcile would be triggered by the Create func
return false
}
if oldObj.ObjectMeta.Labels != nil {
nodelabel, ok := oldObj.ObjectMeta.Labels[controllers.NodeTypeSelectorLabelKey]
if !ok {
// Check if the label was added to the node
nodelabel, ok = newObj.ObjectMeta.Labels[controllers.NodeTypeSelectorLabelKey]
if ok && nodelabel == "gateway" {
return true
}
}
}
return false
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}),
).
Complete(r)
}
Expand Down
43 changes: 18 additions & 25 deletions controllers/slicegateway/slicegateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,17 +700,29 @@ func (r *SliceGwReconciler) ReconcileGwPodStatus(ctx context.Context, slicegatew

if toUpdate {
log.Info("gwPodsInfo", "gwPodsInfo", gwPodsInfo)
slicegateway.Status.GatewayPodStatus = gwPodsInfo
slicegateway.Status.ConnectionContextUpdatedOn = 0
err := r.Status().Update(ctx, slicegateway)
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := r.Get(ctx, types.NamespacedName{Namespace: controllers.ControlPlaneNamespace, Name: slicegateway.Name}, slicegateway)
if err != nil {
log.Error(err, "Failed to get SliceGateway")
return err
}
slicegateway.Status.GatewayPodStatus = gwPodsInfo
slicegateway.Status.ConnectionContextUpdatedOn = 0
err = r.Status().Update(ctx, slicegateway)
if err != nil {
debugLog.Error(err, "Failed to update SliceGateway status for gateway status")
return err
}
return nil
})
if err != nil {
debugLog.Error(err, "error while update", "Failed to update SliceGateway status for gateway status")
return ctrl.Result{}, err, true
log.Error(err, "Failed to update SliceGateway status for gw pods")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil, true
}
toReconcile = true
}
if toReconcile {
return ctrl.Result{}, err, true
return ctrl.Result{}, nil, true
}
return ctrl.Result{}, nil, false
}
Expand Down Expand Up @@ -745,25 +757,6 @@ func (r *SliceGwReconciler) SendConnectionContextAndQosToGwPod(ctx context.Conte
log.Error(err, "Failed to send qos to gateway")
return ctrl.Result{RequeueAfter: 10 * time.Second}, err, true
}

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := r.Get(ctx, req.NamespacedName, slicegateway)
if err != nil {
log.Error(err, "Failed to get SliceGateway")
return err
}
slicegateway.Status.ConnectionContextUpdatedOn = time.Now().Unix()
err = r.Status().Update(ctx, slicegateway)
if err != nil {
log.Error(err, "Failed to update SliceGateway status for conn ctx update in retry loop")
return err
}
return nil
})
if err != nil {
log.Error(err, "Failed to update SliceGateway status for conn ctx update")
return ctrl.Result{}, err, true
}
}
return ctrl.Result{}, nil, false
}
Expand Down
5 changes: 5 additions & 0 deletions controllers/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ var (
ImagePullSecretName = utils.GetEnvOrDefault("IMAGE_PULL_SECRET_NAME", "kubeslice-nexus")

ReconcileInterval = 10 * time.Second
// This value is the periodic reconcile interval for slicegateway CRs. The slicegateway CRD reconciler is set up to
// be event driven. In addition to being triggered due to updates to the slicegateway CR objects, it is also invoked
// in an event driven manner whenever important pods like the slice router, slice gw pods, and other critical infra
// pods restart. Hence, it does not really need an aggressive periodic reconcile interval.
SliceGatewayReconcileInterval = 120 * time.Second
)

const (
Expand Down
113 changes: 79 additions & 34 deletions pkg/hub/controllers/slicegateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1"
"github.com/kubeslice/worker-operator/controllers"
ossEvents "github.com/kubeslice/worker-operator/events"
hubutils "github.com/kubeslice/worker-operator/pkg/hub"
"github.com/kubeslice/worker-operator/pkg/logger"
"github.com/kubeslice/worker-operator/pkg/utils"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -94,47 +95,91 @@ func (r *SliceGwReconciler) Reconcile(ctx context.Context, req reconcile.Request
} else {
utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWCreated, sliceGWController)
}
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
sliceGwRef := client.ObjectKey{
Name: sliceGwName,
Namespace: ControlPlaneNamespace,
}
err := r.MeshClient.Get(ctx, sliceGwRef, meshSliceGw)
if err != nil {
return err
}
meshSliceGw.Status.Config = kubeslicev1beta1.SliceGatewayConfig{
SliceName: sliceGw.Spec.SliceName,
SliceGatewayID: sliceGw.Spec.LocalGatewayConfig.GatewayName,
SliceGatewaySubnet: sliceGw.Spec.LocalGatewayConfig.GatewaySubnet,
SliceGatewayRemoteSubnet: sliceGw.Spec.RemoteGatewayConfig.GatewaySubnet,
SliceGatewayHostType: sliceGw.Spec.GatewayHostType,
SliceGatewayRemoteNodeIPs: sliceGw.Spec.RemoteGatewayConfig.NodeIps,
SliceGatewayRemoteNodePorts: sliceGw.Spec.RemoteGatewayConfig.NodePorts,
SliceGatewayRemoteClusterID: sliceGw.Spec.RemoteGatewayConfig.ClusterName,
SliceGatewayRemoteGatewayID: sliceGw.Spec.RemoteGatewayConfig.GatewayName,
SliceGatewayLocalVpnIP: sliceGw.Spec.LocalGatewayConfig.VpnIp,
SliceGatewayRemoteVpnIP: sliceGw.Spec.RemoteGatewayConfig.VpnIp,
SliceGatewayName: strconv.Itoa(sliceGw.Spec.GatewayNumber),
SliceGatewayIntermediateDeployments: meshSliceGw.Status.Config.SliceGatewayIntermediateDeployments,
SliceGatewayConnectivityType: sliceGw.Spec.GatewayConnectivityType,
SliceGatewayProtocol: sliceGw.Spec.GatewayProtocol,
SliceGatewayServerLBIPs: sliceGw.Spec.RemoteGatewayConfig.LoadBalancerIps,

// Update the slicegateway CR on the worker cluster only if something has changed.
toUpdate := false
sliceGwRef := client.ObjectKey{
Name: sliceGwName,
Namespace: ControlPlaneNamespace,
}
err = r.MeshClient.Get(ctx, sliceGwRef, meshSliceGw)
if err != nil {
return reconcile.Result{}, err
}
// First check all the static fields.
if meshSliceGw.Status.Config.SliceGatewayID != sliceGw.Spec.LocalGatewayConfig.GatewayName ||
meshSliceGw.Status.Config.SliceGatewaySubnet != sliceGw.Spec.LocalGatewayConfig.GatewaySubnet ||
meshSliceGw.Status.Config.SliceGatewayRemoteSubnet != sliceGw.Spec.RemoteGatewayConfig.GatewaySubnet ||
meshSliceGw.Status.Config.SliceGatewayHostType != sliceGw.Spec.GatewayHostType ||
meshSliceGw.Status.Config.SliceGatewayRemoteClusterID != sliceGw.Spec.RemoteGatewayConfig.ClusterName ||
meshSliceGw.Status.Config.SliceGatewayRemoteGatewayID != sliceGw.Spec.RemoteGatewayConfig.GatewayName ||
meshSliceGw.Status.Config.SliceGatewayName != strconv.Itoa(sliceGw.Spec.GatewayNumber) ||
meshSliceGw.Status.Config.SliceGatewayConnectivityType != sliceGw.Spec.GatewayConnectivityType ||
meshSliceGw.Status.Config.SliceGatewayProtocol != sliceGw.Spec.GatewayProtocol {
toUpdate = true
}
// If no change in static fields, check the dynamic fields
if !toUpdate {
// For client type, check the following fields
if meshSliceGw.Status.Config.SliceGatewayHostType == "Client" {
if !hubutils.ListEqual(meshSliceGw.Status.Config.SliceGatewayRemoteNodeIPs, sliceGw.Spec.RemoteGatewayConfig.NodeIps) {
log.Info("Update from hub: Node IPs changed", "SliceGw", sliceGw.Name, "NodeIPs", sliceGw.Spec.RemoteGatewayConfig.NodeIps)
toUpdate = true
}
// Next check node port
if !toUpdate {
if !hubutils.ListEqual(meshSliceGw.Status.Config.SliceGatewayNodePorts, sliceGw.Spec.RemoteGatewayConfig.NodePorts) {
log.Info("Update from hub: NodePort numbers changed", "SliceGw", sliceGw.Name, "NodePorts", sliceGw.Spec.RemoteGatewayConfig.NodePorts)
toUpdate = true
}
}
}
// Nothing dynamic to check for the server type
}

err = r.MeshClient.Status().Update(ctx, meshSliceGw)
if toUpdate {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
sliceGwRef := client.ObjectKey{
Name: sliceGwName,
Namespace: ControlPlaneNamespace,
}
err := r.MeshClient.Get(ctx, sliceGwRef, meshSliceGw)
if err != nil {
return err
}
meshSliceGw.Status.Config = kubeslicev1beta1.SliceGatewayConfig{
SliceName: sliceGw.Spec.SliceName,
SliceGatewayID: sliceGw.Spec.LocalGatewayConfig.GatewayName,
SliceGatewaySubnet: sliceGw.Spec.LocalGatewayConfig.GatewaySubnet,
SliceGatewayRemoteSubnet: sliceGw.Spec.RemoteGatewayConfig.GatewaySubnet,
SliceGatewayHostType: sliceGw.Spec.GatewayHostType,
SliceGatewayRemoteNodeIPs: sliceGw.Spec.RemoteGatewayConfig.NodeIps,
SliceGatewayRemoteNodePorts: sliceGw.Spec.RemoteGatewayConfig.NodePorts,
SliceGatewayRemoteClusterID: sliceGw.Spec.RemoteGatewayConfig.ClusterName,
SliceGatewayRemoteGatewayID: sliceGw.Spec.RemoteGatewayConfig.GatewayName,
SliceGatewayLocalVpnIP: sliceGw.Spec.LocalGatewayConfig.VpnIp,
SliceGatewayRemoteVpnIP: sliceGw.Spec.RemoteGatewayConfig.VpnIp,
SliceGatewayName: strconv.Itoa(sliceGw.Spec.GatewayNumber),
SliceGatewayIntermediateDeployments: meshSliceGw.Status.Config.SliceGatewayIntermediateDeployments,
SliceGatewayConnectivityType: sliceGw.Spec.GatewayConnectivityType,
SliceGatewayProtocol: sliceGw.Spec.GatewayProtocol,
SliceGatewayServerLBIPs: sliceGw.Spec.RemoteGatewayConfig.LoadBalancerIps,
}

err = r.MeshClient.Status().Update(ctx, meshSliceGw)
if err != nil {
return err
}
utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWUpdated, sliceGWController)
return nil
})
if err != nil {
utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWUpdateFailed, sliceGWController)
log.Error(err, "unable to update sliceGw status in spoke cluster", "sliceGw", sliceGwName)
return err
} else {
utils.RecordEvent(ctx, r.EventRecorder, sliceGw, nil, ossEvents.EventSliceGWUpdated, sliceGWController)
return reconcile.Result{}, err
}
return nil
})
if err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/hub/hubclient/hubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"fmt"
"os"
"reflect"

apierrors "k8s.io/apimachinery/pkg/api/errors"

Expand All @@ -38,6 +37,7 @@ import (
hubv1alpha1 "github.com/kubeslice/apis/pkg/controller/v1alpha1"
spokev1alpha1 "github.com/kubeslice/apis/pkg/worker/v1alpha1"
kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1"
hubutils "github.com/kubeslice/worker-operator/pkg/hub"
"github.com/kubeslice/worker-operator/pkg/logger"
"github.com/kubeslice/worker-operator/pkg/monitoring"
)
Expand Down Expand Up @@ -168,8 +168,8 @@ func (hubClient *HubClientConfig) UpdateNodePortForSliceGwServer(ctx context.Con
return err
}

if reflect.DeepEqual(sliceGw.Spec.LocalGatewayConfig.NodePorts, sliceGwNodePorts) {
// No update needed
// If the node ports on the controller cluster and the worker are the same, no need to update
if hubutils.ListEqual(sliceGw.Spec.LocalGatewayConfig.NodePorts, sliceGwNodePorts) {
return nil
}

Expand Down
Loading

0 comments on commit 0294add

Please sign in to comment.