Skip to content

Commit

Permalink
Address cache bug issue
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Dec 12, 2024
1 parent 8e39847 commit 61a0e72
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
16 changes: 13 additions & 3 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
Namespace: csv.Namespace,
Labels: csv.Labels,
Annotations: csv.Annotations,
UID: csv.UID,
},
Spec: v1alpha1.ClusterServiceVersionSpec{
CustomResourceDefinitions: csv.Spec.CustomResourceDefinitions,
Expand Down Expand Up @@ -745,10 +746,15 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 30*time.Second),
workqueue.TypedRateLimitingQueueConfig[any]{
Name: "resolve",
})
//op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
// workqueue.TypedRateLimitingQueueConfig[any]{
// Name: "resolve",
// })
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
Expand Down Expand Up @@ -1313,6 +1319,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
// from users/admins. Resyncing the namespace again is unlikely to resolve
// not-satisfiable error
if _, ok := err.(solver.NotSatisfiable); ok {
if err := o.ResyncInformers(); err != nil {
logger.WithError(err).Infof("error resyncing informers")
}
logger.WithError(err).Debug("resolution failed")
_, updateErr := o.updateSubscriptionStatuses(
o.setSubsCond(subs, v1alpha1.SubscriptionCondition{
Expand All @@ -1325,7 +1334,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
logger.WithError(updateErr).Debug("failed to update subs conditions")
return updateErr
}
return nil
return err
}

_, updateErr := o.updateSubscriptionStatuses(
Expand Down Expand Up @@ -1736,7 +1745,8 @@ func (o *Operator) setSubsCond(subs []*v1alpha1.Subscription, cond v1alpha1.Subs

for _, sub := range subs {
subCond := sub.Status.GetCondition(cond.Type)
if subCond.Equals(cond) {

if subCond.Type == cond.Type && subCond.Status == cond.Status && subCond.Reason == cond.Reason {
continue
}
sub.Status.LastUpdated = lastUpdated
Expand Down
37 changes: 24 additions & 13 deletions pkg/controller/registry/resolver/source_csvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]
listSubscriptions: func(ctx context.Context) (*v1alpha1.SubscriptionList, error) {
return csp.client.OperatorsV1alpha1().Subscriptions(namespace).List(ctx, metav1.ListOptions{})
},
//getCSV: func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error) {
// return csp.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).Get(ctx, name, metav1.GetOptions{})
//},
}
break // first ns is assumed to be the target ns, todo: make explicit
}
Expand All @@ -54,6 +57,7 @@ type csvSource struct {
logger logrus.StdLogger

listSubscriptions func(context.Context) (*v1alpha1.SubscriptionList, error)
// getCSV func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error)
}

func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
Expand Down Expand Up @@ -93,19 +97,26 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
continue
}

if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
// we might be in an incoherent state, so let's check with live clients to make sure
realSubscriptions, err := s.listSubscriptions(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list subscriptions: %w", err)
}
for _, realSubscription := range realSubscriptions.Items {
if realSubscription.Status.InstalledCSV == csv.Name {
// oops, live cluster state is coherent
return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
}
}
}
//if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
// // we might be in an incoherent state, so let's check with live clients to make sure
// realSubscriptions, err := s.listSubscriptions(ctx)
// if err != nil {
// return nil, fmt.Errorf("failed to list subscriptions: %w", err)
// }
// for _, realSubscription := range realSubscriptions.Items {
// if realSubscription.Status.InstalledCSV == csv.Name {
// // oops, live cluster state is coherent
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
// }
// }
// realCsv, err := s.getCSV(ctx, csv.Namespace, csv.Name)
// if err != nil {
// return nil, fmt.Errorf("lister caches might be incoherent for CSV %s/%s: %w", csv.Namespace, csv.Name, err)
// }
// if realCsv.GetUID() != csv.GetUID() {
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s: differing UIDs (%s != %s)", csv.Namespace, csv.Name, csv.UID)
// }
//}

if failForwardEnabled {
replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/step_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
}

func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
subs, err := r.listSubscriptions(namespace)
subs, err := r.subLister.Subscriptions(namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Operator interface {
// RunInformers starts the Operator's underlying Informers.
RunInformers(ctx context.Context)

ResyncInformers() error

// Run starts the Operator and its underlying Informers.
Run(ctx context.Context)
}
Expand Down Expand Up @@ -197,6 +199,17 @@ func (o *operator) Run(ctx context.Context) {
})
}

func (o *operator) ResyncInformers() error {
o.mu.Lock()
defer o.mu.Unlock()
for _, informer := range o.informers {
if err := informer.GetStore().Resync(); err != nil {
return err
}
}
return nil
}

func (o *operator) start(ctx context.Context) error {
defer close(o.ready)

Expand Down

0 comments on commit 61a0e72

Please sign in to comment.