Skip to content

Commit

Permalink
feat: support multiple sync sources (open-policy-agent#2852)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Pana <[email protected]>
  • Loading branch information
acpana authored and salaxander committed Sep 13, 2023
1 parent b22fda4 commit 0e46b19
Show file tree
Hide file tree
Showing 23 changed files with 1,823 additions and 837 deletions.
35 changes: 32 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
mutationsv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/mutations/v1beta1"
statusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1"
"github.com/open-policy-agent/gatekeeper/v3/pkg/audit"
"github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
Expand All @@ -52,6 +53,7 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/upgrade"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
Expand All @@ -69,6 +71,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
crzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
crWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -448,17 +451,43 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle

// Setup all Controllers
setupLog.Info("setting up controllers")
watchSet := watch.NewSet()

// Events ch will be used to receive events from dynamic watches registered
// via the registrar below.
events := make(chan event.GenericEvent, 1024)
reg, err := wm.NewRegistrar(
cachemanager.RegistrarName,
events)
if err != nil {
setupLog.Error(err, "unable to set up watch registrar for cache manager")
return err
}

syncMetricsCache := syncutil.NewMetricsCache()
cm, err := cachemanager.NewCacheManager(&cachemanager.Config{
CfClient: client,
SyncMetricsCache: syncMetricsCache,
Tracker: tracker,
ProcessExcluder: processExcluder,
Registrar: reg,
Reader: mgr.GetCache(),
})
if err != nil {
setupLog.Error(err, "unable to create cache manager")
return err
}

opts := controller.Dependencies{
Opa: client,
WatchManger: wm,
SyncEventsCh: events,
CacheMgr: cm,
ControllerSwitch: sw,
Tracker: tracker,
ProcessExcluder: processExcluder,
MutationSystem: mutationSystem,
ExpansionSystem: expansionSystem,
ProviderCache: providerCache,
WatchSet: watchSet,
PubsubSystem: pubsubSystem,
}

Expand All @@ -483,7 +512,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle

if operations.IsAssigned(operations.Audit) {
setupLog.Info("setting up audit")
auditCache := audit.NewAuditCacheLister(mgr.GetCache(), watchSet)
auditCache := audit.NewAuditCacheLister(mgr.GetCache(), cm)
auditDeps := audit.Dependencies{
Client: client,
ProcessExcluder: processExcluder,
Expand Down
21 changes: 14 additions & 7 deletions pkg/audit/audit_cache_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package audit
import (
"context"

"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// NewAuditCacheLister instantiates a new AuditCache which will read objects in
// watched from auditCache.
func NewAuditCacheLister(auditCache client.Reader, watched *watch.Set) *CacheLister {
func NewAuditCacheLister(auditCache client.Reader, lister WatchIterator) *CacheLister {
return &CacheLister{
auditCache: auditCache,
watched: watched,
auditCache: auditCache,
watchIterator: lister,
}
}

Expand All @@ -25,14 +24,22 @@ type CacheLister struct {
// Caution: only to be read from while watched is locked, such as through
// DoForEach.
auditCache client.Reader
// watched is the set of objects watched by the audit cache.
watched *watch.Set
// watchIterator is a delegate like CacheManager that we can use to query a watched set of GKVs.
// Passing our logic as a callback to a watched.Set allows us to take actions while
// holding the lock on the watched.Set. This prevents us from querying the API server
// for kinds that aren't currently being watched by the CacheManager.
watchIterator WatchIterator
}

// wraps DoForEach from a watch.Set.
type WatchIterator interface {
DoForEach(listFunc func(gvk schema.GroupVersionKind) error) error
}

// ListObjects lists all objects from the audit cache.
func (l *CacheLister) ListObjects(ctx context.Context) ([]unstructured.Unstructured, error) {
var objs []unstructured.Unstructured
err := l.watched.DoForEach(func(gvk schema.GroupVersionKind) error {
err := l.watchIterator.DoForEach(func(gvk schema.GroupVersionKind) error {
gvkObjects, err := listObjects(ctx, l.auditCache, gvk)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (b *GVKAgreggator) IsPresent(gvk schema.GroupVersionKind) bool {
return found
}

// Remove deletes the any associations that Key k has in the GVKAggregator.
// Remove deletes any associations that Key k has in the GVKAggregator.
// For any GVK in the association k --> [GVKs], we also delete any associations
// between the GVK and the Key k stored in the reverse map.
func (b *GVKAgreggator) Remove(k Key) error {
Expand Down Expand Up @@ -98,6 +98,31 @@ func (b *GVKAgreggator) Upsert(k Key, gvks []schema.GroupVersionKind) error {
return nil
}

// List returnes the gvk set for a given Key.
func (b *GVKAgreggator) List(k Key) map[schema.GroupVersionKind]struct{} {
b.mu.RLock()
defer b.mu.RUnlock()

v := b.store[k]
cpy := make(map[schema.GroupVersionKind]struct{}, len(v))
for key, value := range v {
cpy[key] = value
}
return cpy
}

// GVKs returns a list of all of the schema.GroupVersionKind that are aggregated.
func (b *GVKAgreggator) GVKs() []schema.GroupVersionKind {
b.mu.RLock()
defer b.mu.RUnlock()

allGVKs := []schema.GroupVersionKind{}
for gvk := range b.reverseStore {
allGVKs = append(allGVKs, gvk)
}
return allGVKs
}

func (b *GVKAgreggator) pruneReverseStore(gvks map[schema.GroupVersionKind]struct{}, k Key) error {
for gvk := range gvks {
keySet, found := b.reverseStore[gvk]
Expand Down
File renamed without changes.
Loading

0 comments on commit 0e46b19

Please sign in to comment.