Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple sync sources #2852

Merged
merged 60 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
11bb964
use cm
acpana Jun 15, 2023
5314889
replace process excluder in cm
acpana Jun 15, 2023
c382961
nit: dont pass target
acpana Jun 15, 2023
ba56451
use cm.WipeData
acpana Jun 21, 2023
b746411
inject cm
acpana Jun 21, 2023
a455772
move cm, make syncc submodule
acpana Jun 21, 2023
4750a94
introduce cm config
acpana Jun 21, 2023
c3745a9
refactor: make cm watch aware
acpana Jun 23, 2023
8e70e79
review: push all cache mgmt in bckgr
acpana Jul 5, 2023
1cd478a
add cacheManager as a runnable
acpana Jul 5, 2023
9eb8f35
rename makeUpdates, comments
acpana Jul 6, 2023
7fbb1c4
pass reader for cm
acpana Jul 6, 2023
ba4981d
review: consumer defines source, others
acpana Jul 6, 2023
cf11ef1
lint fixes & more
acpana Jul 6, 2023
21b92e9
after origin/master rebase
acpana Jul 6, 2023
29d747f
spli tests into unit, e2e
acpana Jul 11, 2023
6e83480
mediate cm funcs
acpana Jul 11, 2023
33ceeea
review: filteredClient is cm, replay in bckg, watch gvks asap
acpana Jul 11, 2023
30aa4cf
review: update watch set on RemoveSource
acpana Jul 24, 2023
ead18f2
use a set to record failing gvks
acpana Jul 24, 2023
a42ef43
rework the filtered point
acpana Jul 24, 2023
12f39da
review: naming, docs, comments, polish
acpana Jul 25, 2023
b67121b
review: gate gvks to list differently
acpana Jul 25, 2023
80210a6
make the reply goroutine resilient
acpana Jul 26, 2023
1f58357
test replay resiliency in cm
acpana Jul 26, 2023
da64747
review: always replace watch set
acpana Jul 26, 2023
f32b9bf
refactor: move cachemanager to its own pkg
acpana Jul 26, 2023
f835a0e
refactor: move OpaDataClient in cachemanager
acpana Jul 26, 2023
f2f41d7
refactor: no mediator interface
acpana Jul 27, 2023
f8684dd
refactor: move parser package per feedback
acpana Jul 27, 2023
71d77d4
review: use anon funcs, naming, comments
acpana Jul 27, 2023
cb24e09
fix lint
acpana Jul 27, 2023
e62bea1
review: dont use a buffered, sentinel ch
acpana Jul 27, 2023
ba4dbda
review: comments, renaming, read locks
acpana Jul 28, 2023
b78d392
fix: return cpy
acpana Jul 28, 2023
a5ffd53
review: UpsertSource, short circuits
acpana Jul 31, 2023
848bc87
review: pass in stop ch
acpana Jul 31, 2023
fe408f3
add accessors to metrics cache
acpana Aug 3, 2023
f010d3b
review: table tests
acpana Aug 3, 2023
86425fc
add a concurrent test
acpana Aug 3, 2023
a563e9e
review: add a test for instance updates
acpana Aug 5, 2023
3863184
review, test: use struct for test resources, use maps for failures
acpana Aug 7, 2023
db5a0cf
review: better test cleanup
acpana Aug 7, 2023
a799d60
review, test: add jiiter, remove sources, shorten test
acpana Aug 8, 2023
734c0ef
review: move fi, var names
acpana Aug 8, 2023
fbeb772
review: export KeyFor
acpana Aug 9, 2023
2b0ff42
review: docstring for FailureInjector
acpana Aug 9, 2023
7f2d02e
Merge branch 'master' into acpana/cmt-replay-6-r-clean
acpana Aug 9, 2023
f27e91f
review: use assert in go funcs
acpana Aug 10, 2023
605866f
review, refactor: clean up dep injection in config_c
acpana Aug 10, 2023
4c3cbf8
add logging to test controller mgr
acpana Aug 10, 2023
d1f146f
review: move FakeCfClient to pkg/fakes
acpana Aug 10, 2023
a6c65be
review: make audit use cm
acpana Aug 17, 2023
06a4ee6
review: naming nits
acpana Aug 18, 2023
505de58
review: naming, comments
acpana Aug 22, 2023
21f7d57
review: use read locks
acpana Aug 23, 2023
10e62bf
use failure injector in config_c test
acpana Aug 23, 2023
a37cfc8
Merge branch 'master' into acpana/cmt-replay-6-r-clean
acpana Aug 25, 2023
0637f6e
remove stale expectations
acpana Aug 29, 2023
6ca3fa5
fix flaky test
acpana Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
mutationsv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/mutations/v1beta1"
statusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1"
"github.com/open-policy-agent/gatekeeper/v3/pkg/audit"
"github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
Expand All @@ -52,6 +53,7 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/upgrade"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
Expand All @@ -69,6 +71,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
crzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
crWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -447,17 +450,43 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle

// Setup all Controllers
setupLog.Info("setting up controllers")
watchSet := watch.NewSet()

// Events ch will be used to receive events from dynamic watches registered
// via the registrar below.
events := make(chan event.GenericEvent, 1024)
w, err := wm.NewRegistrar(
acpana marked this conversation as resolved.
Show resolved Hide resolved
cachemanager.RegistrarName,
events)
if err != nil {
setupLog.Error(err, "unable to set up watch registrar for cache manager")
return err
}

syncMetricsCache := syncutil.NewMetricsCache()
cm, err := cachemanager.NewCacheManager(&cachemanager.Config{
CfClient: client,
SyncMetricsCache: syncMetricsCache,
Tracker: tracker,
ProcessExcluder: processExcluder,
Registrar: w,
Reader: mgr.GetCache(),
})
if err != nil {
setupLog.Error(err, "unable to create cache manager")
return err
}

opts := controller.Dependencies{
Opa: client,
WatchManger: wm,
SyncEventsCh: events,
CacheMgr: cm,
ControllerSwitch: sw,
Tracker: tracker,
ProcessExcluder: processExcluder,
MutationSystem: mutationSystem,
ExpansionSystem: expansionSystem,
ProviderCache: providerCache,
WatchSet: watchSet,
PubsubSystem: pubsubSystem,
}

Expand All @@ -482,7 +511,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle

if operations.IsAssigned(operations.Audit) {
setupLog.Info("setting up audit")
auditCache := audit.NewAuditCacheLister(mgr.GetCache(), watchSet)
auditCache := audit.NewAuditCacheLister(mgr.GetCache(), cm)
auditDeps := audit.Dependencies{
Client: client,
ProcessExcluder: processExcluder,
Expand Down
16 changes: 10 additions & 6 deletions pkg/audit/audit_cache_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package audit
import (
"context"

"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// NewAuditCacheLister instantiates a new AuditCache which will read objects in
// watched from auditCache.
func NewAuditCacheLister(auditCache client.Reader, watched *watch.Set) *CacheLister {
func NewAuditCacheLister(auditCache client.Reader, lister WatchIterator) *CacheLister {
return &CacheLister{
auditCache: auditCache,
watched: watched,
lister: lister,
acpana marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -25,14 +24,19 @@ type CacheLister struct {
// Caution: only to be read from while watched is locked, such as through
// DoForEach.
auditCache client.Reader
// watched is the set of objects watched by the audit cache.
watched *watch.Set
// lister is a delegate like cachemanager that we can use to query a watched set of GKVs.
acpana marked this conversation as resolved.
Show resolved Hide resolved
lister WatchIterator
}

// wraps DoForEach from a watch.Set.
type WatchIterator interface {
DoForEach(listFunc func(gvk schema.GroupVersionKind) error) error
}

// ListObjects lists all objects from the audit cache.
func (l *CacheLister) ListObjects(ctx context.Context) ([]unstructured.Unstructured, error) {
var objs []unstructured.Unstructured
err := l.watched.DoForEach(func(gvk schema.GroupVersionKind) error {
err := l.lister.DoForEach(func(gvk schema.GroupVersionKind) error {
gvkObjects, err := listObjects(ctx, l.auditCache, gvk)
if err != nil {
return err
Expand Down
37 changes: 18 additions & 19 deletions pkg/cachemanager/cachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const RegistrarName = "cachemanager"

var (
log = logf.Log.WithName("cache-manager")
backoff = wait.Backoff{
Expand All @@ -37,7 +39,6 @@ type Config struct {
Tracker *readiness.Tracker
ProcessExcluder *process.Excluder
Registrar *watch.Registrar
WatchedSet *watch.Set
GVKAggregator *aggregator.GVKAgreggator
Reader client.Reader
}
Expand Down Expand Up @@ -67,9 +68,6 @@ type CFDataClient interface {
}

func NewCacheManager(config *Config) (*CacheManager, error) {
if config.WatchedSet == nil {
return nil, fmt.Errorf("watchedSet must be non-nil")
}
if config.Registrar == nil {
return nil, fmt.Errorf("registrar must be non-nil")
}
Expand All @@ -93,7 +91,7 @@ func NewCacheManager(config *Config) (*CacheManager, error) {
tracker: config.Tracker,
processExcluder: config.ProcessExcluder,
registrar: config.Registrar,
watchedSet: config.WatchedSet,
watchedSet: watch.NewSet(),
reader: config.Reader,
gvksToSync: config.GVKAggregator,
backgroundManagementTicker: *time.NewTicker(3 * time.Second),
Expand All @@ -111,8 +109,7 @@ func (c *CacheManager) Start(ctx context.Context) error {
}

// UpsertSource adjusts the watched set of gvks according to the newGVKs passed in
// for a given sourceKey.
// Callers are responsible for retrying on error.
// for a given sourceKey. Callers are responsible for retrying on error.
func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Key, newGVKs []schema.GroupVersionKind) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -139,8 +136,7 @@ func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Ke
}

// replaceWatchSet looks at the gvksToSync and makes changes to the registrar's watch set.
// assumes caller has lock.
// On error, actual watch state may not align with intended watch state.
// Assumes caller has lock. On error, actual watch state may not align with intended watch state.
func (c *CacheManager) replaceWatchSet(ctx context.Context) error {
newWatchSet := watch.NewSet()
newWatchSet.Add(c.gvksToSync.GVKs()...)
Expand All @@ -159,8 +155,7 @@ func (c *CacheManager) replaceWatchSet(ctx context.Context) error {
return innerError
}

// RemoveSource removes the watches of the GVKs for a given aggregator.Key.
// Callers are responsible for retrying on error.
// RemoveSource removes the watches of the GVKs for a given aggregator.Key. Callers are responsible for retrying on error.
func (c *CacheManager) RemoveSource(ctx context.Context, sourceKey aggregator.Key) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -193,6 +188,15 @@ func (c *CacheManager) ExcludeProcesses(newExcluder *process.Excluder) {
c.excluderChanged = true
}

// DoForEach runs fn function for each GVK that is being watched by the cache manager.
acpana marked this conversation as resolved.
Show resolved Hide resolved
func (c *CacheManager) DoForEach(fn func(gvk schema.GroupVersionKind) error) error {
c.mu.Lock()
acpana marked this conversation as resolved.
Show resolved Hide resolved
defer c.mu.Unlock()

err := c.watchedSet.DoForEach(fn)
return err
}

func (c *CacheManager) watchesGVK(gvk schema.GroupVersionKind) bool {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down Expand Up @@ -241,12 +245,8 @@ func (c *CacheManager) AddObject(ctx context.Context, instance *unstructured.Uns
}

func (c *CacheManager) RemoveObject(ctx context.Context, instance *unstructured.Unstructured) error {
gvk := instance.GroupVersionKind()

if c.watchesGVK(gvk) {
if _, err := c.cfClient.RemoveData(ctx, instance); err != nil {
return err
}
if _, err := c.cfClient.RemoveData(ctx, instance); err != nil {
return err
}

// only delete from metrics map if the data removal was successful
Expand Down Expand Up @@ -383,8 +383,7 @@ func (c *CacheManager) replayGVKs(ctx context.Context, gvksToRelist []schema.Gro

// wipeCacheIfNeeded performs a cache wipe if there are any gvks needing to be removed
// from the cache or if the excluder has changed. It also marks which gvks need to be
// re listed again in the cf data cache after the wipe.
// assumes the caller has lock.
// re listed again in the cf data cache after the wipe. Assumes the caller has lock.
func (c *CacheManager) wipeCacheIfNeeded(ctx context.Context) {
// remove any gvks not needing to be synced anymore
// or re evaluate all if the excluder changed.
Expand Down
Loading