Skip to content

Commit

Permalink
refactor: remove unnecessary AddEventHandler error handling (#12917)
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Gilgur <[email protected]>
  • Loading branch information
Anton Gilgur authored Apr 11, 2024
1 parent d62aa26 commit 87a2041
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 63 deletions.
73 changes: 22 additions & 51 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,37 +292,17 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)

var err error
wfc.wfTaskSetInformer, err = wfc.newWorkflowTaskSetInformer()
if err != nil {
log.Fatal(err)
}

wfc.artGCTaskInformer, err = wfc.newArtGCTaskInformer()
if err != nil {
log.Fatal(err)
}

wfc.taskResultInformer, err = wfc.newWorkflowTaskResultInformer()
if err != nil {
log.Fatal(err)
}

err = wfc.addWorkflowInformerHandlers(ctx)
if err != nil {
log.Fatal(err)
}

wfc.podInformer, err = wfc.newPodInformer(ctx)
wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer()
wfc.artGCTaskInformer = wfc.newArtGCTaskInformer()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
err := wfc.addWorkflowInformerHandlers(ctx)
if err != nil {
log.Fatal(err)
}
wfc.podInformer = wfc.newPodInformer(ctx)
wfc.updateEstimatorFactory()

wfc.configMapInformer, err = wfc.newConfigMapInformer()
if err != nil {
log.Fatal(err)
}
wfc.configMapInformer = wfc.newConfigMapInformer()

// Create Synchronization Manager
wfc.createSynchronizationManager(ctx)
Expand Down Expand Up @@ -1219,14 +1199,15 @@ func (wfc *WorkflowController) newWorkflowPodWatch(ctx context.Context) *cache.L
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.SharedIndexInformer, error) {
func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedIndexInformer {
source := wfc.newWorkflowPodWatch(ctx)
informer := cache.NewSharedIndexInformer(source, &apiv1.Pod{}, podResyncPeriod, cache.Indexers{
indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc,
indexes.NodeIDIndex: indexes.MetaNodeIDIndexFunc,
indexes.PodPhaseIndex: indexes.PodPhaseIndexFunc,
})
_, err := informer.AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
err := wfc.enqueueWfFromPodLabel(obj)
Expand Down Expand Up @@ -1264,21 +1245,19 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared
},
},
)
if err != nil {
return nil, err
}
return informer, nil
return informer
}

func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) {
func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer {
indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{
indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc,
}, func(opts *metav1.ListOptions) {
opts.LabelSelector = common.LabelKeyConfigMapType
})
log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins")
if wfc.executorPlugins != nil {
_, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
cm, err := meta.Accessor(obj)
if err != nil {
Expand Down Expand Up @@ -1329,12 +1308,8 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer
},
},
})
if err != nil {
return nil, err
}

}
return indexInformer, nil
return indexInformer
}

// call this func whenever the configuration changes, or when the workflow informer changes
Expand Down Expand Up @@ -1459,7 +1434,7 @@ func (wfc *WorkflowController) syncPodPhaseMetrics() {
}
}

func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.WorkflowTaskSetInformer, error) {
func (wfc *WorkflowController) newWorkflowTaskSetInformer() wfextvv1alpha1.WorkflowTaskSetInformer {
informer := externalversions.NewSharedInformerFactoryWithOptions(
wfc.wfclientset,
workflowTaskSetResyncPeriod,
Expand All @@ -1468,7 +1443,8 @@ func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.Work
r := util.InstanceIDRequirement(wfc.Config.InstanceID)
x.LabelSelector = r.String()
})).Argoproj().V1alpha1().WorkflowTaskSets()
_, err := informer.Informer().AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
Expand All @@ -1477,13 +1453,10 @@ func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.Work
}
},
})
if err != nil {
return nil, err
}
return informer, nil
return informer
}

func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowArtifactGCTaskInformer, error) {
func (wfc *WorkflowController) newArtGCTaskInformer() wfextvv1alpha1.WorkflowArtifactGCTaskInformer {
informer := externalversions.NewSharedInformerFactoryWithOptions(
wfc.wfclientset,
workflowTaskSetResyncPeriod,
Expand All @@ -1492,7 +1465,8 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr
r := util.InstanceIDRequirement(wfc.Config.InstanceID)
x.LabelSelector = r.String()
})).Argoproj().V1alpha1().WorkflowArtifactGCTasks()
_, err := informer.Informer().AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
Expand All @@ -1501,8 +1475,5 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr
}
},
})
if err != nil {
return nil, err
}
return informer, nil
return informer
}
6 changes: 3 additions & 3 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets()
wfc.artGCTaskInformer = informerFactory.Argoproj().V1alpha1().WorkflowArtifactGCTasks()
wfc.taskResultInformer, _ = wfc.newWorkflowTaskResultInformer()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
_ = wfc.addWorkflowInformerHandlers(ctx)
wfc.podInformer, _ = wfc.newPodInformer(ctx)
wfc.configMapInformer, _ = wfc.newConfigMapInformer()
wfc.podInformer = wfc.newPodInformer(ctx)
wfc.configMapInformer = wfc.newConfigMapInformer()
wfc.createSynchronizationManager(ctx)
_ = wfc.initManagers(ctx)

Expand Down
11 changes: 4 additions & 7 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)

func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedIndexInformer, error) {
func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndexInformer {
labelSelector := labels.NewSelector().
Add(*workflowReq).
Add(wfc.instanceIDReq()).
Expand All @@ -36,7 +36,8 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedInde
options.ResourceVersion = ""
},
)
_, err := informer.AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
Expand All @@ -49,14 +50,10 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedInde
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
})
if err != nil {
return nil, err
}
return informer, nil
return informer
}

func (woc *wfOperationCtx) taskResultReconciliation() {

objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")
for _, obj := range objs {
Expand Down
2 changes: 0 additions & 2 deletions workflow/gccontroller/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Controller struct {

// NewController returns a new workflow ttl controller
func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer, metrics *metrics.Metrics, retentionPolicy *config.RetentionPolicy) *Controller {

orderedQueue := map[wfv1.WorkflowPhase]*gcHeap{
wfv1.WorkflowFailed: NewHeap(),
wfv1.WorkflowError: NewHeap(),
Expand Down Expand Up @@ -180,7 +179,6 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {

// enqueueWF conditionally queues a workflow to the ttl queue if it is within the deletion period
func (c *Controller) enqueueWF(obj interface{}) {

un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("'%v' is not an unstructured", obj)
Expand Down

0 comments on commit 87a2041

Please sign in to comment.