diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index c842c184e4c..af05dac0753 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -7,6 +7,7 @@ package config import ( "errors" + "sync" egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/api/v1alpha1/validation" @@ -37,19 +38,22 @@ type Server struct { // Logger is the logr implementation used by Envoy Gateway. Logger logging.Logger // Elected chan is used to signal what a leader is elected - Elected chan struct{} + Elected *sync.WaitGroup } // New returns a Server with default parameters. func New() (*Server, error) { - return &Server{ + server := &Server{ EnvoyGateway: egv1a1.DefaultEnvoyGateway(), Namespace: env.Lookup("ENVOY_GATEWAY_NAMESPACE", DefaultNamespace), DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain), // the default logger Logger: logging.DefaultLogger(egv1a1.LogLevelInfo), - Elected: make(chan struct{}), - }, nil + Elected: &sync.WaitGroup{}, + } + // Block the tasks that are waiting for the leader to be elected + server.Elected.Add(1) + return server, nil } // Validate validates a Server config. diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index 6896a6e5a16..3344ca0d349 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -72,12 +72,8 @@ func (r *Runner) Start(ctx context.Context) (err error) { if r.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && !ptr.Deref(r.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { go func() { - select { - case <-ctx.Done(): - return - case <-r.Elected: - initInfra() - } + r.Elected.Wait() + initInfra() }() return } diff --git a/internal/message/types.go b/internal/message/types.go index 3e3923e6cb2..2eee7f90345 100644 --- a/internal/message/types.go +++ b/internal/message/types.go @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() { // GatewayAPIStatuses contains gateway API resources statuses type GatewayAPIStatuses struct { - GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus] - HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus] - GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus] - TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus] - TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus] - UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus] + GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus] + GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus] + HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus] + GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus] + TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus] + TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus] + UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus] } func (s *GatewayAPIStatuses) Close() { diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index f71ebee9520..bcb6fa8772c 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -129,13 +129,21 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater } r.log.Info("created gatewayapi controller") - // Subscribe to status updates - r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) - // Watch resources if err := r.watchResources(ctx, mgr, c); err != nil { return fmt.Errorf("error watching resources: %w", err) } + + // When leader election is enabled, only subscribe to status updates upon acquiring leadership. + if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && + !ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + go func() { + cfg.Elected.Wait() + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) + }() + } else { + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) + } return nil } @@ -199,9 +207,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil { if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil { msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err) - if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil { - r.log.Error(err, "unable to update GatewayClass status") - } + gc := status.SetGatewayClassAccepted( + managedGC.DeepCopy(), + false, + string(gwapiv1.GatewayClassReasonInvalidParameters), + msg) + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name) return reconcile.Result{}, err } @@ -293,11 +304,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // process envoy gateway secret refs r.processEnvoyProxySecretRef(ctx, gwcResource) - - if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil { - r.log.Error(err, "unable to update GatewayClass status") - return reconcile.Result{}, err - } + gc := status.SetGatewayClassAccepted( + managedGC.DeepCopy(), + true, + string(gwapiv1.GatewayClassReasonAccepted), + status.MsgValidGatewayClass) + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) if len(gwcResource.Gateways) == 0 { r.log.Info("No gateways found for accepted gatewayclass") diff --git a/internal/provider/kubernetes/kubernetes.go b/internal/provider/kubernetes/kubernetes.go index 4fdbc329dd0..56f96e70a18 100644 --- a/internal/provider/kubernetes/kubernetes.go +++ b/internal/provider/kubernetes/kubernetes.go @@ -36,40 +36,40 @@ type Provider struct { } // New creates a new Provider from the provided EnvoyGateway. -func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) (*Provider, error) { +func New(restCfg *rest.Config, svrCfg *ec.Server, resources *message.ProviderResources) (*Provider, error) { // TODO: Decide which mgr opts should be exposed through envoygateway.provider.kubernetes API. mgrOpts := manager.Options{ Scheme: envoygateway.GetScheme(), - Logger: svr.Logger.Logger, + Logger: svrCfg.Logger.Logger, HealthProbeBindAddress: ":8081", LeaderElectionID: "5b9825d2.gateway.envoyproxy.io", - LeaderElectionNamespace: svr.Namespace, + LeaderElectionNamespace: svrCfg.Namespace, } log.SetLogger(mgrOpts.Logger) klog.SetLogger(mgrOpts.Logger) - if !ptr.Deref(svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + if !ptr.Deref(svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { mgrOpts.LeaderElection = true - if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil { - ld, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration)) + if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil { + ld, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration)) if err != nil { return nil, err } mgrOpts.LeaseDuration = ptr.To(ld) } - if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil { - rp, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod)) + if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil { + rp, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod)) if err != nil { return nil, err } mgrOpts.RetryPeriod = ptr.To(rp) } - if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil { - rd, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline)) + if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil { + rd, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline)) if err != nil { return nil, err } @@ -78,13 +78,13 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) mgrOpts.Controller = config.Controller{NeedLeaderElection: ptr.To(false)} } - if svr.EnvoyGateway.NamespaceMode() { + if svrCfg.EnvoyGateway.NamespaceMode() { mgrOpts.Cache.DefaultNamespaces = make(map[string]cache.Config) - for _, watchNS := range svr.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces { + for _, watchNS := range svrCfg.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces { mgrOpts.Cache.DefaultNamespaces[watchNS] = cache.Config{} } } - mgr, err := ctrl.NewManager(cfg, mgrOpts) + mgr, err := ctrl.NewManager(restCfg, mgrOpts) if err != nil { return nil, fmt.Errorf("failed to create manager: %w", err) } @@ -95,7 +95,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) } // Create and register the controllers with the manager. - if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil { + if err := newGatewayAPIController(mgr, svrCfg, updateHandler.Writer(), resources); err != nil { return nil, fmt.Errorf("failted to create gatewayapi controller: %w", err) } @@ -109,11 +109,10 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) return nil, fmt.Errorf("unable to set up ready check: %w", err) } - // Emit elected & continue with envoyObjects of infra resources + // Emit elected & continue with the tasks that require leadership. go func() { <-mgr.Elected() - // WARN: DO NOT CLOSE IT - svr.Elected <- struct{}{} + svrCfg.Elected.Done() }() return &Provider{ diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index d25ec2fb7d4..16bb9361b04 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,7 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateStatusForGateway(ctx, gtw) + r.updateGatewayStatus(gtw) return false } @@ -528,7 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateStatusForGateway(ctx, gtw) + r.updateGatewayStatus(gtw) return false } } @@ -636,12 +636,32 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont } for _, gateway := range gateways.Items { - r.updateStatusForGateway(ctx, &gateway) + r.updateGatewayStatus(&gateway) } return nil } +// updateGatewayStatus triggers a status update for the Gateway. +func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { + gwName := utils.NamespacedName(gateway) + status := &gateway.Status + // Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator. + if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok { + status = existing + } + + // Since the status does not reflect the actual changed status, we need to delete it first + // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. + r.resources.GatewayStatuses.Delete(gwName) + // The status that is stored in the GatewayStatuses GatewayStatuses is solely used to trigger the status updater + // and does not reflect the real changed status. + // + // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, + // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. + r.resources.GatewayStatuses.Store(gwName, status) +} + func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool { ctx := context.Background() node, ok := obj.(*corev1.Node) diff --git a/internal/provider/kubernetes/predicates_test.go b/internal/provider/kubernetes/predicates_test.go index d8abf845f4d..8ff155f46f4 100644 --- a/internal/provider/kubernetes/predicates_test.go +++ b/internal/provider/kubernetes/predicates_test.go @@ -26,6 +26,7 @@ import ( "github.com/envoyproxy/gateway/internal/gatewayapi/resource" "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy" "github.com/envoyproxy/gateway/internal/logging" + "github.com/envoyproxy/gateway/internal/message" "github.com/envoyproxy/gateway/internal/provider/kubernetes/test" ) @@ -854,6 +855,7 @@ func TestValidateServiceForReconcile(t *testing.T) { classController: egv1a1.GatewayControllerName, log: logger, mergeGateways: sets.New[string]("test-mg"), + resources: &message.ProviderResources{}, grpcRouteCRDExists: true, tcpRouteCRDExists: true, udpRouteCRDExists: true, @@ -972,6 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) { classController: egv1a1.GatewayControllerName, log: logger, mergeGateways: sets.New[string]("test-mg"), + resources: &message.ProviderResources{}, } for _, tc := range testCases { diff --git a/internal/provider/kubernetes/status.go b/internal/provider/kubernetes/status.go index a59eb82f75a..d9ff03f9b66 100644 --- a/internal/provider/kubernetes/status.go +++ b/internal/provider/kubernetes/status.go @@ -10,7 +10,6 @@ import ( "fmt" "reflect" - kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -28,6 +27,35 @@ import ( // subscribeAndUpdateStatus subscribes to gateway API object status updates and // writes it into the Kubernetes API Server. func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) { + // GatewayClass object status updater + go func() { + message.HandleSubscription( + message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"}, + r.resources.GatewayClassStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) { + // skip delete updates. + if update.Delete { + return + } + + r.statusUpdater.Send(Update{ + NamespacedName: update.Key, + Resource: new(gwapiv1.GatewayClass), + Mutator: MutatorFunc(func(obj client.Object) client.Object { + gc, ok := obj.(*gwapiv1.GatewayClass) + if !ok { + panic(fmt.Sprintf("unsupported object type %T", obj)) + } + gcCopy := gc.DeepCopy() + gcCopy.Status = *update.Value + return gcCopy + }), + }) + }, + ) + r.log.Info("gatewayclass status subscriber shutting down") + }() + // Gateway object status updater go func() { message.HandleSubscription( @@ -564,34 +592,3 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * }), }) } - -func (r *gatewayAPIReconciler) updateStatusForGatewayClass( - ctx context.Context, - gc *gwapiv1.GatewayClass, - accepted bool, - reason, - msg string, -) error { - if r.statusUpdater != nil { - r.statusUpdater.Send(Update{ - NamespacedName: types.NamespacedName{Name: gc.Name}, - Resource: &gwapiv1.GatewayClass{}, - Mutator: MutatorFunc(func(obj client.Object) client.Object { - gc, ok := obj.(*gwapiv1.GatewayClass) - if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) - } - - return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg) - }), - }) - } else { - // this branch makes testing easier by not going through the status.Updater. - duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg) - - if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) { - return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err) - } - } - return nil -} diff --git a/release-notes/current.yaml b/release-notes/current.yaml index cfe869fa217..a1aa214cdb9 100644 --- a/release-notes/current.yaml +++ b/release-notes/current.yaml @@ -17,6 +17,7 @@ bug fixes: | Fixed BackendTLSPolicy didn't support using port name as the sectionName in the targetRefs Fixed reference grant from EnvoyExtensionPolicy to referenced ext-proc backend not respected Fixed BackendTrafficPolicy not applying to Gateway Route when Route has a Request Timeout defined + Fixed proxies connected to the secondary EG were not receiving xDS configuration # Enhancements that improve performance. performance improvements: |