diff --git a/api/v1alpha1/clusterextension_types.go b/api/v1alpha1/clusterextension_types.go index fb8903a34..589ca734d 100644 --- a/api/v1alpha1/clusterextension_types.go +++ b/api/v1alpha1/clusterextension_types.go @@ -414,6 +414,7 @@ const ( // TODO(user): add more Types, here and into init() TypeInstalled = "Installed" TypeResolved = "Resolved" + TypeHealthy = "Healthy" // TypeDeprecated is a rollup condition that is present when // any of the deprecated conditions are present. @@ -438,6 +439,8 @@ const ( ReasonErrorGettingReleaseState = "ErrorGettingReleaseState" + ReasonUnverifiable = "Unverifiable" + CRDUpgradeSafetyPolicyEnabled CRDUpgradeSafetyPolicy = "Enabled" CRDUpgradeSafetyPolicyDisabled CRDUpgradeSafetyPolicy = "Disabled" ) @@ -452,6 +455,7 @@ func init() { TypeChannelDeprecated, TypeBundleDeprecated, TypeUnpacked, + TypeHealthy, ) // TODO(user): add Reasons from above conditionsets.ConditionReasons = append(conditionsets.ConditionReasons, @@ -465,6 +469,7 @@ func init() { ReasonUnpackSuccess, ReasonUnpackFailed, ReasonErrorGettingReleaseState, + ReasonUnverifiable, ) } diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7b8ad62f6..95813cf6c 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -249,6 +249,17 @@ func main() { Preflights: preflights, } + cm := contentmanager.NewManager(clientRestConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()) + err = clusterExtensionFinalizers.Register(controllers.ClusterExtensionCleanupContentManagerCacheFinalizer, finalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) { + ext := obj.(*ocv1alpha1.ClusterExtension) + err := cm.Delete(ext) + return crfinalizer.Result{}, err + })) + if err != nil { + setupLog.Error(err, "unable to register content manager cleanup finalizer") + os.Exit(1) + } + if err = (&controllers.ClusterExtensionReconciler{ Client: cl, Resolver: resolver, @@ -256,7 +267,7 @@ func main() { Applier: applier, InstalledBundleGetter: &controllers.DefaultInstalledBundleGetter{ActionClientGetter: acg}, Finalizers: clusterExtensionFinalizers, - Watcher: contentmanager.New(clientRestConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()), + Manager: cm, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension") os.Exit(1) diff --git a/config/samples/olm_v1alpha1_clusterextension.yaml b/config/samples/olm_v1alpha1_clusterextension.yaml index fdd117d58..19bbf06b0 100644 --- a/config/samples/olm_v1alpha1_clusterextension.yaml +++ b/config/samples/olm_v1alpha1_clusterextension.yaml @@ -36,10 +36,10 @@ rules: # Manage ArgoCD CRDs - apiGroups: [apiextensions.k8s.io] resources: [customresourcedefinitions] - verbs: [create] + verbs: [create, list, watch] - apiGroups: [apiextensions.k8s.io] resources: [customresourcedefinitions] - verbs: [get, list, watch, update, patch, delete] + verbs: [get, update, patch, delete] resourceNames: - appprojects.argoproj.io - argocds.argoproj.io diff --git a/internal/applier/helm.go b/internal/applier/helm.go index 6bdc88ae2..ac43726f1 100644 --- a/internal/applier/helm.go +++ b/internal/applier/helm.go @@ -1,18 +1,23 @@ package applier import ( + "bytes" "context" "errors" "fmt" + "io" "io/fs" "strings" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart" "helm.sh/helm/v3/pkg/chartutil" + "helm.sh/helm/v3/pkg/postrender" "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/storage/driver" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + apimachyaml "k8s.io/apimachinery/pkg/util/yaml" "sigs.k8s.io/controller-runtime/pkg/client" helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client" @@ -51,7 +56,7 @@ type Helm struct { Preflights []Preflight } -func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.ClusterExtension, labels map[string]string) ([]client.Object, string, error) { +func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.ClusterExtension, objectLabels map[string]string, storageLabels map[string]string) ([]client.Object, string, error) { chrt, err := convert.RegistryV1ToHelmChart(ctx, contentFS, ext.Spec.Install.Namespace, []string{corev1.NamespaceAll}) if err != nil { return nil, "", err @@ -63,7 +68,11 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust return nil, "", err } - rel, desiredRel, state, err := h.getReleaseState(ac, ext, chrt, values, labels) + post := &postrenderer{ + labels: objectLabels, + } + + rel, desiredRel, state, err := h.getReleaseState(ac, ext, chrt, values, post) if err != nil { return nil, "", err } @@ -94,18 +103,18 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust case StateNeedsInstall: rel, err = ac.Install(ext.GetName(), ext.Spec.Install.Namespace, chrt, values, func(install *action.Install) error { install.CreateNamespace = false - install.Labels = labels + install.Labels = storageLabels return nil - }) + }, helmclient.AppendInstallPostRenderer(post)) if err != nil { return nil, state, err } case StateNeedsUpgrade: rel, err = ac.Upgrade(ext.GetName(), ext.Spec.Install.Namespace, chrt, values, func(upgrade *action.Upgrade) error { upgrade.MaxHistory = maxHelmReleaseHistory - upgrade.Labels = labels + upgrade.Labels = storageLabels return nil - }) + }, helmclient.AppendUpgradePostRenderer(post)) if err != nil { return nil, state, err } @@ -125,7 +134,7 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust return relObjects, state, nil } -func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.ClusterExtension, chrt *chart.Chart, values chartutil.Values, labels map[string]string) (*release.Release, *release.Release, string, error) { +func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.ClusterExtension, chrt *chart.Chart, values chartutil.Values, post postrender.PostRenderer) (*release.Release, *release.Release, string, error) { currentRelease, err := cl.Get(ext.GetName()) if err != nil && !errors.Is(err, driver.ErrReleaseNotFound) { return nil, nil, StateError, err @@ -138,9 +147,8 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl desiredRelease, err := cl.Install(ext.GetName(), ext.Spec.Install.Namespace, chrt, values, func(i *action.Install) error { i.DryRun = true i.DryRunOption = "server" - i.Labels = labels return nil - }) + }, helmclient.AppendInstallPostRenderer(post)) if err != nil { return nil, nil, StateError, err } @@ -150,9 +158,8 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl upgrade.MaxHistory = maxHelmReleaseHistory upgrade.DryRun = true upgrade.DryRunOption = "server" - upgrade.Labels = labels return nil - }) + }, helmclient.AppendUpgradePostRenderer(post)) if err != nil { return currentRelease, nil, StateError, err } @@ -164,3 +171,33 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl } return currentRelease, desiredRelease, relState, nil } + +type postrenderer struct { + labels map[string]string + cascade postrender.PostRenderer +} + +func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) { + var buf bytes.Buffer + dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024) + for { + obj := unstructured.Unstructured{} + err := dec.Decode(&obj) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels)) + b, err := obj.MarshalJSON() + if err != nil { + return nil, err + } + buf.Write(b) + } + if p.cascade != nil { + return p.cascade.Run(&buf) + } + return &buf, nil +} diff --git a/internal/contentmanager/cache/cache.go b/internal/contentmanager/cache/cache.go new file mode 100644 index 000000000..f56cfd575 --- /dev/null +++ b/internal/contentmanager/cache/cache.go @@ -0,0 +1,264 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "io" + "slices" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type Watcher interface { + Watch(source.Source) error +} + +// Cache is a storage mechanism for keeping track of +// managed content sources +type Cache interface { + io.Closer + // Watch establishes watches for all provided client.Objects. + // Subsequent calls to Watch will result in no longer necessary + // watches being stopped and removed and new watches being + // created + Watch(context.Context, Watcher, ...client.Object) error +} + +// CloserSyncingSource is a wrapper of the controller-runtime +// source.SyncingSource that includes methods for: +// - Closing the source, stopping it's interaction with the Kubernetes API server and reaction to events +type CloserSyncingSource interface { + source.SyncingSource + io.Closer +} + +type sourcerer interface { + // Source returns a CloserSyncingSource for the provided + // GroupVersionKind. If the CloserSyncingSource encounters an + // error after having initially synced, it should requeue the + // provided client.Object and call the provided callback function + Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error) +} + +type cache struct { + sources map[schema.GroupVersionKind]CloserSyncingSource + sourcerer sourcerer + owner client.Object + syncTimeout time.Duration + mu sync.Mutex +} + +func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache { + return &cache{ + sources: make(map[schema.GroupVersionKind]CloserSyncingSource), + sourcerer: sourcerer, + owner: owner, + syncTimeout: syncTimeout, + } +} + +var _ Cache = (*cache)(nil) + +func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error { + c.mu.Lock() + defer c.mu.Unlock() + gvkSet, err := gvksForObjects(objs...) + if err != nil { + return fmt.Errorf("getting set of GVKs for managed objects: %w", err) + } + + if err := c.removeStaleSources(gvkSet); err != nil { + return fmt.Errorf("removing stale sources: %w", err) + } + return c.startNewSources(ctx, gvkSet, watcher) +} + +func (c *cache) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + errs := []error{} + for _, source := range c.sources { + if err := source.Close(); err != nil { + errs = append(errs, err) + } + } + + slices.SortFunc(errs, func(a, b error) int { + return strings.Compare(a.Error(), b.Error()) + }) + + return errors.Join(errs...) +} + +func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error { + cacheGvks := c.getCacheGVKs() + gvksToCreate := gvks.Difference(cacheGvks) + + type startResult struct { + source CloserSyncingSource + gvk schema.GroupVersionKind + err error + } + startResults := make(chan startResult) + wg := sync.WaitGroup{} + for _, gvk := range gvksToCreate.UnsortedList() { + wg.Add(1) + go func() { + defer wg.Done() + source, err := c.startNewSource(ctx, gvk, watcher) + startResults <- startResult{ + source: source, + gvk: gvk, + err: err, + } + }() + } + go func() { + wg.Wait() + close(startResults) + }() + + sourcesErrors := []error{} + for result := range startResults { + if result.err != nil { + sourcesErrors = append(sourcesErrors, result.err) + continue + } + + err := c.addSource(result.gvk, result.source) + if err != nil { + // If we made it here then there is a logic error in + // calculating the diffs between what is currently being + // watched by the cache + panic(err) + } + } + + slices.SortFunc(sourcesErrors, func(a, b error) int { + return strings.Compare(a.Error(), b.Error()) + }) + + return errors.Join(sourcesErrors...) +} + +func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) { + s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) { + // this callback function ensures that we remove the source from the + // cache if it encounters an error after it initially synced successfully + c.mu.Lock() + defer c.mu.Unlock() + err := c.removeSource(gvk) + if err != nil { + logr := log.FromContext(ctx) + logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk) + } + }) + if err != nil { + return nil, fmt.Errorf("getting source: %w", err) + } + + err = watcher.Watch(s) + if err != nil { + return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err) + } + + syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout) + defer syncCancel() + err = s.WaitForSync(syncCtx) + if err != nil { + return nil, fmt.Errorf("waiting for sync: %w", err) + } + + return s, nil +} + +func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error { + if _, ok := c.sources[gvk]; !ok { + c.sources[gvk] = source + return nil + } + return errors.New("source already exists") +} + +func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error { + cacheGvks := c.getCacheGVKs() + removeErrs := []error{} + gvksToRemove := cacheGvks.Difference(gvks) + for _, gvk := range gvksToRemove.UnsortedList() { + err := c.removeSource(gvk) + if err != nil { + removeErrs = append(removeErrs, err) + } + } + + slices.SortFunc(removeErrs, func(a, b error) int { + return strings.Compare(a.Error(), b.Error()) + }) + + return errors.Join(removeErrs...) +} + +func (c *cache) removeSource(gvk schema.GroupVersionKind) error { + if source, ok := c.sources[gvk]; ok { + err := source.Close() + if err != nil { + return fmt.Errorf("closing source for GVK %q: %w", gvk, err) + } + } + delete(c.sources, gvk) + return nil +} + +func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] { + cacheGvks := sets.New[schema.GroupVersionKind]() + for gvk := range c.sources { + cacheGvks.Insert(gvk) + } + return cacheGvks +} + +// gvksForObjects builds a sets.Set of GroupVersionKinds for +// the provided client.Objects. It returns an error if: +// - There is no Kind set on the client.Object +// - There is no Version set on the client.Object +// +// An empty Group is assumed to be the "core" Kubernetes +// API group. +func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) { + gvkSet := sets.New[schema.GroupVersionKind]() + for _, obj := range objs { + gvk := obj.GetObjectKind().GroupVersionKind() + + // If the Kind or Version is not set in an object's GroupVersionKind + // attempting to add it to the runtime.Scheme will result in a panic. + // To avoid panics, we are doing the validation and returning early + // with an error if any objects are provided with a missing Kind or Version + // field + if gvk.Kind == "" { + return nil, fmt.Errorf( + "adding %s to set; object Kind is not defined", + obj.GetName(), + ) + } + + if gvk.Version == "" { + return nil, fmt.Errorf( + "adding %s to set; object Version is not defined", + obj.GetName(), + ) + } + + gvkSet.Insert(gvk) + } + + return gvkSet, nil +} diff --git a/internal/contentmanager/cache/cache_test.go b/internal/contentmanager/cache/cache_test.go new file mode 100644 index 000000000..edf5c0e7a --- /dev/null +++ b/internal/contentmanager/cache/cache_test.go @@ -0,0 +1,202 @@ +package cache + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" +) + +type mockWatcher struct { + err error +} + +var _ Watcher = (*mockWatcher)(nil) + +func (mw *mockWatcher) Watch(source.Source) error { + return mw.err +} + +type mockSourcerer struct { + err error + source CloserSyncingSource +} + +var _ sourcerer = (*mockSourcerer)(nil) + +func (ms *mockSourcerer) Source(_ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) { + if ms.err != nil { + return nil, ms.err + } + return ms.source, nil +} + +type mockSource struct { + err error +} + +var _ CloserSyncingSource = (*mockSource)(nil) + +func (ms *mockSource) Start(_ context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return ms.err +} + +func (ms *mockSource) WaitForSync(ctx context.Context) error { + return ms.err +} + +func (ms *mockSource) Close() error { + return ms.err +} + +func TestCacheWatch(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod)) + require.Contains(t, c.(*cache).sources, podGvk, "sources", c.(*cache).sources) +} + +func TestCacheWatchInvalidGVK(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, pod), "should fail on invalid GVK") +} + +func TestCacheWatchSourcererError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + err: errors.New("error"), + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, pod), "should fail when sourcerer returns an error") +} + +func TestCacheWatchWatcherError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{err: errors.New("error")}, pod), "should fail when watcher returns an error") +} + +func TestCacheWatchSourceWaitForSyncError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{ + err: errors.New("error"), + }, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, pod), "should fail when source fails to sync") + require.NotContains(t, c.(*cache).sources, podGvk, "should not contain source entry in mapping") +} + +func TestCacheWatchExistingSourceNotPanic(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{})) + + // In this case, a panic means there is a logic error somewhere in the + // cache.Watch() method. It should never hit the condition where it panics + // as it should never attempt to create a new source for one that already exists. + require.NotPanics(t, func() { _ = c.Watch(context.Background(), &mockWatcher{}, pod) }, "should never panic") +} + +func TestCacheWatchRemovesStaleSources(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod)) + require.Contains(t, c.(*cache).sources, podGvk) + + secret := &corev1.Secret{} + secretGvk := corev1.SchemeGroupVersion.WithKind("Secret") + secret.SetGroupVersionKind(secretGvk) + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, secret)) + require.Contains(t, c.(*cache).sources, secretGvk) + require.NotContains(t, c.(*cache).sources, podGvk) +} + +func TestCacheWatchRemovingStaleSourcesError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{ + err: errors.New("error"), + })) + + secret := &corev1.Secret{} + secretGvk := corev1.SchemeGroupVersion.WithKind("Secret") + secret.SetGroupVersionKind(secretGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, secret)) +} diff --git a/internal/contentmanager/contentmanager.go b/internal/contentmanager/contentmanager.go index 444cf9f74..13fc92f9b 100644 --- a/internal/contentmanager/contentmanager.go +++ b/internal/contentmanager/contentmanager.go @@ -2,138 +2,108 @@ package contentmanager import ( "context" - "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/source" "github.com/operator-framework/operator-controller/api/v1alpha1" + cmcache "github.com/operator-framework/operator-controller/internal/contentmanager/cache" oclabels "github.com/operator-framework/operator-controller/internal/labels" ) -type Watcher interface { - // Watch will establish watches for resources owned by a ClusterExtension - Watch(context.Context, controller.Controller, *v1alpha1.ClusterExtension, []client.Object) error - // Unwatch will remove watches for a ClusterExtension - Unwatch(*v1alpha1.ClusterExtension) +// Manager is a utility to manage content caches belonging +// to ClusterExtensions +type Manager interface { + // Get returns a managed content cache for the provided + // ClusterExtension if one exists. If one does not exist, + // a new Cache is created and returned + Get(context.Context, *v1alpha1.ClusterExtension) (cmcache.Cache, error) + // Delete will stop and remove a managed content cache + // for the provided ClusterExtension if one exists. + Delete(*v1alpha1.ClusterExtension) error } type RestConfigMapper func(context.Context, client.Object, *rest.Config) (*rest.Config, error) -type extensionCacheData struct { - Cache cache.Cache - Cancel context.CancelFunc +// managerImpl is an implementation of the Manager interface +type managerImpl struct { + rcm RestConfigMapper + baseCfg *rest.Config + caches map[string]cmcache.Cache + mapper meta.RESTMapper + mu *sync.Mutex + syncTimeout time.Duration + resyncPeriod time.Duration } -type instance struct { - rcm RestConfigMapper - baseCfg *rest.Config - extensionCaches map[string]extensionCacheData - mapper meta.RESTMapper - mu *sync.Mutex -} +type ManagerOption func(*managerImpl) -// New creates a new ContentManager object -func New(rcm RestConfigMapper, cfg *rest.Config, mapper meta.RESTMapper) Watcher { - return &instance{ - rcm: rcm, - baseCfg: cfg, - extensionCaches: make(map[string]extensionCacheData), - mapper: mapper, - mu: &sync.Mutex{}, +// WithSyncTimeout configures the time spent waiting +// for a managed content source to sync +func WithSyncTimeout(t time.Duration) ManagerOption { + return func(m *managerImpl) { + m.syncTimeout = t } } -// buildScheme builds a runtime.Scheme based on the provided client.Objects, -// with all GroupVersionKinds mapping to the unstructured.Unstructured type -// (unstructured.UnstructuredList for list kinds). -// -// If a provided client.Object does not set a Version or Kind field in its -// GroupVersionKind, an error will be returned. -func buildScheme(objs []client.Object) (*runtime.Scheme, error) { - scheme := runtime.NewScheme() - // The ClusterExtension types must be added to the scheme since its - // going to be used to establish watches that trigger reconciliation - // of the owning ClusterExtension - if err := v1alpha1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("adding operator controller APIs to scheme: %w", err) +// WithResyncPeriod configures the frequency +// a managed content source attempts to resync +func WithResyncPeriod(t time.Duration) ManagerOption { + return func(m *managerImpl) { + m.resyncPeriod = t } +} - for _, obj := range objs { - gvk := obj.GetObjectKind().GroupVersionKind() - - // If the Kind or Version is not set in an object's GroupVersionKind - // attempting to add it to the runtime.Scheme will result in a panic. - // To avoid panics, we are doing the validation and returning early - // with an error if any objects are provided with a missing Kind or Version - // field - if gvk.Kind == "" { - return nil, fmt.Errorf( - "adding %s to scheme; object Kind is not defined", - obj.GetName(), - ) - } - - if gvk.Version == "" { - return nil, fmt.Errorf( - "adding %s to scheme; object Version is not defined", - obj.GetName(), - ) - } +// NewManager creates a new Manager +func NewManager(rcm RestConfigMapper, cfg *rest.Config, mapper meta.RESTMapper, opts ...ManagerOption) Manager { + m := &managerImpl{ + rcm: rcm, + baseCfg: cfg, + caches: make(map[string]cmcache.Cache), + mapper: mapper, + mu: &sync.Mutex{}, + syncTimeout: time.Second * 10, + resyncPeriod: time.Hour * 10, + } - listKind := gvk.Kind + "List" - - if !scheme.Recognizes(gvk) { - // Since we can't have a mapping to every possible Go type in existence - // based on the GVK we need to use the unstructured types for mapping - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(gvk) - ul := &unstructured.UnstructuredList{} - ul.SetGroupVersionKind(gvk.GroupVersion().WithKind(listKind)) - - scheme.AddKnownTypeWithName(gvk, u) - scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(listKind), ul) - // Adding the common meta schemas to the scheme for the GroupVersion - // is necessary to ensure the scheme is aware of the different operations - // that can be performed against the resources in this GroupVersion - metav1.AddToGroupVersion(scheme, gvk.GroupVersion()) - } + for _, opt := range opts { + opt(m) } - return scheme, nil + return m } -// Watch configures a controller-runtime cache.Cache and establishes watches for the provided resources. -// It utilizes the provided ClusterExtension to set a DefaultLabelSelector on the cache.Cache -// to ensure it is only caching and reacting to content that belongs to the ClusterExtension. -// For each client.Object provided, a new source.Kind is created and used in a call to the Watch() method -// of the provided controller.Controller to establish new watches for the managed resources. -func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1alpha1.ClusterExtension, objs []client.Object) error { - if len(objs) == 0 || ce == nil || ctrl == nil { - return nil +// Get returns a Cache for the provided ClusterExtension. +// If a cache does not already exist, a new one will be created. +// If a nil ClusterExtension is provided this function will panic. +func (i *managerImpl) Get(ctx context.Context, ce *v1alpha1.ClusterExtension) (cmcache.Cache, error) { + if ce == nil { + panic("nil ClusterExtension provided") + } + + i.mu.Lock() + defer i.mu.Unlock() + cache, ok := i.caches[ce.Name] + if ok { + return cache, nil } cfg, err := i.rcm(ctx, ce, i.baseCfg) if err != nil { - return fmt.Errorf("getting rest.Config for ClusterExtension %q: %w", ce.Name, err) + return nil, fmt.Errorf("getting rest.Config: %w", err) } - scheme, err := buildScheme(objs) + dynamicClient, err := dynamic.NewForConfig(cfg) if err != nil { - return fmt.Errorf("building scheme for ClusterExtension %q: %w", ce.GetName(), err) + return nil, fmt.Errorf("getting dynamic client: %w", err) } tgtLabels := labels.Set{ @@ -141,83 +111,37 @@ func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1 oclabels.OwnerNameKey: ce.GetName(), } - c, err := cache.New(cfg, cache.Options{ - Scheme: scheme, - DefaultLabelSelector: tgtLabels.AsSelector(), - }) - if err != nil { - return fmt.Errorf("creating cache for ClusterExtension %q: %w", ce.Name, err) + dynamicSourcerer := &dynamicSourcerer{ + // Due to the limitation outlined in the dynamic informer source, + // related to reusing an informer factory, we return a new informer + // factory every time to ensure we are not attempting to configure or + // start an already started informer + informerFactoryCreateFunc: func() dynamicinformer.DynamicSharedInformerFactory { + return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, metav1.NamespaceAll, func(lo *metav1.ListOptions) { + lo.LabelSelector = tgtLabels.AsSelector().String() + }) + }, + mapper: i.mapper, } + cache = cmcache.NewCache(dynamicSourcerer, ce, i.syncTimeout) + i.caches[ce.Name] = cache + return cache, nil +} - for _, obj := range objs { - err = ctrl.Watch( - source.Kind( - c, - obj, - handler.TypedEnqueueRequestForOwner[client.Object]( - scheme, - i.mapper, - ce, - handler.OnlyControllerOwner(), - ), - predicate.Funcs{ - CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false }, - UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true }, - DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true }, - GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true }, - }, - ), - ) - if err != nil { - return fmt.Errorf("creating watch for ClusterExtension %q managed resource %s: %w", ce.Name, obj.GetObjectKind().GroupVersionKind(), err) - } +// Delete stops and removes the Cache for the provided ClusterExtension +func (i *managerImpl) Delete(ce *v1alpha1.ClusterExtension) error { + if ce == nil { + panic("nil ClusterExtension provided") } - // TODO: Instead of stopping the existing cache and replacing it every time - // we should stop the informers that are no longer required - // and create any new ones as necessary. To keep the initial pass - // simple, we are going to keep this as is and optimize in a follow-up. - // Doing this in a follow-up gives us the opportunity to verify that this functions - // as expected when wired up in the ClusterExtension reconciler before going too deep - // in optimizations. i.mu.Lock() - if extCache, ok := i.extensionCaches[ce.GetName()]; ok { - extCache.Cancel() - } - - cacheCtx, cancel := context.WithCancel(context.Background()) - i.extensionCaches[ce.Name] = extensionCacheData{ - Cache: c, - Cancel: cancel, - } - i.mu.Unlock() - - go func() { - err := c.Start(cacheCtx) + defer i.mu.Unlock() + if cache, ok := i.caches[ce.Name]; ok { + err := cache.Close() if err != nil { - i.Unwatch(ce) + return fmt.Errorf("closing cache: %w", err) } - }() - - if !c.WaitForCacheSync(cacheCtx) { - i.Unwatch(ce) - return errors.New("cache could not sync, it has been stopped and removed") + delete(i.caches, ce.Name) } - return nil } - -// Unwatch will stop the cache for the provided ClusterExtension -// stopping any watches on managed content -func (i *instance) Unwatch(ce *v1alpha1.ClusterExtension) { - if ce == nil { - return - } - - i.mu.Lock() - if extCache, ok := i.extensionCaches[ce.GetName()]; ok { - extCache.Cancel() - delete(i.extensionCaches, ce.GetName()) - } - i.mu.Unlock() -} diff --git a/internal/contentmanager/contentmanager_test.go b/internal/contentmanager/contentmanager_test.go deleted file mode 100644 index 6716b8ba5..000000000 --- a/internal/contentmanager/contentmanager_test.go +++ /dev/null @@ -1,208 +0,0 @@ -package contentmanager - -import ( - "context" - "errors" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/operator-framework/operator-controller/api/v1alpha1" -) - -func TestWatch(t *testing.T) { - tests := []struct { - name string - rcm RestConfigMapper - config *rest.Config - ce *v1alpha1.ClusterExtension - objs []client.Object - wantErr bool - }{ - { - name: "Valid cluster extension valid managed content should pass", - rcm: func(_ context.Context, _ client.Object, cfg *rest.Config) (*rest.Config, error) { - return cfg, nil - }, - config: &rest.Config{}, - ce: &v1alpha1.ClusterExtension{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster-extension", - }, - }, - objs: []client.Object{ - &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - }, - { - name: "Fail when the rest config mapper returns an error", - rcm: func(_ context.Context, _ client.Object, cfg *rest.Config) (*rest.Config, error) { - return nil, errors.New("failed getting rest config") - }, - config: &rest.Config{}, - ce: &v1alpha1.ClusterExtension{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster-extension", - }, - }, - objs: []client.Object{ - &corev1.Pod{}, - }, - wantErr: true, - }, - { - name: "Should return an error when buildScheme() fails", - rcm: func(_ context.Context, _ client.Object, cfg *rest.Config) (*rest.Config, error) { - return cfg, nil - }, - config: &rest.Config{}, - ce: &v1alpha1.ClusterExtension{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster-extension", - }, - }, - objs: []client.Object{ - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: true, - }, - } - - for i, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - mgr, _ := manager.New(tc.config, manager.Options{}) - ctrl, err := controller.New(fmt.Sprintf("test-controller-%v", i), mgr, controller.Options{ - Reconciler: reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { - return reconcile.Result{}, nil - }), - }) - require.NoError(t, err) - - instance := New(tc.rcm, tc.config, mgr.GetRESTMapper()) - got := instance.Watch(context.Background(), ctrl, tc.ce, tc.objs) - assert.Equal(t, got != nil, tc.wantErr) - }) - } -} - -func TestBuildScheme(t *testing.T) { - type validation struct { - gvks []schema.GroupVersionKind - valid bool - } - - testcases := []struct { - name string - objects []client.Object - wantErr bool - want validation - }{ - { - name: "Gvk is not defined", - objects: []client.Object{&corev1.Pod{}}, - wantErr: true, - want: validation{ - gvks: []schema.GroupVersionKind{}, - valid: false, - }, - }, - { - name: "Check objects added in scheme", - objects: []client.Object{ - &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployment", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - want: validation{ - gvks: []schema.GroupVersionKind{ - appsv1.SchemeGroupVersion.WithKind("Deployment"), - }, - valid: true, - }, - }, - { - name: "Check object not defined in scheme", - objects: []client.Object{ - &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - want: validation{ - gvks: []schema.GroupVersionKind{ - corev1.SchemeGroupVersion.WithKind("Secret"), - }, - valid: false, - }, - }, - { - name: "Check if empty Group is valid", - objects: []client.Object{ - &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - want: validation{ - gvks: []schema.GroupVersionKind{ - corev1.SchemeGroupVersion.WithKind("Pod"), - }, - valid: true, - }, - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - scheme, err := buildScheme(tc.objects) - require.Equal(t, err != nil, tc.wantErr) - for _, gvk := range tc.want.gvks { - got := scheme.Recognizes(gvk) - assert.Equal(t, got, tc.want.valid) - } - }) - } -} diff --git a/internal/contentmanager/source/dynamicsource.go b/internal/contentmanager/source/dynamicsource.go new file mode 100644 index 000000000..1f539871f --- /dev/null +++ b/internal/contentmanager/source/dynamicsource.go @@ -0,0 +1,195 @@ +package source + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic/dynamicinformer" + cgocache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + source "github.com/operator-framework/operator-controller/internal/contentmanager/source/internal" +) + +type DynamicSourceConfig struct { + // DynamicInformerFactory is the dynamicinformer.DynamicSharedInformerFactory + // that is used to generate the informers configured on this sources startup. + // If you use a dynamicinformer.DynamicSharedInformerFactory that you've + // used previously, it must not have been used to start a new informer for + // the same GVR. You can not start or configure an informer that has already + // been started, even after it has been stopped. Reusing an informer factory + // may result in attempting to configure and start an informer that has + // already been started. + DynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory + // GVR is the GroupVersionResource that this source is responsible + // for creating and configuring an informer for + GVR schema.GroupVersionResource + // Owner is the client.Object that owns the managed content that this + // source will be creating an informer to react to events for. This + // field is used to attempt to requeue the owning client.Object for + // reconciliation on a watch error after a previously successful sync + Owner client.Object + // Handler is the handler.EventHandler that is used to react to events + // received by the configured source + Handler handler.EventHandler + // Predicates are the predicate.Predicate functions used to determine + // if a triggered event should be reacted to + Predicates []predicate.Predicate + // OnPostSyncError is the callback function that is used when the source + // initially synced successfully and later encountered an error + OnPostSyncError func(context.Context) +} + +func NewDynamicSource(cfg DynamicSourceConfig) *dynamicInformerSource { + return &dynamicInformerSource{ + cfg: cfg, + erroredChan: make(chan struct{}), + syncedChan: make(chan struct{}), + startedChan: make(chan struct{}), + } +} + +// dynamicInformerSource is an implementation of the +// ReaderCloserSyncingSource interface. It is used +// to create an informer, using the provided dynamic informer +// factory, for the configured GroupVersionResource. +// +// The informer is configured with a WatchEventErrorHandler that +// stops the informer, and if it had previously synced successfully +// it attempts to requeue the provided Owner for reconciliation and +// calls the provided OnWatchError function. +type dynamicInformerSource struct { + cfg DynamicSourceConfig + informerCancel context.CancelFunc + informerCtx context.Context + startedChan chan struct{} + syncedChan chan struct{} + erroredChan chan struct{} + errOnce sync.Once + err error +} + +func (dis *dynamicInformerSource) String() string { + return fmt.Sprintf("contentmanager.DynamicInformerSource - GVR: %s, Owner Type: %T, Owner Name: %s", dis.cfg.GVR, dis.cfg.Owner, dis.cfg.Owner.GetName()) +} + +func (dis *dynamicInformerSource) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + // Close the startedChan to signal that this + // source has been started. Subsequent calls + // to Start will attempt to close a closed channel + // and panic. + close(dis.startedChan) + + dis.informerCtx, dis.informerCancel = context.WithCancel(ctx) + gInf := dis.cfg.DynamicInformerFactory.ForResource(dis.cfg.GVR) + eventHandler := source.NewEventHandler(dis.informerCtx, q, dis.cfg.Handler, dis.cfg.Predicates) + + // If we encounter an error during the watch we will: + // - Capture the error + // - cancel the informer + // requeuing of the ClusterExtension should happen by the + // WaitForSync function returning an error + // Only if we have successfully synced in the past should we + // requeue the ClusterExtension + sharedIndexInf := gInf.Informer() + err := sharedIndexInf.SetWatchErrorHandler(func(r *cgocache.Reflector, err error) { + dis.errOnce.Do(func() { + dis.err = err + close(dis.erroredChan) + }) + + if dis.hasSynced() { + // We won't be able to update the ClusterExtension status + // conditions so instead force a requeue if we + // have previously synced and then errored + defer q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: dis.cfg.Owner.GetName(), + }, + }) + dis.cfg.OnPostSyncError(dis.informerCtx) + } + dis.informerCancel() + cgocache.DefaultWatchErrorHandler(r, err) + }) + if err != nil { + return fmt.Errorf("setting WatchErrorHandler: %w", err) + } + + _, err = sharedIndexInf.AddEventHandler(eventHandler.HandlerFuncs()) + if err != nil { + return fmt.Errorf("adding event handler: %w", err) + } + + go sharedIndexInf.Run(dis.informerCtx.Done()) + + go func() { + syncOnce := sync.OnceFunc(func() { + dis.syncedChan <- struct{}{} + close(dis.syncedChan) + }) + + _ = wait.PollUntilContextCancel(dis.informerCtx, time.Second, true, func(_ context.Context) (bool, error) { + if sharedIndexInf.HasSynced() { + syncOnce() + return true, nil + } + return false, nil + }) + }() + + return nil +} + +func (dis *dynamicInformerSource) WaitForSync(ctx context.Context) error { + if !dis.hasStarted() { + return fmt.Errorf("not yet started") + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-dis.erroredChan: + return dis.err + case <-dis.informerCtx.Done(): + return dis.informerCtx.Err() + case <-dis.syncedChan: + return nil + } +} + +func (dis *dynamicInformerSource) Close() error { + if !dis.hasStarted() { + return errors.New("source has not yet started") + } + dis.informerCancel() + return nil +} + +func (dis *dynamicInformerSource) hasSynced() bool { + select { + case <-dis.syncedChan: + return true + default: + return false + } +} + +func (dis *dynamicInformerSource) hasStarted() bool { + select { + case <-dis.startedChan: + return true + default: + return false + } +} diff --git a/internal/contentmanager/source/dynamicsource_test.go b/internal/contentmanager/source/dynamicsource_test.go new file mode 100644 index 000000000..61cbab8ca --- /dev/null +++ b/internal/contentmanager/source/dynamicsource_test.go @@ -0,0 +1,112 @@ +package source + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + dynamicfake "k8s.io/client-go/dynamic/fake" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/operator-framework/operator-controller/api/v1alpha1" +) + +func TestDynamicInformerSourceCloseBeforeStartErrors(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + require.Error(t, dis.Close(), "calling close before start should error") +} + +func TestDynamicInformerSourceWaitForSyncTimeout(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + timeout, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10) + defer cancel() + require.Error(t, dis.WaitForSync(timeout), "should error on timeout") +} + +func TestDynamicInformerSourceWaitForSyncInformerContextClosed(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + timeout, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10) + defer cancel() + dis.informerCtx = timeout + require.Error(t, dis.WaitForSync(context.Background()), "should error on informer context closed") +} + +func TestDynamicInformerSourceWaitForSyncErrorChannel(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + go func() { + time.Sleep(time.Millisecond * 10) + dis.err = errors.New("error") + close(dis.erroredChan) + }() + require.Error(t, dis.WaitForSync(context.Background()), "should error on receiving error from channel") +} + +func TestDynamicInformerSourceWaitForSyncAlreadyErrored(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + dis.err = errors.New("error") + close(dis.erroredChan) + require.Error(t, dis.WaitForSync(context.Background()), "should error since there is already a sync error") +} + +func TestDynamicInformerSourceWaitForSyncAlreadySynced(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + close(dis.syncedChan) + dis.informerCtx = context.Background() + require.NoError(t, dis.WaitForSync(context.Background()), "should not error if already synced") +} + +func TestDynamicInformerSourceWaitForSyncSyncedChannel(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + go func() { + time.Sleep(time.Millisecond * 10) + close(dis.syncedChan) + }() + require.NoError(t, dis.WaitForSync(context.Background()), "should not error on receiving struct from syncedChannel") +} + +func TestDynamicInformerSourceWaitForSyncNotStarted(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + require.Error(t, dis.WaitForSync(context.Background()), "should error if not started") +} + +func TestDynamicInformerSourceStartAlreadyStarted(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + require.Panics(t, func() { _ = dis.Start(context.Background(), nil) }, "should return an error if already started") +} + +func TestDynamicInformerSourceStart(t *testing.T) { + fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) + infFact := dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, time.Minute) + dis := NewDynamicSource(DynamicSourceConfig{ + DynamicInformerFactory: infFact, + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + Owner: &v1alpha1.ClusterExtension{}, + Handler: handler.Funcs{}, + Predicates: []predicate.Predicate{}, + OnPostSyncError: func(ctx context.Context) {}, + }) + + require.NoError(t, dis.Start(context.Background(), nil)) + require.NoError(t, dis.Close()) +} diff --git a/internal/contentmanager/source/internal/eventhandler.go b/internal/contentmanager/source/internal/eventhandler.go new file mode 100644 index 000000000..54625af09 --- /dev/null +++ b/internal/contentmanager/source/internal/eventhandler.go @@ -0,0 +1,193 @@ +package source + +// NOTE: To reduce the amount of custom code that needs to be written to facilitate +// the creation of a custom source implementation, the code in this file is copied from +// https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.18/pkg/internal/source/event_handler.go +// +// Any modifications to this code will be noted below: +// - The "k8s.io/client-go/tools/cache" package was aliased to "cgocache". +// All references to this package were updated to use the new alias. +// - The "logf" aliased import was updated from "sigs.k8s.io/controller-runtime/pkg/internal/log" +// to "sigs.k8s.io/controller-runtime/pkg/log" +// - The logger for the event handler is now generated with "logf.Log.WithName("source").WithName("EventHandler")" +// instead of "logf.RuntimeLog.WithName("source").WithName("EventHandler")" +// +// All modifications are licensed under the Apache 2.0 license. +// +// In the future, we may consider making a contribution to the +// controller-runtime project that exports this more generic +// event handler functionality. If that happens, this code will be removed +// and we will instead import the appropriate logic from the controller-runtime +// project. + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import ( + "context" + "fmt" + + cgocache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +var log = logf.Log.WithName("source").WithName("EventHandler") + +// NewEventHandler creates a new EventHandler. +func NewEventHandler[object client.Object, request comparable]( + ctx context.Context, + queue workqueue.TypedRateLimitingInterface[request], + handler handler.TypedEventHandler[object, request], + predicates []predicate.TypedPredicate[object]) *EventHandler[object, request] { + return &EventHandler[object, request]{ + ctx: ctx, + handler: handler, + queue: queue, + predicates: predicates, + } +} + +// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. +type EventHandler[object client.Object, request comparable] struct { + // ctx stores the context that created the event handler + // that is used to propagate cancellation signals to each handler function. + ctx context.Context + + handler handler.TypedEventHandler[object, request] + queue workqueue.TypedRateLimitingInterface[request] + predicates []predicate.TypedPredicate[object] +} + +// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs +// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 +func (e *EventHandler[object, request]) HandlerFuncs() cgocache.ResourceEventHandlerFuncs { + return cgocache.ResourceEventHandlerFuncs{ + AddFunc: e.OnAdd, + UpdateFunc: e.OnUpdate, + DeleteFunc: e.OnDelete, + } +} + +// OnAdd creates CreateEvent and calls Create on EventHandler. +func (e *EventHandler[object, request]) OnAdd(obj interface{}) { + c := event.TypedCreateEvent[object]{} + + // Pull Object out of the object + if o, ok := obj.(object); ok { + c.Object = o + } else { + log.Error(nil, "OnAdd missing Object", + "object", obj, "type", fmt.Sprintf("%T", obj)) + return + } + + for _, p := range e.predicates { + if !p.Create(c) { + return + } + } + + // Invoke create handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Create(ctx, c, e.queue) +} + +// OnUpdate creates UpdateEvent and calls Update on EventHandler. +func (e *EventHandler[object, request]) OnUpdate(oldObj, newObj interface{}) { + u := event.TypedUpdateEvent[object]{} + + if o, ok := oldObj.(object); ok { + u.ObjectOld = o + } else { + log.Error(nil, "OnUpdate missing ObjectOld", + "object", oldObj, "type", fmt.Sprintf("%T", oldObj)) + return + } + + // Pull Object out of the object + if o, ok := newObj.(object); ok { + u.ObjectNew = o + } else { + log.Error(nil, "OnUpdate missing ObjectNew", + "object", newObj, "type", fmt.Sprintf("%T", newObj)) + return + } + + for _, p := range e.predicates { + if !p.Update(u) { + return + } + } + + // Invoke update handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Update(ctx, u, e.queue) +} + +// OnDelete creates DeleteEvent and calls Delete on EventHandler. +func (e *EventHandler[object, request]) OnDelete(obj interface{}) { + d := event.TypedDeleteEvent[object]{} + + // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a + // DeleteFinalStateUnknown struct, so the object needs to be pulled out. + // Copied from sample-controller + // This should never happen if we aren't missing events, which we have concluded that we are not + // and made decisions off of this belief. Maybe this shouldn't be here? + var ok bool + if _, ok = obj.(client.Object); !ok { + // If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown + tombstone, ok := obj.(cgocache.DeletedFinalStateUnknown) + if !ok { + log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown", + "type", fmt.Sprintf("%T", obj), + "object", obj) + return + } + + // Set DeleteStateUnknown to true + d.DeleteStateUnknown = true + + // Set obj to the tombstone obj + obj = tombstone.Obj + } + + // Pull Object out of the object + if o, ok := obj.(object); ok { + d.Object = o + } else { + log.Error(nil, "OnDelete missing Object", + "object", obj, "type", fmt.Sprintf("%T", obj)) + return + } + + for _, p := range e.predicates { + if !p.Delete(d) { + return + } + } + + // Invoke delete handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Delete(ctx, d, e.queue) +} diff --git a/internal/contentmanager/sourcerer.go b/internal/contentmanager/sourcerer.go new file mode 100644 index 000000000..a946cb7ba --- /dev/null +++ b/internal/contentmanager/sourcerer.go @@ -0,0 +1,99 @@ +package contentmanager + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/operator-framework/operator-controller/api/v1alpha1" + "github.com/operator-framework/operator-controller/internal/contentmanager/cache" + "github.com/operator-framework/operator-controller/internal/contentmanager/source" +) + +type dynamicSourcerer struct { + informerFactoryCreateFunc func() dynamicinformer.DynamicSharedInformerFactory + mapper meta.RESTMapper +} + +func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) { + scheme, err := buildScheme(gvk) + if err != nil { + return nil, fmt.Errorf("building scheme: %w", err) + } + + restMapping, err := ds.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, fmt.Errorf("getting resource mapping for GVK %q: %w", gvk, err) + } + + s := source.NewDynamicSource(source.DynamicSourceConfig{ + GVR: restMapping.Resource, + Owner: owner, + Handler: handler.EnqueueRequestForOwner(scheme, ds.mapper, owner, handler.OnlyControllerOwner()), + Predicates: []predicate.Predicate{ + predicate.Funcs{ + CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false }, + UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true }, + DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true }, + GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true }, + }, + }, + DynamicInformerFactory: ds.informerFactoryCreateFunc(), + OnPostSyncError: onPostSyncError, + }) + return s, nil +} + +// buildScheme builds a runtime.Scheme based on the provided GroupVersionKinds, +// with all GroupVersionKinds mapping to the unstructured.Unstructured type +// (unstructured.UnstructuredList for list kinds). +// +// It is assumed all GroupVersionKinds are valid, which means: +// - The Kind is set +// - The Version is set +// +// Invalid GroupVersionKinds will result in a panic. +func buildScheme(gvks ...schema.GroupVersionKind) (*runtime.Scheme, error) { + scheme := runtime.NewScheme() + // The ClusterExtension types must be added to the scheme since its + // going to be used to establish watches that trigger reconciliation + // of the owning ClusterExtension + if err := v1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("adding operator controller APIs to scheme: %w", err) + } + + for _, gvk := range gvks { + if !scheme.Recognizes(gvk) { + // Since we can't have a mapping to every possible Go type in existence + // based on the GVK we need to use the unstructured types for mapping + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(gvk) + scheme.AddKnownTypeWithName(gvk, u) + + // Adding the common meta schemas to the scheme for the GroupVersion + // is necessary to ensure the scheme is aware of the different operations + // that can be performed against the resources in this GroupVersion + metav1.AddToGroupVersion(scheme, gvk.GroupVersion()) + } + + listGVK := gvk + listGVK.Kind = listGVK.Kind + "List" + if !scheme.Recognizes(listGVK) { + ul := &unstructured.UnstructuredList{} + ul.SetGroupVersionKind(listGVK) + scheme.AddKnownTypeWithName(listGVK, ul) + } + } + + return scheme, nil +} diff --git a/internal/controllers/clusterextension_controller.go b/internal/controllers/clusterextension_controller.go index e6c006aea..685bfd35f 100644 --- a/internal/controllers/clusterextension_controller.go +++ b/internal/controllers/clusterextension_controller.go @@ -48,7 +48,6 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" - "github.com/operator-framework/operator-controller/internal/applier" "github.com/operator-framework/operator-controller/internal/bundleutil" "github.com/operator-framework/operator-controller/internal/conditionsets" "github.com/operator-framework/operator-controller/internal/contentmanager" @@ -58,7 +57,8 @@ import ( ) const ( - ClusterExtensionCleanupUnpackCacheFinalizer = "olm.operatorframework.io/cleanup-unpack-cache" + ClusterExtensionCleanupUnpackCacheFinalizer = "olm.operatorframework.io/cleanup-unpack-cache" + ClusterExtensionCleanupContentManagerCacheFinalizer = "olm.operatorframework.io/cleanup-contentmanager-cache" ) // ClusterExtensionReconciler reconciles a ClusterExtension object @@ -67,7 +67,7 @@ type ClusterExtensionReconciler struct { Resolver resolve.Resolver Unpacker rukpaksource.Unpacker Applier Applier - Watcher contentmanager.Watcher + Manager contentmanager.Manager controller crcontroller.Controller cache cache.Cache InstalledBundleGetter InstalledBundleGetter @@ -75,7 +75,10 @@ type ClusterExtensionReconciler struct { } type Applier interface { - Apply(context.Context, fs.FS, *ocv1alpha1.ClusterExtension, map[string]string) ([]client.Object, string, error) + // Apply applies the content in the provided fs.FS using the configuration of the provided ClusterExtension. + // It also takes in a map[string]string to be applied to all applied resources as labels and another + // map[string]string used to create a unique identifier for a stored reference to the resources created. + Apply(context.Context, fs.FS, *ocv1alpha1.ClusterExtension, map[string]string, map[string]string) ([]client.Object, string, error) } type InstalledBundleGetter interface { @@ -268,9 +271,12 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp return ctrl.Result{}, fmt.Errorf("unexpected unpack status: %v", unpackResult.Message) } - lbls := map[string]string{ - labels.OwnerKindKey: ocv1alpha1.ClusterExtensionKind, - labels.OwnerNameKey: ext.GetName(), + objLbls := map[string]string{ + labels.OwnerKindKey: ocv1alpha1.ClusterExtensionKind, + labels.OwnerNameKey: ext.GetName(), + } + + storeLbls := map[string]string{ labels.BundleNameKey: resolvedBundle.Name, labels.PackageNameKey: resolvedBundle.Package, labels.BundleVersionKey: resolvedBundleVersion.String(), @@ -286,30 +292,50 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp // to ensure exponential backoff can occur: // - Permission errors (it is not possible to watch changes to permissions. // The only way to eventually recover from permission errors is to keep retrying). - managedObjs, state, err := r.Applier.Apply(ctx, unpackResult.Bundle, ext, lbls) + managedObjs, _, err := r.Applier.Apply(ctx, unpackResult.Bundle, ext, objLbls, storeLbls) if err != nil { setInstalledStatusConditionFailed(ext, err.Error()) return ctrl.Result{}, err } - // Only attempt to watch resources if we are - // installing / upgrading. Otherwise we may restart - // watches that have already been established - if state != applier.StateUnchanged { - l.V(1).Info("watching managed objects") - if err := r.Watcher.Watch(ctx, r.controller, ext, managedObjs); err != nil { - setInstallStatus(ext, nil) - setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err)) - return ctrl.Result{}, err - } - } - installStatus := &ocv1alpha1.ClusterExtensionInstallStatus{ Bundle: bundleutil.MetadataFor(resolvedBundle.Name, *resolvedBundleVersion), } setInstallStatus(ext, installStatus) setInstalledStatusConditionSuccess(ext, fmt.Sprintf("Installed bundle %s successfully", resolvedBundle.Image)) + l.V(1).Info("watching managed objects") + cache, err := r.Manager.Get(ctx, ext) + if err != nil { + // If we fail to get the cache, set the Healthy condition to + // "Unknown". We can't know the health of resources we can't monitor + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeHealthy, + Reason: ocv1alpha1.ReasonUnverifiable, + Status: metav1.ConditionUnknown, + Message: err.Error(), + ObservedGeneration: ext.Generation, + }) + return ctrl.Result{}, err + } + + if err := cache.Watch(ctx, r.controller, managedObjs...); err != nil { + // If we fail to establish watches, set the Healthy condition to + // "Unknown". We can't know the health of resources we can't monitor + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeHealthy, + Reason: ocv1alpha1.ReasonUnverifiable, + Status: metav1.ConditionUnknown, + Message: err.Error(), + ObservedGeneration: ext.Generation, + }) + return ctrl.Result{}, err + } + + // If we have successfully established the watches, remove the "Healthy" condition. + // It should be interpreted as "Unknown" when not present. + apimeta.RemoveStatusCondition(&ext.Status.Conditions, ocv1alpha1.TypeHealthy) + return ctrl.Result{}, nil } diff --git a/internal/controllers/clusterextension_controller_test.go b/internal/controllers/clusterextension_controller_test.go index b5d241b3e..a6637ff66 100644 --- a/internal/controllers/clusterextension_controller_test.go +++ b/internal/controllers/clusterextension_controller_test.go @@ -495,7 +495,7 @@ func TestClusterExtensionInstallationFailedApplierFails(t *testing.T) { require.NoError(t, cl.DeleteAllOf(ctx, &ocv1alpha1.ClusterExtension{})) } -func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { +func TestClusterExtensionManagerFailed(t *testing.T) { cl, reconciler := newClientAndReconciler(t) reconciler.Unpacker = &MockUnpacker{ result: &source.Result{ @@ -550,8 +550,8 @@ func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { reconciler.Applier = &MockApplier{ objs: []client.Object{}, } - reconciler.Watcher = &MockWatcher{ - err: errors.New("watcher fail"), + reconciler.Manager = &MockManagedContentCacheManager{ + err: errors.New("manager fail"), } res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: extKey}) require.Equal(t, ctrl.Result{}, res) @@ -562,7 +562,7 @@ func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { t.Log("By checking the status fields") require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Resolution.Bundle) - require.Empty(t, clusterExtension.Status.Install) + require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Install.Bundle) t.Log("By checking the expected resolution conditions") resolvedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) @@ -580,8 +580,114 @@ func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { t.Log("By checking the expected installed conditions") installedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeInstalled) require.NotNil(t, installedCond) - require.Equal(t, metav1.ConditionFalse, installedCond.Status) - require.Equal(t, ocv1alpha1.ReasonInstallationFailed, installedCond.Reason) + require.Equal(t, metav1.ConditionTrue, installedCond.Status) + require.Equal(t, ocv1alpha1.ReasonSuccess, installedCond.Reason) + + t.Log("By checking the expected healthy conditions") + managedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeHealthy) + require.NotNil(t, managedCond) + require.Equal(t, metav1.ConditionUnknown, managedCond.Status) + require.Equal(t, ocv1alpha1.ReasonUnverifiable, managedCond.Reason) + + require.NoError(t, cl.DeleteAllOf(ctx, &ocv1alpha1.ClusterExtension{})) +} + +func TestClusterExtensionManagedContentCacheWatchFail(t *testing.T) { + cl, reconciler := newClientAndReconciler(t) + reconciler.Unpacker = &MockUnpacker{ + result: &source.Result{ + State: source.StateUnpacked, + Bundle: fstest.MapFS{}, + }, + } + + ctx := context.Background() + extKey := types.NamespacedName{Name: fmt.Sprintf("cluster-extension-test-%s", rand.String(8))} + + t.Log("When the cluster extension specifies a channel with version that exist") + t.Log("By initializing cluster state") + pkgName := "prometheus" + pkgVer := "1.0.0" + pkgChan := "beta" + installNamespace := fmt.Sprintf("test-ns-%s", rand.String(8)) + serviceAccount := fmt.Sprintf("test-sa-%s", rand.String(8)) + + clusterExtension := &ocv1alpha1.ClusterExtension{ + ObjectMeta: metav1.ObjectMeta{Name: extKey.Name}, + Spec: ocv1alpha1.ClusterExtensionSpec{ + Source: ocv1alpha1.SourceConfig{ + SourceType: ocv1alpha1.SourceTypeCatalog, + + Catalog: &ocv1alpha1.CatalogSource{ + PackageName: pkgName, + Version: pkgVer, + Channel: pkgChan, + }, + }, + Install: ocv1alpha1.ClusterExtensionInstallConfig{ + Namespace: installNamespace, + ServiceAccount: ocv1alpha1.ServiceAccountReference{ + Name: serviceAccount, + }, + }, + }, + } + err := cl.Create(ctx, clusterExtension) + require.NoError(t, err) + + t.Log("It sets resolution success status") + t.Log("By running reconcile") + reconciler.Resolver = resolve.Func(func(_ context.Context, _ *ocv1alpha1.ClusterExtension, _ *ocv1alpha1.BundleMetadata) (*declcfg.Bundle, *bsemver.Version, *declcfg.Deprecation, error) { + v := bsemver.MustParse("1.0.0") + return &declcfg.Bundle{ + Name: "prometheus.v1.0.0", + Package: "prometheus", + Image: "quay.io/operatorhubio/prometheus@fake1.0.0", + }, &v, nil, nil + }) + reconciler.Applier = &MockApplier{ + objs: []client.Object{}, + } + reconciler.Manager = &MockManagedContentCacheManager{ + cache: &MockManagedContentCache{ + err: errors.New("watch error"), + }, + } + res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: extKey}) + require.Equal(t, ctrl.Result{}, res) + require.Error(t, err) + + t.Log("By fetching updated cluster extension after reconcile") + require.NoError(t, cl.Get(ctx, extKey, clusterExtension)) + + t.Log("By checking the status fields") + require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Resolution.Bundle) + require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Install.Bundle) + + t.Log("By checking the expected resolution conditions") + resolvedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) + require.NotNil(t, resolvedCond) + require.Equal(t, metav1.ConditionTrue, resolvedCond.Status) + require.Equal(t, ocv1alpha1.ReasonSuccess, resolvedCond.Reason) + require.Equal(t, "resolved to \"quay.io/operatorhubio/prometheus@fake1.0.0\"", resolvedCond.Message) + + t.Log("By checking the expected unpacked conditions") + unpackedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeUnpacked) + require.NotNil(t, unpackedCond) + require.Equal(t, metav1.ConditionTrue, unpackedCond.Status) + require.Equal(t, ocv1alpha1.ReasonUnpackSuccess, unpackedCond.Reason) + + t.Log("By checking the expected installed conditions") + installedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeInstalled) + require.NotNil(t, installedCond) + require.Equal(t, metav1.ConditionTrue, installedCond.Status) + require.Equal(t, ocv1alpha1.ReasonSuccess, installedCond.Reason) + + t.Log("By checking the expected healthy conditions") + managedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeHealthy) + require.NotNil(t, managedCond) + require.Equal(t, metav1.ConditionUnknown, managedCond.Status) + require.Equal(t, ocv1alpha1.ReasonUnverifiable, managedCond.Reason) require.NoError(t, cl.DeleteAllOf(ctx, &ocv1alpha1.ClusterExtension{})) } @@ -641,7 +747,9 @@ func TestClusterExtensionInstallationSucceeds(t *testing.T) { reconciler.Applier = &MockApplier{ objs: []client.Object{}, } - reconciler.Watcher = &MockWatcher{} + reconciler.Manager = &MockManagedContentCacheManager{ + cache: &MockManagedContentCache{}, + } res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: extKey}) require.Equal(t, ctrl.Result{}, res) require.NoError(t, err) diff --git a/internal/controllers/suite_test.go b/internal/controllers/suite_test.go index f45b89a52..9fa70e457 100644 --- a/internal/controllers/suite_test.go +++ b/internal/controllers/suite_test.go @@ -30,7 +30,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/envtest" crfinalizer "sigs.k8s.io/controller-runtime/pkg/finalizer" @@ -38,6 +37,7 @@ import ( ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" "github.com/operator-framework/operator-controller/internal/contentmanager" + cmcache "github.com/operator-framework/operator-controller/internal/contentmanager/cache" "github.com/operator-framework/operator-controller/internal/controllers" "github.com/operator-framework/operator-controller/internal/rukpak/source" ) @@ -92,7 +92,7 @@ type MockApplier struct { state string } -func (m *MockApplier) Apply(_ context.Context, _ fs.FS, _ *ocv1alpha1.ClusterExtension, _ map[string]string) ([]client.Object, string, error) { +func (m *MockApplier) Apply(_ context.Context, _ fs.FS, _ *ocv1alpha1.ClusterExtension, _ map[string]string, _ map[string]string) ([]client.Object, string, error) { if m.err != nil { return nil, m.state, m.err } @@ -100,17 +100,43 @@ func (m *MockApplier) Apply(_ context.Context, _ fs.FS, _ *ocv1alpha1.ClusterExt return m.objs, m.state, nil } -var _ contentmanager.Watcher = (*MockWatcher)(nil) +var _ contentmanager.Manager = (*MockManagedContentCacheManager)(nil) -type MockWatcher struct { - err error +type MockManagedContentCacheManager struct { + err error + cache cmcache.Cache } -func (m *MockWatcher) Watch(_ context.Context, _ controller.Controller, _ *ocv1alpha1.ClusterExtension, _ []client.Object) error { +func (m *MockManagedContentCacheManager) Get(_ context.Context, _ *ocv1alpha1.ClusterExtension) (cmcache.Cache, error) { + if m.err != nil { + return nil, m.err + } + return m.cache, nil +} + +func (m *MockManagedContentCacheManager) Delete(_ *ocv1alpha1.ClusterExtension) error { return m.err } -func (m *MockWatcher) Unwatch(_ *ocv1alpha1.ClusterExtension) {} +type MockManagedContentCache struct { + err error +} + +var _ cmcache.Cache = (*MockManagedContentCache)(nil) + +func (m *MockManagedContentCache) Close() error { + if m.err != nil { + return m.err + } + return nil +} + +func (m *MockManagedContentCache) Watch(_ context.Context, _ cmcache.Watcher, _ ...client.Object) error { + if m.err != nil { + return m.err + } + return nil +} func newClientAndReconciler(t *testing.T) (client.Client, *controllers.ClusterExtensionReconciler) { cl := newClient(t) diff --git a/test/e2e/cluster_extension_install_test.go b/test/e2e/cluster_extension_install_test.go index 3f7954424..a20554a48 100644 --- a/test/e2e/cluster_extension_install_test.go +++ b/test/e2e/cluster_extension_install_test.go @@ -29,7 +29,6 @@ import ( catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" - "github.com/operator-framework/operator-controller/internal/conditionsets" ) const ( @@ -250,7 +249,6 @@ func TestClusterExtensionInstallRegistry(t *testing.T) { t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -321,7 +319,6 @@ func TestClusterExtensionInstallRegistryMultipleBundles(t *testing.T) { t.Log("By eventually reporting a failed resolution with multiple bundles") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -573,7 +570,6 @@ func TestClusterExtensionInstallReResolvesWhenCatalogIsPatched(t *testing.T) { t.Log("By reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -608,7 +604,6 @@ func TestClusterExtensionInstallReResolvesWhenCatalogIsPatched(t *testing.T) { t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -675,7 +670,6 @@ func TestClusterExtensionInstallReResolvesWhenNewCatalog(t *testing.T) { t.Log("By reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -710,7 +704,6 @@ func TestClusterExtensionInstallReResolvesWhenNewCatalog(t *testing.T) { t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -759,7 +752,6 @@ func TestClusterExtensionInstallReResolvesWhenManagedContentChanged(t *testing.T t.Log("By reporting a successful installation") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeInstalled) if !assert.NotNil(ct, cond) { return @@ -832,7 +824,6 @@ func TestClusterExtensionRecoversFromInitialInstallFailedWhenFailureFixed(t *tes t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return