From 71bccb9ec1872b062343dd2504360e7309db474e Mon Sep 17 00:00:00 2001 From: everettraven Date: Thu, 5 Sep 2024 09:52:13 -0400 Subject: [PATCH] (bugfix): rewrite the contentmanager implementation to fix bugs associated with insufficient permissions resulting in halting reconciliation of all ClusterExtension and informer sync errors not being reported via the ClusterExtension status conditions. Signed-off-by: everettraven --- api/v1alpha1/clusterextension_types.go | 5 + cmd/manager/main.go | 13 +- .../olm_v1alpha1_clusterextension.yaml | 4 +- internal/applier/helm.go | 59 +++- internal/contentmanager/cache/cache.go | 264 ++++++++++++++++++ internal/contentmanager/cache/cache_test.go | 202 ++++++++++++++ internal/contentmanager/contentmanager.go | 254 ++++++----------- .../contentmanager/contentmanager_test.go | 208 -------------- .../contentmanager/source/dynamicsource.go | 195 +++++++++++++ .../source/dynamicsource_test.go | 112 ++++++++ .../source/internal/eventhandler.go | 193 +++++++++++++ internal/contentmanager/sourcerer.go | 99 +++++++ .../clusterextension_controller.go | 66 +++-- .../clusterextension_controller_test.go | 122 +++++++- internal/controllers/suite_test.go | 40 ++- test/e2e/cluster_extension_install_test.go | 9 - 16 files changed, 1415 insertions(+), 430 deletions(-) create mode 100644 internal/contentmanager/cache/cache.go create mode 100644 internal/contentmanager/cache/cache_test.go delete mode 100644 internal/contentmanager/contentmanager_test.go create mode 100644 internal/contentmanager/source/dynamicsource.go create mode 100644 internal/contentmanager/source/dynamicsource_test.go create mode 100644 internal/contentmanager/source/internal/eventhandler.go create mode 100644 internal/contentmanager/sourcerer.go diff --git a/api/v1alpha1/clusterextension_types.go b/api/v1alpha1/clusterextension_types.go index fb8903a34..589ca734d 100644 --- a/api/v1alpha1/clusterextension_types.go +++ b/api/v1alpha1/clusterextension_types.go @@ -414,6 +414,7 @@ const ( // TODO(user): add more Types, here and into init() TypeInstalled = "Installed" TypeResolved = "Resolved" + TypeHealthy = "Healthy" // TypeDeprecated is a rollup condition that is present when // any of the deprecated conditions are present. @@ -438,6 +439,8 @@ const ( ReasonErrorGettingReleaseState = "ErrorGettingReleaseState" + ReasonUnverifiable = "Unverifiable" + CRDUpgradeSafetyPolicyEnabled CRDUpgradeSafetyPolicy = "Enabled" CRDUpgradeSafetyPolicyDisabled CRDUpgradeSafetyPolicy = "Disabled" ) @@ -452,6 +455,7 @@ func init() { TypeChannelDeprecated, TypeBundleDeprecated, TypeUnpacked, + TypeHealthy, ) // TODO(user): add Reasons from above conditionsets.ConditionReasons = append(conditionsets.ConditionReasons, @@ -465,6 +469,7 @@ func init() { ReasonUnpackSuccess, ReasonUnpackFailed, ReasonErrorGettingReleaseState, + ReasonUnverifiable, ) } diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7b8ad62f6..95813cf6c 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -249,6 +249,17 @@ func main() { Preflights: preflights, } + cm := contentmanager.NewManager(clientRestConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()) + err = clusterExtensionFinalizers.Register(controllers.ClusterExtensionCleanupContentManagerCacheFinalizer, finalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) { + ext := obj.(*ocv1alpha1.ClusterExtension) + err := cm.Delete(ext) + return crfinalizer.Result{}, err + })) + if err != nil { + setupLog.Error(err, "unable to register content manager cleanup finalizer") + os.Exit(1) + } + if err = (&controllers.ClusterExtensionReconciler{ Client: cl, Resolver: resolver, @@ -256,7 +267,7 @@ func main() { Applier: applier, InstalledBundleGetter: &controllers.DefaultInstalledBundleGetter{ActionClientGetter: acg}, Finalizers: clusterExtensionFinalizers, - Watcher: contentmanager.New(clientRestConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()), + Manager: cm, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension") os.Exit(1) diff --git a/config/samples/olm_v1alpha1_clusterextension.yaml b/config/samples/olm_v1alpha1_clusterextension.yaml index fdd117d58..19bbf06b0 100644 --- a/config/samples/olm_v1alpha1_clusterextension.yaml +++ b/config/samples/olm_v1alpha1_clusterextension.yaml @@ -36,10 +36,10 @@ rules: # Manage ArgoCD CRDs - apiGroups: [apiextensions.k8s.io] resources: [customresourcedefinitions] - verbs: [create] + verbs: [create, list, watch] - apiGroups: [apiextensions.k8s.io] resources: [customresourcedefinitions] - verbs: [get, list, watch, update, patch, delete] + verbs: [get, update, patch, delete] resourceNames: - appprojects.argoproj.io - argocds.argoproj.io diff --git a/internal/applier/helm.go b/internal/applier/helm.go index 6bdc88ae2..ac43726f1 100644 --- a/internal/applier/helm.go +++ b/internal/applier/helm.go @@ -1,18 +1,23 @@ package applier import ( + "bytes" "context" "errors" "fmt" + "io" "io/fs" "strings" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart" "helm.sh/helm/v3/pkg/chartutil" + "helm.sh/helm/v3/pkg/postrender" "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/storage/driver" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + apimachyaml "k8s.io/apimachinery/pkg/util/yaml" "sigs.k8s.io/controller-runtime/pkg/client" helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client" @@ -51,7 +56,7 @@ type Helm struct { Preflights []Preflight } -func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.ClusterExtension, labels map[string]string) ([]client.Object, string, error) { +func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.ClusterExtension, objectLabels map[string]string, storageLabels map[string]string) ([]client.Object, string, error) { chrt, err := convert.RegistryV1ToHelmChart(ctx, contentFS, ext.Spec.Install.Namespace, []string{corev1.NamespaceAll}) if err != nil { return nil, "", err @@ -63,7 +68,11 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust return nil, "", err } - rel, desiredRel, state, err := h.getReleaseState(ac, ext, chrt, values, labels) + post := &postrenderer{ + labels: objectLabels, + } + + rel, desiredRel, state, err := h.getReleaseState(ac, ext, chrt, values, post) if err != nil { return nil, "", err } @@ -94,18 +103,18 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust case StateNeedsInstall: rel, err = ac.Install(ext.GetName(), ext.Spec.Install.Namespace, chrt, values, func(install *action.Install) error { install.CreateNamespace = false - install.Labels = labels + install.Labels = storageLabels return nil - }) + }, helmclient.AppendInstallPostRenderer(post)) if err != nil { return nil, state, err } case StateNeedsUpgrade: rel, err = ac.Upgrade(ext.GetName(), ext.Spec.Install.Namespace, chrt, values, func(upgrade *action.Upgrade) error { upgrade.MaxHistory = maxHelmReleaseHistory - upgrade.Labels = labels + upgrade.Labels = storageLabels return nil - }) + }, helmclient.AppendUpgradePostRenderer(post)) if err != nil { return nil, state, err } @@ -125,7 +134,7 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust return relObjects, state, nil } -func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.ClusterExtension, chrt *chart.Chart, values chartutil.Values, labels map[string]string) (*release.Release, *release.Release, string, error) { +func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.ClusterExtension, chrt *chart.Chart, values chartutil.Values, post postrender.PostRenderer) (*release.Release, *release.Release, string, error) { currentRelease, err := cl.Get(ext.GetName()) if err != nil && !errors.Is(err, driver.ErrReleaseNotFound) { return nil, nil, StateError, err @@ -138,9 +147,8 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl desiredRelease, err := cl.Install(ext.GetName(), ext.Spec.Install.Namespace, chrt, values, func(i *action.Install) error { i.DryRun = true i.DryRunOption = "server" - i.Labels = labels return nil - }) + }, helmclient.AppendInstallPostRenderer(post)) if err != nil { return nil, nil, StateError, err } @@ -150,9 +158,8 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl upgrade.MaxHistory = maxHelmReleaseHistory upgrade.DryRun = true upgrade.DryRunOption = "server" - upgrade.Labels = labels return nil - }) + }, helmclient.AppendUpgradePostRenderer(post)) if err != nil { return currentRelease, nil, StateError, err } @@ -164,3 +171,33 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl } return currentRelease, desiredRelease, relState, nil } + +type postrenderer struct { + labels map[string]string + cascade postrender.PostRenderer +} + +func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) { + var buf bytes.Buffer + dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024) + for { + obj := unstructured.Unstructured{} + err := dec.Decode(&obj) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels)) + b, err := obj.MarshalJSON() + if err != nil { + return nil, err + } + buf.Write(b) + } + if p.cascade != nil { + return p.cascade.Run(&buf) + } + return &buf, nil +} diff --git a/internal/contentmanager/cache/cache.go b/internal/contentmanager/cache/cache.go new file mode 100644 index 000000000..f56cfd575 --- /dev/null +++ b/internal/contentmanager/cache/cache.go @@ -0,0 +1,264 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "io" + "slices" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type Watcher interface { + Watch(source.Source) error +} + +// Cache is a storage mechanism for keeping track of +// managed content sources +type Cache interface { + io.Closer + // Watch establishes watches for all provided client.Objects. + // Subsequent calls to Watch will result in no longer necessary + // watches being stopped and removed and new watches being + // created + Watch(context.Context, Watcher, ...client.Object) error +} + +// CloserSyncingSource is a wrapper of the controller-runtime +// source.SyncingSource that includes methods for: +// - Closing the source, stopping it's interaction with the Kubernetes API server and reaction to events +type CloserSyncingSource interface { + source.SyncingSource + io.Closer +} + +type sourcerer interface { + // Source returns a CloserSyncingSource for the provided + // GroupVersionKind. If the CloserSyncingSource encounters an + // error after having initially synced, it should requeue the + // provided client.Object and call the provided callback function + Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error) +} + +type cache struct { + sources map[schema.GroupVersionKind]CloserSyncingSource + sourcerer sourcerer + owner client.Object + syncTimeout time.Duration + mu sync.Mutex +} + +func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache { + return &cache{ + sources: make(map[schema.GroupVersionKind]CloserSyncingSource), + sourcerer: sourcerer, + owner: owner, + syncTimeout: syncTimeout, + } +} + +var _ Cache = (*cache)(nil) + +func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error { + c.mu.Lock() + defer c.mu.Unlock() + gvkSet, err := gvksForObjects(objs...) + if err != nil { + return fmt.Errorf("getting set of GVKs for managed objects: %w", err) + } + + if err := c.removeStaleSources(gvkSet); err != nil { + return fmt.Errorf("removing stale sources: %w", err) + } + return c.startNewSources(ctx, gvkSet, watcher) +} + +func (c *cache) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + errs := []error{} + for _, source := range c.sources { + if err := source.Close(); err != nil { + errs = append(errs, err) + } + } + + slices.SortFunc(errs, func(a, b error) int { + return strings.Compare(a.Error(), b.Error()) + }) + + return errors.Join(errs...) +} + +func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error { + cacheGvks := c.getCacheGVKs() + gvksToCreate := gvks.Difference(cacheGvks) + + type startResult struct { + source CloserSyncingSource + gvk schema.GroupVersionKind + err error + } + startResults := make(chan startResult) + wg := sync.WaitGroup{} + for _, gvk := range gvksToCreate.UnsortedList() { + wg.Add(1) + go func() { + defer wg.Done() + source, err := c.startNewSource(ctx, gvk, watcher) + startResults <- startResult{ + source: source, + gvk: gvk, + err: err, + } + }() + } + go func() { + wg.Wait() + close(startResults) + }() + + sourcesErrors := []error{} + for result := range startResults { + if result.err != nil { + sourcesErrors = append(sourcesErrors, result.err) + continue + } + + err := c.addSource(result.gvk, result.source) + if err != nil { + // If we made it here then there is a logic error in + // calculating the diffs between what is currently being + // watched by the cache + panic(err) + } + } + + slices.SortFunc(sourcesErrors, func(a, b error) int { + return strings.Compare(a.Error(), b.Error()) + }) + + return errors.Join(sourcesErrors...) +} + +func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) { + s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) { + // this callback function ensures that we remove the source from the + // cache if it encounters an error after it initially synced successfully + c.mu.Lock() + defer c.mu.Unlock() + err := c.removeSource(gvk) + if err != nil { + logr := log.FromContext(ctx) + logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk) + } + }) + if err != nil { + return nil, fmt.Errorf("getting source: %w", err) + } + + err = watcher.Watch(s) + if err != nil { + return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err) + } + + syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout) + defer syncCancel() + err = s.WaitForSync(syncCtx) + if err != nil { + return nil, fmt.Errorf("waiting for sync: %w", err) + } + + return s, nil +} + +func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error { + if _, ok := c.sources[gvk]; !ok { + c.sources[gvk] = source + return nil + } + return errors.New("source already exists") +} + +func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error { + cacheGvks := c.getCacheGVKs() + removeErrs := []error{} + gvksToRemove := cacheGvks.Difference(gvks) + for _, gvk := range gvksToRemove.UnsortedList() { + err := c.removeSource(gvk) + if err != nil { + removeErrs = append(removeErrs, err) + } + } + + slices.SortFunc(removeErrs, func(a, b error) int { + return strings.Compare(a.Error(), b.Error()) + }) + + return errors.Join(removeErrs...) +} + +func (c *cache) removeSource(gvk schema.GroupVersionKind) error { + if source, ok := c.sources[gvk]; ok { + err := source.Close() + if err != nil { + return fmt.Errorf("closing source for GVK %q: %w", gvk, err) + } + } + delete(c.sources, gvk) + return nil +} + +func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] { + cacheGvks := sets.New[schema.GroupVersionKind]() + for gvk := range c.sources { + cacheGvks.Insert(gvk) + } + return cacheGvks +} + +// gvksForObjects builds a sets.Set of GroupVersionKinds for +// the provided client.Objects. It returns an error if: +// - There is no Kind set on the client.Object +// - There is no Version set on the client.Object +// +// An empty Group is assumed to be the "core" Kubernetes +// API group. +func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) { + gvkSet := sets.New[schema.GroupVersionKind]() + for _, obj := range objs { + gvk := obj.GetObjectKind().GroupVersionKind() + + // If the Kind or Version is not set in an object's GroupVersionKind + // attempting to add it to the runtime.Scheme will result in a panic. + // To avoid panics, we are doing the validation and returning early + // with an error if any objects are provided with a missing Kind or Version + // field + if gvk.Kind == "" { + return nil, fmt.Errorf( + "adding %s to set; object Kind is not defined", + obj.GetName(), + ) + } + + if gvk.Version == "" { + return nil, fmt.Errorf( + "adding %s to set; object Version is not defined", + obj.GetName(), + ) + } + + gvkSet.Insert(gvk) + } + + return gvkSet, nil +} diff --git a/internal/contentmanager/cache/cache_test.go b/internal/contentmanager/cache/cache_test.go new file mode 100644 index 000000000..edf5c0e7a --- /dev/null +++ b/internal/contentmanager/cache/cache_test.go @@ -0,0 +1,202 @@ +package cache + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" +) + +type mockWatcher struct { + err error +} + +var _ Watcher = (*mockWatcher)(nil) + +func (mw *mockWatcher) Watch(source.Source) error { + return mw.err +} + +type mockSourcerer struct { + err error + source CloserSyncingSource +} + +var _ sourcerer = (*mockSourcerer)(nil) + +func (ms *mockSourcerer) Source(_ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) { + if ms.err != nil { + return nil, ms.err + } + return ms.source, nil +} + +type mockSource struct { + err error +} + +var _ CloserSyncingSource = (*mockSource)(nil) + +func (ms *mockSource) Start(_ context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return ms.err +} + +func (ms *mockSource) WaitForSync(ctx context.Context) error { + return ms.err +} + +func (ms *mockSource) Close() error { + return ms.err +} + +func TestCacheWatch(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod)) + require.Contains(t, c.(*cache).sources, podGvk, "sources", c.(*cache).sources) +} + +func TestCacheWatchInvalidGVK(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, pod), "should fail on invalid GVK") +} + +func TestCacheWatchSourcererError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + err: errors.New("error"), + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, pod), "should fail when sourcerer returns an error") +} + +func TestCacheWatchWatcherError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{err: errors.New("error")}, pod), "should fail when watcher returns an error") +} + +func TestCacheWatchSourceWaitForSyncError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{ + err: errors.New("error"), + }, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, pod), "should fail when source fails to sync") + require.NotContains(t, c.(*cache).sources, podGvk, "should not contain source entry in mapping") +} + +func TestCacheWatchExistingSourceNotPanic(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{})) + + // In this case, a panic means there is a logic error somewhere in the + // cache.Watch() method. It should never hit the condition where it panics + // as it should never attempt to create a new source for one that already exists. + require.NotPanics(t, func() { _ = c.Watch(context.Background(), &mockWatcher{}, pod) }, "should never panic") +} + +func TestCacheWatchRemovesStaleSources(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + pod := &corev1.Pod{} + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + pod.SetGroupVersionKind(podGvk) + + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod)) + require.Contains(t, c.(*cache).sources, podGvk) + + secret := &corev1.Secret{} + secretGvk := corev1.SchemeGroupVersion.WithKind("Secret") + secret.SetGroupVersionKind(secretGvk) + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, secret)) + require.Contains(t, c.(*cache).sources, secretGvk) + require.NotContains(t, c.(*cache).sources, podGvk) +} + +func TestCacheWatchRemovingStaleSourcesError(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1alpha1.ClusterExtension{}, + time.Second, + ) + + podGvk := corev1.SchemeGroupVersion.WithKind("Pod") + require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{ + err: errors.New("error"), + })) + + secret := &corev1.Secret{} + secretGvk := corev1.SchemeGroupVersion.WithKind("Secret") + secret.SetGroupVersionKind(secretGvk) + require.Error(t, c.Watch(context.Background(), &mockWatcher{}, secret)) +} diff --git a/internal/contentmanager/contentmanager.go b/internal/contentmanager/contentmanager.go index 444cf9f74..13fc92f9b 100644 --- a/internal/contentmanager/contentmanager.go +++ b/internal/contentmanager/contentmanager.go @@ -2,138 +2,108 @@ package contentmanager import ( "context" - "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/source" "github.com/operator-framework/operator-controller/api/v1alpha1" + cmcache "github.com/operator-framework/operator-controller/internal/contentmanager/cache" oclabels "github.com/operator-framework/operator-controller/internal/labels" ) -type Watcher interface { - // Watch will establish watches for resources owned by a ClusterExtension - Watch(context.Context, controller.Controller, *v1alpha1.ClusterExtension, []client.Object) error - // Unwatch will remove watches for a ClusterExtension - Unwatch(*v1alpha1.ClusterExtension) +// Manager is a utility to manage content caches belonging +// to ClusterExtensions +type Manager interface { + // Get returns a managed content cache for the provided + // ClusterExtension if one exists. If one does not exist, + // a new Cache is created and returned + Get(context.Context, *v1alpha1.ClusterExtension) (cmcache.Cache, error) + // Delete will stop and remove a managed content cache + // for the provided ClusterExtension if one exists. + Delete(*v1alpha1.ClusterExtension) error } type RestConfigMapper func(context.Context, client.Object, *rest.Config) (*rest.Config, error) -type extensionCacheData struct { - Cache cache.Cache - Cancel context.CancelFunc +// managerImpl is an implementation of the Manager interface +type managerImpl struct { + rcm RestConfigMapper + baseCfg *rest.Config + caches map[string]cmcache.Cache + mapper meta.RESTMapper + mu *sync.Mutex + syncTimeout time.Duration + resyncPeriod time.Duration } -type instance struct { - rcm RestConfigMapper - baseCfg *rest.Config - extensionCaches map[string]extensionCacheData - mapper meta.RESTMapper - mu *sync.Mutex -} +type ManagerOption func(*managerImpl) -// New creates a new ContentManager object -func New(rcm RestConfigMapper, cfg *rest.Config, mapper meta.RESTMapper) Watcher { - return &instance{ - rcm: rcm, - baseCfg: cfg, - extensionCaches: make(map[string]extensionCacheData), - mapper: mapper, - mu: &sync.Mutex{}, +// WithSyncTimeout configures the time spent waiting +// for a managed content source to sync +func WithSyncTimeout(t time.Duration) ManagerOption { + return func(m *managerImpl) { + m.syncTimeout = t } } -// buildScheme builds a runtime.Scheme based on the provided client.Objects, -// with all GroupVersionKinds mapping to the unstructured.Unstructured type -// (unstructured.UnstructuredList for list kinds). -// -// If a provided client.Object does not set a Version or Kind field in its -// GroupVersionKind, an error will be returned. -func buildScheme(objs []client.Object) (*runtime.Scheme, error) { - scheme := runtime.NewScheme() - // The ClusterExtension types must be added to the scheme since its - // going to be used to establish watches that trigger reconciliation - // of the owning ClusterExtension - if err := v1alpha1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("adding operator controller APIs to scheme: %w", err) +// WithResyncPeriod configures the frequency +// a managed content source attempts to resync +func WithResyncPeriod(t time.Duration) ManagerOption { + return func(m *managerImpl) { + m.resyncPeriod = t } +} - for _, obj := range objs { - gvk := obj.GetObjectKind().GroupVersionKind() - - // If the Kind or Version is not set in an object's GroupVersionKind - // attempting to add it to the runtime.Scheme will result in a panic. - // To avoid panics, we are doing the validation and returning early - // with an error if any objects are provided with a missing Kind or Version - // field - if gvk.Kind == "" { - return nil, fmt.Errorf( - "adding %s to scheme; object Kind is not defined", - obj.GetName(), - ) - } - - if gvk.Version == "" { - return nil, fmt.Errorf( - "adding %s to scheme; object Version is not defined", - obj.GetName(), - ) - } +// NewManager creates a new Manager +func NewManager(rcm RestConfigMapper, cfg *rest.Config, mapper meta.RESTMapper, opts ...ManagerOption) Manager { + m := &managerImpl{ + rcm: rcm, + baseCfg: cfg, + caches: make(map[string]cmcache.Cache), + mapper: mapper, + mu: &sync.Mutex{}, + syncTimeout: time.Second * 10, + resyncPeriod: time.Hour * 10, + } - listKind := gvk.Kind + "List" - - if !scheme.Recognizes(gvk) { - // Since we can't have a mapping to every possible Go type in existence - // based on the GVK we need to use the unstructured types for mapping - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(gvk) - ul := &unstructured.UnstructuredList{} - ul.SetGroupVersionKind(gvk.GroupVersion().WithKind(listKind)) - - scheme.AddKnownTypeWithName(gvk, u) - scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(listKind), ul) - // Adding the common meta schemas to the scheme for the GroupVersion - // is necessary to ensure the scheme is aware of the different operations - // that can be performed against the resources in this GroupVersion - metav1.AddToGroupVersion(scheme, gvk.GroupVersion()) - } + for _, opt := range opts { + opt(m) } - return scheme, nil + return m } -// Watch configures a controller-runtime cache.Cache and establishes watches for the provided resources. -// It utilizes the provided ClusterExtension to set a DefaultLabelSelector on the cache.Cache -// to ensure it is only caching and reacting to content that belongs to the ClusterExtension. -// For each client.Object provided, a new source.Kind is created and used in a call to the Watch() method -// of the provided controller.Controller to establish new watches for the managed resources. -func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1alpha1.ClusterExtension, objs []client.Object) error { - if len(objs) == 0 || ce == nil || ctrl == nil { - return nil +// Get returns a Cache for the provided ClusterExtension. +// If a cache does not already exist, a new one will be created. +// If a nil ClusterExtension is provided this function will panic. +func (i *managerImpl) Get(ctx context.Context, ce *v1alpha1.ClusterExtension) (cmcache.Cache, error) { + if ce == nil { + panic("nil ClusterExtension provided") + } + + i.mu.Lock() + defer i.mu.Unlock() + cache, ok := i.caches[ce.Name] + if ok { + return cache, nil } cfg, err := i.rcm(ctx, ce, i.baseCfg) if err != nil { - return fmt.Errorf("getting rest.Config for ClusterExtension %q: %w", ce.Name, err) + return nil, fmt.Errorf("getting rest.Config: %w", err) } - scheme, err := buildScheme(objs) + dynamicClient, err := dynamic.NewForConfig(cfg) if err != nil { - return fmt.Errorf("building scheme for ClusterExtension %q: %w", ce.GetName(), err) + return nil, fmt.Errorf("getting dynamic client: %w", err) } tgtLabels := labels.Set{ @@ -141,83 +111,37 @@ func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1 oclabels.OwnerNameKey: ce.GetName(), } - c, err := cache.New(cfg, cache.Options{ - Scheme: scheme, - DefaultLabelSelector: tgtLabels.AsSelector(), - }) - if err != nil { - return fmt.Errorf("creating cache for ClusterExtension %q: %w", ce.Name, err) + dynamicSourcerer := &dynamicSourcerer{ + // Due to the limitation outlined in the dynamic informer source, + // related to reusing an informer factory, we return a new informer + // factory every time to ensure we are not attempting to configure or + // start an already started informer + informerFactoryCreateFunc: func() dynamicinformer.DynamicSharedInformerFactory { + return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, metav1.NamespaceAll, func(lo *metav1.ListOptions) { + lo.LabelSelector = tgtLabels.AsSelector().String() + }) + }, + mapper: i.mapper, } + cache = cmcache.NewCache(dynamicSourcerer, ce, i.syncTimeout) + i.caches[ce.Name] = cache + return cache, nil +} - for _, obj := range objs { - err = ctrl.Watch( - source.Kind( - c, - obj, - handler.TypedEnqueueRequestForOwner[client.Object]( - scheme, - i.mapper, - ce, - handler.OnlyControllerOwner(), - ), - predicate.Funcs{ - CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false }, - UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true }, - DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true }, - GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true }, - }, - ), - ) - if err != nil { - return fmt.Errorf("creating watch for ClusterExtension %q managed resource %s: %w", ce.Name, obj.GetObjectKind().GroupVersionKind(), err) - } +// Delete stops and removes the Cache for the provided ClusterExtension +func (i *managerImpl) Delete(ce *v1alpha1.ClusterExtension) error { + if ce == nil { + panic("nil ClusterExtension provided") } - // TODO: Instead of stopping the existing cache and replacing it every time - // we should stop the informers that are no longer required - // and create any new ones as necessary. To keep the initial pass - // simple, we are going to keep this as is and optimize in a follow-up. - // Doing this in a follow-up gives us the opportunity to verify that this functions - // as expected when wired up in the ClusterExtension reconciler before going too deep - // in optimizations. i.mu.Lock() - if extCache, ok := i.extensionCaches[ce.GetName()]; ok { - extCache.Cancel() - } - - cacheCtx, cancel := context.WithCancel(context.Background()) - i.extensionCaches[ce.Name] = extensionCacheData{ - Cache: c, - Cancel: cancel, - } - i.mu.Unlock() - - go func() { - err := c.Start(cacheCtx) + defer i.mu.Unlock() + if cache, ok := i.caches[ce.Name]; ok { + err := cache.Close() if err != nil { - i.Unwatch(ce) + return fmt.Errorf("closing cache: %w", err) } - }() - - if !c.WaitForCacheSync(cacheCtx) { - i.Unwatch(ce) - return errors.New("cache could not sync, it has been stopped and removed") + delete(i.caches, ce.Name) } - return nil } - -// Unwatch will stop the cache for the provided ClusterExtension -// stopping any watches on managed content -func (i *instance) Unwatch(ce *v1alpha1.ClusterExtension) { - if ce == nil { - return - } - - i.mu.Lock() - if extCache, ok := i.extensionCaches[ce.GetName()]; ok { - extCache.Cancel() - delete(i.extensionCaches, ce.GetName()) - } - i.mu.Unlock() -} diff --git a/internal/contentmanager/contentmanager_test.go b/internal/contentmanager/contentmanager_test.go deleted file mode 100644 index 6716b8ba5..000000000 --- a/internal/contentmanager/contentmanager_test.go +++ /dev/null @@ -1,208 +0,0 @@ -package contentmanager - -import ( - "context" - "errors" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/operator-framework/operator-controller/api/v1alpha1" -) - -func TestWatch(t *testing.T) { - tests := []struct { - name string - rcm RestConfigMapper - config *rest.Config - ce *v1alpha1.ClusterExtension - objs []client.Object - wantErr bool - }{ - { - name: "Valid cluster extension valid managed content should pass", - rcm: func(_ context.Context, _ client.Object, cfg *rest.Config) (*rest.Config, error) { - return cfg, nil - }, - config: &rest.Config{}, - ce: &v1alpha1.ClusterExtension{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster-extension", - }, - }, - objs: []client.Object{ - &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - }, - { - name: "Fail when the rest config mapper returns an error", - rcm: func(_ context.Context, _ client.Object, cfg *rest.Config) (*rest.Config, error) { - return nil, errors.New("failed getting rest config") - }, - config: &rest.Config{}, - ce: &v1alpha1.ClusterExtension{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster-extension", - }, - }, - objs: []client.Object{ - &corev1.Pod{}, - }, - wantErr: true, - }, - { - name: "Should return an error when buildScheme() fails", - rcm: func(_ context.Context, _ client.Object, cfg *rest.Config) (*rest.Config, error) { - return cfg, nil - }, - config: &rest.Config{}, - ce: &v1alpha1.ClusterExtension{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster-extension", - }, - }, - objs: []client.Object{ - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: true, - }, - } - - for i, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - mgr, _ := manager.New(tc.config, manager.Options{}) - ctrl, err := controller.New(fmt.Sprintf("test-controller-%v", i), mgr, controller.Options{ - Reconciler: reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { - return reconcile.Result{}, nil - }), - }) - require.NoError(t, err) - - instance := New(tc.rcm, tc.config, mgr.GetRESTMapper()) - got := instance.Watch(context.Background(), ctrl, tc.ce, tc.objs) - assert.Equal(t, got != nil, tc.wantErr) - }) - } -} - -func TestBuildScheme(t *testing.T) { - type validation struct { - gvks []schema.GroupVersionKind - valid bool - } - - testcases := []struct { - name string - objects []client.Object - wantErr bool - want validation - }{ - { - name: "Gvk is not defined", - objects: []client.Object{&corev1.Pod{}}, - wantErr: true, - want: validation{ - gvks: []schema.GroupVersionKind{}, - valid: false, - }, - }, - { - name: "Check objects added in scheme", - objects: []client.Object{ - &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployment", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - want: validation{ - gvks: []schema.GroupVersionKind{ - appsv1.SchemeGroupVersion.WithKind("Deployment"), - }, - valid: true, - }, - }, - { - name: "Check object not defined in scheme", - objects: []client.Object{ - &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - want: validation{ - gvks: []schema.GroupVersionKind{ - corev1.SchemeGroupVersion.WithKind("Secret"), - }, - valid: false, - }, - }, - { - name: "Check if empty Group is valid", - objects: []client.Object{ - &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "webserver", - }, - }, - }, - wantErr: false, - want: validation{ - gvks: []schema.GroupVersionKind{ - corev1.SchemeGroupVersion.WithKind("Pod"), - }, - valid: true, - }, - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - scheme, err := buildScheme(tc.objects) - require.Equal(t, err != nil, tc.wantErr) - for _, gvk := range tc.want.gvks { - got := scheme.Recognizes(gvk) - assert.Equal(t, got, tc.want.valid) - } - }) - } -} diff --git a/internal/contentmanager/source/dynamicsource.go b/internal/contentmanager/source/dynamicsource.go new file mode 100644 index 000000000..1f539871f --- /dev/null +++ b/internal/contentmanager/source/dynamicsource.go @@ -0,0 +1,195 @@ +package source + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic/dynamicinformer" + cgocache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + source "github.com/operator-framework/operator-controller/internal/contentmanager/source/internal" +) + +type DynamicSourceConfig struct { + // DynamicInformerFactory is the dynamicinformer.DynamicSharedInformerFactory + // that is used to generate the informers configured on this sources startup. + // If you use a dynamicinformer.DynamicSharedInformerFactory that you've + // used previously, it must not have been used to start a new informer for + // the same GVR. You can not start or configure an informer that has already + // been started, even after it has been stopped. Reusing an informer factory + // may result in attempting to configure and start an informer that has + // already been started. + DynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory + // GVR is the GroupVersionResource that this source is responsible + // for creating and configuring an informer for + GVR schema.GroupVersionResource + // Owner is the client.Object that owns the managed content that this + // source will be creating an informer to react to events for. This + // field is used to attempt to requeue the owning client.Object for + // reconciliation on a watch error after a previously successful sync + Owner client.Object + // Handler is the handler.EventHandler that is used to react to events + // received by the configured source + Handler handler.EventHandler + // Predicates are the predicate.Predicate functions used to determine + // if a triggered event should be reacted to + Predicates []predicate.Predicate + // OnPostSyncError is the callback function that is used when the source + // initially synced successfully and later encountered an error + OnPostSyncError func(context.Context) +} + +func NewDynamicSource(cfg DynamicSourceConfig) *dynamicInformerSource { + return &dynamicInformerSource{ + cfg: cfg, + erroredChan: make(chan struct{}), + syncedChan: make(chan struct{}), + startedChan: make(chan struct{}), + } +} + +// dynamicInformerSource is an implementation of the +// ReaderCloserSyncingSource interface. It is used +// to create an informer, using the provided dynamic informer +// factory, for the configured GroupVersionResource. +// +// The informer is configured with a WatchEventErrorHandler that +// stops the informer, and if it had previously synced successfully +// it attempts to requeue the provided Owner for reconciliation and +// calls the provided OnWatchError function. +type dynamicInformerSource struct { + cfg DynamicSourceConfig + informerCancel context.CancelFunc + informerCtx context.Context + startedChan chan struct{} + syncedChan chan struct{} + erroredChan chan struct{} + errOnce sync.Once + err error +} + +func (dis *dynamicInformerSource) String() string { + return fmt.Sprintf("contentmanager.DynamicInformerSource - GVR: %s, Owner Type: %T, Owner Name: %s", dis.cfg.GVR, dis.cfg.Owner, dis.cfg.Owner.GetName()) +} + +func (dis *dynamicInformerSource) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + // Close the startedChan to signal that this + // source has been started. Subsequent calls + // to Start will attempt to close a closed channel + // and panic. + close(dis.startedChan) + + dis.informerCtx, dis.informerCancel = context.WithCancel(ctx) + gInf := dis.cfg.DynamicInformerFactory.ForResource(dis.cfg.GVR) + eventHandler := source.NewEventHandler(dis.informerCtx, q, dis.cfg.Handler, dis.cfg.Predicates) + + // If we encounter an error during the watch we will: + // - Capture the error + // - cancel the informer + // requeuing of the ClusterExtension should happen by the + // WaitForSync function returning an error + // Only if we have successfully synced in the past should we + // requeue the ClusterExtension + sharedIndexInf := gInf.Informer() + err := sharedIndexInf.SetWatchErrorHandler(func(r *cgocache.Reflector, err error) { + dis.errOnce.Do(func() { + dis.err = err + close(dis.erroredChan) + }) + + if dis.hasSynced() { + // We won't be able to update the ClusterExtension status + // conditions so instead force a requeue if we + // have previously synced and then errored + defer q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: dis.cfg.Owner.GetName(), + }, + }) + dis.cfg.OnPostSyncError(dis.informerCtx) + } + dis.informerCancel() + cgocache.DefaultWatchErrorHandler(r, err) + }) + if err != nil { + return fmt.Errorf("setting WatchErrorHandler: %w", err) + } + + _, err = sharedIndexInf.AddEventHandler(eventHandler.HandlerFuncs()) + if err != nil { + return fmt.Errorf("adding event handler: %w", err) + } + + go sharedIndexInf.Run(dis.informerCtx.Done()) + + go func() { + syncOnce := sync.OnceFunc(func() { + dis.syncedChan <- struct{}{} + close(dis.syncedChan) + }) + + _ = wait.PollUntilContextCancel(dis.informerCtx, time.Second, true, func(_ context.Context) (bool, error) { + if sharedIndexInf.HasSynced() { + syncOnce() + return true, nil + } + return false, nil + }) + }() + + return nil +} + +func (dis *dynamicInformerSource) WaitForSync(ctx context.Context) error { + if !dis.hasStarted() { + return fmt.Errorf("not yet started") + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-dis.erroredChan: + return dis.err + case <-dis.informerCtx.Done(): + return dis.informerCtx.Err() + case <-dis.syncedChan: + return nil + } +} + +func (dis *dynamicInformerSource) Close() error { + if !dis.hasStarted() { + return errors.New("source has not yet started") + } + dis.informerCancel() + return nil +} + +func (dis *dynamicInformerSource) hasSynced() bool { + select { + case <-dis.syncedChan: + return true + default: + return false + } +} + +func (dis *dynamicInformerSource) hasStarted() bool { + select { + case <-dis.startedChan: + return true + default: + return false + } +} diff --git a/internal/contentmanager/source/dynamicsource_test.go b/internal/contentmanager/source/dynamicsource_test.go new file mode 100644 index 000000000..61cbab8ca --- /dev/null +++ b/internal/contentmanager/source/dynamicsource_test.go @@ -0,0 +1,112 @@ +package source + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + dynamicfake "k8s.io/client-go/dynamic/fake" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/operator-framework/operator-controller/api/v1alpha1" +) + +func TestDynamicInformerSourceCloseBeforeStartErrors(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + require.Error(t, dis.Close(), "calling close before start should error") +} + +func TestDynamicInformerSourceWaitForSyncTimeout(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + timeout, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10) + defer cancel() + require.Error(t, dis.WaitForSync(timeout), "should error on timeout") +} + +func TestDynamicInformerSourceWaitForSyncInformerContextClosed(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + timeout, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10) + defer cancel() + dis.informerCtx = timeout + require.Error(t, dis.WaitForSync(context.Background()), "should error on informer context closed") +} + +func TestDynamicInformerSourceWaitForSyncErrorChannel(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + go func() { + time.Sleep(time.Millisecond * 10) + dis.err = errors.New("error") + close(dis.erroredChan) + }() + require.Error(t, dis.WaitForSync(context.Background()), "should error on receiving error from channel") +} + +func TestDynamicInformerSourceWaitForSyncAlreadyErrored(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + dis.err = errors.New("error") + close(dis.erroredChan) + require.Error(t, dis.WaitForSync(context.Background()), "should error since there is already a sync error") +} + +func TestDynamicInformerSourceWaitForSyncAlreadySynced(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + close(dis.syncedChan) + dis.informerCtx = context.Background() + require.NoError(t, dis.WaitForSync(context.Background()), "should not error if already synced") +} + +func TestDynamicInformerSourceWaitForSyncSyncedChannel(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + dis.informerCtx = context.Background() + go func() { + time.Sleep(time.Millisecond * 10) + close(dis.syncedChan) + }() + require.NoError(t, dis.WaitForSync(context.Background()), "should not error on receiving struct from syncedChannel") +} + +func TestDynamicInformerSourceWaitForSyncNotStarted(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + require.Error(t, dis.WaitForSync(context.Background()), "should error if not started") +} + +func TestDynamicInformerSourceStartAlreadyStarted(t *testing.T) { + dis := NewDynamicSource(DynamicSourceConfig{}) + close(dis.startedChan) + require.Panics(t, func() { _ = dis.Start(context.Background(), nil) }, "should return an error if already started") +} + +func TestDynamicInformerSourceStart(t *testing.T) { + fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) + infFact := dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, time.Minute) + dis := NewDynamicSource(DynamicSourceConfig{ + DynamicInformerFactory: infFact, + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + Owner: &v1alpha1.ClusterExtension{}, + Handler: handler.Funcs{}, + Predicates: []predicate.Predicate{}, + OnPostSyncError: func(ctx context.Context) {}, + }) + + require.NoError(t, dis.Start(context.Background(), nil)) + require.NoError(t, dis.Close()) +} diff --git a/internal/contentmanager/source/internal/eventhandler.go b/internal/contentmanager/source/internal/eventhandler.go new file mode 100644 index 000000000..54625af09 --- /dev/null +++ b/internal/contentmanager/source/internal/eventhandler.go @@ -0,0 +1,193 @@ +package source + +// NOTE: To reduce the amount of custom code that needs to be written to facilitate +// the creation of a custom source implementation, the code in this file is copied from +// https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.18/pkg/internal/source/event_handler.go +// +// Any modifications to this code will be noted below: +// - The "k8s.io/client-go/tools/cache" package was aliased to "cgocache". +// All references to this package were updated to use the new alias. +// - The "logf" aliased import was updated from "sigs.k8s.io/controller-runtime/pkg/internal/log" +// to "sigs.k8s.io/controller-runtime/pkg/log" +// - The logger for the event handler is now generated with "logf.Log.WithName("source").WithName("EventHandler")" +// instead of "logf.RuntimeLog.WithName("source").WithName("EventHandler")" +// +// All modifications are licensed under the Apache 2.0 license. +// +// In the future, we may consider making a contribution to the +// controller-runtime project that exports this more generic +// event handler functionality. If that happens, this code will be removed +// and we will instead import the appropriate logic from the controller-runtime +// project. + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import ( + "context" + "fmt" + + cgocache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +var log = logf.Log.WithName("source").WithName("EventHandler") + +// NewEventHandler creates a new EventHandler. +func NewEventHandler[object client.Object, request comparable]( + ctx context.Context, + queue workqueue.TypedRateLimitingInterface[request], + handler handler.TypedEventHandler[object, request], + predicates []predicate.TypedPredicate[object]) *EventHandler[object, request] { + return &EventHandler[object, request]{ + ctx: ctx, + handler: handler, + queue: queue, + predicates: predicates, + } +} + +// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. +type EventHandler[object client.Object, request comparable] struct { + // ctx stores the context that created the event handler + // that is used to propagate cancellation signals to each handler function. + ctx context.Context + + handler handler.TypedEventHandler[object, request] + queue workqueue.TypedRateLimitingInterface[request] + predicates []predicate.TypedPredicate[object] +} + +// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs +// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 +func (e *EventHandler[object, request]) HandlerFuncs() cgocache.ResourceEventHandlerFuncs { + return cgocache.ResourceEventHandlerFuncs{ + AddFunc: e.OnAdd, + UpdateFunc: e.OnUpdate, + DeleteFunc: e.OnDelete, + } +} + +// OnAdd creates CreateEvent and calls Create on EventHandler. +func (e *EventHandler[object, request]) OnAdd(obj interface{}) { + c := event.TypedCreateEvent[object]{} + + // Pull Object out of the object + if o, ok := obj.(object); ok { + c.Object = o + } else { + log.Error(nil, "OnAdd missing Object", + "object", obj, "type", fmt.Sprintf("%T", obj)) + return + } + + for _, p := range e.predicates { + if !p.Create(c) { + return + } + } + + // Invoke create handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Create(ctx, c, e.queue) +} + +// OnUpdate creates UpdateEvent and calls Update on EventHandler. +func (e *EventHandler[object, request]) OnUpdate(oldObj, newObj interface{}) { + u := event.TypedUpdateEvent[object]{} + + if o, ok := oldObj.(object); ok { + u.ObjectOld = o + } else { + log.Error(nil, "OnUpdate missing ObjectOld", + "object", oldObj, "type", fmt.Sprintf("%T", oldObj)) + return + } + + // Pull Object out of the object + if o, ok := newObj.(object); ok { + u.ObjectNew = o + } else { + log.Error(nil, "OnUpdate missing ObjectNew", + "object", newObj, "type", fmt.Sprintf("%T", newObj)) + return + } + + for _, p := range e.predicates { + if !p.Update(u) { + return + } + } + + // Invoke update handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Update(ctx, u, e.queue) +} + +// OnDelete creates DeleteEvent and calls Delete on EventHandler. +func (e *EventHandler[object, request]) OnDelete(obj interface{}) { + d := event.TypedDeleteEvent[object]{} + + // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a + // DeleteFinalStateUnknown struct, so the object needs to be pulled out. + // Copied from sample-controller + // This should never happen if we aren't missing events, which we have concluded that we are not + // and made decisions off of this belief. Maybe this shouldn't be here? + var ok bool + if _, ok = obj.(client.Object); !ok { + // If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown + tombstone, ok := obj.(cgocache.DeletedFinalStateUnknown) + if !ok { + log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown", + "type", fmt.Sprintf("%T", obj), + "object", obj) + return + } + + // Set DeleteStateUnknown to true + d.DeleteStateUnknown = true + + // Set obj to the tombstone obj + obj = tombstone.Obj + } + + // Pull Object out of the object + if o, ok := obj.(object); ok { + d.Object = o + } else { + log.Error(nil, "OnDelete missing Object", + "object", obj, "type", fmt.Sprintf("%T", obj)) + return + } + + for _, p := range e.predicates { + if !p.Delete(d) { + return + } + } + + // Invoke delete handler + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Delete(ctx, d, e.queue) +} diff --git a/internal/contentmanager/sourcerer.go b/internal/contentmanager/sourcerer.go new file mode 100644 index 000000000..a946cb7ba --- /dev/null +++ b/internal/contentmanager/sourcerer.go @@ -0,0 +1,99 @@ +package contentmanager + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/operator-framework/operator-controller/api/v1alpha1" + "github.com/operator-framework/operator-controller/internal/contentmanager/cache" + "github.com/operator-framework/operator-controller/internal/contentmanager/source" +) + +type dynamicSourcerer struct { + informerFactoryCreateFunc func() dynamicinformer.DynamicSharedInformerFactory + mapper meta.RESTMapper +} + +func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) { + scheme, err := buildScheme(gvk) + if err != nil { + return nil, fmt.Errorf("building scheme: %w", err) + } + + restMapping, err := ds.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, fmt.Errorf("getting resource mapping for GVK %q: %w", gvk, err) + } + + s := source.NewDynamicSource(source.DynamicSourceConfig{ + GVR: restMapping.Resource, + Owner: owner, + Handler: handler.EnqueueRequestForOwner(scheme, ds.mapper, owner, handler.OnlyControllerOwner()), + Predicates: []predicate.Predicate{ + predicate.Funcs{ + CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false }, + UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true }, + DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true }, + GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true }, + }, + }, + DynamicInformerFactory: ds.informerFactoryCreateFunc(), + OnPostSyncError: onPostSyncError, + }) + return s, nil +} + +// buildScheme builds a runtime.Scheme based on the provided GroupVersionKinds, +// with all GroupVersionKinds mapping to the unstructured.Unstructured type +// (unstructured.UnstructuredList for list kinds). +// +// It is assumed all GroupVersionKinds are valid, which means: +// - The Kind is set +// - The Version is set +// +// Invalid GroupVersionKinds will result in a panic. +func buildScheme(gvks ...schema.GroupVersionKind) (*runtime.Scheme, error) { + scheme := runtime.NewScheme() + // The ClusterExtension types must be added to the scheme since its + // going to be used to establish watches that trigger reconciliation + // of the owning ClusterExtension + if err := v1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("adding operator controller APIs to scheme: %w", err) + } + + for _, gvk := range gvks { + if !scheme.Recognizes(gvk) { + // Since we can't have a mapping to every possible Go type in existence + // based on the GVK we need to use the unstructured types for mapping + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(gvk) + scheme.AddKnownTypeWithName(gvk, u) + + // Adding the common meta schemas to the scheme for the GroupVersion + // is necessary to ensure the scheme is aware of the different operations + // that can be performed against the resources in this GroupVersion + metav1.AddToGroupVersion(scheme, gvk.GroupVersion()) + } + + listGVK := gvk + listGVK.Kind = listGVK.Kind + "List" + if !scheme.Recognizes(listGVK) { + ul := &unstructured.UnstructuredList{} + ul.SetGroupVersionKind(listGVK) + scheme.AddKnownTypeWithName(listGVK, ul) + } + } + + return scheme, nil +} diff --git a/internal/controllers/clusterextension_controller.go b/internal/controllers/clusterextension_controller.go index e6c006aea..685bfd35f 100644 --- a/internal/controllers/clusterextension_controller.go +++ b/internal/controllers/clusterextension_controller.go @@ -48,7 +48,6 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" - "github.com/operator-framework/operator-controller/internal/applier" "github.com/operator-framework/operator-controller/internal/bundleutil" "github.com/operator-framework/operator-controller/internal/conditionsets" "github.com/operator-framework/operator-controller/internal/contentmanager" @@ -58,7 +57,8 @@ import ( ) const ( - ClusterExtensionCleanupUnpackCacheFinalizer = "olm.operatorframework.io/cleanup-unpack-cache" + ClusterExtensionCleanupUnpackCacheFinalizer = "olm.operatorframework.io/cleanup-unpack-cache" + ClusterExtensionCleanupContentManagerCacheFinalizer = "olm.operatorframework.io/cleanup-contentmanager-cache" ) // ClusterExtensionReconciler reconciles a ClusterExtension object @@ -67,7 +67,7 @@ type ClusterExtensionReconciler struct { Resolver resolve.Resolver Unpacker rukpaksource.Unpacker Applier Applier - Watcher contentmanager.Watcher + Manager contentmanager.Manager controller crcontroller.Controller cache cache.Cache InstalledBundleGetter InstalledBundleGetter @@ -75,7 +75,10 @@ type ClusterExtensionReconciler struct { } type Applier interface { - Apply(context.Context, fs.FS, *ocv1alpha1.ClusterExtension, map[string]string) ([]client.Object, string, error) + // Apply applies the content in the provided fs.FS using the configuration of the provided ClusterExtension. + // It also takes in a map[string]string to be applied to all applied resources as labels and another + // map[string]string used to create a unique identifier for a stored reference to the resources created. + Apply(context.Context, fs.FS, *ocv1alpha1.ClusterExtension, map[string]string, map[string]string) ([]client.Object, string, error) } type InstalledBundleGetter interface { @@ -268,9 +271,12 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp return ctrl.Result{}, fmt.Errorf("unexpected unpack status: %v", unpackResult.Message) } - lbls := map[string]string{ - labels.OwnerKindKey: ocv1alpha1.ClusterExtensionKind, - labels.OwnerNameKey: ext.GetName(), + objLbls := map[string]string{ + labels.OwnerKindKey: ocv1alpha1.ClusterExtensionKind, + labels.OwnerNameKey: ext.GetName(), + } + + storeLbls := map[string]string{ labels.BundleNameKey: resolvedBundle.Name, labels.PackageNameKey: resolvedBundle.Package, labels.BundleVersionKey: resolvedBundleVersion.String(), @@ -286,30 +292,50 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp // to ensure exponential backoff can occur: // - Permission errors (it is not possible to watch changes to permissions. // The only way to eventually recover from permission errors is to keep retrying). - managedObjs, state, err := r.Applier.Apply(ctx, unpackResult.Bundle, ext, lbls) + managedObjs, _, err := r.Applier.Apply(ctx, unpackResult.Bundle, ext, objLbls, storeLbls) if err != nil { setInstalledStatusConditionFailed(ext, err.Error()) return ctrl.Result{}, err } - // Only attempt to watch resources if we are - // installing / upgrading. Otherwise we may restart - // watches that have already been established - if state != applier.StateUnchanged { - l.V(1).Info("watching managed objects") - if err := r.Watcher.Watch(ctx, r.controller, ext, managedObjs); err != nil { - setInstallStatus(ext, nil) - setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err)) - return ctrl.Result{}, err - } - } - installStatus := &ocv1alpha1.ClusterExtensionInstallStatus{ Bundle: bundleutil.MetadataFor(resolvedBundle.Name, *resolvedBundleVersion), } setInstallStatus(ext, installStatus) setInstalledStatusConditionSuccess(ext, fmt.Sprintf("Installed bundle %s successfully", resolvedBundle.Image)) + l.V(1).Info("watching managed objects") + cache, err := r.Manager.Get(ctx, ext) + if err != nil { + // If we fail to get the cache, set the Healthy condition to + // "Unknown". We can't know the health of resources we can't monitor + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeHealthy, + Reason: ocv1alpha1.ReasonUnverifiable, + Status: metav1.ConditionUnknown, + Message: err.Error(), + ObservedGeneration: ext.Generation, + }) + return ctrl.Result{}, err + } + + if err := cache.Watch(ctx, r.controller, managedObjs...); err != nil { + // If we fail to establish watches, set the Healthy condition to + // "Unknown". We can't know the health of resources we can't monitor + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeHealthy, + Reason: ocv1alpha1.ReasonUnverifiable, + Status: metav1.ConditionUnknown, + Message: err.Error(), + ObservedGeneration: ext.Generation, + }) + return ctrl.Result{}, err + } + + // If we have successfully established the watches, remove the "Healthy" condition. + // It should be interpreted as "Unknown" when not present. + apimeta.RemoveStatusCondition(&ext.Status.Conditions, ocv1alpha1.TypeHealthy) + return ctrl.Result{}, nil } diff --git a/internal/controllers/clusterextension_controller_test.go b/internal/controllers/clusterextension_controller_test.go index b5d241b3e..a6637ff66 100644 --- a/internal/controllers/clusterextension_controller_test.go +++ b/internal/controllers/clusterextension_controller_test.go @@ -495,7 +495,7 @@ func TestClusterExtensionInstallationFailedApplierFails(t *testing.T) { require.NoError(t, cl.DeleteAllOf(ctx, &ocv1alpha1.ClusterExtension{})) } -func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { +func TestClusterExtensionManagerFailed(t *testing.T) { cl, reconciler := newClientAndReconciler(t) reconciler.Unpacker = &MockUnpacker{ result: &source.Result{ @@ -550,8 +550,8 @@ func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { reconciler.Applier = &MockApplier{ objs: []client.Object{}, } - reconciler.Watcher = &MockWatcher{ - err: errors.New("watcher fail"), + reconciler.Manager = &MockManagedContentCacheManager{ + err: errors.New("manager fail"), } res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: extKey}) require.Equal(t, ctrl.Result{}, res) @@ -562,7 +562,7 @@ func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { t.Log("By checking the status fields") require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Resolution.Bundle) - require.Empty(t, clusterExtension.Status.Install) + require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Install.Bundle) t.Log("By checking the expected resolution conditions") resolvedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) @@ -580,8 +580,114 @@ func TestClusterExtensionInstallationFailedWatcherFailed(t *testing.T) { t.Log("By checking the expected installed conditions") installedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeInstalled) require.NotNil(t, installedCond) - require.Equal(t, metav1.ConditionFalse, installedCond.Status) - require.Equal(t, ocv1alpha1.ReasonInstallationFailed, installedCond.Reason) + require.Equal(t, metav1.ConditionTrue, installedCond.Status) + require.Equal(t, ocv1alpha1.ReasonSuccess, installedCond.Reason) + + t.Log("By checking the expected healthy conditions") + managedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeHealthy) + require.NotNil(t, managedCond) + require.Equal(t, metav1.ConditionUnknown, managedCond.Status) + require.Equal(t, ocv1alpha1.ReasonUnverifiable, managedCond.Reason) + + require.NoError(t, cl.DeleteAllOf(ctx, &ocv1alpha1.ClusterExtension{})) +} + +func TestClusterExtensionManagedContentCacheWatchFail(t *testing.T) { + cl, reconciler := newClientAndReconciler(t) + reconciler.Unpacker = &MockUnpacker{ + result: &source.Result{ + State: source.StateUnpacked, + Bundle: fstest.MapFS{}, + }, + } + + ctx := context.Background() + extKey := types.NamespacedName{Name: fmt.Sprintf("cluster-extension-test-%s", rand.String(8))} + + t.Log("When the cluster extension specifies a channel with version that exist") + t.Log("By initializing cluster state") + pkgName := "prometheus" + pkgVer := "1.0.0" + pkgChan := "beta" + installNamespace := fmt.Sprintf("test-ns-%s", rand.String(8)) + serviceAccount := fmt.Sprintf("test-sa-%s", rand.String(8)) + + clusterExtension := &ocv1alpha1.ClusterExtension{ + ObjectMeta: metav1.ObjectMeta{Name: extKey.Name}, + Spec: ocv1alpha1.ClusterExtensionSpec{ + Source: ocv1alpha1.SourceConfig{ + SourceType: ocv1alpha1.SourceTypeCatalog, + + Catalog: &ocv1alpha1.CatalogSource{ + PackageName: pkgName, + Version: pkgVer, + Channel: pkgChan, + }, + }, + Install: ocv1alpha1.ClusterExtensionInstallConfig{ + Namespace: installNamespace, + ServiceAccount: ocv1alpha1.ServiceAccountReference{ + Name: serviceAccount, + }, + }, + }, + } + err := cl.Create(ctx, clusterExtension) + require.NoError(t, err) + + t.Log("It sets resolution success status") + t.Log("By running reconcile") + reconciler.Resolver = resolve.Func(func(_ context.Context, _ *ocv1alpha1.ClusterExtension, _ *ocv1alpha1.BundleMetadata) (*declcfg.Bundle, *bsemver.Version, *declcfg.Deprecation, error) { + v := bsemver.MustParse("1.0.0") + return &declcfg.Bundle{ + Name: "prometheus.v1.0.0", + Package: "prometheus", + Image: "quay.io/operatorhubio/prometheus@fake1.0.0", + }, &v, nil, nil + }) + reconciler.Applier = &MockApplier{ + objs: []client.Object{}, + } + reconciler.Manager = &MockManagedContentCacheManager{ + cache: &MockManagedContentCache{ + err: errors.New("watch error"), + }, + } + res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: extKey}) + require.Equal(t, ctrl.Result{}, res) + require.Error(t, err) + + t.Log("By fetching updated cluster extension after reconcile") + require.NoError(t, cl.Get(ctx, extKey, clusterExtension)) + + t.Log("By checking the status fields") + require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Resolution.Bundle) + require.Equal(t, &ocv1alpha1.BundleMetadata{Name: "prometheus.v1.0.0", Version: "1.0.0"}, clusterExtension.Status.Install.Bundle) + + t.Log("By checking the expected resolution conditions") + resolvedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) + require.NotNil(t, resolvedCond) + require.Equal(t, metav1.ConditionTrue, resolvedCond.Status) + require.Equal(t, ocv1alpha1.ReasonSuccess, resolvedCond.Reason) + require.Equal(t, "resolved to \"quay.io/operatorhubio/prometheus@fake1.0.0\"", resolvedCond.Message) + + t.Log("By checking the expected unpacked conditions") + unpackedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeUnpacked) + require.NotNil(t, unpackedCond) + require.Equal(t, metav1.ConditionTrue, unpackedCond.Status) + require.Equal(t, ocv1alpha1.ReasonUnpackSuccess, unpackedCond.Reason) + + t.Log("By checking the expected installed conditions") + installedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeInstalled) + require.NotNil(t, installedCond) + require.Equal(t, metav1.ConditionTrue, installedCond.Status) + require.Equal(t, ocv1alpha1.ReasonSuccess, installedCond.Reason) + + t.Log("By checking the expected healthy conditions") + managedCond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeHealthy) + require.NotNil(t, managedCond) + require.Equal(t, metav1.ConditionUnknown, managedCond.Status) + require.Equal(t, ocv1alpha1.ReasonUnverifiable, managedCond.Reason) require.NoError(t, cl.DeleteAllOf(ctx, &ocv1alpha1.ClusterExtension{})) } @@ -641,7 +747,9 @@ func TestClusterExtensionInstallationSucceeds(t *testing.T) { reconciler.Applier = &MockApplier{ objs: []client.Object{}, } - reconciler.Watcher = &MockWatcher{} + reconciler.Manager = &MockManagedContentCacheManager{ + cache: &MockManagedContentCache{}, + } res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: extKey}) require.Equal(t, ctrl.Result{}, res) require.NoError(t, err) diff --git a/internal/controllers/suite_test.go b/internal/controllers/suite_test.go index f45b89a52..9fa70e457 100644 --- a/internal/controllers/suite_test.go +++ b/internal/controllers/suite_test.go @@ -30,7 +30,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/envtest" crfinalizer "sigs.k8s.io/controller-runtime/pkg/finalizer" @@ -38,6 +37,7 @@ import ( ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" "github.com/operator-framework/operator-controller/internal/contentmanager" + cmcache "github.com/operator-framework/operator-controller/internal/contentmanager/cache" "github.com/operator-framework/operator-controller/internal/controllers" "github.com/operator-framework/operator-controller/internal/rukpak/source" ) @@ -92,7 +92,7 @@ type MockApplier struct { state string } -func (m *MockApplier) Apply(_ context.Context, _ fs.FS, _ *ocv1alpha1.ClusterExtension, _ map[string]string) ([]client.Object, string, error) { +func (m *MockApplier) Apply(_ context.Context, _ fs.FS, _ *ocv1alpha1.ClusterExtension, _ map[string]string, _ map[string]string) ([]client.Object, string, error) { if m.err != nil { return nil, m.state, m.err } @@ -100,17 +100,43 @@ func (m *MockApplier) Apply(_ context.Context, _ fs.FS, _ *ocv1alpha1.ClusterExt return m.objs, m.state, nil } -var _ contentmanager.Watcher = (*MockWatcher)(nil) +var _ contentmanager.Manager = (*MockManagedContentCacheManager)(nil) -type MockWatcher struct { - err error +type MockManagedContentCacheManager struct { + err error + cache cmcache.Cache } -func (m *MockWatcher) Watch(_ context.Context, _ controller.Controller, _ *ocv1alpha1.ClusterExtension, _ []client.Object) error { +func (m *MockManagedContentCacheManager) Get(_ context.Context, _ *ocv1alpha1.ClusterExtension) (cmcache.Cache, error) { + if m.err != nil { + return nil, m.err + } + return m.cache, nil +} + +func (m *MockManagedContentCacheManager) Delete(_ *ocv1alpha1.ClusterExtension) error { return m.err } -func (m *MockWatcher) Unwatch(_ *ocv1alpha1.ClusterExtension) {} +type MockManagedContentCache struct { + err error +} + +var _ cmcache.Cache = (*MockManagedContentCache)(nil) + +func (m *MockManagedContentCache) Close() error { + if m.err != nil { + return m.err + } + return nil +} + +func (m *MockManagedContentCache) Watch(_ context.Context, _ cmcache.Watcher, _ ...client.Object) error { + if m.err != nil { + return m.err + } + return nil +} func newClientAndReconciler(t *testing.T) (client.Client, *controllers.ClusterExtensionReconciler) { cl := newClient(t) diff --git a/test/e2e/cluster_extension_install_test.go b/test/e2e/cluster_extension_install_test.go index 3f7954424..a20554a48 100644 --- a/test/e2e/cluster_extension_install_test.go +++ b/test/e2e/cluster_extension_install_test.go @@ -29,7 +29,6 @@ import ( catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" - "github.com/operator-framework/operator-controller/internal/conditionsets" ) const ( @@ -250,7 +249,6 @@ func TestClusterExtensionInstallRegistry(t *testing.T) { t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -321,7 +319,6 @@ func TestClusterExtensionInstallRegistryMultipleBundles(t *testing.T) { t.Log("By eventually reporting a failed resolution with multiple bundles") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -573,7 +570,6 @@ func TestClusterExtensionInstallReResolvesWhenCatalogIsPatched(t *testing.T) { t.Log("By reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -608,7 +604,6 @@ func TestClusterExtensionInstallReResolvesWhenCatalogIsPatched(t *testing.T) { t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -675,7 +670,6 @@ func TestClusterExtensionInstallReResolvesWhenNewCatalog(t *testing.T) { t.Log("By reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -710,7 +704,6 @@ func TestClusterExtensionInstallReResolvesWhenNewCatalog(t *testing.T) { t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return @@ -759,7 +752,6 @@ func TestClusterExtensionInstallReResolvesWhenManagedContentChanged(t *testing.T t.Log("By reporting a successful installation") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeInstalled) if !assert.NotNil(ct, cond) { return @@ -832,7 +824,6 @@ func TestClusterExtensionRecoversFromInitialInstallFailedWhenFailureFixed(t *tes t.Log("By eventually reporting a successful resolution and bundle path") require.EventuallyWithT(t, func(ct *assert.CollectT) { assert.NoError(ct, c.Get(context.Background(), types.NamespacedName{Name: clusterExtension.Name}, clusterExtension)) - assert.Len(ct, clusterExtension.Status.Conditions, len(conditionsets.ConditionTypes)) cond := apimeta.FindStatusCondition(clusterExtension.Status.Conditions, ocv1alpha1.TypeResolved) if !assert.NotNil(ct, cond) { return