Skip to content

Commit

Permalink
*: detect when all objects are labelled, restart
Browse files Browse the repository at this point in the history
When all of the k8s objects that need labels have them, we are good to
exit the process. The next Pod that start up will detect that all labels
are present and be able to filter informers going forward.

Signed-off-by: Steve Kuznetsov <[email protected]>
  • Loading branch information
stevekuznetsov committed Sep 20, 2023
1 parent a20659f commit a219dac
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 127 deletions.
2 changes: 1 addition & 1 deletion cmd/olm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func main() {
olm.WithExternalClient(crClient),
olm.WithMetadataClient(metadataClient),
olm.WithOperatorClient(opClient),
olm.WithRestConfig(config),
olm.WithRestConfig(validatingConfig),
olm.WithConfigClient(versionedConfigClient),
olm.WithProtectedCopiedCSVNamespaces(*protectedCopiedCSVNamespaces),
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/install/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ func (i *StrategyDeploymentInstaller) deploymentForSpec(name string, spec appsv1
dep.Spec.Template.SetAnnotations(annotations)

// Set custom labels before CSV owner labels
dep.SetLabels(specLabels)
if dep.Labels == nil {
dep.Labels = map[string]string{}
}
dep.Labels[OLMManagedLabelKey] = OLMManagedLabelValue
dep.SetLabels(specLabels)

ownerutil.AddNonBlockingOwner(dep, i.owner)
ownerutil.AddOwnerLabelsForKind(dep, i.owner, v1alpha1.ClusterServiceVersionKind)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/install/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func TestInstallStrategyDeploymentCheckInstallErrors(t *testing.T) {
dep.Spec.Template.SetAnnotations(map[string]string{"test": "annotation"})
dep.Spec.RevisionHistoryLimit = &revisionHistoryLimit
dep.SetLabels(labels.CloneAndAddLabel(dep.ObjectMeta.GetLabels(), DeploymentSpecHashLabelKey, HashDeploymentSpec(dep.Spec)))
dep.Labels[OLMManagedLabelKey] = OLMManagedLabelValue
dep.Status.Conditions = append(dep.Status.Conditions, appsv1.DeploymentCondition{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionTrue,
Expand Down
40 changes: 36 additions & 4 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
catalogSubscriberIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient),
clientAttenuator: scoped.NewClientAttenuator(logger, validatingConfig, opClient),
installPlanTimeout: installPlanTimeout,
bundleUnpackTimeout: bundleUnpackTimeout,
clientFactory: clients.NewFactory(config),
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
Expand Down Expand Up @@ -380,10 +380,22 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
complete := map[schema.GroupVersionResource][]bool{}
completeLock := &sync.Mutex{}

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
var idx int
if _, exists := complete[gvr]; exists {
idx = len(complete[gvr])
complete[gvr] = append(complete[gvr], false)
} else {
idx = 0
complete[gvr] = []bool{false}
}

queue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
Expand All @@ -392,7 +404,18 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
queueinformer.WithQueue(queue),
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(sync.ToSyncer()),
queueinformer.WithSyncer(sync(func() bool {
completeLock.Lock()
complete[gvr][idx] = true
allDone := true
for _, items := range complete {
for _, done := range items {
allDone = allDone && done
}
}
completeLock.Unlock()
return allDone
}).ToSyncer()),
)
if err != nil {
return err
Expand All @@ -408,6 +431,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.Filter(rolesgvk),
roleInformer.Lister().List,
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -420,6 +444,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
func(role *rbacv1.Role) (string, error) {
return resolver.PolicyRuleHashLabelValue(role.Rules)
},
roleInformer.Lister().List,
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -436,6 +461,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(rolebindingsgvk),
roleBindingInformer.Lister().List,
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -448,6 +474,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
func(roleBinding *rbacv1.RoleBinding) (string, error) {
return resolver.RoleReferenceAndSubjectHashLabelValue(roleBinding.RoleRef, roleBinding.Subjects)
},
roleBindingInformer.Lister().List,
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -464,6 +491,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
serviceAccountInformer.Lister().List,
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
Expand All @@ -480,6 +508,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.Filter(servicesgvk),
serviceInformer.Lister().List,
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
Expand All @@ -505,6 +534,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, labeller.Filter(podsgvk),
csPodInformer.Lister().List,
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -542,6 +572,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) {
return configMapInformer.Lister().ConfigMaps(namespace).Get(name)
}),
jobInformer.Lister().List,
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -617,6 +648,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
crdLister.List,
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/operators/labeller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{

func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
okLock := sync.Mutex{}
var ok bool
ok := true
g, ctx := errgroup.WithContext(ctx)
allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{}
for gvr, filter := range filters {
Expand Down Expand Up @@ -124,5 +124,6 @@ func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadat
if err := g.Wait(); err != nil {
return false, err
}
logger.WithField("canFilter", ok).Info("detected ability to filter informers")
return ok, nil
}
163 changes: 103 additions & 60 deletions pkg/controller/operators/labeller/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strings"

jsonpatch "github.com/evanphx/json-patch"
"github.com/sirupsen/logrus"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -40,28 +42,50 @@ func ObjectLabeler[T metav1.Object, A ApplyConfig[A]](
ctx context.Context,
logger *logrus.Logger,
check func(metav1.Object) bool,
list func(options labels.Selector) ([]T, error),
applyConfigFor func(name, namespace string) A,
apply func(namespace string, ctx context.Context, cfg A, opts metav1.ApplyOptions) (T, error),
) queueinformer.LegacySyncHandler {
return func(obj interface{}) error {
cast, ok := obj.(T)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}
) func(done func() bool) queueinformer.LegacySyncHandler {
return func(done func() bool) queueinformer.LegacySyncHandler {
return func(obj interface{}) error {
cast, ok := obj.(T)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}

if !check(cast) || hasLabel(cast) {
return nil
}
if !check(cast) || hasLabel(cast) {
// if the object we're processing does not need us to label it, it's possible that every object that requires
// the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter
// the informers used to drive the controller and stop having to track extraneous objects
items, err := list(labels.Everything())
if err != nil {
logger.WithError(err).Warn("failed to list all objects to check for labelling completion")
return nil
}
gvrFullyLabelled := true
for _, item := range items {
gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item))
}
if gvrFullyLabelled {
allObjectsLabelled := done()
if allObjectsLabelled {
logrus.Info("detected that every object is labelled, exiting to re-start the process...")
os.Exit(0)
}
}
return nil
}

cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
cfg.WithLabels(map[string]string{
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
})
cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
cfg.WithLabels(map[string]string{
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
})

_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
return err
_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
return err
}
}
}

Expand All @@ -71,58 +95,77 @@ func ObjectPatchLabeler(
ctx context.Context,
logger *logrus.Logger,
check func(metav1.Object) bool,
list func(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error),
patch func(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *apiextensionsv1.CustomResourceDefinition, err error),
) func(
obj interface{},
) error {
return func(obj interface{}) error {
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}
) func(done func() bool) queueinformer.LegacySyncHandler {
return func(done func() bool) queueinformer.LegacySyncHandler {
return func(obj interface{}) error {
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
logger.WithError(err).Error("casting failed")
return fmt.Errorf("casting failed: %w", err)
}

if !check(cast) || hasLabel(cast) {
return nil
}
if !check(cast) || hasLabel(cast) {
// if the object we're processing does not need us to label it, it's possible that every object that requires
// the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter
// the informers used to drive the controller and stop having to track extraneous objects
items, err := list(labels.Everything())
if err != nil {
logger.WithError(err).Warn("failed to list all objects to check for labelling completion")
return nil
}
gvrFullyLabelled := true
for _, item := range items {
gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item))
}
if gvrFullyLabelled {
allObjectsLabelled := done()
if allObjectsLabelled {
logrus.Fatal("detected that every object is labelled, exiting...")
}
}
return nil
}

uid := cast.GetUID()
rv := cast.GetResourceVersion()
uid := cast.GetUID()
rv := cast.GetResourceVersion()

// to ensure they appear in the patch as preconditions
previous := cast.DeepCopy()
previous.SetUID("")
previous.SetResourceVersion("")
// to ensure they appear in the patch as preconditions
previous := cast.DeepCopy()
previous.SetUID("")
previous.SetResourceVersion("")

oldData, err := json.Marshal(previous)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
}
oldData, err := json.Marshal(previous)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
}

// to ensure they appear in the patch as preconditions
updated := cast.DeepCopy()
updated.SetUID(uid)
updated.SetResourceVersion(rv)
labels := updated.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
updated.SetLabels(labels)
// to ensure they appear in the patch as preconditions
updated := cast.DeepCopy()
updated.SetUID(uid)
updated.SetResourceVersion(rv)
labels := updated.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
updated.SetLabels(labels)

newData, err := json.Marshal(updated)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
}
newData, err := json.Marshal(updated)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
}

_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
}
}
}

Expand Down
Loading

0 comments on commit a219dac

Please sign in to comment.