Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trigger a resync to avoid missing any status updates #4878

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -85,7 +86,7 @@
// newGatewayAPIController
func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater,
resources *message.ProviderResources,
) error {
) (resync func(), err error) {
ctx := context.Background()

// Gather additional resources to watch from registered extensions
Expand Down Expand Up @@ -123,9 +124,26 @@
// controller-runtime doesn't allow run controller with same name for more than once
// see https://github.com/kubernetes-sigs/controller-runtime/blob/2b941650bce159006c88bd3ca0d132c7bc40e947/pkg/controller/name.go#L29
name := fmt.Sprintf("gatewayapi-%d", time.Now().Unix())
c, err := controller.New(name, mgr, controller.Options{Reconciler: r, SkipNameValidation: skipNameValidation()})

queue := workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](),
workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{Name: name})

c, err := controller.New(
name,
mgr,
controller.Options{
Reconciler: r,
SkipNameValidation: skipNameValidation(),
NewQueue: func(
_ string,
_ workqueue.TypedRateLimiter[reconcile.Request],
) workqueue.TypedRateLimitingInterface[reconcile.Request] {
return queue
},
})
if err != nil {
return fmt.Errorf("error creating controller: %w", err)
return nil, fmt.Errorf("error creating controller: %w", err)

Check warning on line 146 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L146

Added line #L146 was not covered by tests
}
r.log.Info("created gatewayapi controller")

Expand All @@ -134,9 +152,13 @@

// Watch resources
if err := r.watchResources(ctx, mgr, c); err != nil {
return fmt.Errorf("error watching resources: %w", err)
return nil, fmt.Errorf("error watching resources: %w", err)

Check warning on line 155 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L155

Added line #L155 was not covered by tests
}
return nil

return func() {
// Trigger a reconciliation to update the status of all resources.
queue.AddRateLimited(reconcile.Request{})
}, nil
}

func byNamespaceSelectorEnabled(eg *egv1a1.EnvoyGateway) bool {
Expand Down
6 changes: 5 additions & 1 deletion internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,14 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
}

// Create and register the controllers with the manager.
if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil {
var resync func()
if resync, err = newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil {
return nil, fmt.Errorf("failted to create gatewayapi controller: %w", err)
}

// Trigger a reconciliation after updateHandler is started to ensure we don't miss any updates.
updateHandler.addPostStart(resync)

// Add health check health probes.
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return nil, fmt.Errorf("unable to set up health check: %w", err)
Expand Down
38 changes: 25 additions & 13 deletions internal/provider/kubernetes/status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import (
"context"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -57,21 +56,22 @@
type UpdateHandler struct {
log logr.Logger
client client.Client
sendUpdates chan struct{}
updateChannel chan Update
wg *sync.WaitGroup
postStart func()
}

func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
u := &UpdateHandler{
return &UpdateHandler{
log: log,
client: client,
sendUpdates: make(chan struct{}),
updateChannel: make(chan Update, 1000),
wg: new(sync.WaitGroup),
}
}

u.wg.Add(1)

return u
func (u *UpdateHandler) addPostStart(postStart func()) {
u.postStart = postStart
}

func (u *UpdateHandler) apply(update Update) {
Expand Down Expand Up @@ -135,7 +135,12 @@
defer u.log.Info("stopped status update handler")

// Enable Updaters to start sending updates to this handler.
u.wg.Done()
close(u.sendUpdates)

// Trigger a reconciliation to ensure we don't miss any updates.
if u.postStart != nil {
u.postStart()
}

for {
select {
Expand All @@ -153,8 +158,8 @@
// Writer retrieves the interface that should be used to write to the UpdateHandler.
func (u *UpdateHandler) Writer() Updater {
return &UpdateWriter{
enabled: u.sendUpdates,
updateChannel: u.updateChannel,
wg: u.wg,
}
}

Expand All @@ -165,15 +170,22 @@

// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
type UpdateWriter struct {
enabled <-chan struct{}
updateChannel chan<- Update
wg *sync.WaitGroup
}

// Send sends the given Update off to the update channel for writing by the UpdateHandler.
func (u *UpdateWriter) Send(update Update) {
// Wait until updater is ready
u.wg.Wait()
u.updateChannel <- update
// The Send method should be non-blocking because the status updater won't be started if the EG it is running in is
// not the leader.
// In a non-leader scenario, the status updater will still receive updates, but the updates will be dropped.
// This is to prevent the status updater from blocking the reconciliation loop.
// The status updates will be handled by the leader.
select {
case <-u.enabled:
u.updateChannel <- update
default:

Check warning on line 187 in internal/provider/kubernetes/status_updater.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status_updater.go#L187

Added line #L187 was not covered by tests
}
}

// isStatusEqual checks if two objects have equivalent status.
Expand Down
11 changes: 6 additions & 5 deletions release-notes/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ security updates: |

# New features or capabilities added in this release.
new features: |
Added support for trusted CIDRs in the ClientIPDetectionSettings API
Added support for sending attributes to external processor in EnvoyExtensionPolicy API
Added support for trusted CIDRs in the ClientIPDetectionSettings API.
Added support for sending attributes to external processor in EnvoyExtensionPolicy API.

# Fixes for bugs identified in previous versions.
bug fixes: |
Fixed BackendTLSPolicy didn't support using port name as the sectionName in the targetRefs
Fixed reference grant from EnvoyExtensionPolicy to referenced ext-proc backend not respected
Fixed BackendTLSPolicy didn't support using port name as the sectionName in the targetRefs.
Fixed reference grant from EnvoyExtensionPolicy to referenced ext-proc backend not respected.
Fixed Envoy proxies connected to the secondary gateway didn't receive configuration.

# Enhancements that improve performance.
performance improvements: |
Expand All @@ -30,4 +31,4 @@ deprecations: |

# Other notable changes not covered by the above sections.
Other changes: |
[SecurityPolicy] Modify the JWT Provider Issuer validation constraint
[SecurityPolicy] Modify the JWT Provider Issuer validation constraint.
Loading