Skip to content

Commit

Permalink
feat(controller): optimize memory with queue when archiving is rate-l…
Browse files Browse the repository at this point in the history
…imited. Fixes #13418 (#13419)

Signed-off-by: chenrui <[email protected]>
Signed-off-by: Alan Clucas <[email protected]>
Signed-off-by: chenrui <[email protected]>
Co-authored-by: chenrui <[email protected]>
Co-authored-by: Alan Clucas <[email protected]>
  • Loading branch information
3 people authored Aug 15, 2024
1 parent c0f879c commit c143e3e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
7 changes: 5 additions & 2 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewRootCommand() *cobra.Command {
workflowTTLWorkers int // --workflow-ttl-workers
podCleanupWorkers int // --pod-cleanup-workers
cronWorkflowWorkers int // --cron-workflow-workers
workflowArchiveWorkers int // --workflow-archive-workers
burst int
qps float32
namespaced bool // --namespaced
Expand Down Expand Up @@ -117,7 +118,8 @@ func NewRootCommand() *cobra.Command {
if leaderElectionOff == "true" {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)

go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers)
go wfController.RunPrometheusServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
Expand Down Expand Up @@ -147,7 +149,7 @@ func NewRootCommand() *cobra.Command {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers)
go wfController.RunPrometheusServer(ctx, false)
},
OnStoppedLeading: func() {
Expand Down Expand Up @@ -185,6 +187,7 @@ func NewRootCommand() *cobra.Command {
command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers")
command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers")
command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers")
command.Flags().IntVar(&workflowArchiveWorkers, "workflow-archive-workers", 8, "Number of workflow archive workers")
command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.")
command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second")
command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode")
Expand Down
46 changes: 43 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type WorkflowController struct {
configMapInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy
wfArchiveQueue workqueue.RateLimitingInterface
throttler sync.Throttler
workflowKeyLock syncpkg.KeyLock // used to lock workflows for exclusive modification or access
session db.Session
Expand Down Expand Up @@ -236,6 +237,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, &fixedItemIntervalRateLimiter{}, "workflow_queue")
wfc.throttler = wfc.newThrottler()
wfc.podCleanupQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "pod_cleanup_queue")
wfc.wfArchiveQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "workflow_archive_queue")

return &wfc, nil
}
Expand Down Expand Up @@ -278,7 +280,7 @@ var indexers = cache.Indexers{
}

// Run starts a Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) {
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, wfArchiveWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

// init DB after leader election (if enabled)
Expand All @@ -298,6 +300,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
log.WithField("workflowWorkers", wfWorkers).
WithField("workflowTtlWorkers", workflowTTLWorkers).
WithField("podCleanup", podCleanupWorkers).
WithField("workflowArchive", wfArchiveWorkers).
Info("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
Expand Down Expand Up @@ -363,6 +366,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
for i := 0; i < wfWorkers; i++ {
go wait.Until(wfc.runWorker, time.Second, ctx.Done())
}
for i := 0; i < wfArchiveWorkers; i++ {
go wait.Until(wfc.runArchiveWorker, time.Second, ctx.Done())
}
if cacheGCPeriod != 0 {
go wait.JitterUntilWithContext(ctx, wfc.syncAllCacheForGC, cacheGCPeriod, 0.0, true)
}
Expand Down Expand Up @@ -798,6 +804,14 @@ func (wfc *WorkflowController) runWorker() {
}
}

func (wfc *WorkflowController) runArchiveWorker() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

ctx := context.Background()
for wfc.processNextArchiveItem(ctx) {
}
}

// processNextItem is the worker logic for handling workflow updates
func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
key, quit := wfc.wfQueue.Get()
Expand Down Expand Up @@ -881,6 +895,26 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

func (wfc *WorkflowController) processNextArchiveItem(ctx context.Context) bool {
key, quit := wfc.wfArchiveQueue.Get()
if quit {
return false
}
defer wfc.wfArchiveQueue.Done(key)

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return true
}
if !exists {
return true
}

wfc.archiveWorkflow(ctx, obj)
return true
}

func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) {
obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
if err != nil {
Expand Down Expand Up @@ -1090,10 +1124,16 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
wfc.archiveWorkflow(ctx, obj)
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.wfArchiveQueue.Add(key)
}
},
UpdateFunc: func(_, obj interface{}) {
wfc.archiveWorkflow(ctx, obj)
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.wfArchiveQueue.Add(key)
}
},
},
})
Expand Down

0 comments on commit c143e3e

Please sign in to comment.