Skip to content

Commit

Permalink
fix: decouple gateway status updates from the reconciler (envoyproxy#…
Browse files Browse the repository at this point in the history
…4767)

* decoup gateway status update

Signed-off-by: Huabing Zhao <[email protected]>

* decoup gatewayclass status update

Signed-off-by: Huabing Zhao <[email protected]>

* fix test

Signed-off-by: Huabing Zhao <[email protected]>

* add comment

Signed-off-by: Huabing Zhao <[email protected]>

* fix test

Signed-off-by: Huabing Zhao <[email protected]>

* fix test

Signed-off-by: Huabing Zhao <[email protected]>

* revert gateway api runner

Signed-off-by: Huabing Zhao <[email protected]>

* update address and programming status

Signed-off-by: Huabing Zhao <[email protected]>

* Revert "update address and programming status"

This reverts commit bf3d07e.

* avoid overriding the gateway status from Gateway API translator

Signed-off-by: Huabing Zhao <[email protected]>

* minor wording

Signed-off-by: Huabing Zhao <[email protected]>

* minor wording

Signed-off-by: Huabing Zhao <[email protected]>

* only subscribe to status updates upon acquiring leadership

Signed-off-by: Huabing Zhao <[email protected]>

* fix lint

Signed-off-by: Huabing Zhao <[email protected]>

* minor wording

Signed-off-by: Huabing Zhao <[email protected]>

* address comment

Signed-off-by: Huabing Zhao <[email protected]>

* address comment

Signed-off-by: Huabing Zhao <[email protected]>

* minor wording

Signed-off-by: Huabing Zhao <[email protected]>

* fix lint

Signed-off-by: Huabing Zhao <[email protected]>

* minor change

Signed-off-by: Huabing Zhao <[email protected]>

* release note

Signed-off-by: Huabing Zhao <[email protected]>

---------

Signed-off-by: Huabing Zhao <[email protected]>
(cherry picked from commit c1ff135)
Signed-off-by: Huabing Zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Dec 13, 2024
1 parent fe15b47 commit be93e32
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 79 deletions.
12 changes: 8 additions & 4 deletions internal/envoygateway/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package config

import (
"errors"
"sync"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/api/v1alpha1/validation"
Expand Down Expand Up @@ -37,19 +38,22 @@ type Server struct {
// Logger is the logr implementation used by Envoy Gateway.
Logger logging.Logger
// Elected chan is used to signal what a leader is elected
Elected chan struct{}
Elected *sync.WaitGroup
}

// New returns a Server with default parameters.
func New() (*Server, error) {
return &Server{
server := &Server{
EnvoyGateway: egv1a1.DefaultEnvoyGateway(),
Namespace: env.Lookup("ENVOY_GATEWAY_NAMESPACE", DefaultNamespace),
DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain),
// the default logger
Logger: logging.DefaultLogger(egv1a1.LogLevelInfo),
Elected: make(chan struct{}),
}, nil
Elected: &sync.WaitGroup{},
}
// Block the tasks that are waiting for the leader to be elected
server.Elected.Add(1)
return server, nil
}

// Validate validates a Server config.
Expand Down
8 changes: 2 additions & 6 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,8 @@ func (r *Runner) Start(ctx context.Context) (err error) {
if r.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes &&
!ptr.Deref(r.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
go func() {
select {
case <-ctx.Done():
return
case <-r.Elected:
initInfra()
}
r.Elected.Wait()
initInfra()
}()
return
}
Expand Down
13 changes: 7 additions & 6 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() {

// GatewayAPIStatuses contains gateway API resources statuses
type GatewayAPIStatuses struct {
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus]
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
}

func (s *GatewayAPIStatuses) Close() {
Expand Down
34 changes: 23 additions & 11 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,21 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater
}
r.log.Info("created gatewayapi controller")

// Subscribe to status updates
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil)

// Watch resources
if err := r.watchResources(ctx, mgr, c); err != nil {
return fmt.Errorf("error watching resources: %w", err)
}

// When leader election is enabled, only subscribe to status updates upon acquiring leadership.
if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes &&
!ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
go func() {
cfg.Elected.Wait()
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil)
}()
} else {
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil)
}
return nil
}

Expand Down Expand Up @@ -199,9 +207,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil {
if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil {
msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err)
if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
false,
string(gwapiv1.GatewayClassReasonInvalidParameters),
msg)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)
r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -293,11 +304,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques

// process envoy gateway secret refs
r.processEnvoyProxySecretRef(ctx, gwcResource)

if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
return reconcile.Result{}, err
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
true,
string(gwapiv1.GatewayClassReasonAccepted),
status.MsgValidGatewayClass)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

if len(gwcResource.Gateways) == 0 {
r.log.Info("No gateways found for accepted gatewayclass")
Expand Down
33 changes: 16 additions & 17 deletions internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,40 @@ type Provider struct {
}

// New creates a new Provider from the provided EnvoyGateway.
func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) (*Provider, error) {
func New(restCfg *rest.Config, svrCfg *ec.Server, resources *message.ProviderResources) (*Provider, error) {
// TODO: Decide which mgr opts should be exposed through envoygateway.provider.kubernetes API.

mgrOpts := manager.Options{
Scheme: envoygateway.GetScheme(),
Logger: svr.Logger.Logger,
Logger: svrCfg.Logger.Logger,
HealthProbeBindAddress: ":8081",
LeaderElectionID: "5b9825d2.gateway.envoyproxy.io",
LeaderElectionNamespace: svr.Namespace,
LeaderElectionNamespace: svrCfg.Namespace,
}

log.SetLogger(mgrOpts.Logger)
klog.SetLogger(mgrOpts.Logger)

if !ptr.Deref(svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
if !ptr.Deref(svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
mgrOpts.LeaderElection = true
if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil {
ld, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration))
if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil {
ld, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration))
if err != nil {
return nil, err
}
mgrOpts.LeaseDuration = ptr.To(ld)
}

if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil {
rp, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod))
if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil {
rp, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod))
if err != nil {
return nil, err
}
mgrOpts.RetryPeriod = ptr.To(rp)
}

if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil {
rd, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline))
if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil {
rd, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline))
if err != nil {
return nil, err
}
Expand All @@ -78,13 +78,13 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
mgrOpts.Controller = config.Controller{NeedLeaderElection: ptr.To(false)}
}

if svr.EnvoyGateway.NamespaceMode() {
if svrCfg.EnvoyGateway.NamespaceMode() {
mgrOpts.Cache.DefaultNamespaces = make(map[string]cache.Config)
for _, watchNS := range svr.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces {
for _, watchNS := range svrCfg.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces {
mgrOpts.Cache.DefaultNamespaces[watchNS] = cache.Config{}
}
}
mgr, err := ctrl.NewManager(cfg, mgrOpts)
mgr, err := ctrl.NewManager(restCfg, mgrOpts)
if err != nil {
return nil, fmt.Errorf("failed to create manager: %w", err)
}
Expand All @@ -95,7 +95,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
}

// Create and register the controllers with the manager.
if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil {
if err := newGatewayAPIController(mgr, svrCfg, updateHandler.Writer(), resources); err != nil {
return nil, fmt.Errorf("failted to create gatewayapi controller: %w", err)
}

Expand All @@ -109,11 +109,10 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

// Emit elected & continue with envoyObjects of infra resources
// Emit elected & continue with the tasks that require leadership.
go func() {
<-mgr.Elected()
// WARN: DO NOT CLOSE IT
svr.Elected <- struct{}{}
svrCfg.Elected.Done()
}()

return &Provider{
Expand Down
26 changes: 23 additions & 3 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo
// Check if the Service belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
return false
}

Expand Down Expand Up @@ -528,7 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo
// Check if the obj belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
return false
}
}
Expand Down Expand Up @@ -636,12 +636,32 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont
}

for _, gateway := range gateways.Items {
r.updateStatusForGateway(ctx, &gateway)
r.updateGatewayStatus(&gateway)
}

return nil
}

// updateGatewayStatus triggers a status update for the Gateway.
func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) {
gwName := utils.NamespacedName(gateway)
status := &gateway.Status
// Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator.
if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok {
status = existing
}

// Since the status does not reflect the actual changed status, we need to delete it first
// to prevent it from being considered unchanged. This ensures that subscribers receive the update event.
r.resources.GatewayStatuses.Delete(gwName)
// The status that is stored in the GatewayStatuses GatewayStatuses is solely used to trigger the status updater
// and does not reflect the real changed status.
//
// The status updater will check the Envoy Proxy service to get the addresses of the Gateway,
// and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload.
r.resources.GatewayStatuses.Store(gwName, status)
}

func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool {
ctx := context.Background()
node, ok := obj.(*corev1.Node)
Expand Down
3 changes: 3 additions & 0 deletions internal/provider/kubernetes/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/envoyproxy/gateway/internal/gatewayapi/resource"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/provider/kubernetes/test"
)

Expand Down Expand Up @@ -854,6 +855,7 @@ func TestValidateServiceForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
grpcRouteCRDExists: true,
tcpRouteCRDExists: true,
udpRouteCRDExists: true,
Expand Down Expand Up @@ -972,6 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
}

for _, tc := range testCases {
Expand Down
61 changes: 29 additions & 32 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"reflect"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -28,6 +27,35 @@ import (
// subscribeAndUpdateStatus subscribes to gateway API object status updates and
// writes it into the Kubernetes API Server.
func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) {
// GatewayClass object status updater
go func() {
message.HandleSubscription(
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"},
r.resources.GatewayClassStatuses.Subscribe(ctx),
func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) {
// skip delete updates.
if update.Delete {
return
}

r.statusUpdater.Send(Update{
NamespacedName: update.Key,
Resource: new(gwapiv1.GatewayClass),
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))
}
gcCopy := gc.DeepCopy()
gcCopy.Status = *update.Value
return gcCopy
}),
})
},
)
r.log.Info("gatewayclass status subscriber shutting down")
}()

// Gateway object status updater
go func() {
message.HandleSubscription(
Expand Down Expand Up @@ -564,34 +592,3 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw *
}),
})
}

func (r *gatewayAPIReconciler) updateStatusForGatewayClass(
ctx context.Context,
gc *gwapiv1.GatewayClass,
accepted bool,
reason,
msg string,
) error {
if r.statusUpdater != nil {
r.statusUpdater.Send(Update{
NamespacedName: types.NamespacedName{Name: gc.Name},
Resource: &gwapiv1.GatewayClass{},
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))
}

return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)
}),
})
} else {
// this branch makes testing easier by not going through the status.Updater.
duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)

if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err)
}
}
return nil
}
1 change: 1 addition & 0 deletions release-notes/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bug fixes: |
Fixed BackendTLSPolicy didn't support using port name as the sectionName in the targetRefs
Fixed reference grant from EnvoyExtensionPolicy to referenced ext-proc backend not respected
Fixed BackendTrafficPolicy not applying to Gateway Route when Route has a Request Timeout defined
Fixed proxies connected to the secondary EG were not receiving xDS configuration
# Enhancements that improve performance.
performance improvements: |
Expand Down

0 comments on commit be93e32

Please sign in to comment.