diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3f662effc..288eb49ab 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { + timeoutSeconds := int64(c.watchResyncTimeout.Seconds()) kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { if r := recover(); r != nil { @@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.TimeoutSeconds = &timeoutSeconds res, err := resClient.Watch(ctx, options) if errors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) @@ -633,17 +635,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return err } - defer func() { - w.Stop() - resourceVersion = "" - }() - - var watchResyncTimeoutCh <-chan time.Time - if c.watchResyncTimeout > 0 { - shouldResync := time.NewTimer(c.watchResyncTimeout) - defer shouldResync.Stop() - watchResyncTimeoutCh = shouldResync.C - } + defer w.Stop() for { select { @@ -651,12 +643,9 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo case <-ctx.Done(): return nil - // re-synchronize API state and restart watch periodically - case <-watchResyncTimeoutCh: - return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host) - // re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version case <-w.Done(): + resourceVersion = "" return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host) case event, ok := <-w.ResultChan(): @@ -666,8 +655,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo obj, ok := event.Object.(*unstructured.Unstructured) if !ok { + resourceVersion = "" return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) } + resourceVersion = obj.GetResourceVersion() c.processEvent(event.Type, obj) if kube.IsCRD(obj) {