Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decouple gateway status updates from the reconciler #4767

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8c84649
decoup gateway status update
zhaohuabing Nov 22, 2024
8c9d33e
decoup gatewayclass status update
zhaohuabing Nov 22, 2024
fc97316
fix test
zhaohuabing Nov 23, 2024
bccc70c
add comment
zhaohuabing Nov 23, 2024
7a4c51e
fix test
zhaohuabing Nov 23, 2024
e406088
fix test
zhaohuabing Nov 23, 2024
163cf5c
revert gateway api runner
zhaohuabing Nov 26, 2024
bf3d07e
update address and programming status
zhaohuabing Nov 26, 2024
0e3aec5
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Nov 26, 2024
c993159
Revert "update address and programming status"
zhaohuabing Nov 26, 2024
f8f2e68
avoid overriding the gateway status from Gateway API translator
zhaohuabing Dec 11, 2024
97d522a
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Dec 11, 2024
272348a
minor wording
zhaohuabing Dec 11, 2024
0393223
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Dec 11, 2024
3d20090
minor wording
zhaohuabing Dec 11, 2024
c7b7f44
only subscribe to status updates upon acquiring leadership
zhaohuabing Dec 11, 2024
be6df9a
fix lint
zhaohuabing Dec 11, 2024
5448863
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Dec 11, 2024
3fc0ec4
Merge branch 'main' into fix-gateway-status-blocking
zhaohuabing Dec 11, 2024
9aa5c7c
minor wording
zhaohuabing Dec 11, 2024
75dcfbd
address comment
zhaohuabing Dec 12, 2024
ba06d2b
address comment
zhaohuabing Dec 12, 2024
d77aa7d
minor wording
zhaohuabing Dec 12, 2024
99a2910
fix lint
zhaohuabing Dec 12, 2024
6ec2c6d
Merge branch 'main' into fix-gateway-status-blocking
zhaohuabing Dec 12, 2024
f36b72f
minor change
zhaohuabing Dec 12, 2024
fb00fb0
release note
zhaohuabing Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() {

// GatewayAPIStatuses contains gateway API resources statuses
type GatewayAPIStatuses struct {
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus]
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
}

func (s *GatewayAPIStatuses) Close() {
Expand Down
38 changes: 27 additions & 11 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,25 @@
}
r.log.Info("created gatewayapi controller")

// Subscribe to status updates
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil)

// Watch resources
if err := r.watchResources(ctx, mgr, c); err != nil {
return fmt.Errorf("error watching resources: %w", err)
}

// When leader election is enabled, only subscribe to status updates upon acquiring leadership.
if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes &&
!ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
go func() {
select {
case <-ctx.Done():
return

Check warning on line 143 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L142-L143

Added lines #L142 - L143 were not covered by tests
case <-mgr.Elected():
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil)
}
}()
} else {
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil)
}

Check warning on line 150 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L148-L150

Added lines #L148 - L150 were not covered by tests
return nil
}

Expand Down Expand Up @@ -199,9 +211,12 @@
if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil {
if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil {
msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err)
if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
false,
string(gwapiv1.GatewayClassReasonInvalidParameters),
msg)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

Check warning on line 219 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L214-L219

Added lines #L214 - L219 were not covered by tests
r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -293,11 +308,12 @@

// process envoy gateway secret refs
r.processEnvoyProxySecretRef(ctx, gwcResource)

if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
return reconcile.Result{}, err
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
true,
string(gwapiv1.GatewayClassReasonAccepted),
status.MsgValidGatewayClass)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

if len(gwcResource.Gateways) == 0 {
r.log.Info("No gateways found for accepted gatewayclass")
Expand Down
26 changes: 23 additions & 3 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
// Check if the Service belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
return false
}

Expand Down Expand Up @@ -528,7 +528,7 @@
// Check if the obj belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
return false
}
}
Expand Down Expand Up @@ -636,12 +636,32 @@
}

for _, gateway := range gateways.Items {
r.updateStatusForGateway(ctx, &gateway)
r.updateGatewayStatus(&gateway)

Check warning on line 639 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L639

Added line #L639 was not covered by tests
}

return nil
}

// updateGatewayStatus triggers a status update for the Gateway.
func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) {
gwName := utils.NamespacedName(gateway)
status := &gateway.Status
// Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator.
if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok {
status = existing
}

// Since the status does not reflect the actual changed status, we need to delete it first
// to prevent it from being considered unchanged. This ensures that subscribers receive the update event.
r.resources.GatewayStatuses.Delete(gwName)
// The status that is stored in the GatewayStatuses GatewayStatuses is solely used to trigger the status updater
// and does not reflect the real changed status.
//
// The status updater will check the Envoy Proxy service to get the addresses of the Gateway,
// and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload.
r.resources.GatewayStatuses.Store(gwName, status)
}

func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool {
ctx := context.Background()
node, ok := obj.(*corev1.Node)
Expand Down
3 changes: 3 additions & 0 deletions internal/provider/kubernetes/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/envoyproxy/gateway/internal/gatewayapi/resource"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/provider/kubernetes/test"
)

Expand Down Expand Up @@ -854,6 +855,7 @@ func TestValidateServiceForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
grpcRouteCRDExists: true,
tcpRouteCRDExists: true,
udpRouteCRDExists: true,
Expand Down Expand Up @@ -972,6 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
}

for _, tc := range testCases {
Expand Down
61 changes: 29 additions & 32 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"fmt"
"reflect"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -28,6 +27,35 @@
// subscribeAndUpdateStatus subscribes to gateway API object status updates and
// writes it into the Kubernetes API Server.
func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) {
// GatewayClass object status updater
go func() {
message.HandleSubscription(
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"},
r.resources.GatewayClassStatuses.Subscribe(ctx),
func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) {
// skip delete updates.
if update.Delete {
return
}

Check warning on line 39 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L38-L39

Added lines #L38 - L39 were not covered by tests

r.statusUpdater.Send(Update{
NamespacedName: update.Key,
Resource: new(gwapiv1.GatewayClass),
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))

Check warning on line 47 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L47

Added line #L47 was not covered by tests
}
gcCopy := gc.DeepCopy()
gcCopy.Status = *update.Value
return gcCopy
}),
})
},
)
r.log.Info("gatewayclass status subscriber shutting down")

Check warning on line 56 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L56

Added line #L56 was not covered by tests
}()

// Gateway object status updater
go func() {
message.HandleSubscription(
Expand Down Expand Up @@ -564,34 +592,3 @@
}),
})
}

zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
func (r *gatewayAPIReconciler) updateStatusForGatewayClass(
ctx context.Context,
gc *gwapiv1.GatewayClass,
accepted bool,
reason,
msg string,
) error {
if r.statusUpdater != nil {
r.statusUpdater.Send(Update{
NamespacedName: types.NamespacedName{Name: gc.Name},
Resource: &gwapiv1.GatewayClass{},
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))
}

return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)
}),
})
} else {
// this branch makes testing easier by not going through the status.Updater.
duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)

if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err)
}
}
return nil
}
Loading