From f7efe1448ce9f51e379d4a99dd78f6a62c1a4eba Mon Sep 17 00:00:00 2001 From: morvencao Date: Wed, 31 Jul 2024 06:25:46 +0000 Subject: [PATCH] add GC for cloudevents work client. Signed-off-by: morvencao --- pkg/addonmanager/cloudevents/manager.go | 27 +- .../metadata/metadatainformer/informer.go | 215 +++++++++++ .../metadata/metadatainformer/interface.go | 53 +++ .../metadata/metadatalister/interface.go | 40 ++ .../metadata/metadatalister/lister.go | 91 +++++ .../client-go/metadata/metadatalister/shim.go | 87 +++++ vendor/modules.txt | 3 + .../work/garbagecollector/garbagecollector.go | 354 ++++++++++++++++++ 8 files changed, 869 insertions(+), 1 deletion(-) create mode 100644 vendor/k8s.io/client-go/metadata/metadatainformer/informer.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatainformer/interface.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatalister/interface.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatalister/lister.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatalister/shim.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/garbagecollector/garbagecollector.go diff --git a/pkg/addonmanager/cloudevents/manager.go b/pkg/addonmanager/cloudevents/manager.go index 5bc2915e..2f0e1687 100644 --- a/pkg/addonmanager/cloudevents/manager.go +++ b/pkg/addonmanager/cloudevents/manager.go @@ -6,10 +6,12 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" @@ -25,6 +27,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/constants" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/garbagecollector" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" ) @@ -49,6 +52,7 @@ func (a *cloudeventsAddonManager) Start(ctx context.Context) error { // ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different // driver configuration. // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. + var clientHolder *cloudeventswork.ClientHolder var workClient workclientset.Interface var watcherStore *store.SourceInformerWatcherStore var err error @@ -67,7 +71,7 @@ func (a *cloudeventsAddonManager) Start(ctx context.Context) error { return err } - clientHolder, err := cloudeventswork.NewClientHolderBuilder(clientConfig). + clientHolder, err = cloudeventswork.NewClientHolderBuilder(clientConfig). WithClientID(a.options.CloudEventsClientID). WithSourceID(a.options.SourceID). WithCodecs(codec.NewManifestBundleCodec()). @@ -104,6 +108,11 @@ func (a *cloudeventsAddonManager) Start(ctx context.Context) error { return err } + metadataClient, err := metadata.NewForConfig(config) + if err != nil { + return err + } + dynamicClient, err := dynamic.NewForConfig(config) if err != nil { return err @@ -178,6 +187,22 @@ func (a *cloudeventsAddonManager) Start(ctx context.Context) error { return err } + // Start the garbage collector for work clients with cloudevents driver + if a.options.WorkDriver == constants.ConfigTypeGRPC || a.options.WorkDriver == constants.ConfigTypeMQTT { + if len(addonNames) == 0 { + return fmt.Errorf("no addon names provided") + } + addonName := addonNames[0] + listOptions := &metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", addonName), + } + ownerGVRFilters := map[schema.GroupVersionResource]*metav1.ListOptions{ + addonv1alpha1.SchemeGroupVersion.WithResource("managedclusteraddons"): listOptions, + } + garbageCollector := garbagecollector.NewGarbageCollector(clientHolder, workInformers, metadataClient, ownerGVRFilters) + go garbageCollector.Run(ctx, 1) + } + err = a.StartWithInformers(ctx, workClient, workInformers, kubeInformers, addonInformers, clusterInformers, dynamicInformers) if err != nil { return err diff --git a/vendor/k8s.io/client-go/metadata/metadatainformer/informer.go b/vendor/k8s.io/client-go/metadata/metadatainformer/informer.go new file mode 100644 index 00000000..ff3537e9 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatainformer/informer.go @@ -0,0 +1,215 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatainformer + +import ( + "context" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatalister" + "k8s.io/client-go/tools/cache" +) + +// SharedInformerOption defines the functional option type for metadataSharedInformerFactory. +type SharedInformerOption func(*metadataSharedInformerFactory) *metadataSharedInformerFactory + +// WithTransform sets a transform on all informers. +func WithTransform(transform cache.TransformFunc) SharedInformerOption { + return func(factory *metadataSharedInformerFactory) *metadataSharedInformerFactory { + factory.transform = transform + return factory + } +} + +// NewSharedInformerFactory constructs a new instance of metadataSharedInformerFactory for all namespaces. +func NewSharedInformerFactory(client metadata.Interface, defaultResync time.Duration) SharedInformerFactory { + return NewFilteredSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil) +} + +// NewFilteredSharedInformerFactory constructs a new instance of metadataSharedInformerFactory. +// Listers obtained via this factory will be subject to the same filters as specified here. +func NewFilteredSharedInformerFactory(client metadata.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) SharedInformerFactory { + return &metadataSharedInformerFactory{ + client: client, + defaultResync: defaultResync, + namespace: namespace, + informers: map[schema.GroupVersionResource]informers.GenericInformer{}, + startedInformers: make(map[schema.GroupVersionResource]bool), + tweakListOptions: tweakListOptions, + } +} + +// NewSharedInformerFactoryWithOptions constructs a new instance of metadataSharedInformerFactory with additional options. +func NewSharedInformerFactoryWithOptions(client metadata.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { + factory := &metadataSharedInformerFactory{ + client: client, + namespace: v1.NamespaceAll, + defaultResync: defaultResync, + informers: map[schema.GroupVersionResource]informers.GenericInformer{}, + startedInformers: make(map[schema.GroupVersionResource]bool), + } + + // Apply all options + for _, opt := range options { + factory = opt(factory) + } + + return factory +} + +type metadataSharedInformerFactory struct { + client metadata.Interface + defaultResync time.Duration + namespace string + transform cache.TransformFunc + + lock sync.Mutex + informers map[schema.GroupVersionResource]informers.GenericInformer + // startedInformers is used for tracking which informers have been started. + // This allows Start() to be called multiple times safely. + startedInformers map[schema.GroupVersionResource]bool + tweakListOptions TweakListOptionsFunc + // wg tracks how many goroutines were started. + wg sync.WaitGroup + // shuttingDown is true when Shutdown has been called. It may still be running + // because it needs to wait for goroutines. + shuttingDown bool +} + +var _ SharedInformerFactory = &metadataSharedInformerFactory{} + +func (f *metadataSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer { + f.lock.Lock() + defer f.lock.Unlock() + + key := gvr + informer, exists := f.informers[key] + if exists { + return informer + } + + informer = NewFilteredMetadataInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) + informer.Informer().SetTransform(f.transform) + f.informers[key] = informer + + return informer +} + +// Start initializes all requested informers. +func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.shuttingDown { + return + } + + for informerType, informer := range f.informers { + if !f.startedInformers[informerType] { + f.wg.Add(1) + // We need a new variable in each loop iteration, + // otherwise the goroutine would use the loop variable + // and that keeps changing. + informer := informer.Informer() + go func() { + defer f.wg.Done() + informer.Run(stopCh) + }() + f.startedInformers[informerType] = true + } + } +} + +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { + informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer.Informer() + } + } + return informers + }() + + res := map[schema.GroupVersionResource]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + +func (f *metadataSharedInformerFactory) Shutdown() { + // Will return immediately if there is nothing to wait for. + defer f.wg.Wait() + + f.lock.Lock() + defer f.lock.Unlock() + f.shuttingDown = true +} + +// NewFilteredMetadataInformer constructs a new informer for a metadata type. +func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { + return &metadataInformer{ + gvr: gvr, + informer: cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options) + }, + }, + &metav1.PartialObjectMetadata{}, + resyncPeriod, + indexers, + ), + } +} + +type metadataInformer struct { + informer cache.SharedIndexInformer + gvr schema.GroupVersionResource +} + +var _ informers.GenericInformer = &metadataInformer{} + +func (d *metadataInformer) Informer() cache.SharedIndexInformer { + return d.informer +} + +func (d *metadataInformer) Lister() cache.GenericLister { + return metadatalister.NewRuntimeObjectShim(metadatalister.New(d.informer.GetIndexer(), d.gvr)) +} diff --git a/vendor/k8s.io/client-go/metadata/metadatainformer/interface.go b/vendor/k8s.io/client-go/metadata/metadatainformer/interface.go new file mode 100644 index 00000000..9f61706c --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatainformer/interface.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatainformer + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" +) + +// SharedInformerFactory provides access to a shared informer and lister for dynamic client +type SharedInformerFactory interface { + // Start initializes all requested informers. They are handled in goroutines + // which run until the stop channel gets closed. + Start(stopCh <-chan struct{}) + + // ForResource gives generic access to a shared informer of the matching type. + ForResource(gvr schema.GroupVersionResource) informers.GenericInformer + + // WaitForCacheSync blocks until all started informers' caches were synced + // or the stop channel gets closed. + WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool + + // Shutdown marks a factory as shutting down. At that point no new + // informers can be started anymore and Start will return without + // doing anything. + // + // In addition, Shutdown blocks until all goroutines have terminated. For that + // to happen, the close channel(s) that they were started with must be closed, + // either before Shutdown gets called or while it is waiting. + // + // Shutdown may be called multiple times, even concurrently. All such calls will + // block until all goroutines have terminated. + Shutdown() +} + +// TweakListOptionsFunc defines the signature of a helper function +// that wants to provide more listing options to API +type TweakListOptionsFunc func(*metav1.ListOptions) diff --git a/vendor/k8s.io/client-go/metadata/metadatalister/interface.go b/vendor/k8s.io/client-go/metadata/metadatalister/interface.go new file mode 100644 index 00000000..bb354858 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatalister/interface.go @@ -0,0 +1,40 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatalister + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// Lister helps list resources. +type Lister interface { + // List lists all resources in the indexer. + List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) + // Get retrieves a resource from the indexer with the given name + Get(name string) (*metav1.PartialObjectMetadata, error) + // Namespace returns an object that can list and get resources in a given namespace. + Namespace(namespace string) NamespaceLister +} + +// NamespaceLister helps list and get resources. +type NamespaceLister interface { + // List lists all resources in the indexer for a given namespace. + List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) + // Get retrieves a resource from the indexer for a given namespace and name. + Get(name string) (*metav1.PartialObjectMetadata, error) +} diff --git a/vendor/k8s.io/client-go/metadata/metadatalister/lister.go b/vendor/k8s.io/client-go/metadata/metadatalister/lister.go new file mode 100644 index 00000000..faeccc0f --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatalister/lister.go @@ -0,0 +1,91 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatalister + +import ( + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +var _ Lister = &metadataLister{} +var _ NamespaceLister = &metadataNamespaceLister{} + +// metadataLister implements the Lister interface. +type metadataLister struct { + indexer cache.Indexer + gvr schema.GroupVersionResource +} + +// New returns a new Lister. +func New(indexer cache.Indexer, gvr schema.GroupVersionResource) Lister { + return &metadataLister{indexer: indexer, gvr: gvr} +} + +// List lists all resources in the indexer. +func (l *metadataLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) { + err = cache.ListAll(l.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*metav1.PartialObjectMetadata)) + }) + return ret, err +} + +// Get retrieves a resource from the indexer with the given name +func (l *metadataLister) Get(name string) (*metav1.PartialObjectMetadata, error) { + obj, exists, err := l.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(l.gvr.GroupResource(), name) + } + return obj.(*metav1.PartialObjectMetadata), nil +} + +// Namespace returns an object that can list and get resources from a given namespace. +func (l *metadataLister) Namespace(namespace string) NamespaceLister { + return &metadataNamespaceLister{indexer: l.indexer, namespace: namespace, gvr: l.gvr} +} + +// metadataNamespaceLister implements the NamespaceLister interface. +type metadataNamespaceLister struct { + indexer cache.Indexer + namespace string + gvr schema.GroupVersionResource +} + +// List lists all resources in the indexer for a given namespace. +func (l *metadataNamespaceLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) { + err = cache.ListAllByNamespace(l.indexer, l.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*metav1.PartialObjectMetadata)) + }) + return ret, err +} + +// Get retrieves a resource from the indexer for a given namespace and name. +func (l *metadataNamespaceLister) Get(name string) (*metav1.PartialObjectMetadata, error) { + obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(l.gvr.GroupResource(), name) + } + return obj.(*metav1.PartialObjectMetadata), nil +} diff --git a/vendor/k8s.io/client-go/metadata/metadatalister/shim.go b/vendor/k8s.io/client-go/metadata/metadatalister/shim.go new file mode 100644 index 00000000..f31c6072 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatalister/shim.go @@ -0,0 +1,87 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatalister + +import ( + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ cache.GenericLister = &metadataListerShim{} +var _ cache.GenericNamespaceLister = &metadataNamespaceListerShim{} + +// metadataListerShim implements the cache.GenericLister interface. +type metadataListerShim struct { + lister Lister +} + +// NewRuntimeObjectShim returns a new shim for Lister. +// It wraps Lister so that it implements cache.GenericLister interface +func NewRuntimeObjectShim(lister Lister) cache.GenericLister { + return &metadataListerShim{lister: lister} +} + +// List will return all objects across namespaces +func (s *metadataListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { + objs, err := s.lister.List(selector) + if err != nil { + return nil, err + } + + ret = make([]runtime.Object, len(objs)) + for index, obj := range objs { + ret[index] = obj + } + return ret, err +} + +// Get will attempt to retrieve assuming that name==key +func (s *metadataListerShim) Get(name string) (runtime.Object, error) { + return s.lister.Get(name) +} + +func (s *metadataListerShim) ByNamespace(namespace string) cache.GenericNamespaceLister { + return &metadataNamespaceListerShim{ + namespaceLister: s.lister.Namespace(namespace), + } +} + +// metadataNamespaceListerShim implements the NamespaceLister interface. +// It wraps NamespaceLister so that it implements cache.GenericNamespaceLister interface +type metadataNamespaceListerShim struct { + namespaceLister NamespaceLister +} + +// List will return all objects in this namespace +func (ns *metadataNamespaceListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { + objs, err := ns.namespaceLister.List(selector) + if err != nil { + return nil, err + } + + ret = make([]runtime.Object, len(objs)) + for index, obj := range objs { + ret[index] = obj + } + return ret, err +} + +// Get will attempt to retrieve by namespace and name +func (ns *metadataNamespaceListerShim) Get(name string) (runtime.Object, error) { + return ns.namespaceLister.Get(name) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f8139c9b..468a1243 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1244,6 +1244,8 @@ k8s.io/client-go/listers/storage/v1alpha1 k8s.io/client-go/listers/storage/v1beta1 k8s.io/client-go/listers/storagemigration/v1alpha1 k8s.io/client-go/metadata +k8s.io/client-go/metadata/metadatainformer +k8s.io/client-go/metadata/metadatalister k8s.io/client-go/openapi k8s.io/client-go/openapi/cached k8s.io/client-go/pkg/apis/clientauthentication @@ -1433,6 +1435,7 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister open-cluster-management.io/sdk-go/pkg/cloudevents/work/common +open-cluster-management.io/sdk-go/pkg/cloudevents/work/garbagecollector open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/garbagecollector/garbagecollector.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/garbagecollector/garbagecollector.go new file mode 100644 index 00000000..15c6983f --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/garbagecollector/garbagecollector.go @@ -0,0 +1,354 @@ +package garbagecollector + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + "k8s.io/apimachinery/pkg/util/wait" + + workv1 "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" + workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" + workapiv1 "open-cluster-management.io/api/work/v1" + cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work" +) + +const ( + manifestWorkByOwner = "manifestWorkByOwner" +) + +// monitor watches resource changes and enqueues the changes for processing. +type monitor struct { + cache.Controller + cache.SharedIndexInformer +} + +type monitors map[schema.GroupVersionResource]*monitor + +// dependent is a struct that holds the owner UID and the dependent manifestwork namespaced name. +type dependent struct { + ownerUID types.UID + namespacedName types.NamespacedName +} + +// The GarbageCollector controller monitors manifestworks and associated owner resources, +// managing the relationship between them and deleting manifestworks when all owner resources are removed. +// It currently supports only background deletion policy, lacking support for foreground and orphan policies. +// To prevent overwhelming the API server, the garbage collector operates with rate limiting. +// It is designed to run alongside the cloudevents source work client, eg. each addon controller +// utilizing the cloudevents driver should be accompanied by its own garbage collector. +type GarbageCollector struct { + // workClient from cloudevents client builder + workClient workv1.WorkV1Interface + // workIndexer to index manifestwork by owner resources + workIndexer cache.Indexer + // workInformer from cloudevents client builder + workInformer workv1informers.ManifestWorkInformer + // metadataClient to operate on the owner resources + metadataClient metadata.Interface + // owner resource and filter pairs + ownerGVRFilters map[schema.GroupVersionResource]*metav1.ListOptions + // each monitor list/watches a resource (including manifestwork) + monitors monitors + // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe. + attemptToDelete workqueue.RateLimitingInterface +} + +// NewGarbageCollector creates a new garbage collector instance. +func NewGarbageCollector( + workClientHolder *cloudeventswork.ClientHolder, + workInformer workv1informers.ManifestWorkInformer, + metadataClient metadata.Interface, + ownerGVRFilters map[schema.GroupVersionResource]*metav1.ListOptions) *GarbageCollector { + + workClient := workClientHolder.WorkInterface().WorkV1() + if err := workInformer.Informer().AddIndexers(cache.Indexers{ + manifestWorkByOwner: indexManifestWorkByOwner, + }); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to add indexers: %v", err)) + } + + return &GarbageCollector{ + workClient: workClient, + workIndexer: workInformer.Informer().GetIndexer(), + workInformer: workInformer, + metadataClient: metadataClient, + ownerGVRFilters: ownerGVRFilters, + attemptToDelete: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete"), + } +} + +// Run starts garbage collector monitors and workers. +func (gc *GarbageCollector) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer gc.attemptToDelete.ShutDown() + + logger := klog.FromContext(ctx) + logger.Info("Starting garbage collector") + defer logger.Info("Shutting down garbage collector") + + // start monitors + if err := gc.startMonitors(ctx, logger); err != nil { + logger.Error(err, "Failed to start monitors") + return + } + + // wait for the controller cache to sync + if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool { + return gc.hasSynced(logger) + }) { + return + } + logger.Info("All resource monitors have synced, proceeding to collect garbage") + + // run gc workers to process attemptToDelete queue. + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second) + } + + <-ctx.Done() +} + +// startMonitors starts the monitor list/watches a resource (including manifestwork) +func (gc *GarbageCollector) startMonitors(ctx context.Context, logger klog.Logger) error { + logger.Info("Starting monitors") + gc.monitors = make(monitors) + // add monitor for manifestwork + gc.monitors[workapiv1.SchemeGroupVersion.WithResource("manifestworks")] = &monitor{ + Controller: gc.workInformer.Informer().GetController(), + SharedIndexInformer: gc.workInformer.Informer(), + } + + // add monitor for owner resources + for gvr, listOptions := range gc.ownerGVRFilters { + monitor, err := gc.monitorFor(logger, gvr, listOptions) + if err != nil { + return err + } + gc.monitors[gvr] = monitor + } + + // start monitors + started := 0 + for _, monitor := range gc.monitors { + go monitor.Controller.Run(ctx.Done()) + go monitor.SharedIndexInformer.Run(ctx.Done()) + started++ + } + + logger.V(4).Info("Started monitors", "started", started, "total", len(gc.monitors)) + return nil +} + +// monitorFor creates monitor for owner resource +func (gc *GarbageCollector) monitorFor(logger klog.Logger, gvr schema.GroupVersionResource, listOptions *metav1.ListOptions) (*monitor, error) { + handlers := cache.ResourceEventHandlerFuncs{ + // TODO: Handle the case where the owner resource is deleted + // while the garbage collector is restarting. + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(oldObj, newObj interface{}) {}, + DeleteFunc: func(obj interface{}) { + // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it + if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = deletedFinalStateUnknown.Obj + } + accessor, err := meta.Accessor(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) + return + } + + ownerUID := accessor.GetUID() + objs, err := gc.workIndexer.ByIndex(manifestWorkByOwner, string(ownerUID)) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get manifestwork by owner UID index: %v", err)) + return + } + + for _, o := range objs { + manifestWork, ok := o.(*workapiv1.ManifestWork) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect a *ManifestWork, got %v", o)) + continue + } + namesapcedName := types.NamespacedName{Namespace: manifestWork.Namespace, Name: manifestWork.Name} + logger.V(4).Info("enqueue manifestWork because of owner deletion", "manifestwork", namesapcedName, "owner UID", ownerUID) + gc.attemptToDelete.Add(&dependent{ownerUID: ownerUID, namespacedName: namesapcedName}) + } + }, + } + + // create informer for owner resource with GVR and listOptions. + informer := metadatainformer.NewFilteredMetadataInformer(gc.metadataClient, gvr, + metav1.NamespaceAll, 10*time.Minute, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + if listOptions != nil { + options.FieldSelector = listOptions.FieldSelector + options.LabelSelector = listOptions.LabelSelector + } + }) + if _, err := informer.Informer().AddEventHandlerWithResyncPeriod(handlers, 0); err != nil { + return nil, err + } + return &monitor{ + Controller: informer.Informer().GetController(), + SharedIndexInformer: informer.Informer(), + }, nil +} + +// HasSynced returns true if any monitors exist AND all those monitors' +// controllers HasSynced functions return true. +func (gc *GarbageCollector) hasSynced(logger klog.Logger) bool { + if len(gc.monitors) == 0 { + logger.V(4).Info("garbage collector monitors are not synced: no monitors") + return false + } + + for resource, monitor := range gc.monitors { + if !monitor.Controller.HasSynced() { + logger.V(4).Info("garbage controller monitor is not yet synced", "resource", resource) + return false + } + } + + return true +} + +// runAttemptToDeleteWorker start work to process the attemptToDelete queue. +func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) { + for gc.processAttemptToDeleteWorker(ctx) { + } +} + +func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool { + item, quit := gc.attemptToDelete.Get() + if quit { + return false + } + defer gc.attemptToDelete.Done(item) + + action := gc.attemptToDeleteWorker(ctx, item) + switch action { + case forgetItem: + gc.attemptToDelete.Forget(item) + case requeueItem: + gc.attemptToDelete.AddRateLimited(item) + } + + return true +} + +type workQueueItemAction int + +const ( + requeueItem = iota + forgetItem +) + +func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item interface{}) workQueueItemAction { + dep, ok := item.(*dependent) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect a *dependent, got %v", item)) + return forgetItem + } + + logger := klog.FromContext(ctx) + logger.V(4).Info("Attempting to delete manifestwork", "ownerUID", dep.ownerUID, "namespacedName", dep.namespacedName) + + latest, err := gc.workClient.ManifestWorks(dep.namespacedName.Namespace).Get(ctx, dep.namespacedName.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + logger.V(4).Info("Manifestwork not found, skipping", "manifestwork", dep.namespacedName) + return forgetItem + } + return requeueItem + } + + ownerReferences := latest.GetOwnerReferences() + found := false + for _, owner := range ownerReferences { + if owner.UID == dep.ownerUID { + found = true + break + } + } + + if found { + // if the deleted owner reference is the only owner reference then delete the manifestwork + if len(ownerReferences) == 1 { + logger.V(4).Info("All owner references are deleted for manifestwork, deleting the manifestwork itself", "manifestwork", dep.namespacedName) + if err := gc.workClient.ManifestWorks(dep.namespacedName.Namespace).Delete(ctx, dep.namespacedName.Name, metav1.DeleteOptions{}); err != nil { + return requeueItem + } + return forgetItem + } + + // remove the owner reference from the manifestwork + logger.V(4).Info("Removing owner reference from manifestwork", "owner", dep.ownerUID, "manifestwork", dep.namespacedName) + jmp, err := generateDeleteOwnerRefJSONMergePatch(latest, dep.ownerUID) + if err != nil { + logger.Error(err, "Failed to generate JSON merge patch", "error") + return requeueItem + } + if _, err = gc.workClient.ManifestWorks(dep.namespacedName.Namespace).Patch(ctx, dep.namespacedName.Name, types.MergePatchType, jmp, metav1.PatchOptions{}); err != nil { + logger.Error(err, "Failed to patch manifestwork with json patch") + return requeueItem + } + logger.V(4).Info("Successfully removed owner reference from manifestwork", "owner", dep.ownerUID, "manifestwork", dep.namespacedName) + } + + return forgetItem +} + +func indexManifestWorkByOwner(obj interface{}) ([]string, error) { + manifestWork, ok := obj.(*workapiv1.ManifestWork) + if !ok { + return []string{}, fmt.Errorf("obj %T is not a ManifestWork", obj) + } + + var ownerKeys []string + for _, ownerRef := range manifestWork.GetOwnerReferences() { + ownerKeys = append(ownerKeys, string(ownerRef.UID)) + } + + return ownerKeys, nil +} + +// objectForOwnerRefsPatch defines object struct for owner references patch operation. +type objectForOwnerRefsPatch struct { + ObjectMetaForOwnerRefsPatch `json:"metadata"` +} + +// ObjectMetaForOwnerRefsPatch defines object meta struct for owner references patch operation. +type ObjectMetaForOwnerRefsPatch struct { + ResourceVersion string `json:"resourceVersion"` + OwnerReferences []metav1.OwnerReference `json:"ownerReferences"` +} + +// returns JSON merge patch that removes the ownerReferences matching ownerUID. +func generateDeleteOwnerRefJSONMergePatch(obj metav1.Object, ownerUID types.UID) ([]byte, error) { + expectedObjectMeta := objectForOwnerRefsPatch{} + expectedObjectMeta.ResourceVersion = obj.GetResourceVersion() + refs := obj.GetOwnerReferences() + for _, ref := range refs { + if ref.UID != ownerUID { + expectedObjectMeta.OwnerReferences = append(expectedObjectMeta.OwnerReferences, ref) + } + } + return json.Marshal(expectedObjectMeta) +}