Skip to content

Commit

Permalink
refactor: adder interface, rename data client (open-policy-agent#2991)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Pana <[email protected]>
Co-authored-by: Sertaç Özercan <[email protected]>
  • Loading branch information
2 people authored and Mattes83 committed Oct 25, 2023
1 parent 798cce5 commit bf2d1f2
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 229 deletions.
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle
cfArgs = append(cfArgs, constraintclient.Driver(k8sDriver))
}

// initialize OPA
driver, err := rego.New(args...)
if err != nil {
setupLog.Error(err, "unable to set up Driver")
Expand Down Expand Up @@ -478,7 +477,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle
}

opts := controller.Dependencies{
Opa: client,
CFClient: client,
WatchManger: wm,
SyncEventsCh: events,
CacheMgr: cm,
Expand Down
14 changes: 0 additions & 14 deletions pkg/controller/config/config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ import (
"context"
"fmt"

constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/frameworks/constraint/pkg/externaldata"
configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1"
cm "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager"
"github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/keys"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -66,10 +62,6 @@ func (a *Adder) Add(mgr manager.Manager) error {
return add(mgr, r)
}

func (a *Adder) InjectOpa(_ *constraintclient.Client) {}

func (a *Adder) InjectWatchManager(_ *watch.Manager) {}

func (a *Adder) InjectControllerSwitch(cs *watch.ControllerSwitch) {
a.ControllerSwitch = cs
}
Expand All @@ -78,12 +70,6 @@ func (a *Adder) InjectTracker(t *readiness.Tracker) {
a.Tracker = t
}

func (a *Adder) InjectMutationSystem(mutationSystem *mutation.System) {}

func (a *Adder) InjectExpansionSystem(expansionSystem *expansion.System) {}

func (a *Adder) InjectProviderCache(providerCache *externaldata.ProviderCache) {}

func (a *Adder) InjectCacheManager(cm *cm.CacheManager) {
a.CacheManager = cm
}
Expand Down
64 changes: 32 additions & 32 deletions pkg/controller/config/config_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestReconcile(t *testing.T) {
mgr, wm := setupManager(t)
c := testclient.NewRetryClient(mgr.GetClient())

opaClient := &fakes.FakeCfClient{}
dataClient := &fakes.FakeCfClient{}

cs := watch.NewSwitch()
tracker, err := readiness.SetupTracker(mgr, false, false, false)
Expand All @@ -137,7 +137,7 @@ func TestReconcile(t *testing.T) {
events)
require.NoError(t, err)
cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{
CfClient: opaClient,
CfClient: dataClient,
SyncMetricsCache: syncMetricsCache,
Tracker: tracker,
ProcessExcluder: processExcluder,
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestReconcile(t *testing.T) {
require.NoError(t, cacheManager.AddObject(ctx, fooPod))

// fooPod should be namespace excluded, hence not added to the cache
require.False(t, opaClient.Contains(map[fakes.CfDataKey]interface{}{{Gvk: fooPod.GroupVersionKind(), Key: "default"}: struct{}{}}))
require.False(t, dataClient.Contains(map[fakes.CfDataKey]interface{}{{Gvk: fooPod.GroupVersionKind(), Key: "default"}: struct{}{}}))

cs.Stop()
}
Expand Down Expand Up @@ -401,20 +401,20 @@ func TestConfig_DeleteSyncResources(t *testing.T) {
}, timeout).Should(gomega.BeTrue())
}

func setupController(ctx context.Context, mgr manager.Manager, wm *watch.Manager, tracker *readiness.Tracker, events chan event.GenericEvent, reader client.Reader, useFakeOpa bool) (cachemanager.CFDataClient, error) {
// initialize OPA
var opaClient cachemanager.CFDataClient
if useFakeOpa {
opaClient = &fakes.FakeCfClient{}
func setupController(ctx context.Context, mgr manager.Manager, wm *watch.Manager, tracker *readiness.Tracker, events chan event.GenericEvent, reader client.Reader, useFakeClient bool) (cachemanager.CFDataClient, error) {
// initialize constraint framework data client
var client cachemanager.CFDataClient
if useFakeClient {
client = &fakes.FakeCfClient{}
} else {
driver, err := rego.New(rego.Tracing(true))
if err != nil {
return nil, fmt.Errorf("unable to set up Driver: %w", err)
}

opaClient, err = constraintclient.NewClient(constraintclient.Targets(&target.K8sValidationTarget{}), constraintclient.Driver(driver))
client, err = constraintclient.NewClient(constraintclient.Targets(&target.K8sValidationTarget{}), constraintclient.Driver(driver))
if err != nil {
return nil, fmt.Errorf("unable to set up OPA backend client: %w", err)
return nil, fmt.Errorf("unable to set up constraint framework data client: %w", err)
}
}

Expand All @@ -430,7 +430,7 @@ func setupController(ctx context.Context, mgr manager.Manager, wm *watch.Manager
return nil, fmt.Errorf("cannot create registrar: %w", err)
}
cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{
CfClient: opaClient,
CfClient: client,
SyncMetricsCache: syncMetricsCache,
Tracker: tracker,
ProcessExcluder: processExcluder,
Expand Down Expand Up @@ -461,10 +461,10 @@ func setupController(ctx context.Context, mgr manager.Manager, wm *watch.Manager
if err != nil {
return nil, fmt.Errorf("registering sync controller: %w", err)
}
return opaClient, nil
return client, nil
}

// Verify the Opa cache is populated based on the config resource.
// Verify the constraint framework cache is populated based on the config resource.
func TestConfig_CacheContents(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
Expand Down Expand Up @@ -506,10 +506,10 @@ func TestConfig_CacheContents(t *testing.T) {
require.NoError(t, err)

events := make(chan event.GenericEvent, 1024)
opa, err := setupController(ctx, mgr, wm, tracker, events, c, true)
dataClient, err := setupController(ctx, mgr, wm, tracker, events, c, true)
require.NoError(t, err, "failed to set up controller")

opaClient, ok := opa.(*fakes.FakeCfClient)
fakeClient, ok := dataClient.(*fakes.FakeCfClient)
require.True(t, ok)

testutils.StartManager(ctx, t, mgr)
Expand All @@ -521,12 +521,12 @@ func TestConfig_CacheContents(t *testing.T) {
expected := map[fakes.CfDataKey]interface{}{
{Gvk: nsGVK, Key: "default"}: nil,
cmKey: nil,
// kube-system namespace is being excluded, it should not be in opa cache
// kube-system namespace is being excluded, it should not be in the cache
}
g.Eventually(func() bool {
return opaClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "checking initial opa cache contents")
require.True(t, opaClient.HasGVK(nsGVK), "want opaClient.HasGVK(nsGVK) to be true but got false")
return fakeClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "checking initial cache contents")
require.True(t, fakeClient.HasGVK(nsGVK), "want fakeClient.HasGVK(nsGVK) to be true but got false")

// Reconfigure to drop the namespace watches
config = configFor([]schema.GroupVersionKind{configMapGVK})
Expand All @@ -538,7 +538,7 @@ func TestConfig_CacheContents(t *testing.T) {

// Expect namespaces to go away from cache
g.Eventually(func() bool {
return opaClient.HasGVK(nsGVK)
return fakeClient.HasGVK(nsGVK)
}, 10*time.Second).Should(gomega.BeFalse())

// Expect our configMap to return at some point
Expand All @@ -547,25 +547,25 @@ func TestConfig_CacheContents(t *testing.T) {
cmKey: nil,
}
g.Eventually(func() bool {
return opaClient.Contains(expected)
return fakeClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "waiting for ConfigMap to repopulate in cache")

expected = map[fakes.CfDataKey]interface{}{
cm2Key: nil,
}
g.Eventually(func() bool {
return !opaClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "kube-system namespace is excluded. kube-system/config-test-2 should not be in opa cache")
return !fakeClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "kube-system namespace is excluded. kube-system/config-test-2 should not be in the cache")

// Delete the config resource - expect opa to empty out.
if opaClient.Len() == 0 {
// Delete the config resource - expect cache to empty out.
if fakeClient.Len() == 0 {
t.Fatal("sanity")
}
require.NoError(t, c.Delete(ctx, config), "deleting Config resource")

// The cache will be cleared out.
g.Eventually(func() int {
return opaClient.Len()
return fakeClient.Len()
}, 10*time.Second).Should(gomega.BeZero(), "waiting for cache to empty")
}

Expand All @@ -590,7 +590,7 @@ func TestConfig_Retries(t *testing.T) {
mgr, wm := setupManager(t)
c := testclient.NewRetryClient(mgr.GetClient())

opaClient := &fakes.FakeCfClient{}
dataClient := &fakes.FakeCfClient{}
cs := watch.NewSwitch()
tracker, err := readiness.SetupTracker(mgr, false, false, false)
if err != nil {
Expand All @@ -606,7 +606,7 @@ func TestConfig_Retries(t *testing.T) {
events)
require.NoError(t, err)
cacheManager, err := cachemanager.NewCacheManager(&cachemanager.Config{
CfClient: opaClient,
CfClient: dataClient,
SyncMetricsCache: syncMetricsCache,
Tracker: tracker,
ProcessExcluder: processExcluder,
Expand Down Expand Up @@ -679,8 +679,8 @@ func TestConfig_Retries(t *testing.T) {
cmKey: nil,
}
g.Eventually(func() bool {
return opaClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "checking initial opa cache contents")
return dataClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "checking initial cache contents")

fi.SetFailures("ConfigMapList", 2)

Expand All @@ -697,8 +697,8 @@ func TestConfig_Retries(t *testing.T) {

// Despite the transient error, we expect the cache to eventually be repopulated.
g.Eventually(func() bool {
return opaClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "checking final opa cache contents")
return dataClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "checking final cache contents")
}

// configFor returns a config resource that watches the requested set of resources.
Expand Down
28 changes: 11 additions & 17 deletions pkg/controller/constraint/constraint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import (
constraintstatusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/constraintstatus"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
Expand Down Expand Up @@ -59,7 +57,7 @@ const (
)

type Adder struct {
Opa *constraintclient.Client
CFClient *constraintclient.Client
ConstraintsCache *ConstraintsCache
WatchManager *watch.Manager
ControllerSwitch *watch.ControllerSwitch
Expand All @@ -74,8 +72,8 @@ type Adder struct {
IfWatching func(schema.GroupVersionKind, func() error) (bool, error)
}

func (a *Adder) InjectOpa(o *constraintclient.Client) {
a.Opa = o
func (a *Adder) InjectCFClient(c *constraintclient.Client) {
a.CFClient = c
}

func (a *Adder) InjectWatchManager(w *watch.Manager) {
Expand All @@ -90,10 +88,6 @@ func (a *Adder) InjectTracker(t *readiness.Tracker) {
a.Tracker = t
}

func (a *Adder) InjectMutationSystem(mutationSystem *mutation.System) {}

func (a *Adder) InjectExpansionSystem(expansionSystem *expansion.System) {}

// Add creates a new Constraint Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func (a *Adder) Add(mgr manager.Manager) error {
Expand All @@ -106,7 +100,7 @@ func (a *Adder) Add(mgr manager.Manager) error {
return err
}

r := newReconciler(mgr, a.Opa, a.ControllerSwitch, reporter, a.ConstraintsCache, a.Tracker)
r := newReconciler(mgr, a.CFClient, a.ControllerSwitch, reporter, a.ConstraintsCache, a.Tracker)
if a.GetPod != nil {
r.getPod = a.GetPod
}
Expand All @@ -129,7 +123,7 @@ type tags struct {
// newReconciler returns a new reconcile.Reconciler.
func newReconciler(
mgr manager.Manager,
opa *constraintclient.Client,
cfClient *constraintclient.Client,
cs *watch.ControllerSwitch,
reporter StatsReporter,
constraintsCache *ConstraintsCache,
Expand All @@ -143,7 +137,7 @@ func newReconciler(

cs: cs,
scheme: mgr.GetScheme(),
opa: opa,
cfClient: cfClient,
log: log,
reporter: reporter,
constraintsCache: constraintsCache,
Expand Down Expand Up @@ -196,7 +190,7 @@ type ReconcileConstraint struct {

cs *watch.ControllerSwitch
scheme *runtime.Scheme
opa *constraintclient.Client
cfClient *constraintclient.Client
log logr.Logger
reporter StatsReporter
constraintsCache *ConstraintsCache
Expand Down Expand Up @@ -291,7 +285,7 @@ func (r *ReconcileConstraint) Reconcile(ctx context.Context, request reconcile.R
status.Status.ConstraintUID = instance.GetUID()
status.Status.ObservedGeneration = instance.GetGeneration()
status.Status.Errors = nil
if c, err := r.opa.GetConstraint(instance); err != nil || !constraints.SemanticEqual(instance, c) {
if c, err := r.cfClient.GetConstraint(instance); err != nil || !constraints.SemanticEqual(instance, c) {
if err := r.cacheConstraint(ctx, instance); err != nil {
r.constraintsCache.addConstraintKey(constraintKey, tags{
enforcementAction: enforcementAction,
Expand Down Expand Up @@ -320,7 +314,7 @@ func (r *ReconcileConstraint) Reconcile(ctx context.Context, request reconcile.R
reportMetrics = true
} else {
r.log.Info("handling constraint delete", "instance", instance)
if _, err := r.opa.RemoveConstraint(ctx, instance); err != nil {
if _, err := r.cfClient.RemoveConstraint(ctx, instance); err != nil {
if errors.Is(err, constraintclient.ErrMissingConstraint) {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -414,9 +408,9 @@ func (r *ReconcileConstraint) cacheConstraint(ctx context.Context, instance *uns
t := r.tracker.For(instance.GroupVersionKind())

obj := instance.DeepCopy()
// Remove the status field since we do not need it for OPA
// Remove the status field since we do not need it
unstructured.RemoveNestedField(obj.Object, "status")
_, err := r.opa.AddConstraint(ctx, obj)
_, err := r.cfClient.AddConstraint(ctx, obj)
if err != nil {
t.TryCancelExpect(obj)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
var log = logf.Log.WithName("controller").WithValues(logging.Process, "constraint_status_controller")

type Adder struct {
Opa *constraintclient.Client
CFClient *constraintclient.Client
WatchManager *watch.Manager
ControllerSwitch *watch.ControllerSwitch
Events <-chan event.GenericEvent
Expand Down
Loading

0 comments on commit bf2d1f2

Please sign in to comment.