Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-17157: *: label k8s objects we own #3020

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/davecgh/go-spew v1.1.1
github.com/distribution/distribution v2.7.1+incompatible
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/ghodss/yaml v1.0.0
github.com/go-air/gini v1.0.4
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/fatih/color v1.13.0 // indirect
Expand Down
119 changes: 119 additions & 0 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/connectivity"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
Expand All @@ -32,6 +35,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/yaml"
batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
Expand Down Expand Up @@ -103,6 +109,7 @@ type Operator struct {
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
k8sLabelQueueSets map[schema.GroupVersionResource]workqueue.RateLimitingInterface
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
Expand Down Expand Up @@ -191,6 +198,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
lister: lister,
namespace: operatorNamespace,
recorder: eventRecorder,
k8sLabelQueueSets: map[schema.GroupVersionResource]workqueue.RateLimitingInterface{},
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
Expand Down Expand Up @@ -363,21 +371,84 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(sync.ToSyncer()),
)
if err != nil {
return err
}

if err := op.RegisterQueueInformer(queueInformer); err != nil {
return err
}

return nil
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire RoleBindings
roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire ServiceAccounts
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
},
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Services
serviceInformer := k8sInformerFactory.Core().V1().Services()
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Pods for CatalogSource
catsrcReq, err := labels.NewRequirement(reconciler.CatalogSourceLabelKey, selection.Exists, nil)
if err != nil {
Expand All @@ -392,6 +463,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Pods for BundleUnpack job
buReq, err := labels.NewRequirement(bundle.BundleUnpackPodLabel, selection.Exists, nil)
if err != nil {
Expand All @@ -416,6 +500,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := configMapInformer.Lister().ConfigMaps(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
return false
}
return labeller.HasOLMOwnerRef(cm)
}
}
return false
},
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Generate and register QueueInformers for k8s resources
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
for _, informer := range sharedIndexInformers {
Expand Down Expand Up @@ -480,6 +585,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
}

// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/operators/internal/alongside/alongside.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
prefix = "operatorframework.io/installed-alongside-"
AnnotationPrefix = "operatorframework.io/installed-alongside-"
)

// NamespacedName is a reference to an object by namespace and name.
Expand All @@ -33,7 +33,7 @@ type Annotator struct{}
func (a Annotator) FromObject(o Annotatable) []NamespacedName {
var result []NamespacedName
for k, v := range o.GetAnnotations() {
if !strings.HasPrefix(k, prefix) {
if !strings.HasPrefix(k, AnnotationPrefix) {
continue
}
tokens := strings.Split(v, "/")
Expand All @@ -55,7 +55,7 @@ func (a Annotator) ToObject(o Annotatable, nns []NamespacedName) {
annotations := o.GetAnnotations()

for key := range annotations {
if strings.HasPrefix(key, prefix) {
if strings.HasPrefix(key, AnnotationPrefix) {
delete(annotations, key)
}
}
Expand All @@ -82,5 +82,5 @@ func key(n NamespacedName) string {
hasher.Write([]byte(n.Namespace))
hasher.Write([]byte{'/'})
hasher.Write([]byte(n.Name))
return fmt.Sprintf("%s%x", prefix, hasher.Sum64())
return fmt.Sprintf("%s%x", AnnotationPrefix, hasher.Sum64())
}
4 changes: 2 additions & 2 deletions pkg/controller/operators/internal/alongside/alongside_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestAnnotatorFromObject(t *testing.T) {
NamespacedNames []NamespacedName
}{
{
Name: "annotation without prefix ignored",
Name: "annotation without AnnotationPrefix ignored",
Object: TestAnnotatable{
"foo": "namespace/name",
},
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestAnnotatorToObject(t *testing.T) {
},
},
{
Name: "annotation without prefix ignored",
Name: "annotation without AnnotationPrefix ignored",
Object: TestAnnotatable{
"operatorframework.io/something-else": "",
},
Expand Down
Loading
Loading