From 5bc7bdcc58ae0561f1154e22ada74deca76f9cfe Mon Sep 17 00:00:00 2001 From: qingfeng777 <251098199@qq.com> Date: Tue, 30 Jul 2024 01:09:10 +0800 Subject: [PATCH] feat: workflow template support record last run time with workqueue. Fixes #1915 Signed-off-by: qingfeng777 <251098199@qq.com> --- workflow/cron/controller.go | 2 +- workflow/templateresolution/context.go | 54 ++++++++--- workflow/templateresolution/context_test.go | 38 +++++++- .../templateresolution/template_status.go | 93 +++++++++++++++++++ 4 files changed, 174 insertions(+), 13 deletions(-) create mode 100644 workflow/templateresolution/template_status.go diff --git a/workflow/cron/controller.go b/workflow/cron/controller.go index 290d0c87a410a..091a097be3103 100644 --- a/workflow/cron/controller.go +++ b/workflow/cron/controller.go @@ -21,7 +21,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" diff --git a/workflow/templateresolution/context.go b/workflow/templateresolution/context.go index 343894d906a00..a8a4533d33206 100644 --- a/workflow/templateresolution/context.go +++ b/workflow/templateresolution/context.go @@ -79,11 +79,13 @@ type Context struct { workflow *wfv1.Workflow // log is a logrus entry. log *log.Entry + // wftmplStatusQueue is two queues for tmpl.Status update + wftmplStatusQueue *wftmplStatusQueue } // NewContext returns new Context. func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter ClusterWorkflowTemplateGetter, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow) *Context { - return &Context{ + ctx := &Context{ wftmplGetter: wftmplGetter, cwftmplGetter: cwftmplGetter, wftmplClient: wftmplClientHolder, @@ -92,6 +94,10 @@ func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter Clu workflow: workflow, log: log.WithFields(log.Fields{}), } + wftmplStatusQueue := NewTmplStatusQueue(ctx) + go wftmplStatusQueue.run(context.TODO()) + ctx.wftmplStatusQueue = wftmplStatusQueue + return ctx } var ( @@ -115,7 +121,7 @@ func NewContextWithClientSet(wftmplGetter WorkflowTemplateNamespacedGetter, cwft cwftmplGetter = WrapClusterWorkflowTemplateInterface(cwftmplClient) } - return &Context{ + ctx := &Context{ wftmplGetter: wftmplGetter, cwftmplGetter: cwftmplGetter, wftmplClient: wftmplClient, @@ -124,6 +130,10 @@ func NewContextWithClientSet(wftmplGetter WorkflowTemplateNamespacedGetter, cwft workflow: workflow, log: log.WithFields(log.Fields{}), } + wftmplStatusQueue := NewTmplStatusQueue(ctx) + go wftmplStatusQueue.run(context.TODO()) + ctx.wftmplStatusQueue = wftmplStatusQueue + return ctx } // GetTemplateByName returns a template by name in the context. @@ -162,11 +172,10 @@ func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Templat } return nil, err } - if !tmplRef.ClusterScope && wftmpl != nil { - _, err = ctx.UpdateTemplateStatus(wftmpl.(*wfv1.WorkflowTemplate)) - if err != nil { - log.Errorf("Update workflow template %s err: %v", wftmpl.GetName(), err) - } + if tmplRef.ClusterScope { + ctx.wftmplStatusQueue.cwftmplQueue.Add(tmplRef.Name) + } else { + ctx.wftmplStatusQueue.wftmplQueue.Add(tmplRef.Name) } template = wftmpl.GetTemplateByName(tmplRef.Template) @@ -199,15 +208,38 @@ func (ctx *Context) GetTemplateScope() string { return string(ctx.tmplBase.GetResourceScope()) + "/" + ctx.tmplBase.GetName() } -// UpdateTemplateStatus update the WorkflowTemplate.Status of a given WorkflowTemplate. -func (ctx *Context) UpdateTemplateStatus(wftmple *wfv1.WorkflowTemplate) (*wfv1.WorkflowTemplate, error) { +// updateTemplateStatus update the WorkflowTemplate.Status of a given WorkflowTemplate. +func (ctx *Context) updateTemplateStatus(name string) error { if ctx.wftmplClient == nil { ctx.log.Warnln("while try update template status, wftmplClient not set") - return wftmple, nil + return nil + } + c := context.TODO() + wftmple, err := ctx.wftmplClient.Get(c, name, metav1.GetOptions{}) + if err != nil { + return err + } + + wftmple.Status.LastRunAt = metav1.Now() + _, err = ctx.wftmplClient.Update(c, wftmple, metav1.UpdateOptions{}) + return err +} + +// updateCtemplateStatus update the ClusterWorkflowTemplate.Status of a given ClusterWorkflowTemplate. +func (ctx *Context) updateCtemplateStatus(name string) error { + if ctx.cwftmplClient == nil { + ctx.log.Warnln("while try update cluster template status, cwftmplClient not set") + return nil + } + c := context.TODO() + wftmple, err := ctx.cwftmplClient.Get(c, name, metav1.GetOptions{}) + if err != nil { + return err } wftmple.Status.LastRunAt = metav1.Now() - return ctx.wftmplClient.Update(context.TODO(), wftmple, metav1.UpdateOptions{}) + _, err = ctx.cwftmplClient.Update(c, wftmple, metav1.UpdateOptions{}) + return err } // ResolveTemplate digs into referenes and returns a merged template. diff --git a/workflow/templateresolution/context_test.go b/workflow/templateresolution/context_test.go index cc732fe68f4a9..2ee6e6efcd1ac 100644 --- a/workflow/templateresolution/context_test.go +++ b/workflow/templateresolution/context_test.go @@ -24,6 +24,16 @@ func createWorkflowTemplate(wfClientset wfclientset.Interface, yamlStr string) e return err } +func createClusterWorkflowTemplate(wfClientset wfclientset.Interface, yamlStr string) error { + ctx := context.Background() + cwftmpl := wfv1.MustUnmarshalClusterWorkflow(yamlStr) + _, err := wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates().Create(ctx, cwftmpl, metav1.CreateOptions{}) + if err != nil && apierr.IsAlreadyExists(err) { + return nil + } + return err +} + // Deprecated func unmarshalWftmpl(yamlStr string) *wfv1.WorkflowTemplate { return wfv1.MustUnmarshalWorkflowTemplate(yamlStr) @@ -402,7 +412,7 @@ func TestUpdateTemplLastRun(t *testing.T) { wftemplateName := "some-workflow-template" templateClient := wfClientset.ArgoprojV1alpha1().WorkflowTemplates(v1.NamespaceDefault) - _, err = ctx.UpdateTemplateStatus(wftmpl) + err = ctx.updateTemplateStatus(wftmpl.GetName()) if err != nil { t.Fatal(err) } @@ -414,3 +424,29 @@ func TestUpdateTemplLastRun(t *testing.T) { } assert.NotEmpty(t, wftmpl.Status.LastRunAt) } + +func TestUpdateCtemplLastRun(t *testing.T) { + wfClientset := fakewfclientset.NewSimpleClientset() + cwftmpl := wfv1.MustUnmarshalClusterWorkflow(someWorkflowTemplateYaml) + ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), cwftmpl, nil) + + err := createClusterWorkflowTemplate(wfClientset, someWorkflowTemplateYaml) + if err != nil { + t.Fatal(err) + } + + wftemplateName := "some-workflow-template" + ctemplateClient := wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates() + + err = ctx.updateTemplateStatus(cwftmpl.GetName()) + if err != nil { + t.Fatal(err) + } + + cwftmpl, err = ctemplateClient.Get(context.Background(), wftemplateName, v1.GetOptions{}) + if err != nil { + t.Fatal(err) + return + } + assert.NotEmpty(t, cwftmpl.Status.LastRunAt) +} diff --git a/workflow/templateresolution/template_status.go b/workflow/templateresolution/template_status.go new file mode 100644 index 0000000000000..f163ce9f37c53 --- /dev/null +++ b/workflow/templateresolution/template_status.go @@ -0,0 +1,93 @@ +package templateresolution + +import ( + "context" + + "github.com/argoproj/pkg/sync" + log "github.com/sirupsen/logrus" + runtimeutil "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/util/workqueue" +) + +type wftmplStatusQueue struct { + wftmplQueue workqueue.RateLimitingInterface + cwftmplQueue workqueue.RateLimitingInterface + + keyLock sync.KeyLock + ckeyLock sync.KeyLock + ctx *Context +} + +func NewTmplStatusQueue(ctx *Context) *wftmplStatusQueue { + return &wftmplStatusQueue{ + wftmplQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tmpl-status-queue"), + cwftmplQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ctmpl-status-queue"), + keyLock: sync.NewKeyLock(), + ckeyLock: sync.NewKeyLock(), + ctx: ctx, + } +} + +func (q *wftmplStatusQueue) run(ctx context.Context) { + defer q.wftmplQueue.ShutDown() + defer q.cwftmplQueue.ShutDown() + go q.runTmplStatusUpdate() + go q.runCtmplStatusUpdate() + <-ctx.Done() +} + +func (q *wftmplStatusQueue) runTmplStatusUpdate() { + ctx := context.TODO() + for q.processNextTmplItem(ctx) { + } +} + +func (q *wftmplStatusQueue) runCtmplStatusUpdate() { + ctx := context.TODO() + for q.processNextCtmplItem(ctx) { + } +} + +func (q *wftmplStatusQueue) processNextTmplItem(ctx context.Context) bool { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + + key, quit := q.wftmplQueue.Get() + if quit { + return false + } + defer q.wftmplQueue.Done(key) + + q.keyLock.Lock(key.(string)) + defer q.keyLock.Unlock(key.(string)) + + logCtx := log.WithField("wftmplStatus", key) + logCtx.Infof("Processing %s", key) + + err := q.ctx.updateTemplateStatus(key.(string)) + if err != nil { + log.Errorf("Update workflow template %s err: %v", key.(string), err) + } + return true +} + +func (q *wftmplStatusQueue) processNextCtmplItem(ctx context.Context) bool { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + + key, quit := q.cwftmplQueue.Get() + if quit { + return false + } + defer q.cwftmplQueue.Done(key) + + q.ckeyLock.Lock(key.(string)) + defer q.ckeyLock.Unlock(key.(string)) + + logCtx := log.WithField("cwftmplStatus", key) + logCtx.Infof("Processing %s", key) + + err := q.ctx.updateCtemplateStatus(key.(string)) + if err != nil { + log.Errorf("Update cluster workflow template %s err: %v", key.(string), err) + } + return true +}