Skip to content

Commit

Permalink
fix: apply feedback
Browse files Browse the repository at this point in the history
Signed-off-by: william.vanhevelingen <[email protected]>
  • Loading branch information
blkperl committed Dec 3, 2024
1 parent b42b58f commit 0c9632f
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,13 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
go wfc.workflowGarbageCollector(ctx, ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx, ctx.Done())
go wfc.workflowGarbageCollector(ctx)
go wfc.archivedWorkflowGarbageCollector(ctx)

go wfc.runGCcontroller(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx, cronWorkflowWorkers)

go wait.Until(func() { wfc.syncManager.CheckWorkflowExistence(ctx) }, workflowExistenceCheckPeriod, ctx.Done())
go wait.UntilWithContext(ctx, wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod)

for i := 0; i < wfWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runWorker, time.Second)
Expand Down Expand Up @@ -687,15 +687,15 @@ func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace s
return time.Duration(*pod.Spec.TerminationGracePeriodSeconds) * time.Second, nil
}

func (wfc *WorkflowController) workflowGarbageCollector(ctx context.Context, stopCh <-chan struct{}) {
func (wfc *WorkflowController) workflowGarbageCollector(ctx context.Context) {
defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...)

periodicity := env.LookupEnvDurationOr("WORKFLOW_GC_PERIOD", 5*time.Minute)
log.WithField("periodicity", periodicity).Info("Performing periodic GC")
ticker := time.NewTicker(periodicity)
for {
select {
case <-stopCh:
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
Expand Down Expand Up @@ -772,7 +772,7 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
return nil
}

func (wfc *WorkflowController) archivedWorkflowGarbageCollector(ctx context.Context, stopCh <-chan struct{}) {
func (wfc *WorkflowController) archivedWorkflowGarbageCollector(ctx context.Context) {
defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...)

periodicity := env.LookupEnvDurationOr("ARCHIVED_WORKFLOW_GC_PERIOD", 24*time.Hour)
Expand All @@ -794,7 +794,7 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(ctx context.Cont
defer ticker.Stop()
for {
select {
case <-stopCh:
case <-ctx.Done():
return
case <-ticker.C:
log.Info("Performing archived workflow GC")
Expand Down

0 comments on commit 0c9632f

Please sign in to comment.