diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1c8764a081a8..e738f5f64504 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -292,37 +292,17 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace) - var err error - wfc.wfTaskSetInformer, err = wfc.newWorkflowTaskSetInformer() - if err != nil { - log.Fatal(err) - } - - wfc.artGCTaskInformer, err = wfc.newArtGCTaskInformer() - if err != nil { - log.Fatal(err) - } - - wfc.taskResultInformer, err = wfc.newWorkflowTaskResultInformer() - if err != nil { - log.Fatal(err) - } - - err = wfc.addWorkflowInformerHandlers(ctx) - if err != nil { - log.Fatal(err) - } - - wfc.podInformer, err = wfc.newPodInformer(ctx) + wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer() + wfc.artGCTaskInformer = wfc.newArtGCTaskInformer() + wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer() + err := wfc.addWorkflowInformerHandlers(ctx) if err != nil { log.Fatal(err) } + wfc.podInformer = wfc.newPodInformer(ctx) wfc.updateEstimatorFactory() - wfc.configMapInformer, err = wfc.newConfigMapInformer() - if err != nil { - log.Fatal(err) - } + wfc.configMapInformer = wfc.newConfigMapInformer() // Create Synchronization Manager wfc.createSynchronizationManager(ctx) @@ -1219,14 +1199,15 @@ func (wfc *WorkflowController) newWorkflowPodWatch(ctx context.Context) *cache.L return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} } -func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedIndexInformer { source := wfc.newWorkflowPodWatch(ctx) informer := cache.NewSharedIndexInformer(source, &apiv1.Pod{}, podResyncPeriod, cache.Indexers{ indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc, indexes.NodeIDIndex: indexes.MetaNodeIDIndexFunc, indexes.PodPhaseIndex: indexes.PodPhaseIndexFunc, }) - _, err := informer.AddEventHandler( + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { err := wfc.enqueueWfFromPodLabel(obj) @@ -1264,13 +1245,10 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared }, }, ) - if err != nil { - return nil, err - } - return informer, nil + return informer } -func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }, func(opts *metav1.ListOptions) { @@ -1278,7 +1256,8 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer }) log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") if wfc.executorPlugins != nil { - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cm, err := meta.Accessor(obj) if err != nil { @@ -1329,12 +1308,8 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer }, }, }) - if err != nil { - return nil, err - } - } - return indexInformer, nil + return indexInformer } // call this func whenever the configuration changes, or when the workflow informer changes @@ -1459,7 +1434,7 @@ func (wfc *WorkflowController) syncPodPhaseMetrics() { } } -func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.WorkflowTaskSetInformer, error) { +func (wfc *WorkflowController) newWorkflowTaskSetInformer() wfextvv1alpha1.WorkflowTaskSetInformer { informer := externalversions.NewSharedInformerFactoryWithOptions( wfc.wfclientset, workflowTaskSetResyncPeriod, @@ -1468,7 +1443,8 @@ func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.Work r := util.InstanceIDRequirement(wfc.Config.InstanceID) x.LabelSelector = r.String() })).Argoproj().V1alpha1().WorkflowTaskSets() - _, err := informer.Informer().AddEventHandler( + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + informer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) @@ -1477,13 +1453,10 @@ func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.Work } }, }) - if err != nil { - return nil, err - } - return informer, nil + return informer } -func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowArtifactGCTaskInformer, error) { +func (wfc *WorkflowController) newArtGCTaskInformer() wfextvv1alpha1.WorkflowArtifactGCTaskInformer { informer := externalversions.NewSharedInformerFactoryWithOptions( wfc.wfclientset, workflowTaskSetResyncPeriod, @@ -1492,7 +1465,8 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr r := util.InstanceIDRequirement(wfc.Config.InstanceID) x.LabelSelector = r.String() })).Argoproj().V1alpha1().WorkflowArtifactGCTasks() - _, err := informer.Informer().AddEventHandler( + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + informer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) @@ -1501,8 +1475,5 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr } }, }) - if err != nil { - return nil, err - } - return informer, nil + return informer } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 7522d368918a..0c4af5f4e6dd 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -318,11 +318,11 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets() wfc.artGCTaskInformer = informerFactory.Argoproj().V1alpha1().WorkflowArtifactGCTasks() - wfc.taskResultInformer, _ = wfc.newWorkflowTaskResultInformer() + wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer() wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) - wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.configMapInformer, _ = wfc.newConfigMapInformer() + wfc.podInformer = wfc.newPodInformer(ctx) + wfc.configMapInformer = wfc.newConfigMapInformer() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index b751e07ef619..be4bed0db701 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -15,7 +15,7 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" ) -func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndexInformer { labelSelector := labels.NewSelector(). Add(*workflowReq). Add(wfc.instanceIDReq()). @@ -36,7 +36,8 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedInde options.ResourceVersion = "" }, ) - _, err := informer.AddEventHandler( + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(new interface{}) { result := new.(*wfv1.WorkflowTaskResult) @@ -49,14 +50,10 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedInde wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow) }, }) - if err != nil { - return nil, err - } - return informer, nil + return informer } func (woc *wfOperationCtx) taskResultReconciliation() { - objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name) woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation") for _, obj := range objs { diff --git a/workflow/gccontroller/gc_controller.go b/workflow/gccontroller/gc_controller.go index 05da97603671..a1ec95563af6 100644 --- a/workflow/gccontroller/gc_controller.go +++ b/workflow/gccontroller/gc_controller.go @@ -41,7 +41,6 @@ type Controller struct { // NewController returns a new workflow ttl controller func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer, metrics *metrics.Metrics, retentionPolicy *config.RetentionPolicy) *Controller { - orderedQueue := map[wfv1.WorkflowPhase]*gcHeap{ wfv1.WorkflowFailed: NewHeap(), wfv1.WorkflowError: NewHeap(), @@ -180,7 +179,6 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { // enqueueWF conditionally queues a workflow to the ttl queue if it is within the deletion period func (c *Controller) enqueueWF(obj interface{}) { - un, ok := obj.(*unstructured.Unstructured) if !ok { log.Warnf("'%v' is not an unstructured", obj)