From 36939dc4f93f237f031364c7dc32f4c0dbf56545 Mon Sep 17 00:00:00 2001 From: Arko Dasgupta Date: Thu, 28 Nov 2024 20:31:52 -0800 Subject: [PATCH] use a waitGroup instead of an enabled channel in the status updater (#4809) use a waitGroup instead of a channel in the status updater * use a waitGroup to synchronize to the `Send` method that the status updater is enabled and ready for updates Signed-off-by: Arko Dasgupta --- .../provider/kubernetes/status_updater.go | 64 ++++++------------- 1 file changed, 18 insertions(+), 46 deletions(-) diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index ee5cbce59d2..1bafe23668b 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -7,7 +7,7 @@ package kubernetes import ( "context" - "errors" + "sync" "time" "github.com/go-logr/logr" @@ -57,26 +57,21 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object { type UpdateHandler struct { log logr.Logger client client.Client - sendUpdates chan struct{} updateChannel chan Update - writer *UpdateWriter + wg *sync.WaitGroup } func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { - sendUpdates := make(chan struct{}) - updateChannel := make(chan Update, 100) - return &UpdateHandler{ + u := &UpdateHandler{ log: log, client: client, - sendUpdates: sendUpdates, - updateChannel: updateChannel, - writer: &UpdateWriter{ - log: log, - enabled: sendUpdates, - updateChannel: updateChannel, - eventsBeforeEnabled: make(chan Update, 1000), - }, + updateChannel: make(chan Update, 1000), + wg: new(sync.WaitGroup), } + + u.wg.Add(1) + + return u } func (u *UpdateHandler) apply(update Update) { @@ -140,8 +135,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { defer u.log.Info("stopped status update handler") // Enable Updaters to start sending updates to this handler. - close(u.sendUpdates) - u.writer.handleEventsReceivedBeforeEnabled() + u.wg.Done() for { select { @@ -158,7 +152,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the UpdateHandler. func (u *UpdateHandler) Writer() Updater { - return u.writer + return &UpdateWriter{ + updateChannel: u.updateChannel, + wg: u.wg, + } } // Updater describes an interface to send status updates somewhere. @@ -168,40 +165,15 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { - log logr.Logger - enabled <-chan struct{} updateChannel chan<- Update - // a temporary buffer to store events received before the Updater is enabled. - // These events will be sent to the update channel once the Updater is enabled. - eventsBeforeEnabled chan Update + wg *sync.WaitGroup } // Send sends the given Update off to the update channel for writing by the UpdateHandler. func (u *UpdateWriter) Send(update Update) { - // Non-blocking receive to see if we should pass along update. - select { - case <-u.enabled: - u.updateChannel <- update - default: - if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) { - u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) - u.eventsBeforeEnabled <- update - } else { - // If the buffer is full, drop the event to avoid blocking the sender. - u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) - } - } -} - -// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel. -func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() { - go func() { - for e := range u.eventsBeforeEnabled { - u.log.Info("sending stored status update", "event", e.NamespacedName) - u.updateChannel <- e - } - close(u.eventsBeforeEnabled) - }() + // Wait until updater is ready + u.wg.Wait() + u.updateChannel <- update } // isStatusEqual checks if two objects have equivalent status.