From 0e46b192e3d6f1f952abade10533987863311e36 Mon Sep 17 00:00:00 2001 From: alex <8968914+acpana@users.noreply.github.com> Date: Wed, 30 Aug 2023 15:58:47 -0700 Subject: [PATCH] feat: support multiple sync sources (#2852) Signed-off-by: Alex Pana <8968914+acpana@users.noreply.github.com> --- main.go | 35 +- pkg/audit/audit_cache_lister.go | 21 +- .../aggregator/aggregator.go | 27 +- .../aggregator/aggregator_test.go | 0 pkg/cachemanager/cachemanager.go | 411 +++++++++++++ pkg/cachemanager/cachemanager_test.go | 539 ++++++++++++++++++ .../cachemanager_integration_test.go | 369 ++++++++++++ .../parser/syncannotationreader.go | 0 .../parser/syncannotationreader_test.go | 0 pkg/controller/config/config_controller.go | 241 +------- .../config/config_controller_suite_test.go | 9 +- .../config/config_controller_test.go | 484 +++++++--------- pkg/controller/config/fakes_test.go | 35 -- pkg/controller/controller.go | 46 +- pkg/controller/sync/sync_controller.go | 15 +- .../{opadataclient.go => fakecfdataclient.go} | 51 +- pkg/fakes/reader.go | 60 ++ pkg/readiness/ready_tracker_test.go | 30 +- pkg/syncutil/cachemanager/cachemanager.go | 82 --- .../cachemanager/cachemanager_test.go | 111 ---- pkg/syncutil/opadataclient.go | 69 --- pkg/syncutil/stats_reporter.go | 22 + test/testutils/manager.go | 3 + 23 files changed, 1823 insertions(+), 837 deletions(-) rename pkg/{syncutil => cachemanager}/aggregator/aggregator.go (86%) rename pkg/{syncutil => cachemanager}/aggregator/aggregator_test.go (100%) create mode 100644 pkg/cachemanager/cachemanager.go create mode 100644 pkg/cachemanager/cachemanager_test.go create mode 100644 pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go rename pkg/{syncutil => cachemanager}/parser/syncannotationreader.go (100%) rename pkg/{syncutil => cachemanager}/parser/syncannotationreader_test.go (100%) delete mode 100644 pkg/controller/config/fakes_test.go rename pkg/fakes/{opadataclient.go => fakecfdataclient.go} (62%) create mode 100644 pkg/fakes/reader.go delete mode 100644 pkg/syncutil/cachemanager/cachemanager.go delete mode 100644 pkg/syncutil/cachemanager/cachemanager_test.go delete mode 100644 pkg/syncutil/opadataclient.go diff --git a/main.go b/main.go index e2f3859dbe6..238ae49dc2c 100644 --- a/main.go +++ b/main.go @@ -43,6 +43,7 @@ import ( mutationsv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/mutations/v1beta1" statusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" "github.com/open-policy-agent/gatekeeper/v3/pkg/audit" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" @@ -52,6 +53,7 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/upgrade" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" @@ -69,6 +71,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" crzap "sigs.k8s.io/controller-runtime/pkg/log/zap" crWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -448,17 +451,43 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle // Setup all Controllers setupLog.Info("setting up controllers") - watchSet := watch.NewSet() + + // Events ch will be used to receive events from dynamic watches registered + // via the registrar below. + events := make(chan event.GenericEvent, 1024) + reg, err := wm.NewRegistrar( + cachemanager.RegistrarName, + events) + if err != nil { + setupLog.Error(err, "unable to set up watch registrar for cache manager") + return err + } + + syncMetricsCache := syncutil.NewMetricsCache() + cm, err := cachemanager.NewCacheManager(&cachemanager.Config{ + CfClient: client, + SyncMetricsCache: syncMetricsCache, + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: mgr.GetCache(), + }) + if err != nil { + setupLog.Error(err, "unable to create cache manager") + return err + } + opts := controller.Dependencies{ Opa: client, WatchManger: wm, + SyncEventsCh: events, + CacheMgr: cm, ControllerSwitch: sw, Tracker: tracker, ProcessExcluder: processExcluder, MutationSystem: mutationSystem, ExpansionSystem: expansionSystem, ProviderCache: providerCache, - WatchSet: watchSet, PubsubSystem: pubsubSystem, } @@ -483,7 +512,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle if operations.IsAssigned(operations.Audit) { setupLog.Info("setting up audit") - auditCache := audit.NewAuditCacheLister(mgr.GetCache(), watchSet) + auditCache := audit.NewAuditCacheLister(mgr.GetCache(), cm) auditDeps := audit.Dependencies{ Client: client, ProcessExcluder: processExcluder, diff --git a/pkg/audit/audit_cache_lister.go b/pkg/audit/audit_cache_lister.go index 299cb00cd26..89220128e37 100644 --- a/pkg/audit/audit_cache_lister.go +++ b/pkg/audit/audit_cache_lister.go @@ -3,7 +3,6 @@ package audit import ( "context" - "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" @@ -11,10 +10,10 @@ import ( // NewAuditCacheLister instantiates a new AuditCache which will read objects in // watched from auditCache. -func NewAuditCacheLister(auditCache client.Reader, watched *watch.Set) *CacheLister { +func NewAuditCacheLister(auditCache client.Reader, lister WatchIterator) *CacheLister { return &CacheLister{ - auditCache: auditCache, - watched: watched, + auditCache: auditCache, + watchIterator: lister, } } @@ -25,14 +24,22 @@ type CacheLister struct { // Caution: only to be read from while watched is locked, such as through // DoForEach. auditCache client.Reader - // watched is the set of objects watched by the audit cache. - watched *watch.Set + // watchIterator is a delegate like CacheManager that we can use to query a watched set of GKVs. + // Passing our logic as a callback to a watched.Set allows us to take actions while + // holding the lock on the watched.Set. This prevents us from querying the API server + // for kinds that aren't currently being watched by the CacheManager. + watchIterator WatchIterator +} + +// wraps DoForEach from a watch.Set. +type WatchIterator interface { + DoForEach(listFunc func(gvk schema.GroupVersionKind) error) error } // ListObjects lists all objects from the audit cache. func (l *CacheLister) ListObjects(ctx context.Context) ([]unstructured.Unstructured, error) { var objs []unstructured.Unstructured - err := l.watched.DoForEach(func(gvk schema.GroupVersionKind) error { + err := l.watchIterator.DoForEach(func(gvk schema.GroupVersionKind) error { gvkObjects, err := listObjects(ctx, l.auditCache, gvk) if err != nil { return err diff --git a/pkg/syncutil/aggregator/aggregator.go b/pkg/cachemanager/aggregator/aggregator.go similarity index 86% rename from pkg/syncutil/aggregator/aggregator.go rename to pkg/cachemanager/aggregator/aggregator.go index 07eb91c9dcd..5b0b78aec63 100644 --- a/pkg/syncutil/aggregator/aggregator.go +++ b/pkg/cachemanager/aggregator/aggregator.go @@ -46,7 +46,7 @@ func (b *GVKAgreggator) IsPresent(gvk schema.GroupVersionKind) bool { return found } -// Remove deletes the any associations that Key k has in the GVKAggregator. +// Remove deletes any associations that Key k has in the GVKAggregator. // For any GVK in the association k --> [GVKs], we also delete any associations // between the GVK and the Key k stored in the reverse map. func (b *GVKAgreggator) Remove(k Key) error { @@ -98,6 +98,31 @@ func (b *GVKAgreggator) Upsert(k Key, gvks []schema.GroupVersionKind) error { return nil } +// List returnes the gvk set for a given Key. +func (b *GVKAgreggator) List(k Key) map[schema.GroupVersionKind]struct{} { + b.mu.RLock() + defer b.mu.RUnlock() + + v := b.store[k] + cpy := make(map[schema.GroupVersionKind]struct{}, len(v)) + for key, value := range v { + cpy[key] = value + } + return cpy +} + +// GVKs returns a list of all of the schema.GroupVersionKind that are aggregated. +func (b *GVKAgreggator) GVKs() []schema.GroupVersionKind { + b.mu.RLock() + defer b.mu.RUnlock() + + allGVKs := []schema.GroupVersionKind{} + for gvk := range b.reverseStore { + allGVKs = append(allGVKs, gvk) + } + return allGVKs +} + func (b *GVKAgreggator) pruneReverseStore(gvks map[schema.GroupVersionKind]struct{}, k Key) error { for gvk := range gvks { keySet, found := b.reverseStore[gvk] diff --git a/pkg/syncutil/aggregator/aggregator_test.go b/pkg/cachemanager/aggregator/aggregator_test.go similarity index 100% rename from pkg/syncutil/aggregator/aggregator_test.go rename to pkg/cachemanager/aggregator/aggregator_test.go diff --git a/pkg/cachemanager/cachemanager.go b/pkg/cachemanager/cachemanager.go new file mode 100644 index 00000000000..5af1e2c4b5a --- /dev/null +++ b/pkg/cachemanager/cachemanager.go @@ -0,0 +1,411 @@ +package cachemanager + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/open-policy-agent/frameworks/constraint/pkg/types" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + "github.com/open-policy-agent/gatekeeper/v3/pkg/target" + "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const RegistrarName = "cachemanager" + +var ( + log = logf.Log.WithName("cache-manager") + backoff = wait.Backoff{ + Duration: time.Second, + Factor: 2, + Jitter: 0.1, + Steps: 3, + } +) + +type Config struct { + CfClient CFDataClient + SyncMetricsCache *syncutil.MetricsCache + Tracker *readiness.Tracker + ProcessExcluder *process.Excluder + Registrar *watch.Registrar + GVKAggregator *aggregator.GVKAgreggator + Reader client.Reader +} + +type CacheManager struct { + watchedSet *watch.Set + processExcluder *process.Excluder + gvksToSync *aggregator.GVKAgreggator + needToList bool + gvksToDeleteFromCache *watch.Set + excluderChanged bool + // mu guards access to any of the fields above + mu sync.RWMutex + + cfClient CFDataClient + syncMetricsCache *syncutil.MetricsCache + tracker *readiness.Tracker + registrar *watch.Registrar + backgroundManagementTicker time.Ticker + reader client.Reader +} + +// CFDataClient is an interface for caching data. +type CFDataClient interface { + AddData(ctx context.Context, data interface{}) (*types.Responses, error) + RemoveData(ctx context.Context, data interface{}) (*types.Responses, error) +} + +func NewCacheManager(config *Config) (*CacheManager, error) { + if config.Registrar == nil { + return nil, fmt.Errorf("registrar must be non-nil") + } + if config.ProcessExcluder == nil { + return nil, fmt.Errorf("processExcluder must be non-nil") + } + if config.Tracker == nil { + return nil, fmt.Errorf("tracker must be non-nil") + } + if config.Reader == nil { + return nil, fmt.Errorf("reader must be non-nil") + } + + if config.GVKAggregator == nil { + config.GVKAggregator = aggregator.NewGVKAggregator() + } + + cm := &CacheManager{ + cfClient: config.CfClient, + syncMetricsCache: config.SyncMetricsCache, + tracker: config.Tracker, + processExcluder: config.ProcessExcluder, + registrar: config.Registrar, + watchedSet: watch.NewSet(), + reader: config.Reader, + gvksToSync: config.GVKAggregator, + backgroundManagementTicker: *time.NewTicker(3 * time.Second), + gvksToDeleteFromCache: watch.NewSet(), + } + + return cm, nil +} + +func (c *CacheManager) Start(ctx context.Context) error { + go c.manageCache(ctx) + + <-ctx.Done() + return nil +} + +// UpsertSource adjusts the watched set of gvks according to the newGVKs passed in +// for a given sourceKey. Callers are responsible for retrying on error. +func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Key, newGVKs []schema.GroupVersionKind) error { + c.mu.Lock() + defer c.mu.Unlock() + + if len(newGVKs) > 0 { + if err := c.gvksToSync.Upsert(sourceKey, newGVKs); err != nil { + return fmt.Errorf("internal error adding source: %w", err) + } + } else { + if err := c.gvksToSync.Remove(sourceKey); err != nil { + return fmt.Errorf("internal error removing source: %w", err) + } + } + + // as a result of upserting the new gvks for the source key, some gvks + // may become unreferenced and need to be deleted; this will be handled async + // in the manageCache loop. + + if err := c.replaceWatchSet(ctx); err != nil { + return fmt.Errorf("error watching new gvks: %w", err) + } + + return nil +} + +// replaceWatchSet looks at the gvksToSync and makes changes to the registrar's watch set. +// Assumes caller has lock. On error, actual watch state may not align with intended watch state. +func (c *CacheManager) replaceWatchSet(ctx context.Context) error { + newWatchSet := watch.NewSet() + newWatchSet.Add(c.gvksToSync.GVKs()...) + + diff := c.watchedSet.Difference(newWatchSet) + c.removeStaleExpectations(diff) + + c.gvksToDeleteFromCache.AddSet(diff) + + var innerError error + c.watchedSet.Replace(newWatchSet, func() { + // *Note the following steps are not transactional with respect to admission control + + // Important: dynamic watches update must happen *after* updating our watchSet. + // Otherwise, the sync controller will drop events for the newly watched kinds. + innerError = c.registrar.ReplaceWatch(ctx, newWatchSet.Items()) + }) + + return innerError +} + +// removeStaleExpectations stops tracking data for any resources that are no longer watched. +func (c *CacheManager) removeStaleExpectations(stale *watch.Set) { + for _, gvk := range stale.Items() { + c.tracker.CancelData(gvk) + } +} + +// RemoveSource removes the watches of the GVKs for a given aggregator.Key. Callers are responsible for retrying on error. +func (c *CacheManager) RemoveSource(ctx context.Context, sourceKey aggregator.Key) error { + c.mu.Lock() + defer c.mu.Unlock() + + if err := c.gvksToSync.Remove(sourceKey); err != nil { + return fmt.Errorf("internal error removing source: %w", err) + } + + if err := c.replaceWatchSet(ctx); err != nil { + return fmt.Errorf("error removing watches for source %v: %w", sourceKey, err) + } + + return nil +} + +// ExcludeProcesses swaps the current process excluder with the new *process.Excluder. +// It's a no-op if the two excluders are equal. +func (c *CacheManager) ExcludeProcesses(newExcluder *process.Excluder) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.processExcluder.Equals(newExcluder) { + return + } + + c.processExcluder.Replace(newExcluder) + // there is a new excluder which means we need to schedule a wipe for any + // previously watched GVKs to be re-added to get a chance to be evaluated + // for this new process excluder. + c.excluderChanged = true +} + +// DoForEach runs fn for each GVK that is being watched by the cache manager. +// This is handy when we want to take actions while holding the lock on the watched.Set. +func (c *CacheManager) DoForEach(fn func(gvk schema.GroupVersionKind) error) error { + c.mu.RLock() + defer c.mu.RUnlock() + + err := c.watchedSet.DoForEach(fn) + return err +} + +func (c *CacheManager) watchesGVK(gvk schema.GroupVersionKind) bool { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.watchedSet.Contains(gvk) +} + +func (c *CacheManager) AddObject(ctx context.Context, instance *unstructured.Unstructured) error { + gvk := instance.GroupVersionKind() + + isNamespaceExcluded, err := c.processExcluder.IsNamespaceExcluded(process.Sync, instance) + if err != nil { + return fmt.Errorf("error while excluding namespaces for gvk: %+v: %w", gvk, err) + } + + if isNamespaceExcluded { + c.tracker.ForData(instance.GroupVersionKind()).CancelExpect(instance) + return nil + } + + syncKey := syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName()) + if c.watchesGVK(gvk) { + _, err = c.cfClient.AddData(ctx, instance) + if err != nil { + c.syncMetricsCache.AddObject( + syncKey, + syncutil.Tags{ + Kind: instance.GetKind(), + Status: metrics.ErrorStatus, + }, + ) + + return err + } + + c.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ + Kind: instance.GetKind(), + Status: metrics.ActiveStatus, + }) + c.syncMetricsCache.AddKind(instance.GetKind()) + } + + c.tracker.ForData(instance.GroupVersionKind()).Observe(instance) + + return nil +} + +func (c *CacheManager) RemoveObject(ctx context.Context, instance *unstructured.Unstructured) error { + if _, err := c.cfClient.RemoveData(ctx, instance); err != nil { + return err + } + + // only delete from metrics map if the data removal was successful + c.syncMetricsCache.DeleteObject(syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName())) + c.tracker.ForData(instance.GroupVersionKind()).CancelExpect(instance) + + return nil +} + +func (c *CacheManager) wipeData(ctx context.Context) error { + if _, err := c.cfClient.RemoveData(ctx, target.WipeData()); err != nil { + return err + } + + // reset sync cache before sending the metric + c.syncMetricsCache.ResetCache() + c.syncMetricsCache.ReportSync() + + return nil +} + +func (c *CacheManager) ReportSyncMetrics() { + c.syncMetricsCache.ReportSync() +} + +func (c *CacheManager) syncGVK(ctx context.Context, gvk schema.GroupVersionKind) error { + u := &unstructured.UnstructuredList{} + u.SetGroupVersionKind(schema.GroupVersionKind{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind + "List", + }) + + var err error + func() { + c.mu.RLock() + defer c.mu.RUnlock() + + // only call List if we are still watching the gvk. + if c.watchedSet.Contains(gvk) { + err = c.reader.List(ctx, u) + } + }() + + if err != nil { + return fmt.Errorf("listing data for %+v: %w", gvk, err) + } + + for i := range u.Items { + if err := c.AddObject(ctx, &u.Items[i]); err != nil { + return fmt.Errorf("adding data for %+v: %w", gvk, err) + } + } + + return nil +} + +func (c *CacheManager) manageCache(ctx context.Context) { + // relistStopChan is used to stop any list operations still in progress + relistStopChan := make(chan struct{}) + + for { + select { + case <-ctx.Done(): + return + case <-c.backgroundManagementTicker.C: + func() { + c.mu.Lock() + defer c.mu.Unlock() + + c.wipeCacheIfNeeded(ctx) + + if !c.needToList { + // this means that there are no changes needed + // such that any gvks need to be relisted. + // any in flight goroutines can finish relisiting. + return + } + + // otherwise, spin up new goroutines to relist gvks as there has been a wipe + + // stop any goroutines that were relisting before + // as we may no longer be interested in those gvks + close(relistStopChan) + + // assume all gvks need to be relisted + // and while under lock, make a copy of + // all gvks so we can pass it in the goroutine + // without needing to read lock this data + gvksToRelist := c.gvksToSync.GVKs() + + // clean state + c.needToList = false + relistStopChan = make(chan struct{}) + + go c.replayGVKs(ctx, gvksToRelist, relistStopChan) + }() + } + } +} + +func (c *CacheManager) replayGVKs(ctx context.Context, gvksToRelist []schema.GroupVersionKind, stopCh <-chan struct{}) { + gvksSet := watch.NewSet() + gvksSet.Add(gvksToRelist...) + + for gvksSet.Size() != 0 { + gvkItems := gvksSet.Items() + + for _, gvk := range gvkItems { + select { + case <-ctx.Done(): + return + case <-stopCh: + return + default: + operation := func() (bool, error) { + if err := c.syncGVK(ctx, gvk); err != nil { + return false, err + } + return true, nil + } + + if err := wait.ExponentialBackoff(backoff, operation); err != nil { + log.Error(err, "internal: error listings gvk cache data", "gvk", gvk) + } else { + gvksSet.Remove(gvk) + } + } + } + + c.ReportSyncMetrics() + } +} + +// wipeCacheIfNeeded performs a cache wipe if there are any gvks needing to be removed +// from the cache or if the excluder has changed. It also marks which gvks need to be +// re listed again in the cf data cache after the wipe. Assumes the caller has lock. +func (c *CacheManager) wipeCacheIfNeeded(ctx context.Context) { + // remove any gvks not needing to be synced anymore + // or re evaluate all if the excluder changed. + if c.gvksToDeleteFromCache.Size() > 0 || c.excluderChanged { + if err := c.wipeData(ctx); err != nil { + log.Error(err, "internal: error wiping cache") + return + } + + c.gvksToDeleteFromCache = watch.NewSet() + c.excluderChanged = false + c.needToList = true + } +} diff --git a/pkg/cachemanager/cachemanager_test.go b/pkg/cachemanager/cachemanager_test.go new file mode 100644 index 00000000000..3e7fa9bbeec --- /dev/null +++ b/pkg/cachemanager/cachemanager_test.go @@ -0,0 +1,539 @@ +package cachemanager + +import ( + "context" + "testing" + + configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" + "github.com/open-policy-agent/gatekeeper/v3/pkg/wildcard" + testclient "github.com/open-policy-agent/gatekeeper/v3/test/clients" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +var cfg *rest.Config + +func TestMain(m *testing.M) { + testutils.StartControlPlane(m, &cfg, 2) +} + +func makeCacheManager(t *testing.T) (*CacheManager, context.Context) { + mgr, wm := testutils.SetupManager(t, cfg) + c := testclient.NewRetryClient(mgr.GetClient()) + + ctx, cancelFunc := context.WithCancel(context.Background()) + + cfClient := &fakes.FakeCfClient{} + tracker, err := readiness.SetupTracker(mgr, false, false, false) + require.NoError(t, err) + processExcluder := process.Get() + processExcluder.Add([]configv1alpha1.MatchEntry{ + { + ExcludedNamespaces: []wildcard.Wildcard{"kube-system"}, + Processes: []string{"sync"}, + }, + }) + events := make(chan event.GenericEvent, 1024) + reg, err := wm.NewRegistrar( + "test-cache-manager", + events) + require.NoError(t, err) + + cacheManager, err := NewCacheManager(&Config{ + CfClient: cfClient, + SyncMetricsCache: syncutil.NewMetricsCache(), + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: c, + GVKAggregator: aggregator.NewGVKAggregator(), + }) + require.NoError(t, err) + + t.Cleanup(func() { + cancelFunc() + }) + + testutils.StartManager(ctx, t, mgr) + + return cacheManager, ctx +} + +func TestCacheManager_wipeCacheIfNeeded(t *testing.T) { + configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + dataClientForTest := func() CFDataClient { + cfdc := &fakes.FakeCfClient{} + + cm := unstructuredFor(configMapGVK, "config-test-1") + _, err := cfdc.AddData(context.Background(), cm) + + require.NoError(t, err, "adding ConfigMap config-test-1 in cfClient") + + return cfdc + } + + tcs := []struct { + name string + cm *CacheManager + expectedData map[fakes.CfDataKey]interface{} + }{ + { + name: "wipe cache if there are gvks to remove", + cm: &CacheManager{ + cfClient: dataClientForTest(), + gvksToDeleteFromCache: func() *watch.Set { + gvksToDelete := watch.NewSet() + gvksToDelete.Add(configMapGVK) + return gvksToDelete + }(), + syncMetricsCache: syncutil.NewMetricsCache(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + }, + { + name: "wipe cache if there are excluder changes", + cm: &CacheManager{ + cfClient: dataClientForTest(), + excluderChanged: true, + syncMetricsCache: syncutil.NewMetricsCache(), + gvksToDeleteFromCache: watch.NewSet(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + }, + { + name: "don't wipe cache if no excluder changes or no gvks to delete", + cm: &CacheManager{ + cfClient: dataClientForTest(), + syncMetricsCache: syncutil.NewMetricsCache(), + gvksToDeleteFromCache: watch.NewSet(), + }, + expectedData: map[fakes.CfDataKey]interface{}{{Gvk: configMapGVK, Key: "default/config-test-1"}: nil}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + cfClient, ok := tc.cm.cfClient.(*fakes.FakeCfClient) + require.True(t, ok) + + tc.cm.wipeCacheIfNeeded(context.Background()) + require.True(t, cfClient.Contains(tc.expectedData)) + }) + } +} + +// TestCacheManager_AddObject tests that we can add objects in the cache. +func TestCacheManager_AddObject(t *testing.T) { + pod := fakes.Pod( + fakes.WithNamespace("test-ns"), + fakes.WithName("test-name"), + ) + unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + require.NoError(t, err) + + mgr, _ := testutils.SetupManager(t, cfg) + + tcs := []struct { + name string + cm *CacheManager + expectSyncMetric bool + expectedMetricStatus metrics.Status + expectedData map[fakes.CfDataKey]interface{} + }{ + { + name: "AddObject happy path", + cm: &CacheManager{ + cfClient: &fakes.FakeCfClient{}, + watchedSet: func() *watch.Set { + ws := watch.NewSet() + ws.Add(pod.GroupVersionKind()) + + return ws + }(), + tracker: readiness.NewTracker(mgr.GetAPIReader(), false, false, false), + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: process.Get(), + }, + expectedData: map[fakes.CfDataKey]interface{}{{Gvk: pod.GroupVersionKind(), Key: "test-ns/test-name"}: nil}, + expectSyncMetric: true, + expectedMetricStatus: metrics.ActiveStatus, + }, + { + name: "AddObject has no effect if GVK is not watched", + cm: &CacheManager{ + cfClient: &fakes.FakeCfClient{}, + watchedSet: watch.NewSet(), + tracker: readiness.NewTracker(mgr.GetAPIReader(), false, false, false), + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: process.Get(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + expectSyncMetric: false, + }, + { + name: "AddObject has no effect if GVK is process excluded", + cm: &CacheManager{ + cfClient: &fakes.FakeCfClient{}, + watchedSet: func() *watch.Set { + ws := watch.NewSet() + ws.Add(pod.GroupVersionKind()) + + return ws + }(), + tracker: readiness.NewTracker(mgr.GetAPIReader(), false, false, false), + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: func() *process.Excluder { + processExcluder := process.New() + processExcluder.Add([]configv1alpha1.MatchEntry{ + { + ExcludedNamespaces: []wildcard.Wildcard{"test-ns"}, + Processes: []string{"sync"}, + }, + }) + return processExcluder + }(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + expectSyncMetric: false, + }, + { + name: "AddObject sets metrics on error from cfdataclient", + cm: &CacheManager{ + cfClient: func() CFDataClient { + c := &fakes.FakeCfClient{} + c.SetErroring(true) + return c + }(), + watchedSet: func() *watch.Set { + ws := watch.NewSet() + ws.Add(pod.GroupVersionKind()) + + return ws + }(), + tracker: readiness.NewTracker(mgr.GetAPIReader(), false, false, false), + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: process.Get(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + expectSyncMetric: true, + expectedMetricStatus: metrics.ErrorStatus, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.cm.AddObject(context.Background(), &unstructured.Unstructured{Object: unstructuredPod}) + if tc.expectedMetricStatus == metrics.ActiveStatus { + require.NoError(t, err) + } + + assertExpecations(t, tc.cm, &unstructured.Unstructured{Object: unstructuredPod}, tc.expectedData, tc.expectSyncMetric, &tc.expectedMetricStatus) + }) + } +} + +func assertExpecations(t *testing.T, cm *CacheManager, instance *unstructured.Unstructured, expectedData map[fakes.CfDataKey]interface{}, expectSyncMetric bool, expectedMetricStatus *metrics.Status) { + t.Helper() + + cfClient, ok := cm.cfClient.(*fakes.FakeCfClient) + require.True(t, ok) + + require.True(t, cfClient.Contains(expectedData)) + + syncKey := syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName()) + + require.Equal(t, expectSyncMetric, cm.syncMetricsCache.HasObject(syncKey)) + + if expectSyncMetric { + require.Equal(t, *expectedMetricStatus, cm.syncMetricsCache.GetTags(syncKey).Status) + } +} + +// TestCacheManager_RemoveObject tests that we can remove objects from the cache. +func TestCacheManager_RemoveObject(t *testing.T) { + pod := fakes.Pod( + fakes.WithNamespace("test-ns"), + fakes.WithName("test-name"), + ) + unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + require.NoError(t, err) + + mgr, _ := testutils.SetupManager(t, cfg) + tracker := readiness.NewTracker(mgr.GetAPIReader(), false, false, false) + makeDataClient := func() *fakes.FakeCfClient { + c := &fakes.FakeCfClient{} + _, err := c.AddData(context.Background(), &unstructured.Unstructured{Object: unstructuredPod}) + require.NoError(t, err) + + return c + } + + tcs := []struct { + name string + cm *CacheManager + expectSyncMetric bool + expectedData map[fakes.CfDataKey]interface{} + }{ + { + name: "RemoveObject happy path", + cm: &CacheManager{ + cfClient: makeDataClient(), + watchedSet: func() *watch.Set { + ws := watch.NewSet() + ws.Add(pod.GroupVersionKind()) + + return ws + }(), + tracker: tracker, + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: process.Get(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + expectSyncMetric: false, + }, + { + name: "RemoveObject succeeds even if GVK is not watched", + cm: &CacheManager{ + cfClient: makeDataClient(), + watchedSet: watch.NewSet(), + tracker: tracker, + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: process.Get(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + expectSyncMetric: false, + }, + { + name: "RemoveObject succeeds even if process excluded", + cm: &CacheManager{ + cfClient: makeDataClient(), + watchedSet: func() *watch.Set { + ws := watch.NewSet() + ws.Add(pod.GroupVersionKind()) + + return ws + }(), + tracker: tracker, + syncMetricsCache: syncutil.NewMetricsCache(), + processExcluder: func() *process.Excluder { + processExcluder := process.New() + processExcluder.Add([]configv1alpha1.MatchEntry{ + { + ExcludedNamespaces: []wildcard.Wildcard{"test-ns"}, + Processes: []string{"sync"}, + }, + }) + return processExcluder + }(), + }, + expectedData: map[fakes.CfDataKey]interface{}{}, + expectSyncMetric: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, tc.cm.RemoveObject(context.Background(), &unstructured.Unstructured{Object: unstructuredPod})) + + assertExpecations(t, tc.cm, &unstructured.Unstructured{Object: unstructuredPod}, tc.expectedData, tc.expectSyncMetric, nil) + }) + } +} + +// TestCacheManager_UpsertSource tests that we can modify the gvk aggregator and watched set when adding a new source. +func TestCacheManager_UpsertSource(t *testing.T) { + configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + podGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + sourceA := aggregator.Key{Source: "a", ID: "source"} + sourceB := aggregator.Key{Source: "b", ID: "source"} + + type sourcesAndGvk struct { + source aggregator.Key + gvks []schema.GroupVersionKind + } + + tcs := []struct { + name string + sourcesAndGvks []sourcesAndGvk + expectedGVKs []schema.GroupVersionKind + }{ + { + name: "add one source", + sourcesAndGvks: []sourcesAndGvk{ + { + source: sourceA, + gvks: []schema.GroupVersionKind{configMapGVK}, + }, + }, + expectedGVKs: []schema.GroupVersionKind{configMapGVK}, + }, + { + name: "overwrite source", + sourcesAndGvks: []sourcesAndGvk{ + { + source: sourceA, + gvks: []schema.GroupVersionKind{configMapGVK}, + }, + { + source: sourceA, + gvks: []schema.GroupVersionKind{podGVK}, + }, + }, + expectedGVKs: []schema.GroupVersionKind{podGVK}, + }, + { + name: "remove source by not specifying any gvk", + sourcesAndGvks: []sourcesAndGvk{ + { + source: sourceA, + gvks: []schema.GroupVersionKind{configMapGVK}, + }, + { + source: sourceA, + gvks: []schema.GroupVersionKind{}, + }, + }, + expectedGVKs: []schema.GroupVersionKind{}, + }, + { + name: "add two disjoing sources", + sourcesAndGvks: []sourcesAndGvk{ + { + source: sourceA, + gvks: []schema.GroupVersionKind{configMapGVK}, + }, + { + source: sourceB, + gvks: []schema.GroupVersionKind{podGVK}, + }, + }, + expectedGVKs: []schema.GroupVersionKind{configMapGVK, podGVK}, + }, + { + name: "add two sources with overlapping gvks", + sourcesAndGvks: []sourcesAndGvk{ + { + source: sourceA, + gvks: []schema.GroupVersionKind{configMapGVK, podGVK}, + }, + { + source: sourceB, + gvks: []schema.GroupVersionKind{podGVK}, + }, + }, + expectedGVKs: []schema.GroupVersionKind{configMapGVK, podGVK}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + cacheManager, ctx := makeCacheManager(t) + + for _, sourceAndGVK := range tc.sourcesAndGvks { + require.NoError(t, cacheManager.UpsertSource(ctx, sourceAndGVK.source, sourceAndGVK.gvks)) + } + + require.ElementsMatch(t, cacheManager.watchedSet.Items(), tc.expectedGVKs) + require.ElementsMatch(t, cacheManager.gvksToSync.GVKs(), tc.expectedGVKs) + }) + } +} + +// TestCacheManager_RemoveSource tests that we can modify the gvk aggregator when removing a source. +func TestCacheManager_RemoveSource(t *testing.T) { + configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + podGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + sourceA := aggregator.Key{Source: "a", ID: "source"} + sourceB := aggregator.Key{Source: "b", ID: "source"} + + tcs := []struct { + name string + seed func(c *CacheManager) + sourcesToRemove []aggregator.Key + expectedGVKs []schema.GroupVersionKind + }{ + { + name: "remove disjoint source", + seed: func(c *CacheManager) { + require.NoError(t, c.gvksToSync.Upsert(sourceA, []schema.GroupVersionKind{podGVK})) + require.NoError(t, c.gvksToSync.Upsert(sourceB, []schema.GroupVersionKind{configMapGVK})) + }, + sourcesToRemove: []aggregator.Key{sourceB}, + expectedGVKs: []schema.GroupVersionKind{podGVK}, + }, + { + name: "remove overlapping source", + seed: func(c *CacheManager) { + require.NoError(t, c.gvksToSync.Upsert(sourceA, []schema.GroupVersionKind{podGVK})) + require.NoError(t, c.gvksToSync.Upsert(sourceB, []schema.GroupVersionKind{podGVK})) + }, + sourcesToRemove: []aggregator.Key{sourceB}, + expectedGVKs: []schema.GroupVersionKind{podGVK}, + }, + { + name: "remove non existing source", + seed: func(c *CacheManager) { + require.NoError(t, c.gvksToSync.Upsert(sourceA, []schema.GroupVersionKind{podGVK})) + }, + sourcesToRemove: []aggregator.Key{sourceB}, + expectedGVKs: []schema.GroupVersionKind{podGVK}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + cm, ctx := makeCacheManager(t) + tc.seed(cm) + + for _, source := range tc.sourcesToRemove { + require.NoError(t, cm.RemoveSource(ctx, source)) + } + + require.ElementsMatch(t, cm.gvksToSync.GVKs(), tc.expectedGVKs) + }) + } + cacheManager, ctx := makeCacheManager(t) + + // seed the gvk aggregator + require.NoError(t, cacheManager.gvksToSync.Upsert(sourceA, []schema.GroupVersionKind{podGVK})) + require.NoError(t, cacheManager.gvksToSync.Upsert(sourceB, []schema.GroupVersionKind{podGVK, configMapGVK})) + + // removing a source that is not the only one referencing a gvk ... + require.NoError(t, cacheManager.RemoveSource(ctx, sourceB)) + // ... should not remove any gvks that are still referenced by other sources + require.True(t, cacheManager.gvksToSync.IsPresent(podGVK)) + require.False(t, cacheManager.gvksToSync.IsPresent(configMapGVK)) + + require.NoError(t, cacheManager.RemoveSource(ctx, sourceA)) + require.False(t, cacheManager.gvksToSync.IsPresent(podGVK)) +} + +func unstructuredFor(gvk schema.GroupVersionKind, name string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(gvk) + u.SetName(name) + u.SetNamespace("default") + if gvk.Kind == "Pod" { + u.Object["spec"] = map[string]interface{}{ + "containers": []map[string]interface{}{ + { + "name": "foo-container", + "image": "foo-image", + }, + }, + } + } + return u +} diff --git a/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go b/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go new file mode 100644 index 00000000000..7618c62bf45 --- /dev/null +++ b/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go @@ -0,0 +1,369 @@ +package cachemanager_test + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" + "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" + "github.com/open-policy-agent/gatekeeper/v3/pkg/wildcard" + testclient "github.com/open-policy-agent/gatekeeper/v3/test/clients" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + eventuallyTimeout = 10 * time.Second + eventuallyTicker = 2 * time.Second + + jitterUpperBound = 100 +) + +var cfg *rest.Config + +var ( + configMapGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + podGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + + cm1Name = "config-test-1" + cm2Name = "config-test-2" + + pod1Name = "pod-test-1" +) + +func TestMain(m *testing.M) { + testutils.StartControlPlane(m, &cfg, 3) +} + +// TestCacheManager_replay_retries tests that we can retry GVKs that error out in the replay goroutine. +func TestCacheManager_replay_retries(t *testing.T) { + mgr, wm := testutils.SetupManager(t, cfg) + c := testclient.NewRetryClient(mgr.GetClient()) + + fi := fakes.NewFailureInjector() + reader := fakes.SpyReader{ + Reader: c, + ListFunc: func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + // return as many syntenthic failures as there are registered for this kind + if fi.CheckFailures(list.GetObjectKind().GroupVersionKind().Kind) { + return fmt.Errorf("synthetic failure") + } + + return c.List(ctx, list, opts...) + }, + } + + testResources, ctx := makeTestResources(t, mgr, wm, reader) + cacheManager := testResources.CacheManager + dataStore := testResources.CFDataClient + + cfClient, ok := dataStore.(*fakes.FakeCfClient) + require.True(t, ok) + + cm := unstructuredFor(configMapGVK, cm1Name) + require.NoError(t, c.Create(ctx, cm), fmt.Sprintf("creating ConfigMap %s", cm1Name)) + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, cm), fmt.Sprintf("deleting resource %s", cm1Name)) + }) + cmKey, err := fakes.KeyFor(cm) + require.NoError(t, err) + + pod := unstructuredFor(podGVK, pod1Name) + require.NoError(t, c.Create(ctx, pod), fmt.Sprintf("creating Pod %s", pod1Name)) + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, pod), fmt.Sprintf("deleting resource %s", pod1Name)) + }) + podKey, err := fakes.KeyFor(pod) + require.NoError(t, err) + + syncSourceOne := aggregator.Key{Source: "source_a", ID: "ID_a"} + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK, podGVK})) + + expected := map[fakes.CfDataKey]interface{}{ + cmKey: nil, + podKey: nil, + } + + require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker) + + fi.SetFailures("ConfigMapList", 5) + + // this call should schedule a cache wipe and a replay for the configMapGVK + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + + expected2 := map[fakes.CfDataKey]interface{}{ + cmKey: nil, + } + require.Eventually(t, expectedCheck(cfClient, expected2), eventuallyTimeout, eventuallyTicker) +} + +// TestCacheManager_concurrent makes sure that we can add and remove multiple sources +// from separate go routines and changes to the underlying cache are reflected. +func TestCacheManager_concurrent(t *testing.T) { + r := rand.New(rand.NewSource(12345)) // #nosec G404: Using weak random number generator for determinism between calls + + mgr, wm := testutils.SetupManager(t, cfg) + c := testclient.NewRetryClient(mgr.GetClient()) + testResources, ctx := makeTestResources(t, mgr, wm, c) + + cacheManager := testResources.CacheManager + dataStore := testResources.CFDataClient + agg := testResources.GVKAgreggator + + // Create configMaps to test for + cm := unstructuredFor(configMapGVK, cm1Name) + require.NoError(t, c.Create(ctx, cm), fmt.Sprintf("creating ConfigMap %s", cm1Name)) + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, cm), fmt.Sprintf("deleting resource %s", cm1Name)) + }) + cmKey, err := fakes.KeyFor(cm) + require.NoError(t, err) + + cm2 := unstructuredFor(configMapGVK, cm2Name) + require.NoError(t, c.Create(ctx, cm2), fmt.Sprintf("creating ConfigMap %s", cm2Name)) + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, cm2), fmt.Sprintf("deleting resource %s", cm2Name)) + }) + cm2Key, err := fakes.KeyFor(cm2) + require.NoError(t, err) + + pod := unstructuredFor(podGVK, pod1Name) + require.NoError(t, c.Create(ctx, pod), fmt.Sprintf("creating Pod %s", pod1Name)) + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, pod), fmt.Sprintf("deleting resource %s", pod1Name)) + }) + podKey, err := fakes.KeyFor(pod) + require.NoError(t, err) + + cfClient, ok := dataStore.(*fakes.FakeCfClient) + require.True(t, ok) + + syncSourceOne := aggregator.Key{Source: "source_a", ID: "ID_a"} + syncSourceTwo := aggregator.Key{Source: "source_b", ID: "ID_b"} + + wg := &sync.WaitGroup{} + + // simulate a churn-y concurrent access by swapping the gvks for the sync sources repeatedly + // and removing sync sources, all from different go routines. + for i := 1; i < 100; i++ { + wg.Add(3) + + // add some jitter between go func calls + time.Sleep(time.Duration(r.Intn(jitterUpperBound)) * time.Millisecond) + go func() { + defer wg.Done() + + assert.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + assert.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK})) + }() + + time.Sleep(time.Duration(r.Intn(jitterUpperBound)) * time.Millisecond) + go func() { + defer wg.Done() + + assert.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{podGVK})) + assert.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{configMapGVK})) + }() + + time.Sleep(time.Duration(r.Intn(jitterUpperBound)) * time.Millisecond) + go func() { + defer wg.Done() + + assert.NoError(t, cacheManager.RemoveSource(ctx, syncSourceTwo)) + assert.NoError(t, cacheManager.RemoveSource(ctx, syncSourceOne)) + }() + } + + wg.Wait() + + // final upsert for determinism + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK})) + + expected := map[fakes.CfDataKey]interface{}{ + cmKey: nil, + cm2Key: nil, + podKey: nil, + } + + require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker) + // now assert that the gvkAggregator looks as expected + agg.IsPresent(configMapGVK) + gvks := agg.List(syncSourceOne) + require.Len(t, gvks, 1) + _, foundConfigMap := gvks[configMapGVK] + require.True(t, foundConfigMap) + gvks = agg.List(syncSourceTwo) + require.Len(t, gvks, 1) + _, foundPod := gvks[podGVK] + require.True(t, foundPod) + + // do a final remove and expect the cache to clear + require.NoError(t, cacheManager.RemoveSource(ctx, syncSourceOne)) + require.NoError(t, cacheManager.RemoveSource(ctx, syncSourceTwo)) + + require.Eventually(t, expectedCheck(cfClient, map[fakes.CfDataKey]interface{}{}), eventuallyTimeout, eventuallyTicker) + require.True(t, len(agg.GVKs()) == 0) +} + +// TestCacheManager_instance_updates tests that cache manager wires up dependencies correctly +// such that updates to an instance of a watched gvks are reconciled in the sync_controller. +func TestCacheManager_instance_updates(t *testing.T) { + mgr, wm := testutils.SetupManager(t, cfg) + c := testclient.NewRetryClient(mgr.GetClient()) + + testResources, ctx := makeTestResources(t, mgr, wm, c) + + cacheManager := testResources.CacheManager + dataStore := testResources.CFDataClient + + cfClient, ok := dataStore.(*fakes.FakeCfClient) + require.True(t, ok) + + cm := unstructuredFor(configMapGVK, cm1Name) + require.NoError(t, c.Create(ctx, cm), fmt.Sprintf("creating ConfigMap %s", cm1Name)) + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, cm), fmt.Sprintf("deleting resource %s", cm1Name)) + }) + cmKey, err := fakes.KeyFor(cm) + require.NoError(t, err) + + syncSourceOne := aggregator.Key{Source: "source_a", ID: "ID_a"} + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + + expected := map[fakes.CfDataKey]interface{}{ + cmKey: nil, + } + + require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker) + + cmUpdate := unstructuredFor(configMapGVK, cm1Name) + cmUpdate.SetLabels(map[string]string{"testlabel": "test"}) // trigger an instance update + require.NoError(t, c.Update(ctx, cmUpdate)) + + require.Eventually(t, func() bool { + instance := cfClient.GetData(cmKey) + unInstance, ok := instance.(*unstructured.Unstructured) + require.True(t, ok) + + value, found, err := unstructured.NestedString(unInstance.Object, "metadata", "labels", "testlabel") + require.NoError(t, err) + + return found && "test" == value + }, eventuallyTimeout, eventuallyTicker) +} + +func deleteResource(ctx context.Context, c client.Client, resounce *unstructured.Unstructured) error { + err := c.Delete(ctx, resounce) + if apierrors.IsNotFound(err) { + // resource does not exist, this is good + return nil + } + + return err +} + +func expectedCheck(cfClient *fakes.FakeCfClient, expected map[fakes.CfDataKey]interface{}) func() bool { + return func() bool { + if cfClient.Len() != len(expected) { + return false + } + if cfClient.Contains(expected) { + return true + } + + return false + } +} + +func unstructuredFor(gvk schema.GroupVersionKind, name string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(gvk) + u.SetName(name) + u.SetNamespace("default") + if gvk.Kind == "Pod" { + u.Object["spec"] = map[string]interface{}{ + "containers": []map[string]interface{}{ + { + "name": "foo-container", + "image": "foo-image", + }, + }, + } + } + return u +} + +type testResources struct { + *cachemanager.CacheManager + cachemanager.CFDataClient + *aggregator.GVKAgreggator +} + +func makeTestResources(t *testing.T, mgr manager.Manager, wm *watch.Manager, reader client.Reader) (testResources, context.Context) { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(func() { + cancelFunc() + }) + + cfClient := &fakes.FakeCfClient{} + tracker, err := readiness.SetupTracker(mgr, false, false, false) + require.NoError(t, err) + processExcluder := process.Get() + processExcluder.Add([]configv1alpha1.MatchEntry{ + { + ExcludedNamespaces: []wildcard.Wildcard{"kube-system"}, + Processes: []string{"sync"}, + }, + }) + events := make(chan event.GenericEvent, 1024) + reg, err := wm.NewRegistrar( + "test-cache-manager", + events) + require.NoError(t, err) + + aggregator := aggregator.NewGVKAggregator() + config := &cachemanager.Config{ + CfClient: cfClient, + SyncMetricsCache: syncutil.NewMetricsCache(), + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: reader, + GVKAggregator: aggregator, + } + cacheManager, err := cachemanager.NewCacheManager(config) + require.NoError(t, err) + + syncAdder := syncc.Adder{ + Events: events, + CacheManager: cacheManager, + } + require.NoError(t, syncAdder.Add(mgr), "registering sync controller") + go func() { + assert.NoError(t, cacheManager.Start(ctx)) + }() + + testutils.StartManager(ctx, t, mgr) + + return testResources{cacheManager, cfClient, aggregator}, ctx +} diff --git a/pkg/syncutil/parser/syncannotationreader.go b/pkg/cachemanager/parser/syncannotationreader.go similarity index 100% rename from pkg/syncutil/parser/syncannotationreader.go rename to pkg/cachemanager/parser/syncannotationreader.go diff --git a/pkg/syncutil/parser/syncannotationreader_test.go b/pkg/cachemanager/parser/syncannotationreader_test.go similarity index 100% rename from pkg/syncutil/parser/syncannotationreader_test.go rename to pkg/cachemanager/parser/syncannotationreader_test.go diff --git a/pkg/controller/config/config_controller.go b/pkg/controller/config/config_controller.go index 406ab6f5b02..4d3788d3222 100644 --- a/pkg/controller/config/config_controller.go +++ b/pkg/controller/config/config_controller.go @@ -22,24 +22,19 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/externaldata" configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + cm "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" - syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" "github.com/open-policy-agent/gatekeeper/v3/pkg/keys" - "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" - syncutil "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" - cm "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil/cachemanager" - "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -55,21 +50,15 @@ const ( var log = logf.Log.WithName("controller").WithValues("kind", "Config") type Adder struct { - Opa *constraintclient.Client - WatchManager *watch.Manager ControllerSwitch *watch.ControllerSwitch Tracker *readiness.Tracker - ProcessExcluder *process.Excluder - WatchSet *watch.Set + CacheManager *cm.CacheManager } // Add creates a new ConfigController and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func (a *Adder) Add(mgr manager.Manager) error { - // Events will be used to receive events from dynamic watches registered - // via the registrar below. - events := make(chan event.GenericEvent, 1024) - r, err := newReconciler(mgr, a.Opa, a.WatchManager, a.ControllerSwitch, a.Tracker, a.ProcessExcluder, events, a.WatchSet, events) + r, err := newReconciler(mgr, a.CacheManager, a.ControllerSwitch, a.Tracker) if err != nil { return err } @@ -77,13 +66,9 @@ func (a *Adder) Add(mgr manager.Manager) error { return add(mgr, r) } -func (a *Adder) InjectOpa(o *constraintclient.Client) { - a.Opa = o -} +func (a *Adder) InjectOpa(_ *constraintclient.Client) {} -func (a *Adder) InjectWatchManager(wm *watch.Manager) { - a.WatchManager = wm -} +func (a *Adder) InjectWatchManager(_ *watch.Manager) {} func (a *Adder) InjectControllerSwitch(cs *watch.ControllerSwitch) { a.ControllerSwitch = cs @@ -93,60 +78,30 @@ func (a *Adder) InjectTracker(t *readiness.Tracker) { a.Tracker = t } -func (a *Adder) InjectProcessExcluder(m *process.Excluder) { - a.ProcessExcluder = m -} - func (a *Adder) InjectMutationSystem(mutationSystem *mutation.System) {} func (a *Adder) InjectExpansionSystem(expansionSystem *expansion.System) {} func (a *Adder) InjectProviderCache(providerCache *externaldata.ProviderCache) {} -func (a *Adder) InjectWatchSet(watchSet *watch.Set) { - a.WatchSet = watchSet +func (a *Adder) InjectCacheManager(cm *cm.CacheManager) { + a.CacheManager = cm } -// newReconciler returns a new reconcile.Reconciler -// events is the channel from which sync controller will receive the events -// regEvents is the channel registered by Registrar to put the events in -// events and regEvents point to same event channel except for testing. -func newReconciler(mgr manager.Manager, opa syncutil.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch, tracker *readiness.Tracker, processExcluder *process.Excluder, events <-chan event.GenericEvent, watchSet *watch.Set, regEvents chan<- event.GenericEvent) (*ReconcileConfig, error) { - filteredOpa := syncutil.NewFilteredOpaDataClient(opa, watchSet) - syncMetricsCache := syncutil.NewMetricsCache() - cm := cm.NewCacheManager(opa, syncMetricsCache, tracker, processExcluder) - - syncAdder := syncc.Adder{ - Events: events, - CacheManager: cm, - } - // Create subordinate controller - we will feed it events dynamically via watch - if err := syncAdder.Add(mgr); err != nil { - return nil, fmt.Errorf("registering sync controller: %w", err) - } - - if watchSet == nil { - return nil, fmt.Errorf("watchSet must be non-nil") +// newReconciler returns a new reconcile.Reconciler. +func newReconciler(mgr manager.Manager, cm *cm.CacheManager, cs *watch.ControllerSwitch, tracker *readiness.Tracker) (*ReconcileConfig, error) { + if cm == nil { + return nil, fmt.Errorf("cacheManager must be non-nil") } - w, err := wm.NewRegistrar( - ctrlName, - regEvents) - if err != nil { - return nil, err - } return &ReconcileConfig{ - reader: mgr.GetCache(), - writer: mgr.GetClient(), - statusClient: mgr.GetClient(), - scheme: mgr.GetScheme(), - opa: filteredOpa, - cs: cs, - watcher: w, - watched: watchSet, - syncMetricsCache: syncMetricsCache, - tracker: tracker, - processExcluder: processExcluder, + reader: mgr.GetCache(), + writer: mgr.GetClient(), + statusClient: mgr.GetClient(), + scheme: mgr.GetScheme(), + cs: cs, + cacheManager: cm, + tracker: tracker, }, nil } @@ -175,18 +130,11 @@ type ReconcileConfig struct { writer client.Writer statusClient client.StatusClient - scheme *runtime.Scheme - opa syncutil.OpaDataClient - syncMetricsCache *syncutil.MetricsCache - cs *watch.ControllerSwitch - watcher *watch.Registrar + scheme *runtime.Scheme + cacheManager *cm.CacheManager + cs *watch.ControllerSwitch - watched *watch.Set - - needsReplay *watch.Set - needsWipe bool - tracker *readiness.Tracker - processExcluder *process.Excluder + tracker *readiness.Tracker } // +kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch @@ -237,15 +185,15 @@ func (r *ReconcileConfig) Reconcile(ctx context.Context, request reconcile.Reque } } - newSyncOnly := watch.NewSet() newExcluder := process.New() var statsEnabled bool // If the config is being deleted the user is saying they don't want to // sync anything + gvksToSync := []schema.GroupVersionKind{} if exists && instance.GetDeletionTimestamp().IsZero() { for _, entry := range instance.Spec.Sync.SyncOnly { gvk := schema.GroupVersionKind{Group: entry.Group, Version: entry.Version, Kind: entry.Kind} - newSyncOnly.Add(gvk) + gvksToSync = append(gvksToSync, gvk) } newExcluder.Add(instance.Spec.Match) @@ -261,146 +209,15 @@ func (r *ReconcileConfig) Reconcile(ctx context.Context, request reconcile.Reque r.tracker.DisableStats() } - // Remove expectations for resources we no longer watch. - diff := r.watched.Difference(newSyncOnly) - r.removeStaleExpectations(diff) - - // If the watch set has not changed, we're done here. - if r.watched.Equals(newSyncOnly) && r.processExcluder.Equals(newExcluder) { - // ...unless we have pending wipe / replay operations from a previous reconcile. - if !(r.needsWipe || r.needsReplay != nil) { - return reconcile.Result{}, nil - } - - // If we reach here, the watch set hasn't changed since last reconcile, but we - // have unfinished wipe/replay business from the last change. - } else { - // The watch set _has_ changed, so recalculate the replay set. - r.needsReplay = nil - r.needsWipe = true - } - - // --- Start watching the new set --- - - // This must happen first - signals to the opa client in the sync controller - // to drop events from no-longer-watched resources that may be in its queue. - if r.needsReplay == nil { - r.needsReplay = r.watched.Intersection(newSyncOnly) - } - - // Wipe all data to avoid stale state if needed. Happens once per watch-set-change. - if err := r.wipeCacheIfNeeded(ctx); err != nil { - return reconcile.Result{}, fmt.Errorf("wiping opa data cache: %w", err) - } - - r.watched.Replace(newSyncOnly, func() { - // swapping with the new excluder - r.processExcluder.Replace(newExcluder) - - // *Note the following steps are not transactional with respect to admission control* - - // Important: dynamic watches update must happen *after* updating our watchSet. - // Otherwise, the sync controller will drop events for the newly watched kinds. - // Defer error handling so object re-sync happens even if the watch is hard - // errored due to a missing GVK in the watch set. - err = r.watcher.ReplaceWatch(ctx, newSyncOnly.Items()) - }) - if err != nil { - return reconcile.Result{}, err - } - - // Replay cached data for any resources that were previously watched and still in the watch set. - // This is necessary because we wipe their data from Opa above. - // TODO(OREN): Improve later by selectively removing subtrees of data instead of a full wipe. - if err := r.replayData(ctx); err != nil { - return reconcile.Result{}, fmt.Errorf("replaying data: %w", err) + r.cacheManager.ExcludeProcesses(newExcluder) + configSourceKey := aggregator.Key{Source: "config", ID: request.NamespacedName.String()} + if err := r.cacheManager.UpsertSource(ctx, configSourceKey, gvksToSync); err != nil { + return reconcile.Result{Requeue: true}, fmt.Errorf("config-controller: error establishing watches for new syncOny: %w", err) } return reconcile.Result{}, nil } -func (r *ReconcileConfig) wipeCacheIfNeeded(ctx context.Context) error { - if r.needsWipe { - if _, err := r.opa.RemoveData(ctx, target.WipeData()); err != nil { - return err - } - - // reset sync cache before sending the metric - r.syncMetricsCache.ResetCache() - r.syncMetricsCache.ReportSync() - - r.needsWipe = false - } - return nil -} - -// replayData replays all watched and cached data into Opa following a config set change. -// In the future we can rework this to avoid the full opa data cache wipe. -func (r *ReconcileConfig) replayData(ctx context.Context) error { - if r.needsReplay == nil { - return nil - } - for _, gvk := range r.needsReplay.Items() { - u := &unstructured.UnstructuredList{} - u.SetGroupVersionKind(schema.GroupVersionKind{ - Group: gvk.Group, - Version: gvk.Version, - Kind: gvk.Kind + "List", - }) - err := r.reader.List(ctx, u) - if err != nil { - return fmt.Errorf("replaying data for %+v: %w", gvk, err) - } - - defer r.syncMetricsCache.ReportSync() - - for i := range u.Items { - syncKey := syncutil.GetKeyForSyncMetrics(u.Items[i].GetNamespace(), u.Items[i].GetName()) - - isExcludedNamespace, err := r.skipExcludedNamespace(&u.Items[i]) - if err != nil { - log.Error(err, "error while excluding namespaces") - } - - if isExcludedNamespace { - continue - } - - if _, err := r.opa.AddData(ctx, &u.Items[i]); err != nil { - r.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ - Kind: u.Items[i].GetKind(), - Status: metrics.ErrorStatus, - }) - return fmt.Errorf("adding data for %+v: %w", gvk, err) - } - - r.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ - Kind: u.Items[i].GetKind(), - Status: metrics.ActiveStatus, - }) - } - r.needsReplay.Remove(gvk) - } - r.needsReplay = nil - return nil -} - -// removeStaleExpectations stops tracking data for any resources that are no longer watched. -func (r *ReconcileConfig) removeStaleExpectations(stale *watch.Set) { - for _, gvk := range stale.Items() { - r.tracker.CancelData(gvk) - } -} - -func (r *ReconcileConfig) skipExcludedNamespace(obj *unstructured.Unstructured) (bool, error) { - isNamespaceExcluded, err := r.processExcluder.IsNamespaceExcluded(process.Sync, obj) - if err != nil { - return false, err - } - - return isNamespaceExcluded, err -} - func containsString(s string, items []string) bool { for _, item := range items { if item == s { diff --git a/pkg/controller/config/config_controller_suite_test.go b/pkg/controller/config/config_controller_suite_test.go index a10be3452be..3932f427015 100644 --- a/pkg/controller/config/config_controller_suite_test.go +++ b/pkg/controller/config/config_controller_suite_test.go @@ -20,6 +20,7 @@ import ( stdlog "log" "os" "path/filepath" + "sync" "testing" "github.com/open-policy-agent/gatekeeper/v3/apis" @@ -62,12 +63,12 @@ func TestMain(m *testing.M) { // SetupTestReconcile returns a reconcile.Reconcile implementation that delegates to inner and // writes the request to requests after Reconcile is finished. -func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) { - requests := make(chan reconcile.Request) +func SetupTestReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, *sync.Map) { + var requests sync.Map fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { result, err := inner.Reconcile(ctx, req) - requests <- req + requests.Store(req, struct{}{}) return result, err }) - return fn, requests + return fn, &requests } diff --git a/pkg/controller/config/config_controller_test.go b/pkg/controller/config/config_controller_test.go index c9274362e1b..7e85c13f8ae 100644 --- a/pkg/controller/config/config_controller_test.go +++ b/pkg/controller/config/config_controller_test.go @@ -17,7 +17,6 @@ package config import ( "fmt" - gosync "sync" "testing" "time" @@ -26,17 +25,23 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/client/drivers/rego" configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" "github.com/open-policy-agent/gatekeeper/v3/pkg/wildcard" testclient "github.com/open-policy-agent/gatekeeper/v3/test/clients" "github.com/open-policy-agent/gatekeeper/v3/test/testutils" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/net/context" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -53,11 +58,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{ - Name: "config", - Namespace: "gatekeeper-system", -}} - const timeout = time.Second * 20 // setupManager sets up a controller-runtime manager with registered watch manager. @@ -89,7 +89,11 @@ func setupManager(t *testing.T) (manager.Manager, *watch.Manager) { } func TestReconcile(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + instance := &configv1alpha1.Config{ ObjectMeta: metav1.ObjectMeta{ Name: "config", @@ -114,22 +118,10 @@ func TestReconcile(t *testing.T) { }, }, } - - // Set up the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a - // channel when it is finished. mgr, wm := setupManager(t) c := testclient.NewRetryClient(mgr.GetClient()) - // initialize OPA - driver, err := rego.New(rego.Tracing(true)) - if err != nil { - t.Fatalf("unable to set up Driver: %v", err) - } - - opaClient, err := constraintclient.NewClient(constraintclient.Targets(&target.K8sValidationTarget{}), constraintclient.Driver(driver)) - if err != nil { - t.Fatalf("unable to set up OPA client: %s", err) - } + opaClient := &fakes.FakeCfClient{} cs := watch.NewSwitch() tracker, err := readiness.SetupTracker(mgr, false, false, false) @@ -139,25 +131,34 @@ func TestReconcile(t *testing.T) { processExcluder := process.Get() processExcluder.Add(instance.Spec.Match) events := make(chan event.GenericEvent, 1024) - watchSet := watch.NewSet() - rec, _ := newReconciler(mgr, opaClient, wm, cs, tracker, processExcluder, events, watchSet, events) + syncMetricsCache := syncutil.NewMetricsCache() + reg, err := wm.NewRegistrar( + cachemanager.RegistrarName, + events) + require.NoError(t, err) + cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{ + CfClient: opaClient, + SyncMetricsCache: syncMetricsCache, + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: c, + }) + require.NoError(t, err) + + // start the cache manager + go func() { + assert.NoError(t, cacheManager.Start(ctx)) + }() + + rec, err := newReconciler(mgr, cacheManager, cs, tracker) + require.NoError(t, err) + // Wrap the Controller Reconcile function so it writes each request to a map when it is finished reconciling. recFn, requests := SetupTestReconcile(rec) - err = add(mgr, recFn) - if err != nil { - t.Fatal(err) - } + require.NoError(t, add(mgr, recFn)) - ctx, cancelFunc := context.WithCancel(context.Background()) testutils.StartManager(ctx, t, mgr) - once := gosync.Once{} - testMgrStopped := func() { - once.Do(func() { - cancelFunc() - }) - } - - defer testMgrStopped() // Create the Config object and expect the Reconcile to be created err = c.Create(ctx, instance) @@ -172,10 +173,20 @@ func TestReconcile(t *testing.T) { t.Fatal(err) } }() - g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest))) + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: types.NamespacedName{ + Name: "config", + Namespace: "gatekeeper-system", + }} + _, ok := requests.Load(expectedReq) + return ok + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + g.Eventually(func() int { + return len(wm.GetManagedGVK()) + }).WithTimeout(timeout).ShouldNot(gomega.Equal(0)) gvks := wm.GetManagedGVK() - g.Eventually(len(gvks), timeout).ShouldNot(gomega.Equal(0)) wantGVKs := []schema.GroupVersionKind{ {Group: "", Version: "v1", Kind: "Namespace"}, @@ -251,7 +262,28 @@ func TestReconcile(t *testing.T) { t.Fatal(err) } - testMgrStopped() + fooNs := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + require.NoError(t, c.Create(ctx, fooNs)) + fooPod.Object["spec"] = map[string]interface{}{ + "containers": []map[string]interface{}{ + { + "name": "foo-container", + "image": "foo-image", + }, + }, + } + + // directly call cacheManager to avoid any race condition + // between adding the pod and the sync_controller calling AddObject + require.NoError(t, cacheManager.AddObject(ctx, fooPod)) + + // fooPod should be namespace excluded, hence not added to the cache + require.False(t, opaClient.Contains(map[fakes.CfDataKey]interface{}{{Gvk: fooPod.GroupVersionKind(), Key: "default"}: struct{}{}})) + cs.Stop() } @@ -281,7 +313,10 @@ func TestConfig_DeleteSyncResources(t *testing.T) { }, }, } - ctx := context.Background() + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + err := c.Create(ctx, instance) if err != nil { t.Fatal(err) @@ -324,20 +359,12 @@ func TestConfig_DeleteSyncResources(t *testing.T) { events := make(chan event.GenericEvent, 1024) // set up controller and add it to the manager - err = setupController(mgr, wm, tracker, events) - if err != nil { - t.Fatal(err) - } + _, err = setupController(ctx, mgr, wm, tracker, events, c, false) + require.NoError(t, err, "failed to set up controller") // start manager that will start tracker and controller - ctx, cancelFunc := context.WithCancel(context.Background()) testutils.StartManager(ctx, t, mgr) - once := gosync.Once{} - defer func() { - once.Do(func() { - cancelFunc() - }) - }() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} // get the object tracker for the synconly pod resource @@ -374,35 +401,77 @@ func TestConfig_DeleteSyncResources(t *testing.T) { }, timeout).Should(gomega.BeTrue()) } -func setupController(mgr manager.Manager, wm *watch.Manager, tracker *readiness.Tracker, events <-chan event.GenericEvent) error { +func setupController(ctx context.Context, mgr manager.Manager, wm *watch.Manager, tracker *readiness.Tracker, events chan event.GenericEvent, reader client.Reader, useFakeOpa bool) (cachemanager.CFDataClient, error) { // initialize OPA - driver, err := rego.New(rego.Tracing(true)) - if err != nil { - return fmt.Errorf("unable to set up Driver: %w", err) - } + var opaClient cachemanager.CFDataClient + if useFakeOpa { + opaClient = &fakes.FakeCfClient{} + } else { + driver, err := rego.New(rego.Tracing(true)) + if err != nil { + return nil, fmt.Errorf("unable to set up Driver: %w", err) + } - opaClient, err := constraintclient.NewClient(constraintclient.Targets(&target.K8sValidationTarget{}), constraintclient.Driver(driver)) - if err != nil { - return fmt.Errorf("unable to set up OPA backend client: %w", err) + opaClient, err = constraintclient.NewClient(constraintclient.Targets(&target.K8sValidationTarget{}), constraintclient.Driver(driver)) + if err != nil { + return nil, fmt.Errorf("unable to set up OPA backend client: %w", err) + } } // ControllerSwitch will be used to disable controllers during our teardown process, // avoiding conflicts in finalizer cleanup. cs := watch.NewSwitch() - processExcluder := process.Get() + syncMetricsCache := syncutil.NewMetricsCache() + reg, err := wm.NewRegistrar( + cachemanager.RegistrarName, + events) + if err != nil { + return nil, fmt.Errorf("cannot create registrar: %w", err) + } + cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{ + CfClient: opaClient, + SyncMetricsCache: syncMetricsCache, + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: reader, + }) + if err != nil { + return nil, fmt.Errorf("error creating cache manager: %w", err) + } + go func() { + _ = cacheManager.Start(ctx) + }() - watchSet := watch.NewSet() - rec, _ := newReconciler(mgr, opaClient, wm, cs, tracker, processExcluder, events, watchSet, nil) + rec, err := newReconciler(mgr, cacheManager, cs, tracker) + if err != nil { + return nil, fmt.Errorf("creating reconciler: %w", err) + } err = add(mgr, rec) if err != nil { - return fmt.Errorf("adding reconciler to manager: %w", err) + return nil, fmt.Errorf("adding reconciler to manager: %w", err) } - return nil + + syncAdder := syncc.Adder{ + Events: events, + CacheManager: cacheManager, + } + err = syncAdder.Add(mgr) + if err != nil { + return nil, fmt.Errorf("registering sync controller: %w", err) + } + return opaClient, nil } // Verify the Opa cache is populated based on the config resource. func TestConfig_CacheContents(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + // Setup the Manager and Controller. + mgr, wm := setupManager(t) + c := testclient.NewRetryClient(mgr.GetClient()) g := gomega.NewGomegaWithT(t) nsGVK := schema.GroupVersionKind{ Group: "", @@ -414,113 +483,58 @@ func TestConfig_CacheContents(t *testing.T) { Version: "v1", Kind: "ConfigMap", } - instance := configFor([]schema.GroupVersionKind{ - nsGVK, - configMapGVK, + // Create a configMap to test for + cm := unstructuredFor(configMapGVK, "config-test-1") + cm.SetNamespace("default") + require.NoError(t, c.Create(ctx, cm), "creating configMap config-test-1") + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, cm), "deleting configMap config-test-1") }) + cmKey, err := fakes.KeyFor(cm) + require.NoError(t, err) - // Setup the Manager and Controller. - mgr, wm := setupManager(t) - c := testclient.NewRetryClient(mgr.GetClient()) + cm2 := unstructuredFor(configMapGVK, "config-test-2") + cm2.SetNamespace("kube-system") + require.NoError(t, c.Create(ctx, cm2), "creating configMap config-test-2") + t.Cleanup(func() { + assert.NoError(t, deleteResource(ctx, c, cm2), "deleting configMap config-test-2") + }) + cm2Key, err := fakes.KeyFor(cm2) + require.NoError(t, err) - opaClient := &fakes.FakeOpa{} - cs := watch.NewSwitch() tracker, err := readiness.SetupTracker(mgr, false, false, false) - if err != nil { - t.Fatal(err) - } - processExcluder := process.Get() - processExcluder.Add(instance.Spec.Match) + require.NoError(t, err) events := make(chan event.GenericEvent, 1024) - watchSet := watch.NewSet() - rec, _ := newReconciler(mgr, opaClient, wm, cs, tracker, processExcluder, events, watchSet, events) - err = add(mgr, rec) - if err != nil { - t.Fatal(err) - } + opa, err := setupController(ctx, mgr, wm, tracker, events, c, true) + require.NoError(t, err, "failed to set up controller") - ctx, cancelFunc := context.WithCancel(context.Background()) - testutils.StartManager(ctx, t, mgr) - once := gosync.Once{} - testMgrStopped := func() { - once.Do(func() { - cancelFunc() - }) - } + opaClient, ok := opa.(*fakes.FakeCfClient) + require.True(t, ok) - defer testMgrStopped() + testutils.StartManager(ctx, t, mgr) // Create the Config object and expect the Reconcile to be created - ctx = context.Background() - - instance = configFor([]schema.GroupVersionKind{nsGVK, configMapGVK}) + config := configFor([]schema.GroupVersionKind{nsGVK, configMapGVK}) + require.NoError(t, c.Create(ctx, config), "creating Config config") - // Since we're reusing instance between tests, we must wait for it to be fully - // deleted. We also can't reuse the same instance without introducing - // flakiness as client.Client methods modify their input. - g.Eventually(ensureDeleted(ctx, c, instance), timeout). - ShouldNot(gomega.HaveOccurred()) - g.Eventually(ensureCreated(ctx, c, instance), timeout). - ShouldNot(gomega.HaveOccurred()) - - t.Cleanup(func() { - err = c.Delete(ctx, instance) - if !apierrors.IsNotFound(err) { - t.Errorf("got Delete(instance) error %v, want IsNotFound", err) - } - }) - - // Create a configMap to test for - cm := unstructuredFor(configMapGVK, "config-test-1") - cm.SetNamespace("default") - err = c.Create(ctx, cm) - if err != nil { - t.Fatalf("creating configMap config-test-1: %v", err) - } - - cm2 := unstructuredFor(configMapGVK, "config-test-2") - cm2.SetNamespace("kube-system") - err = c.Create(ctx, cm2) - if err != nil { - t.Fatalf("creating configMap config-test-2: %v", err) - } - - defer func() { - err = c.Delete(ctx, cm) - if err != nil { - t.Fatal(err) - } - err = c.Delete(ctx, cm2) - if err != nil { - t.Fatal(err) - } - }() - - expected := map[fakes.OpaKey]interface{}{ - {Gvk: nsGVK, Key: "default"}: nil, - {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, + expected := map[fakes.CfDataKey]interface{}{ + {Gvk: nsGVK, Key: "default"}: nil, + cmKey: nil, // kube-system namespace is being excluded, it should not be in opa cache } g.Eventually(func() bool { return opaClient.Contains(expected) }, 10*time.Second).Should(gomega.BeTrue(), "checking initial opa cache contents") - - // Sanity - if !opaClient.HasGVK(nsGVK) { - t.Fatal("want opaClient.HasGVK(nsGVK) to be true but got false") - } + require.True(t, opaClient.HasGVK(nsGVK), "want opaClient.HasGVK(nsGVK) to be true but got false") // Reconfigure to drop the namespace watches - instance = configFor([]schema.GroupVersionKind{configMapGVK}) - forUpdate := instance.DeepCopy() - _, err = controllerutil.CreateOrUpdate(ctx, c, forUpdate, func() error { - forUpdate.Spec = instance.Spec - return nil - }) - if err != nil { - t.Fatalf("updating Config resource: %v", err) - } + config = configFor([]schema.GroupVersionKind{configMapGVK}) + configUpdate := config.DeepCopy() + + require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(configUpdate), configUpdate)) + configUpdate.Spec = config.Spec + require.NoError(t, c.Update(ctx, configUpdate), "updating Config config") // Expect namespaces to go away from cache g.Eventually(func() bool { @@ -529,21 +543,15 @@ func TestConfig_CacheContents(t *testing.T) { // Expect our configMap to return at some point // TODO: In the future it will remain instead of having to repopulate. - expected = map[fakes.OpaKey]interface{}{ - { - Gvk: configMapGVK, - Key: "default/config-test-1", - }: nil, + expected = map[fakes.CfDataKey]interface{}{ + cmKey: nil, } g.Eventually(func() bool { return opaClient.Contains(expected) }, 10*time.Second).Should(gomega.BeTrue(), "waiting for ConfigMap to repopulate in cache") - expected = map[fakes.OpaKey]interface{}{ - { - Gvk: configMapGVK, - Key: "kube-system/config-test-2", - }: nil, + expected = map[fakes.CfDataKey]interface{}{ + cm2Key: nil, } g.Eventually(func() bool { return !opaClient.Contains(expected) @@ -553,10 +561,7 @@ func TestConfig_CacheContents(t *testing.T) { if opaClient.Len() == 0 { t.Fatal("sanity") } - err = c.Delete(ctx, instance) - if err != nil { - t.Fatalf("deleting Config resource: %v", err) - } + require.NoError(t, c.Delete(ctx, config), "deleting Config resource") // The cache will be cleared out. g.Eventually(func() int { @@ -565,6 +570,9 @@ func TestConfig_CacheContents(t *testing.T) { } func TestConfig_Retries(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) nsGVK := schema.GroupVersionKind{ Group: "", @@ -576,15 +584,13 @@ func TestConfig_Retries(t *testing.T) { Version: "v1", Kind: "ConfigMap", } - instance := configFor([]schema.GroupVersionKind{ - configMapGVK, - }) + instance := configFor([]schema.GroupVersionKind{nsGVK, configMapGVK}) // Setup the Manager and Controller. mgr, wm := setupManager(t) c := testclient.NewRetryClient(mgr.GetClient()) - opaClient := &fakes.FakeOpa{} + opaClient := &fakes.FakeCfClient{} cs := watch.NewSwitch() tracker, err := readiness.SetupTracker(mgr, false, false, false) if err != nil { @@ -594,44 +600,52 @@ func TestConfig_Retries(t *testing.T) { processExcluder.Add(instance.Spec.Match) events := make(chan event.GenericEvent, 1024) - watchSet := watch.NewSet() - rec, _ := newReconciler(mgr, opaClient, wm, cs, tracker, processExcluder, events, watchSet, events) + syncMetricsCache := syncutil.NewMetricsCache() + reg, err := wm.NewRegistrar( + cachemanager.RegistrarName, + events) + require.NoError(t, err) + cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{ + CfClient: opaClient, + SyncMetricsCache: syncMetricsCache, + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: c, + }) + require.NoError(t, err) + go func() { + assert.NoError(t, cacheManager.Start(ctx)) + }() + + rec, _ := newReconciler(mgr, cacheManager, cs, tracker) err = add(mgr, rec) if err != nil { t.Fatal(err) } + syncAdder := syncc.Adder{ + Events: events, + CacheManager: cacheManager, + } + require.NoError(t, syncAdder.Add(mgr), "registering sync controller") - // Use our special hookReader to inject controlled failures - failPlease := make(chan string, 1) - rec.reader = hookReader{ + // Use our special reader interceptor to inject controlled failures + fi := fakes.NewFailureInjector() + rec.reader = fakes.SpyReader{ Reader: mgr.GetCache(), ListFunc: func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - // Return an error the first go-around. - var failKind string - select { - case failKind = <-failPlease: - default: - } - if failKind != "" && list.GetObjectKind().GroupVersionKind().Kind == failKind { + // return as many syntenthic failures as there are registered for this kind + if fi.CheckFailures(list.GetObjectKind().GroupVersionKind().Kind) { return fmt.Errorf("synthetic failure") } + return mgr.GetCache().List(ctx, list, opts...) }, } - ctx, cancelFunc := context.WithCancel(context.Background()) testutils.StartManager(ctx, t, mgr) - once := gosync.Once{} - testMgrStopped := func() { - once.Do(func() { - cancelFunc() - }) - } - - defer testMgrStopped() // Create the Config object and expect the Reconcile to be created - ctx = context.Background() g.Eventually(func() error { return c.Create(ctx, instance.DeepCopy()) }, timeout).Should(gomega.BeNil()) @@ -658,28 +672,20 @@ func TestConfig_Retries(t *testing.T) { t.Error(err) } }() + cmKey, err := fakes.KeyFor(cm) + require.NoError(t, err) - expected := map[fakes.OpaKey]interface{}{ - {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, + expected := map[fakes.CfDataKey]interface{}{ + cmKey: nil, } g.Eventually(func() bool { return opaClient.Contains(expected) }, 10*time.Second).Should(gomega.BeTrue(), "checking initial opa cache contents") - // Wipe the opa cache, we want to see it repopulate despite transient replay errors below. - _, err = opaClient.RemoveData(ctx, target.WipeData()) - if err != nil { - t.Fatalf("wiping opa cache: %v", err) - } - if opaClient.Contains(expected) { - t.Fatal("wipe failed") - } + fi.SetFailures("ConfigMapList", 2) - // Make List fail once for ConfigMaps as the replay occurs following the reconfig below. - failPlease <- "ConfigMapList" - - // Reconfigure to add a namespace watch. - instance = configFor([]schema.GroupVersionKind{nsGVK, configMapGVK}) + // Reconfigure to force an internal replay. + instance = configFor([]schema.GroupVersionKind{configMapGVK}) forUpdate := instance.DeepCopy() _, err = controllerutil.CreateOrUpdate(ctx, c, forUpdate, func() error { forUpdate.Spec = instance.Spec @@ -740,65 +746,15 @@ type testExpectations interface { IsExpecting(gvk schema.GroupVersionKind, nsName types.NamespacedName) bool } -// ensureDeleted -// -// This package uses the same API server process across multiple test functions. -// The residual state from a previous test function can cause flakes. -// -// To ensure a clean slate, we must verify that any previously applied Config object -// has been fully removed before applying our new object. -func ensureDeleted(ctx context.Context, c client.Client, toDelete client.Object) func() error { - gvk := toDelete.GetObjectKind().GroupVersionKind() - key := client.ObjectKeyFromObject(toDelete) - - return func() error { - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(gvk) - - err := c.Get(ctx, key, u) - if apierrors.IsNotFound(err) { - return nil - } else if err != nil { - return err - } - - if !u.GetDeletionTimestamp().IsZero() { - return fmt.Errorf("waiting for deletion: %v %v", gvk, key) - } - - err = c.Delete(ctx, u) - if err != nil { - return fmt.Errorf("deleting %v %v: %w", gvk, key, err) - } - - return fmt.Errorf("queued %v %v for deletion", gvk, key) +func deleteResource(ctx context.Context, c client.Client, resounce *unstructured.Unstructured) error { + if ctx.Err() != nil { + ctx = context.Background() } -} - -// ensureCreated attempts to create toCreate in Client c as toCreate existed when ensureCreated was called. -func ensureCreated(ctx context.Context, c client.Client, toCreate client.Object) func() error { - gvk := toCreate.GetObjectKind().GroupVersionKind() - key := client.ObjectKeyFromObject(toCreate) - - // As ensureCreated returns a closure, it is possible that the value toCreate will be modified after ensureCreated - // is called but before the closure is called. Creating a copy here ensures the object to be created is consistent - // with the way it existed when ensureCreated was called. - toCreateCopy := toCreate.DeepCopyObject() - - return func() error { - instance, ok := toCreateCopy.(client.Object) - if !ok { - return fmt.Errorf("instance was %T which is not a client.Object", instance) - } - - err := c.Create(ctx, instance) - if apierrors.IsAlreadyExists(err) { - return fmt.Errorf("a copy of %v %v already exists - run ensureDeleted to ensure a fresh copy exists for testing", - gvk, key) - } else if err != nil { - return fmt.Errorf("creating %v %v: %w", gvk, key, err) - } - + err := c.Delete(ctx, resounce) + if apierrors.IsNotFound(err) { + // resource does not exist, this is good return nil } + + return err } diff --git a/pkg/controller/config/fakes_test.go b/pkg/controller/config/fakes_test.go deleted file mode 100644 index 3c209d4e3d4..00000000000 --- a/pkg/controller/config/fakes_test.go +++ /dev/null @@ -1,35 +0,0 @@ -/* - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "context" - - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// hookReader is a client.Reader with overrideable methods. -type hookReader struct { - client.Reader - ListFunc func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error -} - -func (r hookReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - if r.ListFunc != nil { - return r.ListFunc(ctx, list, opts...) - } - return r.Reader.List(ctx, list, opts...) -} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2fec2fcda8e..97f4b1760bc 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -18,12 +18,15 @@ package controller import ( "context" "flag" + "fmt" "os" "sync" constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/externaldata" + cm "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" @@ -37,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -57,18 +61,14 @@ type GetPodInjector interface { InjectGetPod(func(context.Context) (*corev1.Pod, error)) } -type GetProcessExcluderInjector interface { - InjectProcessExcluder(processExcluder *process.Excluder) -} - -type WatchSetInjector interface { - InjectWatchSet(watchSet *watch.Set) -} - type PubsubInjector interface { InjectPubsubSystem(pubsubSystem *pubsub.System) } +type CacheManagerInjector interface { + InjectCacheManager(cm *cm.CacheManager) +} + // Injectors is a list of adder structs that need injection. We can convert this // to an interface once we create controllers for things like data sync. var Injectors []Injector @@ -87,8 +87,9 @@ type Dependencies struct { MutationSystem *mutation.System ExpansionSystem *expansion.System ProviderCache *externaldata.ProviderCache - WatchSet *watch.Set PubsubSystem *pubsub.System + SyncEventsCh chan event.GenericEvent + CacheMgr *cm.CacheManager } type defaultPodGetter struct { @@ -160,6 +161,22 @@ func AddToManager(m manager.Manager, deps *Dependencies) error { } deps.GetPod = fakePodGetter } + + // Adding the CacheManager as a runnable; + // manager will start CacheManager. + if err := m.Add(deps.CacheMgr); err != nil { + return fmt.Errorf("error adding cache manager as a runnable: %w", err) + } + + syncAdder := syncc.Adder{ + Events: deps.SyncEventsCh, + CacheManager: deps.CacheMgr, + } + // Create subordinate controller - we will feed it events dynamically via watch + if err := syncAdder.Add(m); err != nil { + return fmt.Errorf("registering sync controller: %w", err) + } + for _, a := range Injectors { a.InjectOpa(deps.Opa) a.InjectWatchManager(deps.WatchManger) @@ -171,15 +188,14 @@ func AddToManager(m manager.Manager, deps *Dependencies) error { if a2, ok := a.(GetPodInjector); ok { a2.InjectGetPod(deps.GetPod) } - if a2, ok := a.(GetProcessExcluderInjector); ok { - a2.InjectProcessExcluder(deps.ProcessExcluder) - } - if a2, ok := a.(WatchSetInjector); ok { - a2.InjectWatchSet(deps.WatchSet) - } if a2, ok := a.(PubsubInjector); ok { a2.InjectPubsubSystem(deps.PubsubSystem) } + if a2, ok := a.(CacheManagerInjector); ok { + // this is used by the config controller to sync + a2.InjectCacheManager(deps.CacheMgr) + } + if err := a.Add(m); err != nil { return err } diff --git a/pkg/controller/sync/sync_controller.go b/pkg/controller/sync/sync_controller.go index 7dd5630bead..a74655c2135 100644 --- a/pkg/controller/sync/sync_controller.go +++ b/pkg/controller/sync/sync_controller.go @@ -20,10 +20,10 @@ import ( "time" "github.com/go-logr/logr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" - cm "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -41,7 +41,7 @@ import ( var log = logf.Log.WithName("controller").WithValues("metaKind", "Sync") type Adder struct { - CacheManager *cm.CacheManager + CacheManager *cachemanager.CacheManager Events <-chan event.GenericEvent } @@ -65,14 +65,14 @@ func (a *Adder) Add(mgr manager.Manager) error { func newReconciler( mgr manager.Manager, reporter syncutil.Reporter, - cmt *cm.CacheManager, + cm *cachemanager.CacheManager, ) reconcile.Reconciler { return &ReconcileSync{ reader: mgr.GetCache(), scheme: mgr.GetScheme(), log: log, reporter: reporter, - cm: cmt, + cm: cm, } } @@ -103,7 +103,7 @@ type ReconcileSync struct { scheme *runtime.Scheme log logr.Logger reporter syncutil.Reporter - cm *cm.CacheManager + cm *cachemanager.CacheManager } // +kubebuilder:rbac:groups=constraints.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete @@ -172,13 +172,10 @@ func (r *ReconcileSync) Reconcile(ctx context.Context, request reconcile.Request logging.ResourceName, instance.GetName(), ) + reportMetrics = true if err := r.cm.AddObject(ctx, instance); err != nil { - reportMetrics = true - return reconcile.Result{}, err } - reportMetrics = true - return reconcile.Result{}, nil } diff --git a/pkg/fakes/opadataclient.go b/pkg/fakes/fakecfdataclient.go similarity index 62% rename from pkg/fakes/opadataclient.go rename to pkg/fakes/fakecfdataclient.go index 76c117e3e94..ba81b4fe255 100644 --- a/pkg/fakes/opadataclient.go +++ b/pkg/fakes/fakecfdataclient.go @@ -19,43 +19,40 @@ import ( gosync "sync" constraintTypes "github.com/open-policy-agent/frameworks/constraint/pkg/types" - "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" ) -type OpaKey struct { +type CfDataKey struct { Gvk schema.GroupVersionKind Key string } -// FakeOpa is an OpaDataClient for testing. -type FakeOpa struct { +// FakeCfClient is an CfDataClient for testing. +type FakeCfClient struct { mu gosync.Mutex - data map[OpaKey]interface{} + data map[CfDataKey]interface{} needsToError bool } -var _ syncutil.OpaDataClient = &FakeOpa{} - -// keyFor returns an opaKey for the provided resource. +// KeyFor returns a CfDataKey for the provided resource. // Returns error if the resource is not a runtime.Object w/ metadata. -func (f *FakeOpa) keyFor(obj interface{}) (OpaKey, error) { +func KeyFor(obj interface{}) (CfDataKey, error) { o, ok := obj.(client.Object) if !ok { - return OpaKey{}, fmt.Errorf("expected runtime.Object, got: %T", obj) + return CfDataKey{}, fmt.Errorf("expected runtime.Object, got: %T", obj) } gvk := o.GetObjectKind().GroupVersionKind() ns := o.GetNamespace() if ns == "" { - return OpaKey{Gvk: gvk, Key: o.GetName()}, nil + return CfDataKey{Gvk: gvk, Key: o.GetName()}, nil } - return OpaKey{Gvk: gvk, Key: fmt.Sprintf("%s/%s", ns, o.GetName())}, nil + return CfDataKey{Gvk: gvk, Key: fmt.Sprintf("%s/%s", ns, o.GetName())}, nil } -func (f *FakeOpa) AddData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { +func (f *FakeCfClient) AddData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { f.mu.Lock() defer f.mu.Unlock() @@ -63,20 +60,20 @@ func (f *FakeOpa) AddData(ctx context.Context, data interface{}) (*constraintTyp return nil, fmt.Errorf("test error") } - key, err := f.keyFor(data) + key, err := KeyFor(data) if err != nil { return nil, err } if f.data == nil { - f.data = make(map[OpaKey]interface{}) + f.data = make(map[CfDataKey]interface{}) } f.data[key] = data return &constraintTypes.Responses{}, nil } -func (f *FakeOpa) RemoveData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { +func (f *FakeCfClient) RemoveData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { f.mu.Lock() defer f.mu.Unlock() @@ -85,11 +82,11 @@ func (f *FakeOpa) RemoveData(ctx context.Context, data interface{}) (*constraint } if target.IsWipeData(data) { - f.data = make(map[OpaKey]interface{}) + f.data = make(map[CfDataKey]interface{}) return &constraintTypes.Responses{}, nil } - key, err := f.keyFor(data) + key, err := KeyFor(data) if err != nil { return nil, err } @@ -98,8 +95,18 @@ func (f *FakeOpa) RemoveData(ctx context.Context, data interface{}) (*constraint return &constraintTypes.Responses{}, nil } +// GetData returns data for a CfDataKey. It assumes that the +// key is present in the FakeCfClient. Also the data returned is not copied +// and it's meant only for assertions not modifications. +func (f *FakeCfClient) GetData(key CfDataKey) interface{} { + f.mu.Lock() + defer f.mu.Unlock() + + return f.data[key] +} + // Contains returns true if all expected resources are in the cache. -func (f *FakeOpa) Contains(expected map[OpaKey]interface{}) bool { +func (f *FakeCfClient) Contains(expected map[CfDataKey]interface{}) bool { f.mu.Lock() defer f.mu.Unlock() @@ -112,7 +119,7 @@ func (f *FakeOpa) Contains(expected map[OpaKey]interface{}) bool { } // HasGVK returns true if the cache has any data of the requested kind. -func (f *FakeOpa) HasGVK(gvk schema.GroupVersionKind) bool { +func (f *FakeCfClient) HasGVK(gvk schema.GroupVersionKind) bool { f.mu.Lock() defer f.mu.Unlock() @@ -125,14 +132,14 @@ func (f *FakeOpa) HasGVK(gvk schema.GroupVersionKind) bool { } // Len returns the number of items in the cache. -func (f *FakeOpa) Len() int { +func (f *FakeCfClient) Len() int { f.mu.Lock() defer f.mu.Unlock() return len(f.data) } // SetErroring will error out on AddObject or RemoveObject. -func (f *FakeOpa) SetErroring(enabled bool) { +func (f *FakeCfClient) SetErroring(enabled bool) { f.mu.Lock() defer f.mu.Unlock() f.needsToError = enabled diff --git a/pkg/fakes/reader.go b/pkg/fakes/reader.go new file mode 100644 index 00000000000..2acd81d19da --- /dev/null +++ b/pkg/fakes/reader.go @@ -0,0 +1,60 @@ +package fakes + +import ( + "context" + "sync" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type SpyReader struct { + client.Reader + ListFunc func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error +} + +func (r SpyReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + if r.ListFunc != nil { + return r.ListFunc(ctx, list, opts...) + } + return r.Reader.List(ctx, list, opts...) +} + +// FailureInjector can be used in combination with the SpyReader to simulate transient +// failures for network calls. +type FailureInjector struct { + mu sync.Mutex + failures map[string]int // registers GVK.Kind and how many times to fail +} + +func (f *FailureInjector) SetFailures(kind string, failures int) { + f.mu.Lock() + defer f.mu.Unlock() + + f.failures[kind] = failures +} + +// CheckFailures looks at the count of failures and returns true +// if there are still failures for the kind to consume, false otherwise. +func (f *FailureInjector) CheckFailures(kind string) bool { + f.mu.Lock() + defer f.mu.Unlock() + + v, ok := f.failures[kind] + if !ok { + return false + } + + if v == 0 { + return false + } + + f.failures[kind] = v - 1 + + return true +} + +func NewFailureInjector() *FailureInjector { + return &FailureInjector{ + failures: make(map[string]int), + } +} diff --git a/pkg/readiness/ready_tracker_test.go b/pkg/readiness/ready_tracker_test.go index 7cd18d4abf0..3a14351c11c 100644 --- a/pkg/readiness/ready_tracker_test.go +++ b/pkg/readiness/ready_tracker_test.go @@ -30,6 +30,7 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/client/drivers/rego" frameworksexternaldata "github.com/open-policy-agent/frameworks/constraint/pkg/externaldata" + "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" @@ -37,6 +38,7 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" mutationtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" @@ -49,6 +51,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -102,7 +105,7 @@ func setupOpa(t *testing.T) *constraintclient.Client { func setupController( mgr manager.Manager, wm *watch.Manager, - opa *constraintclient.Client, + cfClient *constraintclient.Client, mutationSystem *mutation.System, expansionSystem *expansion.System, providerCache *frameworksexternaldata.ProviderCache, @@ -125,9 +128,29 @@ func setupController( processExcluder := process.Get() + events := make(chan event.GenericEvent, 1024) + syncMetricsCache := syncutil.NewMetricsCache() + reg, err := wm.NewRegistrar( + cachemanager.RegistrarName, + events) + if err != nil { + return fmt.Errorf("setting up watch manager: %w", err) + } + cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{ + CfClient: cfClient, + SyncMetricsCache: syncMetricsCache, + Tracker: tracker, + ProcessExcluder: processExcluder, + Registrar: reg, + Reader: mgr.GetCache(), + }) + if err != nil { + return fmt.Errorf("setting up cache manager: %w", err) + } + // Setup all Controllers opts := controller.Dependencies{ - Opa: opa, + Opa: cfClient, WatchManger: wm, ControllerSwitch: sw, Tracker: tracker, @@ -136,7 +159,8 @@ func setupController( MutationSystem: mutationSystem, ExpansionSystem: expansionSystem, ProviderCache: providerCache, - WatchSet: watch.NewSet(), + CacheMgr: cacheManager, + SyncEventsCh: events, } if err := controller.AddToManager(mgr, &opts); err != nil { return fmt.Errorf("registering controllers: %w", err) diff --git a/pkg/syncutil/cachemanager/cachemanager.go b/pkg/syncutil/cachemanager/cachemanager.go deleted file mode 100644 index 73e723860b5..00000000000 --- a/pkg/syncutil/cachemanager/cachemanager.go +++ /dev/null @@ -1,82 +0,0 @@ -package cachemanager - -import ( - "context" - "fmt" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" - "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" - "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" - "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" -) - -type CacheManager struct { - opa syncutil.OpaDataClient - syncMetricsCache *syncutil.MetricsCache - tracker *readiness.Tracker - processExcluder *process.Excluder -} - -func NewCacheManager(opa syncutil.OpaDataClient, syncMetricsCache *syncutil.MetricsCache, tracker *readiness.Tracker, processExcluder *process.Excluder) *CacheManager { - return &CacheManager{ - opa: opa, - syncMetricsCache: syncMetricsCache, - tracker: tracker, - processExcluder: processExcluder, - } -} - -func (c *CacheManager) AddObject(ctx context.Context, instance *unstructured.Unstructured) error { - isNamespaceExcluded, err := c.processExcluder.IsNamespaceExcluded(process.Sync, instance) - if err != nil { - return fmt.Errorf("error while excluding namespaces: %w", err) - } - - // bail because it means we should not be - // syncing this gvk - if isNamespaceExcluded { - c.tracker.ForData(instance.GroupVersionKind()).CancelExpect(instance) - return nil - } - - syncKey := syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName()) - _, err = c.opa.AddData(ctx, instance) - if err != nil { - c.syncMetricsCache.AddObject( - syncKey, - syncutil.Tags{ - Kind: instance.GetKind(), - Status: metrics.ErrorStatus, - }, - ) - - return err - } - - c.tracker.ForData(instance.GroupVersionKind()).Observe(instance) - - c.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ - Kind: instance.GetKind(), - Status: metrics.ActiveStatus, - }) - c.syncMetricsCache.AddKind(instance.GetKind()) - - return err -} - -func (c *CacheManager) RemoveObject(ctx context.Context, instance *unstructured.Unstructured) error { - if _, err := c.opa.RemoveData(ctx, instance); err != nil { - return err - } - - // only delete from metrics map if the data removal was succcesful - c.syncMetricsCache.DeleteObject(syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName())) - c.tracker.ForData(instance.GroupVersionKind()).CancelExpect(instance) - - return nil -} - -func (c *CacheManager) ReportSyncMetrics() { - c.syncMetricsCache.ReportSync() -} diff --git a/pkg/syncutil/cachemanager/cachemanager_test.go b/pkg/syncutil/cachemanager/cachemanager_test.go deleted file mode 100644 index 2a0038ccf3d..00000000000 --- a/pkg/syncutil/cachemanager/cachemanager_test.go +++ /dev/null @@ -1,111 +0,0 @@ -package cachemanager - -import ( - "context" - "testing" - - configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" - "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" - "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" - "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" - "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" - "github.com/open-policy-agent/gatekeeper/v3/pkg/wildcard" - "github.com/open-policy-agent/gatekeeper/v3/test/testutils" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" -) - -var cfg *rest.Config - -func TestMain(m *testing.M) { - testutils.StartControlPlane(m, &cfg, 3) -} - -// TestCacheManager_AddObject_RemoveObject tests that we can add/ remove objects in the cache. -func TestCacheManager_AddObject_RemoveObject(t *testing.T) { - mgr, _ := testutils.SetupManager(t, cfg) - opaClient := &fakes.FakeOpa{} - - tracker, err := readiness.SetupTracker(mgr, false, false, false) - assert.NoError(t, err) - - processExcluder := process.Get() - cm := NewCacheManager(opaClient, syncutil.NewMetricsCache(), tracker, processExcluder) - ctx := context.Background() - - pod := fakes.Pod( - fakes.WithNamespace("test-ns"), - fakes.WithName("test-name"), - ) - unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) - require.NoError(t, err) - - require.NoError(t, cm.AddObject(ctx, &unstructured.Unstructured{Object: unstructuredPod})) - - // test that pod is cache managed - require.True(t, opaClient.HasGVK(pod.GroupVersionKind())) - - // now remove the object and verify it's removed - require.NoError(t, cm.RemoveObject(ctx, &unstructured.Unstructured{Object: unstructuredPod})) - require.False(t, opaClient.HasGVK(pod.GroupVersionKind())) -} - -// TestCacheManager_processExclusion makes sure that we don't add objects that are process excluded. -func TestCacheManager_processExclusion(t *testing.T) { - mgr, _ := testutils.SetupManager(t, cfg) - opaClient := &fakes.FakeOpa{} - - tracker, err := readiness.SetupTracker(mgr, false, false, false) - assert.NoError(t, err) - - // exclude "test-ns-excluded" namespace - processExcluder := process.Get() - processExcluder.Add([]configv1alpha1.MatchEntry{ - { - ExcludedNamespaces: []wildcard.Wildcard{"test-ns-excluded"}, - Processes: []string{"sync"}, - }, - }) - - cm := NewCacheManager(opaClient, syncutil.NewMetricsCache(), tracker, processExcluder) - ctx := context.Background() - - pod := fakes.Pod( - fakes.WithNamespace("test-ns-excluded"), - fakes.WithName("test-name"), - ) - unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) - require.NoError(t, err) - require.NoError(t, cm.AddObject(ctx, &unstructured.Unstructured{Object: unstructuredPod})) - - // test that pod from excluded namespace is not cache managed - require.False(t, opaClient.HasGVK(pod.GroupVersionKind())) -} - -// TestCacheManager_errors tests that we cache manager responds to errors from the opa client. -func TestCacheManager_errors(t *testing.T) { - mgr, _ := testutils.SetupManager(t, cfg) - opaClient := &fakes.FakeOpa{} - opaClient.SetErroring(true) // AddObject, RemoveObject will error out now. - - tracker, err := readiness.SetupTracker(mgr, false, false, false) - assert.NoError(t, err) - - processExcluder := process.Get() - cm := NewCacheManager(opaClient, syncutil.NewMetricsCache(), tracker, processExcluder) - ctx := context.Background() - - pod := fakes.Pod( - fakes.WithNamespace("test-ns"), - fakes.WithName("test-name"), - ) - unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) - require.NoError(t, err) - - // test that cm bubbles up the errors - require.ErrorContains(t, cm.AddObject(ctx, &unstructured.Unstructured{Object: unstructuredPod}), "test error") - require.ErrorContains(t, cm.RemoveObject(ctx, &unstructured.Unstructured{Object: unstructuredPod}), "test error") -} diff --git a/pkg/syncutil/opadataclient.go b/pkg/syncutil/opadataclient.go deleted file mode 100644 index 63acd1ddfac..00000000000 --- a/pkg/syncutil/opadataclient.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package syncutil - -import ( - "context" - - "github.com/open-policy-agent/frameworks/constraint/pkg/types" - "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// OpaDataClient is an interface for caching data. -type OpaDataClient interface { - AddData(ctx context.Context, data interface{}) (*types.Responses, error) - RemoveData(ctx context.Context, data interface{}) (*types.Responses, error) -} - -// FilteredDataClient is an OpaDataClient which drops any unwatched resources. -type FilteredDataClient struct { - watched *watch.Set - opa OpaDataClient -} - -func NewFilteredOpaDataClient(opa OpaDataClient, watchSet *watch.Set) *FilteredDataClient { - return &FilteredDataClient{ - watched: watchSet, - opa: opa, - } -} - -// AddData adds data to the opa cache if that data is currently being watched. -// Unwatched data is silently dropped with no error. -func (f *FilteredDataClient) AddData(ctx context.Context, data interface{}) (*types.Responses, error) { - if obj, ok := data.(client.Object); ok { - gvk := obj.GetObjectKind().GroupVersionKind() - if !f.watched.Contains(gvk) { - return &types.Responses{}, nil - } - } - - return f.opa.AddData(ctx, data) -} - -// RemoveData removes data from the opa cache if that data is currently being watched. -// Unwatched data is silently dropped with no error. -func (f *FilteredDataClient) RemoveData(ctx context.Context, data interface{}) (*types.Responses, error) { - if obj, ok := data.(client.Object); ok { - gvk := obj.GetObjectKind().GroupVersionKind() - if !f.watched.Contains(gvk) { - return &types.Responses{}, nil - } - } - - return f.opa.RemoveData(ctx, data) -} diff --git a/pkg/syncutil/stats_reporter.go b/pkg/syncutil/stats_reporter.go index adb5cf27cae..42a1b8f2f32 100644 --- a/pkg/syncutil/stats_reporter.go +++ b/pkg/syncutil/stats_reporter.go @@ -108,6 +108,28 @@ func (c *MetricsCache) DeleteObject(key string) { delete(c.Cache, key) } +func (c *MetricsCache) GetTags(key string) *Tags { + c.mux.RLock() + defer c.mux.RUnlock() + + cpy := &Tags{} + v, ok := c.Cache[key] + if ok { + cpy.Kind = v.Kind + cpy.Status = v.Status + } + + return cpy +} + +func (c *MetricsCache) HasObject(key string) bool { + c.mux.RLock() + defer c.mux.RUnlock() + + _, ok := c.Cache[key] + return ok +} + func (c *MetricsCache) ReportSync() { c.mux.RLock() defer c.mux.RUnlock() diff --git a/test/testutils/manager.go b/test/testutils/manager.go index 2e5641f3a5f..8bfb377ea06 100644 --- a/test/testutils/manager.go +++ b/test/testutils/manager.go @@ -8,7 +8,9 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" "github.com/prometheus/client_golang/prometheus" "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" ) @@ -40,6 +42,7 @@ func StartManager(ctx context.Context, t *testing.T, mgr manager.Manager) { func SetupManager(t *testing.T, cfg *rest.Config) (manager.Manager, *watch.Manager) { t.Helper() + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) metrics.Registry = prometheus.NewRegistry() mgr, err := manager.New(cfg, manager.Options{ MetricsBindAddress: "0",