diff --git a/internal/message/types.go b/internal/message/types.go index 3e3923e6cb2..2eee7f90345 100644 --- a/internal/message/types.go +++ b/internal/message/types.go @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() { // GatewayAPIStatuses contains gateway API resources statuses type GatewayAPIStatuses struct { - GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus] - HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus] - GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus] - TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus] - TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus] - UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus] + GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus] + GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus] + HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus] + GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus] + TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus] + TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus] + UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus] } func (s *GatewayAPIStatuses) Close() { diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index f71ebee9520..6803de62d5d 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -129,13 +129,25 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater } r.log.Info("created gatewayapi controller") - // Subscribe to status updates - r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) - // Watch resources if err := r.watchResources(ctx, mgr, c); err != nil { return fmt.Errorf("error watching resources: %w", err) } + + // When leader election is enabled, only subscribe to status updates upon acquiring leadership. + if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && + !ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + go func() { + select { + case <-ctx.Done(): + return + case <-mgr.Elected(): + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) + } + }() + } else { + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) + } return nil } @@ -199,9 +211,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil { if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil { msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err) - if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil { - r.log.Error(err, "unable to update GatewayClass status") - } + gc := status.SetGatewayClassAccepted( + managedGC.DeepCopy(), + false, + string(gwapiv1.GatewayClassReasonInvalidParameters), + msg) + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name) return reconcile.Result{}, err } @@ -293,11 +308,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // process envoy gateway secret refs r.processEnvoyProxySecretRef(ctx, gwcResource) - - if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil { - r.log.Error(err, "unable to update GatewayClass status") - return reconcile.Result{}, err - } + gc := status.SetGatewayClassAccepted( + managedGC.DeepCopy(), + true, + string(gwapiv1.GatewayClassReasonAccepted), + status.MsgValidGatewayClass) + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) if len(gwcResource.Gateways) == 0 { r.log.Info("No gateways found for accepted gatewayclass") diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index d25ec2fb7d4..16bb9361b04 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,7 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateStatusForGateway(ctx, gtw) + r.updateGatewayStatus(gtw) return false } @@ -528,7 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateStatusForGateway(ctx, gtw) + r.updateGatewayStatus(gtw) return false } } @@ -636,12 +636,32 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont } for _, gateway := range gateways.Items { - r.updateStatusForGateway(ctx, &gateway) + r.updateGatewayStatus(&gateway) } return nil } +// updateGatewayStatus triggers a status update for the Gateway. +func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { + gwName := utils.NamespacedName(gateway) + status := &gateway.Status + // Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator. + if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok { + status = existing + } + + // Since the status does not reflect the actual changed status, we need to delete it first + // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. + r.resources.GatewayStatuses.Delete(gwName) + // The status that is stored in the GatewayStatuses GatewayStatuses is solely used to trigger the status updater + // and does not reflect the real changed status. + // + // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, + // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. + r.resources.GatewayStatuses.Store(gwName, status) +} + func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool { ctx := context.Background() node, ok := obj.(*corev1.Node) diff --git a/internal/provider/kubernetes/status.go b/internal/provider/kubernetes/status.go index a59eb82f75a..d9ff03f9b66 100644 --- a/internal/provider/kubernetes/status.go +++ b/internal/provider/kubernetes/status.go @@ -10,7 +10,6 @@ import ( "fmt" "reflect" - kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -28,6 +27,35 @@ import ( // subscribeAndUpdateStatus subscribes to gateway API object status updates and // writes it into the Kubernetes API Server. func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) { + // GatewayClass object status updater + go func() { + message.HandleSubscription( + message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"}, + r.resources.GatewayClassStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) { + // skip delete updates. + if update.Delete { + return + } + + r.statusUpdater.Send(Update{ + NamespacedName: update.Key, + Resource: new(gwapiv1.GatewayClass), + Mutator: MutatorFunc(func(obj client.Object) client.Object { + gc, ok := obj.(*gwapiv1.GatewayClass) + if !ok { + panic(fmt.Sprintf("unsupported object type %T", obj)) + } + gcCopy := gc.DeepCopy() + gcCopy.Status = *update.Value + return gcCopy + }), + }) + }, + ) + r.log.Info("gatewayclass status subscriber shutting down") + }() + // Gateway object status updater go func() { message.HandleSubscription( @@ -564,34 +592,3 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * }), }) } - -func (r *gatewayAPIReconciler) updateStatusForGatewayClass( - ctx context.Context, - gc *gwapiv1.GatewayClass, - accepted bool, - reason, - msg string, -) error { - if r.statusUpdater != nil { - r.statusUpdater.Send(Update{ - NamespacedName: types.NamespacedName{Name: gc.Name}, - Resource: &gwapiv1.GatewayClass{}, - Mutator: MutatorFunc(func(obj client.Object) client.Object { - gc, ok := obj.(*gwapiv1.GatewayClass) - if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) - } - - return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg) - }), - }) - } else { - // this branch makes testing easier by not going through the status.Updater. - duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg) - - if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) { - return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err) - } - } - return nil -}