diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index e8be1504c509..977c27af74fb 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -355,13 +355,13 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo for i := 0; i < podCleanupWorkers; i++ { go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second) } - go wfc.workflowGarbageCollector(ctx, ctx.Done()) - go wfc.archivedWorkflowGarbageCollector(ctx, ctx.Done()) + go wfc.workflowGarbageCollector(ctx) + go wfc.archivedWorkflowGarbageCollector(ctx) go wfc.runGCcontroller(ctx, workflowTTLWorkers) go wfc.runCronController(ctx, cronWorkflowWorkers) - go wait.Until(func() { wfc.syncManager.CheckWorkflowExistence(ctx) }, workflowExistenceCheckPeriod, ctx.Done()) + go wait.UntilWithContext(ctx, wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod) for i := 0; i < wfWorkers; i++ { go wait.UntilWithContext(ctx, wfc.runWorker, time.Second) @@ -687,7 +687,7 @@ func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace s return time.Duration(*pod.Spec.TerminationGracePeriodSeconds) * time.Second, nil } -func (wfc *WorkflowController) workflowGarbageCollector(ctx context.Context, stopCh <-chan struct{}) { +func (wfc *WorkflowController) workflowGarbageCollector(ctx context.Context) { defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) periodicity := env.LookupEnvDurationOr("WORKFLOW_GC_PERIOD", 5*time.Minute) @@ -695,7 +695,7 @@ func (wfc *WorkflowController) workflowGarbageCollector(ctx context.Context, sto ticker := time.NewTicker(periodicity) for { select { - case <-stopCh: + case <-ctx.Done(): ticker.Stop() return case <-ticker.C: @@ -772,7 +772,7 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi return nil } -func (wfc *WorkflowController) archivedWorkflowGarbageCollector(ctx context.Context, stopCh <-chan struct{}) { +func (wfc *WorkflowController) archivedWorkflowGarbageCollector(ctx context.Context) { defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) periodicity := env.LookupEnvDurationOr("ARCHIVED_WORKFLOW_GC_PERIOD", 24*time.Hour) @@ -794,7 +794,7 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(ctx context.Cont defer ticker.Stop() for { select { - case <-stopCh: + case <-ctx.Done(): return case <-ticker.C: log.Info("Performing archived workflow GC")