Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add syncset controller, feat: syncset readiness #3030

Merged
merged 47 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e34eed0
add syncset controller
acpana Oct 2, 2023
3d43814
add syncset readiness, observe config
acpana Oct 3, 2023
0dc69bd
reconcile data expectations
acpana Oct 3, 2023
46dfde2
enable syncset crd
acpana Oct 3, 2023
686af5d
interpret registrar err
acpana Oct 5, 2023
7e94cdd
review: move EM, rename to EP
acpana Oct 5, 2023
31979ac
review: remove waitgroup
acpana Oct 6, 2023
b290867
smoll fix for test
acpana Oct 7, 2023
49d652e
Merge branch 'master' into acpana/syncsets-rt-r
acpana Oct 9, 2023
2192d65
use logging constant
acpana Oct 9, 2023
592e243
add TryCancelData
acpana Oct 9, 2023
8da842c
fix lock
acpana Oct 9, 2023
50f7abd
review: rm comments, mgr starts pruner
acpana Oct 11, 2023
7296196
review: move interface, IsUniversal
acpana Oct 11, 2023
6a260a3
review: explicitly remove source
acpana Oct 11, 2023
a1e153f
Merge branch 'master' into acpana/syncsets-rt-r
acpana Oct 12, 2023
689b252
limit name to 63 char
acpana Oct 13, 2023
4708b4d
review: comments, naming, HasBalidationOperation
acpana Oct 13, 2023
9937605
only add syncset controller for validation
acpana Oct 13, 2023
05af5c9
flesh out errorList
acpana Oct 13, 2023
dda78b6
TryCancel all gvks if universal
acpana Oct 16, 2023
5e48091
review feedback
acpana Oct 19, 2023
24f9237
review feedback
acpana Oct 26, 2023
53a7625
review feedback
acpana Oct 28, 2023
c8537e2
use fake client builder
acpana Oct 29, 2023
7cd0440
use fake client builder in readiness pkg too
acpana Oct 29, 2023
23d9ef7
Merge branch 'master' into acpana/syncsets-rt-r
acpana Oct 30, 2023
c4ea7cb
review feedback
acpana Nov 2, 2023
b660943
review: remove Test_ExpectationsMgr_DeletedSyncSets
acpana Nov 2, 2023
79dce07
refactor: agg err, restore agg
acpana Nov 2, 2023
09842ff
revert: restore agg
acpana Nov 3, 2023
3d07778
revert: infer podGVK
acpana Nov 3, 2023
ccf2160
add RemoveGVKErr
acpana Nov 3, 2023
449b39a
account for dangling watches
acpana Nov 4, 2023
d0ebd6a
better handling for dangling watches
acpana Nov 6, 2023
eeba2bf
check err nullability, naming
acpana Nov 7, 2023
034e826
use remove set instead of forceWipe
acpana Nov 7, 2023
854306e
set instead of book for dangling watches
acpana Nov 9, 2023
d7bb9b5
general error watch dangling
acpana Nov 9, 2023
7faecbc
review feedback
acpana Nov 10, 2023
03565fa
refactor: use tt
acpana Nov 15, 2023
3766d66
fix: general err dangling watches
acpana Nov 15, 2023
352ae72
rf: add set directly
acpana Nov 16, 2023
a216503
Merge branch 'master' into acpana/syncsets-rt-r
acpana Nov 17, 2023
a280fb7
Apply suggestions from code review
acpana Nov 22, 2023
c87e934
review suggestions
acpana Nov 22, 2023
430deea
Merge branch 'master' into acpana/syncsets-rt-r
acpana Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apis/config/v1alpha1/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1
import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/wildcard"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// ConfigSpec defines the desired state of Config.
Expand Down Expand Up @@ -62,6 +63,14 @@ type SyncOnlyEntry struct {
Kind string `json:"kind,omitempty"`
}

func (e *SyncOnlyEntry) ToGroupVersionKind() schema.GroupVersionKind {
return schema.GroupVersionKind{
Group: e.Group,
Version: e.Version,
Kind: e.Kind,
}
}

type MatchEntry struct {
Processes []string `json:"processes,omitempty"`
ExcludedNamespaces []wildcard.Wildcard `json:"excludedNamespaces,omitempty"`
Expand Down
9 changes: 9 additions & 0 deletions apis/syncset/v1alpha1/syncset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type SyncSetSpec struct {
Expand All @@ -14,6 +15,14 @@ type GVKEntry struct {
Kind string `json:"kind,omitempty"`
}

func (e *GVKEntry) ToGroupVersionKind() schema.GroupVersionKind {
return schema.GroupVersionKind{
Group: e.Group,
Version: e.Version,
Kind: e.Kind,
}
}

// +kubebuilder:resource:scope=Cluster
// +kubebuilder:object:root=true

Expand Down
8 changes: 7 additions & 1 deletion config/crd/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# It should be run by config/default
resources:
- bases/config.gatekeeper.sh_configs.yaml
#- bases/syncset.gatekeeper.sh_syncsets.yaml
- bases/syncset.gatekeeper.sh_syncsets.yaml
- bases/status.gatekeeper.sh_constraintpodstatuses.yaml
- bases/status.gatekeeper.sh_constrainttemplatepodstatuses.yaml
- bases/status.gatekeeper.sh_mutatorpodstatuses.yaml
Expand Down Expand Up @@ -50,6 +50,12 @@ patchesJson6902:
kind: CustomResourceDefinition
name: assignimage.mutations.gatekeeper.sh
path: patches/max_name_size.yaml
- target:
group: apiextensions.k8s.io
version: v1
kind: CustomResourceDefinition
name: syncsets.syncset.gatekeeper.sh
path: patches/max_name_size.yaml

patchesStrategicMerge:
#- patches/max_name_size_for_modifyset.yaml
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/upgrade"
Expand Down Expand Up @@ -486,6 +487,12 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle
return err
}

err = mgr.Add(pruner.NewExpectationsPruner(cm, tracker))
if err != nil {
setupLog.Error(err, "adding expectations pruner to manager")
return err
}

opts := controller.Dependencies{
CFClient: client,
WatchManger: wm,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
labels:
gatekeeper.sh/system: "yes"
name: syncsets.syncset.gatekeeper.sh
spec:
group: syncset.gatekeeper.sh
names:
kind: SyncSet
listKind: SyncSetList
plural: syncsets
singular: syncset
preserveUnknownFields: false
scope: Cluster
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: SyncSet is the Schema for the SyncSet API.
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
properties:
name:
maxLength: 63
type: string
type: object
spec:
properties:
gvks:
items:
properties:
group:
type: string
kind:
type: string
version:
type: string
type: object
type: array
type: object
type: object
served: true
storage: true
53 changes: 53 additions & 0 deletions manifest_staging/deploy/gatekeeper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3367,6 +3367,59 @@ spec:
served: true
storage: true
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
labels:
gatekeeper.sh/system: "yes"
name: syncsets.syncset.gatekeeper.sh
spec:
group: syncset.gatekeeper.sh
names:
kind: SyncSet
listKind: SyncSetList
plural: syncsets
singular: syncset
preserveUnknownFields: false
scope: Cluster
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: SyncSet is the Schema for the SyncSet API.
acpana marked this conversation as resolved.
Show resolved Hide resolved
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
properties:
name:
maxLength: 63
type: string
type: object
spec:
properties:
gvks:
items:
properties:
group:
type: string
kind:
type: string
version:
type: string
type: object
type: array
type: object
type: object
served: true
storage: true
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down
62 changes: 50 additions & 12 deletions pkg/cachemanager/cachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cachemanager

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -128,8 +129,21 @@ func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Ke
// may become unreferenced and need to be deleted; this will be handled async
// in the manageCache loop.

if err := c.replaceWatchSet(ctx); err != nil {
return fmt.Errorf("error watching new gvks: %w", err)
err := c.replaceWatchSet(ctx)
acpana marked this conversation as resolved.
Show resolved Hide resolved
if general, failedGVKs := interpretErr(err, newGVKs); len(failedGVKs) > 0 {
var gvksToTryCancel []schema.GroupVersionKind
if general {
// if the err is general, assume all gvks need TryCancel because of some
// WatchManager internal error and we don't want to block readiness.
gvksToTryCancel = c.gvksToSync.GVKs()
} else {
gvksToTryCancel = failedGVKs
}

for _, g := range gvksToTryCancel {
c.tracker.TryCancelData(g)
}
return fmt.Errorf("error establishing watches: %w", err)
}

return nil
Expand All @@ -140,11 +154,7 @@ func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Ke
func (c *CacheManager) replaceWatchSet(ctx context.Context) error {
newWatchSet := watch.NewSet()
newWatchSet.Add(c.gvksToSync.GVKs()...)

diff := c.watchedSet.Difference(newWatchSet)
c.removeStaleExpectations(diff)

c.gvksToDeleteFromCache.AddSet(diff)
c.gvksToDeleteFromCache.AddSet(c.watchedSet.Difference(newWatchSet))

var innerError error
c.watchedSet.Replace(newWatchSet, func() {
Expand All @@ -158,11 +168,31 @@ func (c *CacheManager) replaceWatchSet(ctx context.Context) error {
return innerError
}

// removeStaleExpectations stops tracking data for any resources that are no longer watched.
func (c *CacheManager) removeStaleExpectations(stale *watch.Set) {
for _, gvk := range stale.Items() {
c.tracker.CancelData(gvk)
// interpret if the err received is general or whether it is specific to the provided GVKs.
acpana marked this conversation as resolved.
Show resolved Hide resolved
func interpretErr(e error, gvks []schema.GroupVersionKind) (bool, []schema.GroupVersionKind) {
if e == nil {
return false, nil
}

var f watch.WatchesError
if !errors.As(e, &f) || f.HasGeneralErr() {
return true, nil
}

failedGvks := watch.NewSet()
failedGvks.Add(f.FailingGVKs()...)
sourceGVKSet := watch.NewSet()
sourceGVKSet.Add(gvks...)

common := failedGvks.Intersection(sourceGVKSet)
if common.Size() > 0 {
return false, common.Items()
}

// this error is not about the gvks in this request
// but we still log it for visibility
log.Info("encountered unrelated error when replacing watch set", "error", e)
acpana marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
}

// RemoveSource removes the watches of the GVKs for a given aggregator.Key. Callers are responsible for retrying on error.
Expand All @@ -174,7 +204,8 @@ func (c *CacheManager) RemoveSource(ctx context.Context, sourceKey aggregator.Ke
return fmt.Errorf("internal error removing source: %w", err)
}

if err := c.replaceWatchSet(ctx); err != nil {
err := c.replaceWatchSet(ctx)
if general, _ := interpretErr(err, []schema.GroupVersionKind{}); general {
acpana marked this conversation as resolved.
Show resolved Hide resolved
acpana marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("error removing watches for source %v: %w", sourceKey, err)
}

Expand Down Expand Up @@ -208,6 +239,13 @@ func (c *CacheManager) DoForEach(fn func(gvk schema.GroupVersionKind) error) err
return err
}

func (c *CacheManager) WatchedGVKs() []schema.GroupVersionKind {
c.mu.RLock()
defer c.mu.RUnlock()

return c.watchedSet.Items()
}

func (c *CacheManager) watchesGVK(gvk schema.GroupVersionKind) bool {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
Loading