Skip to content

Commit

Permalink
*: detect when all objects are labelled, restart
Browse files Browse the repository at this point in the history
When all of the k8s objects that need labels have them, we are good to
exit the process. The next Pod that start up will detect that all labels
are present and be able to filter informers going forward.

Signed-off-by: Steve Kuznetsov <[email protected]>
  • Loading branch information
stevekuznetsov committed Sep 11, 2023
1 parent c0c61fe commit d706d6b
Show file tree
Hide file tree
Showing 25 changed files with 487 additions and 292 deletions.
24 changes: 22 additions & 2 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,31 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
complete := map[schema.GroupVersionResource]bool{}
completeLock := &sync.RWMutex{}

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
complete[gvr] = false
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(sync.ToSyncer()),
queueinformer.WithSyncer(sync(func() bool {
completeLock.Lock()
complete[gvr] = true
allDone := true
for _, done := range complete {
allDone = allDone && done
}
completeLock.Unlock()
return allDone
}).ToSyncer()),
)
if err != nil {
return err
Expand All @@ -409,6 +422,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.Filter(rolesgvk),
roleInformer.Lister().List,
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -425,6 +439,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(rolebindingsgvk),
roleBindingInformer.Lister().List,
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -441,6 +456,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
serviceAccountInformer.Lister().List,
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
Expand All @@ -457,6 +473,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.Filter(servicesgvk),
serviceInformer.Lister().List,
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
Expand All @@ -482,6 +499,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, labeller.Filter(podsgvk),
csPodInformer.Lister().List,
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -519,6 +537,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) {
return configMapInformer.Lister().ConfigMaps(namespace).Get(name)
}),
jobInformer.Lister().List,
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -594,6 +613,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
crdLister.List,
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/operators/labeller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{

func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
okLock := sync.Mutex{}
var ok bool
ok := true
g, ctx := errgroup.WithContext(ctx)
allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{}
for gvr, filter := range filters {
Expand Down Expand Up @@ -108,5 +108,6 @@ func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadat
if err := g.Wait(); err != nil {
return false, err
}
logger.WithField("canFilter", ok).Info("detected ability to filter informers")
return ok, nil
}
161 changes: 101 additions & 60 deletions pkg/controller/operators/labeller/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/sirupsen/logrus"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -40,28 +41,49 @@ func ObjectLabeler[T metav1.Object, A ApplyConfig[A]](
ctx context.Context,
logger *logrus.Logger,
check func(metav1.Object) bool,
list func(options labels.Selector) ([]T, error),
applyConfigFor func(name, namespace string) A,
apply func(namespace string, ctx context.Context, cfg A, opts metav1.ApplyOptions) (T, error),
) queueinformer.LegacySyncHandler {
return func(obj interface{}) error {
cast, ok := obj.(T)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}
) func(done func() bool) queueinformer.LegacySyncHandler {
return func(done func() bool) queueinformer.LegacySyncHandler {
return func(obj interface{}) error {
cast, ok := obj.(T)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}

if !check(cast) || hasLabel(cast) {
return nil
}
if !check(cast) || hasLabel(cast) {
// if the object we're processing does not need us to label it, it's possible that every object that requires
// the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter
// the informers used to drive the controller and stop having to track extraneous objects
items, err := list(labels.Everything())
if err != nil {
logger.WithError(err).Warn("failed to list all objects to check for labelling completion")
return nil
}
gvrFullyLabelled := true
for _, item := range items {
gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item))
}
if gvrFullyLabelled {
allObjectsLabelled := done()
if allObjectsLabelled {
logrus.Fatal("detected that every object is labelled, exiting...")
}
}
return nil
}

cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
cfg.WithLabels(map[string]string{
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
})
cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
cfg.WithLabels(map[string]string{
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
})

_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
return err
_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
return err
}
}
}

Expand All @@ -71,58 +93,77 @@ func ObjectPatchLabeler(
ctx context.Context,
logger *logrus.Logger,
check func(metav1.Object) bool,
list func(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error),
patch func(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *apiextensionsv1.CustomResourceDefinition, err error),
) func(
obj interface{},
) error {
return func(obj interface{}) error {
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}
) func(done func() bool) queueinformer.LegacySyncHandler {
return func(done func() bool) queueinformer.LegacySyncHandler {
return func(obj interface{}) error {
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}

if !check(cast) || hasLabel(cast) {
return nil
}
if !check(cast) || hasLabel(cast) {
// if the object we're processing does not need us to label it, it's possible that every object that requires
// the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter
// the informers used to drive the controller and stop having to track extraneous objects
items, err := list(labels.Everything())
if err != nil {
logger.WithError(err).Warn("failed to list all objects to check for labelling completion")
return nil
}
gvrFullyLabelled := true
for _, item := range items {
gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item))
}
if gvrFullyLabelled {
allObjectsLabelled := done()
if allObjectsLabelled {
logrus.Fatal("detected that every object is labelled, exiting...")
}
}
return nil
}

uid := cast.GetUID()
rv := cast.GetResourceVersion()
uid := cast.GetUID()
rv := cast.GetResourceVersion()

// to ensure they appear in the patch as preconditions
previous := cast.DeepCopy()
previous.SetUID("")
previous.SetResourceVersion("")
// to ensure they appear in the patch as preconditions
previous := cast.DeepCopy()
previous.SetUID("")
previous.SetResourceVersion("")

oldData, err := json.Marshal(previous)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
}
oldData, err := json.Marshal(previous)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
}

// to ensure they appear in the patch as preconditions
updated := cast.DeepCopy()
updated.SetUID(uid)
updated.SetResourceVersion(rv)
labels := updated.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
updated.SetLabels(labels)
// to ensure they appear in the patch as preconditions
updated := cast.DeepCopy()
updated.SetUID(uid)
updated.SetResourceVersion(rv)
labels := updated.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
updated.SetLabels(labels)

newData, err := json.Marshal(updated)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
}
newData, err := json.Marshal(updated)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
}

_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
}
}
}

Expand Down
26 changes: 22 additions & 4 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
Expand Down Expand Up @@ -449,18 +450,31 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}
}

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
complete := map[schema.GroupVersionResource]bool{}
completeLock := &sync.RWMutex{}

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
complete[gvr] = false
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(sync.ToSyncer()),
queueinformer.WithSyncer(sync(func() bool {
completeLock.Lock()
complete[gvr] = true
allDone := true
for _, done := range complete {
allDone = allDone && done
}
completeLock.Unlock()
return allDone
}).ToSyncer()),
)
if err != nil {
return err
Expand All @@ -476,6 +490,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
deploymentsgvk := appsv1.SchemeGroupVersion.WithResource("deployments")
if err := labelObjects(deploymentsgvk, informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
ctx, op.logger, labeller.Filter(deploymentsgvk),
informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Lister().List,
appsv1applyconfigurations.Deployment,
func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) {
return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -548,6 +563,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
clusterrolesgvk := rbacv1.SchemeGroupVersion.WithResource("clusterroles")
if err := labelObjects(clusterrolesgvk, clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
ctx, op.logger, labeller.Filter(clusterrolesgvk),
clusterRoleInformer.Lister().List,
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration {
return rbacv1applyconfigurations.ClusterRole(name)
},
Expand Down Expand Up @@ -577,6 +593,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
clusterrolebindingssgvk := rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")
if err := labelObjects(clusterrolebindingssgvk, clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(clusterrolebindingssgvk),
clusterRoleBindingInformer.Lister().List,
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration {
return rbacv1applyconfigurations.ClusterRoleBinding(name)
},
Expand All @@ -587,8 +604,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

// register namespace queueinformer
namespaceInformer := k8sInformerFactory.Core().V1().Namespaces()
// register namespace queueinformer using a new informer factory - since namespaces won't have the labels
// that other k8s objects will
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces()
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsQueueSet = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
Expand Down
Loading

0 comments on commit d706d6b

Please sign in to comment.