Skip to content

Commit

Permalink
feat: workflow template support record last run time. Fixes argoproj#…
Browse files Browse the repository at this point in the history
…1915

Signed-off-by: qingfeng777 <[email protected]>
  • Loading branch information
qingfeng777 committed Nov 10, 2024
1 parent f057b7a commit d337d23
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 15 deletions.
8 changes: 6 additions & 2 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,12 @@ func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.Wo
return nil, sutils.ToStatusError(err, codes.Internal)
}

wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
wftmplClient := wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace)
cwftmplClient := wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates()
templateresolution.SetWorkflowTemplateClient(wftmplClient, cwftmplClient)

wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wftmplClient)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(cwftmplClient)

err = validate.ValidateWorkflow(wftmplGetter, cwftmplGetter, wf, validate.ValidateOpts{Submit: true})
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3677,7 +3677,11 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou
} else {
clusterWorkflowTemplateGetter = &templateresolution.NullClusterWorkflowTemplateGetter{}
}
ctx := templateresolution.NewContext(woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace), clusterWorkflowTemplateGetter, woc.execWf, woc.wf)

clientSet := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTemplates(woc.controller.GetManagedNamespace())
cClientSet := woc.controller.wfclientset.ArgoprojV1alpha1().ClusterWorkflowTemplates()
ctx := templateresolution.NewContextWithClientSet(woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace),
clusterWorkflowTemplateGetter, clientSet, cClientSet, woc.execWf, woc.wf)

switch scope {
case wfv1.ResourceScopeNamespaced:
Expand Down
52 changes: 48 additions & 4 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type Context struct {
wftmplGetter WorkflowTemplateNamespacedGetter
// cwftmplGetter is an interface to get ClusterWorkflowTemplates
cwftmplGetter ClusterWorkflowTemplateGetter
// wftmplClient is an interface to operate WorkflowTemplates.
wftmplClient typed.WorkflowTemplateInterface
// cwftmplClient is an interface to operate ClusterWorkflowTemplates
cwftmplClient typed.ClusterWorkflowTemplateInterface
// tmplBase is the base of local template search.
tmplBase wfv1.TemplateHolder
// workflow is the Workflow where templates will be stored
Expand All @@ -82,17 +86,40 @@ func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter Clu
return &Context{
wftmplGetter: wftmplGetter,
cwftmplGetter: cwftmplGetter,
wftmplClient: wftmplClientHolder,
cwftmplClient: cwftmplClientHolder,
tmplBase: tmplBase,
workflow: workflow,
log: log.WithFields(log.Fields{}),
}
}

// NewContextFromClientSet returns new Context.
func NewContextFromClientSet(wftmplClientset typed.WorkflowTemplateInterface, clusterWftmplClient typed.ClusterWorkflowTemplateInterface, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow) *Context {
var (
wftmplClientHolder typed.WorkflowTemplateInterface
cwftmplClientHolder typed.ClusterWorkflowTemplateInterface
)

func SetWorkflowTemplateClient(wftmplClient typed.WorkflowTemplateInterface, cwftmplClient typed.ClusterWorkflowTemplateInterface) {
wftmplClientHolder, cwftmplClientHolder = wftmplClient, cwftmplClient
}

// NewContextWithClientSet returns new Context.
// if Getter is nil, use clientset as Getter.
func NewContextWithClientSet(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter ClusterWorkflowTemplateGetter,
wftmplClient typed.WorkflowTemplateInterface, cwftmplClient typed.ClusterWorkflowTemplateInterface, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow) *Context {
wftmplClientHolder, cwftmplClientHolder = wftmplClient, cwftmplClient
if wftmplGetter == nil {
wftmplGetter = WrapWorkflowTemplateInterface(wftmplClient)
}
if cwftmplGetter == nil {
cwftmplGetter = WrapClusterWorkflowTemplateInterface(cwftmplClient)
}

return &Context{
wftmplGetter: WrapWorkflowTemplateInterface(wftmplClientset),
cwftmplGetter: WrapClusterWorkflowTemplateInterface(clusterWftmplClient),
wftmplGetter: wftmplGetter,
cwftmplGetter: cwftmplGetter,
wftmplClient: wftmplClient,
cwftmplClient: cwftmplClient,
tmplBase: tmplBase,
workflow: workflow,
log: log.WithFields(log.Fields{}),
Expand Down Expand Up @@ -135,6 +162,12 @@ func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Templat
}
return nil, err
}
if !tmplRef.ClusterScope && wftmpl != nil {
_, err = ctx.UpdateTemplateStatus(wftmpl.(*wfv1.WorkflowTemplate))
if err != nil {
log.Errorf("Update workflow template %s err: %v", wftmpl.GetName(), err)
}
}

template = wftmpl.GetTemplateByName(tmplRef.Template)

Expand Down Expand Up @@ -166,6 +199,17 @@ func (ctx *Context) GetTemplateScope() string {
return string(ctx.tmplBase.GetResourceScope()) + "/" + ctx.tmplBase.GetName()
}

// UpdateTemplateStatus update the WorkflowTemplate.Status of a given WorkflowTemplate.
func (ctx *Context) UpdateTemplateStatus(wftmple *wfv1.WorkflowTemplate) (*wfv1.WorkflowTemplate, error) {
if ctx.wftmplClient == nil {
ctx.log.Warnln("while try update template status, wftmplClient not set")
return wftmple, nil
}

wftmple.Status.LastRunAt = metav1.Now()
return ctx.wftmplClient.Update(context.TODO(), wftmple, metav1.UpdateOptions{})
}

// ResolveTemplate digs into referenes and returns a merged template.
// This method is the public start point of template resolution.
func (ctx *Context) ResolveTemplate(tmplHolder wfv1.TemplateReferenceHolder) (*Context, *wfv1.Template, bool, error) {
Expand Down
44 changes: 36 additions & 8 deletions workflow/templateresolution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -103,7 +104,7 @@ spec:
func TestGetTemplateByName(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

tmpl, err := ctx.GetTemplateByName("whalesay")
require.NoError(t, err)
Expand All @@ -125,7 +126,7 @@ func TestGetTemplateFromRef(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

// Get the template of existing template reference.
tmplRef := wfv1.TemplateRef{Name: "some-workflow-template", Template: "whalesay"}
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestGetTemplate(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

// Get the template of existing template name.
tmplHolder := wfv1.WorkflowStep{Template: "whalesay"}
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestGetTemplate(t *testing.T) {
func TestGetCurrentTemplateBase(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

// Get the template base of existing template name.
tmplBase := ctx.GetCurrentTemplateBase()
Expand All @@ -206,7 +207,7 @@ func TestWithTemplateHolder(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

var tmplGetter wfv1.TemplateHolder
// Get the template base of existing template name.
Expand Down Expand Up @@ -248,7 +249,7 @@ func TestResolveTemplate(t *testing.T) {
require.NoError(t, err)

wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

// Get the template of template name.
tmplHolder := wfv1.WorkflowStep{Template: "whalesay"}
Expand Down Expand Up @@ -321,7 +322,7 @@ func TestResolveTemplate(t *testing.T) {
func TestWithTemplateBase(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

anotherWftmpl := unmarshalWftmpl(anotherWorkflowTemplateYaml)

Expand All @@ -335,7 +336,8 @@ func TestWithTemplateBase(t *testing.T) {
func TestOnWorkflowTemplate(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContextFromClientSet(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(),
wftmpl, nil)

err := createWorkflowTemplate(wfClientset, anotherWorkflowTemplateYaml)
require.NoError(t, err)
Expand All @@ -346,3 +348,29 @@ func TestOnWorkflowTemplate(t *testing.T) {
tmpl := newCtx.tmplBase.GetTemplateByName("whalesay")
assert.NotNil(t, tmpl)
}

func TestUpdateTemplLastRun(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
wftmpl := unmarshalWftmpl(someWorkflowTemplateYaml)
ctx := NewContextWithClientSet(nil, nil, wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates(), wftmpl, nil)

err := createWorkflowTemplate(wfClientset, someWorkflowTemplateYaml)
if err != nil {
t.Fatal(err)
}

wftemplateName := "some-workflow-template"
templateClient := wfClientset.ArgoprojV1alpha1().WorkflowTemplates(v1.NamespaceDefault)

_, err = ctx.UpdateTemplateStatus(wftmpl)
if err != nil {
t.Fatal(err)
}

wftmpl, err = templateClient.Get(context.Background(), wftemplateName, v1.GetOptions{})
if err != nil {
t.Fatal(err)
return
}
assert.NotEmpty(t, wftmpl.Status.LastRunAt)
}

0 comments on commit d337d23

Please sign in to comment.