diff --git a/Makefile b/Makefile index e30d92706e8e..6e8123a16e3d 100644 --- a/Makefile +++ b/Makefile @@ -297,11 +297,15 @@ $(GOPATH)/bin/goimports: go install golang.org/x/tools/cmd/goimports@v0.1.6 pkg/apis/workflow/v1alpha1/generated.proto: $(GOPATH)/bin/go-to-protobuf $(PROTO_BINARIES) $(TYPES) $(GOPATH)/src/github.com/gogo/protobuf + # These files are generated on a v3/ folder by the tool. Link them to the root folder + [ -e ./v3 ] || ln -s . v3 $(GOPATH)/bin/go-to-protobuf \ --go-header-file=./hack/custom-boilerplate.go.txt \ --packages=github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1 \ --apimachinery-packages=+k8s.io/apimachinery/pkg/util/intstr,+k8s.io/apimachinery/pkg/api/resource,k8s.io/apimachinery/pkg/runtime/schema,+k8s.io/apimachinery/pkg/runtime,k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/api/core/v1,k8s.io/api/policy/v1beta1 \ --proto-import $(GOPATH)/src + # Delete the link + [ -e ./v3 ] && rm -rf v3 touch pkg/apis/workflow/v1alpha1/generated.proto # this target will also create a .pb.go and a .pb.gw.go file, but in Make 3 we cannot use _grouped target_, instead we must choose @@ -499,20 +503,28 @@ clean: # swagger pkg/apis/workflow/v1alpha1/openapi_generated.go: $(GOPATH)/bin/openapi-gen $(TYPES) + # These files are generated on a v3/ folder by the tool. Link them to the root folder + [ -e ./v3 ] || ln -s . v3 $(GOPATH)/bin/openapi-gen \ --go-header-file ./hack/custom-boilerplate.go.txt \ --input-dirs github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1 \ --output-package github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1 \ --report-filename pkg/apis/api-rules/violation_exceptions.list + # Delete the link + [ -e ./v3 ] && rm -rf v3 # generates many other files (listers, informers, client etc). pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go: $(TYPES) + # These files are generated on a v3/ folder by the tool. Link them to the root folder + [ -e ./v3 ] || ln -s . v3 bash $(GOPATH)/pkg/mod/k8s.io/code-generator@v0.21.5/generate-groups.sh \ "deepcopy,client,informer,lister" \ github.com/argoproj/argo-workflows/v3/pkg/client github.com/argoproj/argo-workflows/v3/pkg/apis \ workflow:v1alpha1 \ --go-header-file ./hack/custom-boilerplate.go.txt + # Delete the link + [ -e ./v3 ] && rm -rf v3 dist/kubernetes.swagger.json: @mkdir -p dist diff --git a/cmd/argo/commands/archive/list.go b/cmd/argo/commands/archive/list.go index 2a4569a3b8b2..407c2daf0280 100644 --- a/cmd/argo/commands/archive/list.go +++ b/cmd/argo/commands/archive/list.go @@ -46,7 +46,7 @@ func NewListCommand() *cobra.Command { listOpts.Continue = resp.Continue } sort.Sort(workflows) - err = printer.PrintWorkflows(workflows, os.Stdout, printer.PrintOpts{Output: output, Namespace: true}) + err = printer.PrintWorkflows(workflows, os.Stdout, printer.PrintOpts{Output: output, Namespace: true, UID: true}) errors.CheckError(err) }, } diff --git a/config/config.go b/config/config.go index 9d5d8cc4aa4d..14764ef34b5b 100644 --- a/config/config.go +++ b/config/config.go @@ -125,8 +125,7 @@ type Config struct { // https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary Images map[string]Image `json:"images,omitempty"` - // UI defines a config for UI - UI *wfv1.UI `json:"ui,omitempty"` + RetentionPolicy *RetentionPolicy `json:"retentionPolicy,omitempty"` } func (c Config) GetContainerRuntimeExecutor(labels labels.Labels) (string, error) { diff --git a/config/retention_policy.go b/config/retention_policy.go new file mode 100644 index 000000000000..564c082b0434 --- /dev/null +++ b/config/retention_policy.go @@ -0,0 +1,7 @@ +package config + +type RetentionPolicy struct { + Completed int `json:"completed,omitempty"` + Failed int `json:"failed,omitempty"` + Errored int `json:"errored,omitempty"` +} diff --git a/hack/check-logging.sh b/hack/check-logging.sh index c17fc25b1efc..afc4de1f545e 100755 --- a/hack/check-logging.sh +++ b/hack/check-logging.sh @@ -17,7 +17,7 @@ # # As a last resort, use `log.Info(fmt.Sprintf(""))`. -set -eu +set -eux from=$(git merge-base --fork-point origin/master) count=$(git diff "$from" -- '*.go' | grep '^+' | grep -v '\(fmt\|errors\).Errorf' | grep -c '\(Debug\|Info\|Warn\|Warning\|Error\)f' || true) diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index a8d13cd2110a..9d77c53413aa 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -733,6 +733,10 @@ data: path: /metrics port: 9090 namespaceParallelism: "10" + retentionPolicy: | + completed: 10 + failed: 3 + errored: 3 kind: ConfigMap metadata: name: workflow-controller-configmap diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index eab567e2fa3f..f9c9e7a9f432 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -752,6 +752,10 @@ data: passwordSecret: name: argo-mysql-config key: password + retentionPolicy: | + completed: 10 + failed: 3 + errored: 3 kind: ConfigMap metadata: name: workflow-controller-configmap diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index c749faf6cff6..299d51acee11 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -752,6 +752,10 @@ data: passwordSecret: name: argo-postgres-config key: password + retentionPolicy: | + completed: 10 + failed: 3 + errored: 3 kind: ConfigMap metadata: name: workflow-controller-configmap diff --git a/manifests/quick-start/minimal/kustomization.yaml b/manifests/quick-start/minimal/kustomization.yaml index aa0b761e856e..09f056431abe 100644 --- a/manifests/quick-start/minimal/kustomization.yaml +++ b/manifests/quick-start/minimal/kustomization.yaml @@ -3,3 +3,7 @@ kind: Kustomization resources: - ../base + +patchesStrategicMerge: + - overlays/workflow-controller-configmap.yaml + diff --git a/manifests/quick-start/minimal/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/minimal/overlays/workflow-controller-configmap.yaml new file mode 100644 index 000000000000..7dc3d5f1cbd9 --- /dev/null +++ b/manifests/quick-start/minimal/overlays/workflow-controller-configmap.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +data: + retentionPolicy: | + completed: 10 + failed: 3 + errored: 3 +kind: ConfigMap +metadata: + name: workflow-controller-configmap \ No newline at end of file diff --git a/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml index 1027091164fb..5e1fca1a47c1 100644 --- a/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/mysql/overlays/workflow-controller-configmap.yaml @@ -19,6 +19,10 @@ data: passwordSecret: name: argo-mysql-config key: password + retentionPolicy: | + completed: 10 + failed: 3 + errored: 3 kind: ConfigMap metadata: name: workflow-controller-configmap diff --git a/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml index 6675ce86e2d2..1a10fd20d95c 100644 --- a/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/postgres/overlays/workflow-controller-configmap.yaml @@ -19,6 +19,10 @@ data: passwordSecret: name: argo-postgres-config key: password + retentionPolicy: | + completed: 10 + failed: 3 + errored: 3 kind: ConfigMap metadata: name: workflow-controller-configmap \ No newline at end of file diff --git a/test/e2e/cli_test.go b/test/e2e/cli_test.go index 8528c8640325..9d5d25afd6ac 100644 --- a/test/e2e/cli_test.go +++ b/test/e2e/cli_test.go @@ -1402,6 +1402,7 @@ func (s *CLISuite) TestArchive() { assert.Contains(t, lines[0], "NAMESPACE") assert.Contains(t, lines[0], "NAME") assert.Contains(t, lines[0], "STATUS") + assert.Contains(t, lines[0], "UID") assert.Contains(t, lines[1], "argo") assert.Contains(t, lines[1], "basic") assert.Contains(t, lines[1], "Succeeded") diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index 55186d0b248d..d7297b0b5592 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -302,6 +302,32 @@ func (w *When) WaitForWorkflow(options ...interface{}) *When { } } +func (w *When) WaitForWorkflowList(listOptions metav1.ListOptions, condition func(list []wfv1.Workflow) bool) *When { + w.t.Helper() + timeout := defaultTimeout + start := time.Now() + _, _ = fmt.Println("Waiting", timeout.String(), "for workflows", listOptions) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + select { + case <-ctx.Done(): + w.t.Errorf("timeout after %v waiting for condition", timeout) + return w + default: + wfList, err := w.client.List(ctx, listOptions) + if err != nil { + w.t.Error(err) + return w + } + if ok := condition(wfList.Items); ok { + _, _ = fmt.Printf("Condition met after %s\n", time.Since(start).Truncate(time.Second)) + return w + } + } + } +} + func (w *When) hydrateWorkflow(wf *wfv1.Workflow) { w.t.Helper() err := w.hydrator.Hydrate(wf) diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index a040548c5872..26182534dd1e 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -84,6 +84,28 @@ spec: ExpectWorkflowDeleted() } +func (s *FunctionalSuite) TestWorkflowRetention() { + listOptions := metav1.ListOptions{LabelSelector: "workflows.argoproj.io/phase=Failed"} + s.Given(). + Workflow("@testdata/exit-1.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeFailed). + Given(). + Workflow("@testdata/exit-1.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeFailed). + Given(). + Workflow("@testdata/exit-1.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeFailed). + WaitForWorkflowList(listOptions, func(list []wfv1.Workflow) bool { + return len(list) == 2 + }) +} + // in this test we create a poi quota, and then we create a workflow that needs one more pod than the quota allows // because we run them in parallel, the first node will run to completion, and then the second one func (s *FunctionalSuite) TestResourceQuota() { diff --git a/test/e2e/manifests/mixins/workflow-controller-configmap.yaml b/test/e2e/manifests/mixins/workflow-controller-configmap.yaml index 3e172fe98f56..8501ec97f48f 100644 --- a/test/e2e/manifests/mixins/workflow-controller-configmap.yaml +++ b/test/e2e/manifests/mixins/workflow-controller-configmap.yaml @@ -19,4 +19,8 @@ data: limits: cpu: 0.5 memory: 128Mi + retentionPolicy: | + completed: 10 + failed: 2 + errored: 2 kubeletInsecure: "true" diff --git a/util/printer/workflow-printer.go b/util/printer/workflow-printer.go index 83ac4abd409f..1a4da23b2035 100644 --- a/util/printer/workflow-printer.go +++ b/util/printer/workflow-printer.go @@ -51,6 +51,7 @@ type PrintOpts struct { NoHeaders bool Namespace bool Output string + UID bool } func printTable(wfList []wfv1.Workflow, out io.Writer, opts PrintOpts) { @@ -63,6 +64,9 @@ func printTable(wfList []wfv1.Workflow, out io.Writer, opts PrintOpts) { if opts.Output == "wide" { _, _ = fmt.Fprint(w, "\tP/R/C\tPARAMETERS") } + if opts.UID { + _, _ = fmt.Fprint(w, "\tUID") + } _, _ = fmt.Fprint(w, "\n") } for _, wf := range wfList { @@ -81,6 +85,9 @@ func printTable(wfList []wfv1.Workflow, out io.Writer, opts PrintOpts) { _, _ = fmt.Fprintf(w, "\t%d/%d/%d", pending, running, completed) _, _ = fmt.Fprintf(w, "\t%s", parameterString(wf.Spec.Arguments.Parameters)) } + if opts.UID { + _, _ = fmt.Fprintf(w, "\t%s", wf.UID) + } _, _ = fmt.Fprintf(w, "\n") } _ = w.Flush() diff --git a/workflow/common/util.go b/workflow/common/util.go index 12bc14c74d1a..a20b4a48b232 100644 --- a/workflow/common/util.go +++ b/workflow/common/util.go @@ -17,6 +17,7 @@ import ( apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -353,3 +354,9 @@ func GetTemplateHolderString(tmplHolder wfv1.TemplateReferenceHolder) string { func GenerateOnExitNodeName(parentNodeName string) string { return fmt.Sprintf("%s.onExit", parentNodeName) } + +func IsDone(un *unstructured.Unstructured) bool { + return un.GetDeletionTimestamp() == nil && + un.GetLabels()[LabelKeyCompleted] == "true" && + un.GetLabels()[LabelKeyWorkflowArchivingStatus] != "Pending" +} diff --git a/workflow/common/util_test.go b/workflow/common/util_test.go index d20b339a51c0..3b003526aac0 100644 --- a/workflow/common/util_test.go +++ b/workflow/common/util_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/kubernetes/fake" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -141,3 +142,22 @@ func TestGetTemplateHolderString(t *testing.T) { ClusterScope: true, }})) } + +func TestIsDone(t *testing.T) { + assert.False(t, IsDone(&unstructured.Unstructured{})) + assert.True(t, IsDone(&unstructured.Unstructured{Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + LabelKeyCompleted: "true", + }, + }, + }})) + assert.False(t, IsDone(&unstructured.Unstructured{Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + LabelKeyCompleted: "true", + LabelKeyWorkflowArchivingStatus: "Pending", + }, + }, + }})) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 8b436c8f6a3b..56fb673bd747 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -56,11 +56,11 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/controller/pod" "github.com/argoproj/argo-workflows/v3/workflow/cron" "github.com/argoproj/argo-workflows/v3/workflow/events" + "github.com/argoproj/argo-workflows/v3/workflow/gccontroller" "github.com/argoproj/argo-workflows/v3/workflow/hydrator" "github.com/argoproj/argo-workflows/v3/workflow/metrics" "github.com/argoproj/argo-workflows/v3/workflow/signal" "github.com/argoproj/argo-workflows/v3/workflow/sync" - "github.com/argoproj/argo-workflows/v3/workflow/ttlcontroller" "github.com/argoproj/argo-workflows/v3/workflow/util" ) @@ -184,12 +184,12 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler { } } -// RunTTLController runs the workflow TTL controller -func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTLWorkers int) { +// runGCcontroller runs the workflow garbage collector controller +func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) - ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer, wfc.metrics) - err := ttlCtrl.Run(ctx.Done(), workflowTTLWorkers) + gcCtrl := gccontroller.NewController(wfc.wfclientset, wfc.wfInformer, wfc.metrics, wfc.Config.RetentionPolicy) + err := gcCtrl.Run(ctx.Done(), workflowTTLWorkers) if err != nil { panic(err) } @@ -321,7 +321,7 @@ func (wfc *WorkflowController) startLeading(ctx context.Context, logCtx *log.Ent go wfc.workflowGarbageCollector(ctx.Done()) go wfc.archivedWorkflowGarbageCollector(ctx.Done()) - go wfc.runTTLController(ctx, workflowTTLWorkers) + go wfc.runGCcontroller(ctx, workflowTTLWorkers) go wfc.runCronController(ctx) go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done()) go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done()) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 6b80aba2b1de..a76f5f16c071 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1760,6 +1760,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, // the container. The status of this node should be "Success" if any // of the retries succeed. Otherwise, it is "Failed". retryNodeName := "" + + // Here it is needed to be updated if woc.retryStrategy(processedTmpl) != nil { retryNodeName = nodeName retryParentNode := node diff --git a/workflow/ttlcontroller/ttlcontroller.go b/workflow/gccontroller/gc_controller.go similarity index 59% rename from workflow/ttlcontroller/ttlcontroller.go rename to workflow/gccontroller/gc_controller.go index 702821a42e27..4f0f564a1df5 100644 --- a/workflow/ttlcontroller/ttlcontroller.go +++ b/workflow/gccontroller/gc_controller.go @@ -1,8 +1,10 @@ -package ttlcontroller +package gccontroller import ( + "container/heap" "context" "fmt" + "sync" "time" log "github.com/sirupsen/logrus" @@ -15,6 +17,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "github.com/argoproj/argo-workflows/v3/config" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" commonutil "github.com/argoproj/argo-workflows/v3/util" @@ -23,28 +26,41 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/util" ) +var ticker *time.Ticker = time.NewTicker(50 * time.Millisecond) + type Controller struct { - wfclientset wfclientset.Interface - wfInformer cache.SharedIndexInformer - workqueue workqueue.DelayingInterface - clock clock.Clock - metrics *metrics.Metrics + wfclientset wfclientset.Interface + wfInformer cache.SharedIndexInformer + workqueue workqueue.DelayingInterface + clock clock.Clock + metrics *metrics.Metrics + orderedQueueLock sync.Mutex + orderedQueue map[wfv1.WorkflowPhase]*gcHeap + retentionPolicy *config.RetentionPolicy } // NewController returns a new workflow ttl controller -func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer, metrics *metrics.Metrics) *Controller { +func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer, metrics *metrics.Metrics, retentionPolicy *config.RetentionPolicy) *Controller { + + orderedQueue := map[wfv1.WorkflowPhase]*gcHeap{ + wfv1.WorkflowFailed: NewHeap(), + wfv1.WorkflowError: NewHeap(), + wfv1.WorkflowSucceeded: NewHeap(), + } controller := &Controller{ - wfclientset: wfClientset, - wfInformer: wfInformer, - workqueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "workflow_ttl_queue"), - clock: clock.RealClock{}, - metrics: metrics, + wfclientset: wfClientset, + wfInformer: wfInformer, + workqueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "workflow_ttl_queue"), + clock: clock.RealClock{}, + metrics: metrics, + orderedQueue: orderedQueue, + retentionPolicy: retentionPolicy, } wfInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { un, ok := obj.(*unstructured.Unstructured) - return ok && un.GetDeletionTimestamp() == nil && un.GetLabels()[common.LabelKeyCompleted] == "true" && un.GetLabels()[common.LabelKeyWorkflowArchivingStatus] != "Pending" + return ok && common.IsDone(un) }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueWF, @@ -53,23 +69,61 @@ func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedInd }, }, }) + + wfInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + un, ok := obj.(*unstructured.Unstructured) + return ok && common.IsDone(un) + }, + Handler: cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + controller.retentionEnqueue(new) + }, + AddFunc: func(obj interface{}) { + controller.retentionEnqueue(obj) + }, + }, + }) return controller } -func (c *Controller) Run(stopCh <-chan struct{}, workflowTTLWorkers int) error { +func (c *Controller) retentionEnqueue(obj interface{}) { + // No need to queue the workflow if the retention policy is not set + if c.retentionPolicy == nil { + return + } + + un, ok := obj.(*unstructured.Unstructured) + if !ok { + log.Warnf("'%v' is not an unstructured", obj) + return + } + + switch phase := wfv1.WorkflowPhase(un.GetLabels()[common.LabelKeyPhase]); phase { + case wfv1.WorkflowSucceeded, wfv1.WorkflowFailed, wfv1.WorkflowError: + c.orderedQueueLock.Lock() + heap.Push(c.orderedQueue[phase], un) + c.runGC(phase) + c.orderedQueueLock.Unlock() + } +} + +func (c *Controller) Run(stopCh <-chan struct{}, workflowGCWorkers int) error { defer runtimeutil.HandleCrash() defer c.workqueue.ShutDown() - log.Infof("Starting workflow TTL controller (workflowTTLWorkers %d)", workflowTTLWorkers) + defer ticker.Stop() + log.Infof("Starting workflow garbage collector controller (retentionWorkers %d)", workflowGCWorkers) go c.wfInformer.Run(stopCh) if ok := cache.WaitForCacheSync(stopCh, c.wfInformer.HasSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } - for i := 0; i < workflowTTLWorkers; i++ { + + for i := 0; i < workflowGCWorkers; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } - log.Info("Started workflow TTL worker") + log.Info("Started workflow garbage collection") <-stopCh - log.Info("Shutting workflow TTL worker") + log.Info("Shutting workflow garbage collection") return nil } @@ -82,6 +136,29 @@ func (c *Controller) runWorker() { } } +// retentionGC queues workflows for deletion based upon the retention policy. +func (c *Controller) runGC(phase wfv1.WorkflowPhase) { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + var maxWorkflows int + switch phase { + case wfv1.WorkflowSucceeded: + maxWorkflows = c.retentionPolicy.Completed + case wfv1.WorkflowFailed: + maxWorkflows = c.retentionPolicy.Failed + case wfv1.WorkflowError: + maxWorkflows = c.retentionPolicy.Errored + default: + return + } + + for c.orderedQueue[phase].Len() > maxWorkflows { + key, _ := cache.MetaNamespaceKeyFunc(heap.Pop(c.orderedQueue[phase])) + log.Infof("Queueing %v workflow %s for delete due to max rention(%d workflows)", phase, key, maxWorkflows) + c.workqueue.Add(key) + <-ticker.C + } +} + // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { @@ -90,7 +167,6 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { return false } defer c.workqueue.Done(key) - runtimeutil.HandleError(c.deleteWorkflow(ctx, key.(string))) return true @@ -98,11 +174,13 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { // enqueueWF conditionally queues a workflow to the ttl queue if it is within the deletion period func (c *Controller) enqueueWF(obj interface{}) { + un, ok := obj.(*unstructured.Unstructured) if !ok { log.Warnf("'%v' is not an unstructured", obj) return } + wf, err := util.FromUnstructured(un) if err != nil { log.Warnf("Failed to unmarshal workflow %v object: %v", obj, err) @@ -119,15 +197,16 @@ func (c *Controller) enqueueWF(obj interface{}) { // truly works. addAfter := remaining + time.Second key, _ := cache.MetaNamespaceKeyFunc(obj) - log.Infof("Queueing %v workflow %s for delete in %v", wf.Status.Phase, key, addAfter.Truncate(time.Second)) + log.Infof("Queueing %v workflow %s for delete in %v due to TTL", wf.Status.Phase, key, addAfter.Truncate(time.Second)) c.workqueue.AddAfter(key, addAfter) } func (c *Controller) deleteWorkflow(ctx context.Context, key string) error { // It should be impossible for a workflow to have been queue without a valid key. namespace, name, _ := cache.SplitMetaNamespaceKey(key) + // Any workflow that was queued must need deleting, therefore we do not check the expiry again. - log.Infof("Deleting TTL expired workflow '%s'", key) + log.Infof("Deleting garbage collected workflow '%s'", key) err := c.wfclientset.ArgoprojV1alpha1().Workflows(namespace).Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: commonutil.GetDeletePropagation()}) if err != nil { if apierr.IsNotFound(err) { diff --git a/workflow/ttlcontroller/ttlcontroller_test.go b/workflow/gccontroller/gc_controller_test.go similarity index 99% rename from workflow/ttlcontroller/ttlcontroller_test.go rename to workflow/gccontroller/gc_controller_test.go index 711f2b3d8f9e..80a364a50b0d 100644 --- a/workflow/ttlcontroller/ttlcontroller_test.go +++ b/workflow/gccontroller/gc_controller_test.go @@ -1,4 +1,4 @@ -package ttlcontroller +package gccontroller import ( "context" diff --git a/workflow/gccontroller/heap.go b/workflow/gccontroller/heap.go new file mode 100644 index 000000000000..33bc3b6c4bc5 --- /dev/null +++ b/workflow/gccontroller/heap.go @@ -0,0 +1,40 @@ +package gccontroller + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +type gcHeap struct { + heap []*unstructured.Unstructured + dedup map[string]bool +} + +func NewHeap() *gcHeap { + return &gcHeap{ + heap: make([]*unstructured.Unstructured, 0), + dedup: make(map[string]bool), + } +} + +func (h *gcHeap) Len() int { return len(h.heap) } +func (h *gcHeap) Less(i, j int) bool { + return h.heap[j].GetCreationTimestamp().After((h.heap[i].GetCreationTimestamp().Time)) +} +func (h *gcHeap) Swap(i, j int) { h.heap[i], h.heap[j] = h.heap[j], h.heap[i] } + +func (h *gcHeap) Push(x interface{}) { + if _, ok := h.dedup[x.(*unstructured.Unstructured).GetName()]; ok { + return + } + h.dedup[x.(*unstructured.Unstructured).GetName()] = true + h.heap = append(h.heap, x.(*unstructured.Unstructured)) +} + +func (h *gcHeap) Pop() interface{} { + old := h.heap + n := len(old) + x := old[n-1] + h.heap = old[0 : n-1] + delete(h.dedup, x.GetName()) + return x +} diff --git a/workflow/gccontroller/heap_test.go b/workflow/gccontroller/heap_test.go new file mode 100644 index 000000000000..1d030f2db059 --- /dev/null +++ b/workflow/gccontroller/heap_test.go @@ -0,0 +1,94 @@ +package gccontroller + +import ( + "container/heap" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + testutil "github.com/argoproj/argo-workflows/v3/test/util" +) + +func TestPriorityQueue(t *testing.T) { + wf := testutil.MustUnmarshalUnstructured(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: bad-baseline + labels: + workflows.argoproj.io/phase: Failed +`) + now := time.Now() + wf.SetCreationTimestamp(v1.Time{Time: now}) + queue := &gcHeap{ + heap: []*unstructured.Unstructured{wf}, + dedup: make(map[string]bool), + } + heap.Init(queue) + assert.Equal(t, 1, queue.Len()) + wf = testutil.MustUnmarshalUnstructured(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: bad-baseline + labels: + workflows.argoproj.io/phase: Failed +`) + wf.SetCreationTimestamp(v1.Time{Time: now.Add(time.Second)}) + heap.Push(queue, wf) + wf = testutil.MustUnmarshalUnstructured(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: bad-baseline-oldest + labels: + workflows.argoproj.io/phase: Failed +`) + wf.SetCreationTimestamp(v1.Time{Time: now.Add(-time.Second)}) + heap.Push(queue, wf) + assert.Equal(t, 3, queue.Len()) + first := heap.Pop(queue).(*unstructured.Unstructured) + assert.Equal(t, now.Add(-time.Second).Unix(), first.GetCreationTimestamp().Time.Unix()) + assert.Equal(t, "bad-baseline-oldest", first.GetName()) + assert.Equal(t, now.Unix(), heap.Pop(queue).(*unstructured.Unstructured).GetCreationTimestamp().Time.Unix()) + assert.Equal(t, now.Add(time.Second).Unix(), heap.Pop(queue).(*unstructured.Unstructured).GetCreationTimestamp().Time.Unix()) + assert.Equal(t, 0, queue.Len()) +} + +func TestDeduplicationOfPriorityQueue(t *testing.T) { + wf := testutil.MustUnmarshalUnstructured(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: bad-baseline + labels: + workflows.argoproj.io/phase: Failed +`) + now := time.Now() + wf.SetCreationTimestamp(v1.Time{Time: now}) + queue := &gcHeap{ + heap: []*unstructured.Unstructured{}, + dedup: make(map[string]bool), + } + heap.Push(queue, wf) + assert.Equal(t, 1, queue.Len()) + wf = testutil.MustUnmarshalUnstructured(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: bad-baseline + labels: + workflows.argoproj.io/phase: Failed +`) + wf.SetCreationTimestamp(v1.Time{Time: now.Add(-time.Second)}) + heap.Push(queue, wf) + assert.Equal(t, 1, queue.Len()) + _ = heap.Pop(queue) + assert.Equal(t, 0, queue.Len()) + heap.Push(queue, wf) + assert.Equal(t, 1, queue.Len()) + +}