Skip to content

Commit

Permalink
feat: workflow template support record last run time with workqueue. F…
Browse files Browse the repository at this point in the history
…ixes #1915

Signed-off-by: qingfeng777 <[email protected]>
  • Loading branch information
qingfeng777 committed Jul 29, 2024
1 parent 071f92b commit 5bc7bdc
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 13 deletions.
2 changes: 1 addition & 1 deletion workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
Expand Down
54 changes: 43 additions & 11 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ type Context struct {
workflow *wfv1.Workflow
// log is a logrus entry.
log *log.Entry
// wftmplStatusQueue is two queues for tmpl.Status update
wftmplStatusQueue *wftmplStatusQueue
}

// NewContext returns new Context.
func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter ClusterWorkflowTemplateGetter, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow) *Context {
return &Context{
ctx := &Context{
wftmplGetter: wftmplGetter,
cwftmplGetter: cwftmplGetter,
wftmplClient: wftmplClientHolder,
Expand All @@ -92,6 +94,10 @@ func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter Clu
workflow: workflow,
log: log.WithFields(log.Fields{}),
}
wftmplStatusQueue := NewTmplStatusQueue(ctx)
go wftmplStatusQueue.run(context.TODO())
ctx.wftmplStatusQueue = wftmplStatusQueue
return ctx
}

var (
Expand All @@ -115,7 +121,7 @@ func NewContextWithClientSet(wftmplGetter WorkflowTemplateNamespacedGetter, cwft
cwftmplGetter = WrapClusterWorkflowTemplateInterface(cwftmplClient)
}

return &Context{
ctx := &Context{
wftmplGetter: wftmplGetter,
cwftmplGetter: cwftmplGetter,
wftmplClient: wftmplClient,
Expand All @@ -124,6 +130,10 @@ func NewContextWithClientSet(wftmplGetter WorkflowTemplateNamespacedGetter, cwft
workflow: workflow,
log: log.WithFields(log.Fields{}),
}
wftmplStatusQueue := NewTmplStatusQueue(ctx)
go wftmplStatusQueue.run(context.TODO())
ctx.wftmplStatusQueue = wftmplStatusQueue
return ctx
}

// GetTemplateByName returns a template by name in the context.
Expand Down Expand Up @@ -162,11 +172,10 @@ func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Templat
}
return nil, err
}
if !tmplRef.ClusterScope && wftmpl != nil {
_, err = ctx.UpdateTemplateStatus(wftmpl.(*wfv1.WorkflowTemplate))
if err != nil {
log.Errorf("Update workflow template %s err: %v", wftmpl.GetName(), err)
}
if tmplRef.ClusterScope {
ctx.wftmplStatusQueue.cwftmplQueue.Add(tmplRef.Name)
} else {
ctx.wftmplStatusQueue.wftmplQueue.Add(tmplRef.Name)
}

template = wftmpl.GetTemplateByName(tmplRef.Template)
Expand Down Expand Up @@ -199,15 +208,38 @@ func (ctx *Context) GetTemplateScope() string {
return string(ctx.tmplBase.GetResourceScope()) + "/" + ctx.tmplBase.GetName()
}

// UpdateTemplateStatus update the WorkflowTemplate.Status of a given WorkflowTemplate.
func (ctx *Context) UpdateTemplateStatus(wftmple *wfv1.WorkflowTemplate) (*wfv1.WorkflowTemplate, error) {
// updateTemplateStatus update the WorkflowTemplate.Status of a given WorkflowTemplate.
func (ctx *Context) updateTemplateStatus(name string) error {
if ctx.wftmplClient == nil {
ctx.log.Warnln("while try update template status, wftmplClient not set")
return wftmple, nil
return nil
}
c := context.TODO()
wftmple, err := ctx.wftmplClient.Get(c, name, metav1.GetOptions{})
if err != nil {
return err
}

wftmple.Status.LastRunAt = metav1.Now()
_, err = ctx.wftmplClient.Update(c, wftmple, metav1.UpdateOptions{})
return err
}

// updateCtemplateStatus update the ClusterWorkflowTemplate.Status of a given ClusterWorkflowTemplate.
func (ctx *Context) updateCtemplateStatus(name string) error {
if ctx.cwftmplClient == nil {
ctx.log.Warnln("while try update cluster template status, cwftmplClient not set")
return nil
}
c := context.TODO()
wftmple, err := ctx.cwftmplClient.Get(c, name, metav1.GetOptions{})
if err != nil {
return err
}

wftmple.Status.LastRunAt = metav1.Now()
return ctx.wftmplClient.Update(context.TODO(), wftmple, metav1.UpdateOptions{})
_, err = ctx.cwftmplClient.Update(c, wftmple, metav1.UpdateOptions{})
return err
}

// ResolveTemplate digs into referenes and returns a merged template.
Expand Down
38 changes: 37 additions & 1 deletion workflow/templateresolution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ func createWorkflowTemplate(wfClientset wfclientset.Interface, yamlStr string) e
return err
}

func createClusterWorkflowTemplate(wfClientset wfclientset.Interface, yamlStr string) error {
ctx := context.Background()
cwftmpl := wfv1.MustUnmarshalClusterWorkflow(yamlStr)
_, err := wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates().Create(ctx, cwftmpl, metav1.CreateOptions{})
if err != nil && apierr.IsAlreadyExists(err) {
return nil
}
return err
}

// Deprecated
func unmarshalWftmpl(yamlStr string) *wfv1.WorkflowTemplate {
return wfv1.MustUnmarshalWorkflowTemplate(yamlStr)
Expand Down Expand Up @@ -402,7 +412,7 @@ func TestUpdateTemplLastRun(t *testing.T) {
wftemplateName := "some-workflow-template"
templateClient := wfClientset.ArgoprojV1alpha1().WorkflowTemplates(v1.NamespaceDefault)

_, err = ctx.UpdateTemplateStatus(wftmpl)
err = ctx.updateTemplateStatus(wftmpl.GetName())
if err != nil {
t.Fatal(err)
}
Expand All @@ -414,3 +424,29 @@ func TestUpdateTemplLastRun(t *testing.T) {
}
assert.NotEmpty(t, wftmpl.Status.LastRunAt)
}

func TestUpdateCtemplLastRun(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
cwftmpl := wfv1.MustUnmarshalClusterWorkflow(someWorkflowTemplateYaml)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), cwftmpl, nil)

err := createClusterWorkflowTemplate(wfClientset, someWorkflowTemplateYaml)
if err != nil {
t.Fatal(err)
}

wftemplateName := "some-workflow-template"
ctemplateClient := wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates()

err = ctx.updateTemplateStatus(cwftmpl.GetName())
if err != nil {
t.Fatal(err)
}

cwftmpl, err = ctemplateClient.Get(context.Background(), wftemplateName, v1.GetOptions{})
if err != nil {
t.Fatal(err)
return
}
assert.NotEmpty(t, cwftmpl.Status.LastRunAt)
}
93 changes: 93 additions & 0 deletions workflow/templateresolution/template_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package templateresolution

import (
"context"

"github.com/argoproj/pkg/sync"
log "github.com/sirupsen/logrus"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"
)

type wftmplStatusQueue struct {
wftmplQueue workqueue.RateLimitingInterface
cwftmplQueue workqueue.RateLimitingInterface

keyLock sync.KeyLock
ckeyLock sync.KeyLock
ctx *Context
}

func NewTmplStatusQueue(ctx *Context) *wftmplStatusQueue {
return &wftmplStatusQueue{
wftmplQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tmpl-status-queue"),
cwftmplQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ctmpl-status-queue"),
keyLock: sync.NewKeyLock(),
ckeyLock: sync.NewKeyLock(),
ctx: ctx,
}
}

func (q *wftmplStatusQueue) run(ctx context.Context) {
defer q.wftmplQueue.ShutDown()
defer q.cwftmplQueue.ShutDown()
go q.runTmplStatusUpdate()
go q.runCtmplStatusUpdate()
<-ctx.Done()
}

func (q *wftmplStatusQueue) runTmplStatusUpdate() {
ctx := context.TODO()
for q.processNextTmplItem(ctx) {
}
}

func (q *wftmplStatusQueue) runCtmplStatusUpdate() {
ctx := context.TODO()
for q.processNextCtmplItem(ctx) {
}
}

func (q *wftmplStatusQueue) processNextTmplItem(ctx context.Context) bool {

Check failure on line 51 in workflow/templateresolution/template_status.go

View workflow job for this annotation

GitHub Actions / Lint

`(*wftmplStatusQueue).processNextTmplItem` - `ctx` is unused (unparam)
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

key, quit := q.wftmplQueue.Get()
if quit {
return false
}
defer q.wftmplQueue.Done(key)

q.keyLock.Lock(key.(string))
defer q.keyLock.Unlock(key.(string))

logCtx := log.WithField("wftmplStatus", key)
logCtx.Infof("Processing %s", key)

err := q.ctx.updateTemplateStatus(key.(string))
if err != nil {
log.Errorf("Update workflow template %s err: %v", key.(string), err)
}
return true
}

func (q *wftmplStatusQueue) processNextCtmplItem(ctx context.Context) bool {

Check failure on line 73 in workflow/templateresolution/template_status.go

View workflow job for this annotation

GitHub Actions / Lint

`(*wftmplStatusQueue).processNextCtmplItem` - `ctx` is unused (unparam)
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

key, quit := q.cwftmplQueue.Get()
if quit {
return false
}
defer q.cwftmplQueue.Done(key)

q.ckeyLock.Lock(key.(string))
defer q.ckeyLock.Unlock(key.(string))

logCtx := log.WithField("cwftmplStatus", key)
logCtx.Infof("Processing %s", key)

err := q.ctx.updateCtemplateStatus(key.(string))
if err != nil {
log.Errorf("Update cluster workflow template %s err: %v", key.(string), err)
}
return true
}

0 comments on commit 5bc7bdc

Please sign in to comment.