Skip to content

Commit

Permalink
Store resolved templates (argoproj#1552)
Browse files Browse the repository at this point in the history
* Store resolved templates in node status
  • Loading branch information
dtaniwaki authored and sarabala1979 committed Sep 30, 2019
1 parent df8260d commit 6acea0c
Show file tree
Hide file tree
Showing 13 changed files with 572 additions and 299 deletions.
3 changes: 3 additions & 0 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func printWorkflowHelper(wf *wfv1.Workflow, getArgs getFlags) {

// Print main and onExit Trees
mainRoot := roots[wf.ObjectMeta.Name]
if mainRoot == nil {
panic("failed to get the entrypoint node")
}
mainRoot.renderNodes(w, wf, 0, " ", " ", getArgs)

onExitID := wf.NodeID(wf.ObjectMeta.Name + "." + onExitSuffix)
Expand Down
52 changes: 52 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schema "k8s.io/apimachinery/pkg/runtime/schema"
)

// TemplateType is the type of a template
Expand Down Expand Up @@ -67,6 +68,7 @@ const (
type TemplateGetter interface {
GetNamespace() string
GetName() string
GroupVersionKind() schema.GroupVersionKind
GetTemplateByName(name string) *Template
}

Expand Down Expand Up @@ -365,6 +367,13 @@ func (tmpl *Template) GetTemplateRef() *TemplateRef {
return tmpl.TemplateRef
}

// GetBaseTemplate returns a base template content.
func (tmpl *Template) GetBaseTemplate() *Template {
baseTemplate := tmpl.DeepCopy()
baseTemplate.Inputs = Inputs{}
return baseTemplate
}

// Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another
type Inputs struct {
// Parameters are a list of parameters passed as inputs
Expand Down Expand Up @@ -672,6 +681,9 @@ type WorkflowStatus struct {
// Nodes is a mapping between a node ID and the node's status.
Nodes map[string]NodeStatus `json:"nodes,omitempty"`

// StoredTemplates is a mapping between a template ref and the node's status.
StoredTemplates map[string]Template `json:"storedTemplates,omitempty"`

// PersistentVolumeClaims tracks all PVCs that were created as part of the workflow.
// The contents of this list are drained at the end of the workflow.
PersistentVolumeClaims []apiv1.Volume `json:"persistentVolumeClaims,omitempty"`
Expand Down Expand Up @@ -709,6 +721,9 @@ type NodeStatus struct {
// Not applicable to virtual nodes (e.g. Retry, StepGroup)
TemplateRef *TemplateRef `json:"templateRef,omitempty"`

// WorkflowTemplateName is the WorkflowTemplate resource name on which the resolved template of this node is retrieved.
WorkflowTemplateName string `json:"workflowTemplateName,omitempty"`

// Phase a simple, high-level summary of where the node is in its lifecycle.
// Can be used as a state machine.
Phase NodePhase `json:"phase,omitempty"`
Expand Down Expand Up @@ -810,6 +825,16 @@ func (n NodeStatus) CanRetry() bool {
return n.Completed() && !n.Successful()
}

// GetBaseTemplateID returns a base template ID if available.
func (n *NodeStatus) GetBaseTemplateID() string {
if n.TemplateRef != nil {
return fmt.Sprintf("%s/%s", n.TemplateRef.Name, n.TemplateRef.Template)
} else if n.WorkflowTemplateName != "" {
return fmt.Sprintf("%s/%s", n.WorkflowTemplateName, n.TemplateName)
}
return ""
}

// S3Bucket contains the access information required for interfacing with an S3 bucket
type S3Bucket struct {
// Endpoint is the hostname of the bucket endpoint
Expand Down Expand Up @@ -1239,6 +1264,33 @@ func (wf *Workflow) NodeID(name string) string {
return fmt.Sprintf("%s-%v", wf.ObjectMeta.Name, h.Sum32())
}

// GetStoredTemplate gets a resolved template from stored data.
func (wf *Workflow) GetStoredTemplate(node *NodeStatus) *Template {
id := node.GetBaseTemplateID()
tmpl, ok := wf.Status.StoredTemplates[id]
if ok {
return &tmpl
}
return nil
}

// GetStoredOrLocalTemplate gets a resolved template from stored data or local template.
func (wf *Workflow) GetStoredOrLocalTemplate(node *NodeStatus) *Template {
// Try to find a template from stored data.
tmpl := wf.GetStoredTemplate(node)
if tmpl != nil {
return tmpl
}
// Try to get template from Workflow.
if node.WorkflowTemplateName == "" && node.TemplateName != "" {
tmpl := wf.GetTemplateByName(node.TemplateName)
if tmpl != nil {
return tmpl
}
}
return nil
}

// ContinueOn defines if a workflow should continue even if a task or step fails/errors.
// It can be specified if the workflow should continue when the pod errors, fails or both.
type ContinueOn struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 23 additions & 23 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,34 +187,29 @@ func (d *dagContext) hasMoreRetries(node *wfv1.NodeStatus) bool {
return true
}
// pick the first child to determine it's template type
childNode := d.wf.Status.Nodes[node.Children[0]]
tmpl, err := d.tmplCtx.GetTemplate(&childNode)
if err != nil {
childNode, ok := d.wf.Status.Nodes[node.Children[0]]
if !ok {
return false
}
if tmpl.RetryStrategy.Limit != nil && int32(len(node.Children)) > *tmpl.RetryStrategy.Limit {
tmpl := d.wf.GetStoredOrLocalTemplate(&childNode)
if tmpl != nil && tmpl.RetryStrategy != nil && tmpl.RetryStrategy.Limit != nil && int32(len(node.Children)) > *tmpl.RetryStrategy.Limit {
return false
}
return true
}

func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresolution.Context, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) *wfv1.NodeStatus {
node := woc.getNodeByName(nodeName)
if node != nil && node.Completed() {
return node
}
if node == nil {
node = woc.initializeNode(nodeName, wfv1.NodeTypeDAG, orgTmpl, boundaryID, wfv1.NodeRunning)
}
func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresolution.Context, tmpl *wfv1.Template, boundaryID string) error {
node := woc.markNodePhase(nodeName, wfv1.NodeRunning)

defer func() {
if node != nil && woc.wf.Status.Nodes[node.ID].Completed() {
if woc.wf.Status.Nodes[node.ID].Completed() {
_ = woc.killDaemonedChildren(node.ID)
}
}()

dagCtx := &dagContext{
boundaryName: nodeName,
boundaryID: woc.wf.NodeID(nodeName),
boundaryID: node.ID,
tasks: tmpl.DAG.Tasks,
visited: make(map[string]bool),
tmpl: tmpl,
Expand All @@ -239,9 +234,10 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
dagPhase := dagCtx.assessDAGPhase(targetTasks, woc.wf.Status.Nodes)
switch dagPhase {
case wfv1.NodeRunning:
return woc.getNodeByName(nodeName)
return nil
case wfv1.NodeError, wfv1.NodeFailed:
return woc.markNodePhase(nodeName, dagPhase)
_ = woc.markNodePhase(nodeName, dagPhase)
return nil
}

// set outputs from tasks in order for DAG templates to support outputs
Expand All @@ -259,7 +255,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
}
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
return woc.markNodeError(nodeName, err)
return err
}
if outputs != nil {
node = woc.getNodeByName(nodeName)
Expand All @@ -268,7 +264,6 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
}

// set the outbound nodes from the target tasks
node = woc.getNodeByName(nodeName)
outbound := make([]string, 0)
for _, depName := range targetTasks {
depNode := dagCtx.GetTaskNode(depName)
Expand All @@ -279,10 +274,12 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
outbound = append(outbound, outboundNodeIDs...)
}
woc.log.Infof("Outbound nodes of %s set to %s", node.ID, outbound)
node = woc.getNodeByName(nodeName)
node.OutboundNodes = outbound
woc.wf.Status.Nodes[node.ID] = *node

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
}

// executeDAGTask traverses and executes the upward chain of dependencies of a task
Expand Down Expand Up @@ -438,6 +435,9 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
ancestors := common.GetTaskAncestry(dagCtx, task.Name, dagCtx.tasks)
for _, ancestor := range ancestors {
ancestorNode := dagCtx.GetTaskNode(ancestor)
if ancestorNode == nil {
return nil, errors.InternalErrorf("Ancestor task node %s not found", ancestor)
}
prefix := fmt.Sprintf("tasks.%s", ancestor)
if ancestorNode.Type == wfv1.NodeTypeTaskGroup {
var ancestorNodes []wfv1.NodeStatus
Expand All @@ -446,11 +446,11 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
ancestorNodes = append(ancestorNodes, node)
}
}
tmpl, err := dagCtx.tmplCtx.GetTemplate(ancestorNode)
if err != nil {
return nil, errors.InternalWrapError(err)
tmpl := dagCtx.wf.GetStoredOrLocalTemplate(ancestorNode)
if tmpl != nil {
return nil, errors.InternalErrorf("Template of ancestor node '%s' not found", ancestorNode.Name)
}
err = woc.processAggregateNodeOutputs(tmpl, &scope, prefix, ancestorNodes)
err := woc.processAggregateNodeOutputs(tmpl, &scope, prefix, ancestorNodes)
if err != nil {
return nil, errors.InternalWrapError(err)
}
Expand Down
Loading

0 comments on commit 6acea0c

Please sign in to comment.