diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 151246ece078..a5edb978f374 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -58,6 +58,7 @@ func NewRootCommand() *cobra.Command { workflowTTLWorkers int // --workflow-ttl-workers podCleanupWorkers int // --pod-cleanup-workers cronWorkflowWorkers int // --cron-workflow-workers + workflowArchiveWorkers int // --workflow-archive-workers burst int qps float32 namespaced bool // --namespaced @@ -117,7 +118,8 @@ func NewRootCommand() *cobra.Command { if leaderElectionOff == "true" { log.Info("Leader election is turned off. Running in single-instance mode") log.WithField("id", "single-instance").Info("starting leading") - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers) + + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers) go wfController.RunPrometheusServer(ctx, false) } else { nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY") @@ -147,7 +149,7 @@ func NewRootCommand() *cobra.Command { Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { dummyCancel() - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers) go wfController.RunPrometheusServer(ctx, false) }, OnStoppedLeading: func() { @@ -185,6 +187,7 @@ func NewRootCommand() *cobra.Command { command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers") command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers") command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers") + command.Flags().IntVar(&workflowArchiveWorkers, "workflow-archive-workers", 8, "Number of workflow archive workers") command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.") command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second") command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index d030cf1906cd..121342fd2a53 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -126,6 +126,7 @@ type WorkflowController struct { configMapInformer cache.SharedIndexInformer wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy + wfArchiveQueue workqueue.RateLimitingInterface throttler sync.Throttler workflowKeyLock syncpkg.KeyLock // used to lock workflows for exclusive modification or access session db.Session @@ -236,6 +237,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, &fixedItemIntervalRateLimiter{}, "workflow_queue") wfc.throttler = wfc.newThrottler() wfc.podCleanupQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "pod_cleanup_queue") + wfc.wfArchiveQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "workflow_archive_queue") return &wfc, nil } @@ -278,7 +280,7 @@ var indexers = cache.Indexers{ } // Run starts a Workflow resource controller -func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) { +func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, wfArchiveWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) // init DB after leader election (if enabled) @@ -298,6 +300,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo log.WithField("workflowWorkers", wfWorkers). WithField("workflowTtlWorkers", workflowTTLWorkers). WithField("podCleanup", podCleanupWorkers). + WithField("workflowArchive", wfArchiveWorkers). Info("Current Worker Numbers") wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) @@ -363,6 +366,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo for i := 0; i < wfWorkers; i++ { go wait.Until(wfc.runWorker, time.Second, ctx.Done()) } + for i := 0; i < wfArchiveWorkers; i++ { + go wait.Until(wfc.runArchiveWorker, time.Second, ctx.Done()) + } if cacheGCPeriod != 0 { go wait.JitterUntilWithContext(ctx, wfc.syncAllCacheForGC, cacheGCPeriod, 0.0, true) } @@ -798,6 +804,14 @@ func (wfc *WorkflowController) runWorker() { } } +func (wfc *WorkflowController) runArchiveWorker() { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + + ctx := context.Background() + for wfc.processNextArchiveItem(ctx) { + } +} + // processNextItem is the worker logic for handling workflow updates func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { key, quit := wfc.wfQueue.Get() @@ -881,6 +895,26 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } +func (wfc *WorkflowController) processNextArchiveItem(ctx context.Context) bool { + key, quit := wfc.wfArchiveQueue.Get() + if quit { + return false + } + defer wfc.wfArchiveQueue.Done(key) + + obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string)) + if err != nil { + log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer") + return true + } + if !exists { + return true + } + + wfc.archiveWorkflow(ctx, obj) + return true +} + func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) { obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key) if err != nil { @@ -1090,10 +1124,16 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - wfc.archiveWorkflow(ctx, obj) + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + wfc.wfArchiveQueue.Add(key) + } }, UpdateFunc: func(_, obj interface{}) { - wfc.archiveWorkflow(ctx, obj) + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + wfc.wfArchiveQueue.Add(key) + } }, }, })