Skip to content

Commit

Permalink
feat: Drop unnecessary listing for the sake of watch reinitialization
Browse files Browse the repository at this point in the history
Signed-off-by: Antoni Zawodny <[email protected]>
  • Loading branch information
tosi3k committed Jul 25, 2024
1 parent 6b2984e commit b4fa2a7
Showing 1 changed file with 6 additions and 15 deletions.
21 changes: 6 additions & 15 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.TimeoutSeconds = &timeoutSeconds
res, err := resClient.Watch(ctx, options)
if errors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
Expand All @@ -633,30 +635,17 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return err
}

defer func() {
w.Stop()
resourceVersion = ""
}()

var watchResyncTimeoutCh <-chan time.Time
if c.watchResyncTimeout > 0 {
shouldResync := time.NewTimer(c.watchResyncTimeout)
defer shouldResync.Stop()
watchResyncTimeoutCh = shouldResync.C
}
defer w.Stop()

for {
select {
// stop watching when parent context got cancelled
case <-ctx.Done():
return nil

// re-synchronize API state and restart watch periodically
case <-watchResyncTimeoutCh:
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)

// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
case <-w.Done():
resourceVersion = ""
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host)

case event, ok := <-w.ResultChan():
Expand All @@ -666,8 +655,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
resourceVersion = ""
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}
resourceVersion = obj.GetResourceVersion()

c.processEvent(event.Type, obj)
if kube.IsCRD(obj) {
Expand Down

0 comments on commit b4fa2a7

Please sign in to comment.