Skip to content

Commit

Permalink
fix: handle transition case from 3.4 by providing fallback code
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Sep 4, 2024
1 parent d55ba86 commit 88f3276
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 68 deletions.
3 changes: 2 additions & 1 deletion test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ spec:
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeFailed, status.Phase)
assert.Equal(t, v1alpha1.NodeTypeRetry, status.Type)
assert.Nil(t, status.NodeFlag)
assert.False(t, status.NodeFlag.Hooked)
assert.False(t, status.NodeFlag.Retried)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.Name == "test-workflow-level-hooks-with-retry(0)"
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ spec:
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeFailed, status.Phase)
assert.Equal(t, v1alpha1.NodeTypeRetry, status.Type)
assert.Nil(t, status.NodeFlag)
assert.False(t, status.NodeFlag.Hooked)
assert.False(t, status.NodeFlag.Retried)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.Name == "test-retry-limit(0)"
Expand Down
28 changes: 27 additions & 1 deletion workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,23 @@ func IsDone(un *unstructured.Unstructured) bool {
un.GetLabels()[LabelKeyWorkflowArchivingStatus] != "Pending"
}

// Remove ASAP in 3.6, used **only** for backward compatability.
func CheckHookNode(nodeName string) bool {
names := strings.Split(nodeName, ".")
if len(names) == 2 {
return names[1] == "onExit"
}

if len(names) <= 2 {
return false
}

if names[len(names)-1] == "onExit" || names[len(names)-2] == "hooks" {
return true
}
return false
}

// Check whether child hooked nodes Fulfilled
func CheckAllHooksFullfilled(node *wfv1.NodeStatus, nodes wfv1.Nodes) bool {
childs := node.Children
Expand All @@ -331,7 +348,16 @@ func CheckAllHooksFullfilled(node *wfv1.NodeStatus, nodes wfv1.Nodes) bool {
if !ok {
continue
}
if n.NodeFlag != nil && n.NodeFlag.Hooked && !n.Fulfilled() {

// fallback code
if n.NodeFlag == nil {
if CheckHookNode(n.Name) && !n.Fulfilled() {
return false
}
continue
}

if n.NodeFlag.Hooked && !n.Fulfilled() {
return false
}
}
Expand Down
16 changes: 8 additions & 8 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}
execute, proceed, err := dagCtx.evaluateDependsLogic(taskName)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, &wfv1.NodeFlag{}, err.Error())
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, wfv1.NodeFlag{}, err.Error())
connectDependencies(nodeName)
return
}
Expand All @@ -528,7 +528,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}
if !execute {
// Given the results of this node's dependencies, this node should not be executed. Mark it omitted
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, &wfv1.NodeFlag{}, "omitted: depends condition not met")
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, wfv1.NodeFlag{}, "omitted: depends condition not met")
connectDependencies(nodeName)
return
}
Expand All @@ -538,7 +538,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
// First resolve/substitute params/artifacts from our dependencies
newTask, err := woc.resolveDependencyReferences(dagCtx, task)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, &wfv1.NodeFlag{}, err.Error())
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, wfv1.NodeFlag{}, err.Error())
connectDependencies(nodeName)
return
}
Expand All @@ -547,7 +547,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
// expandedTasks will be a single element list of the same task
expandedTasks, err := expandTask(*newTask)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, &wfv1.NodeFlag{}, err.Error())
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, wfv1.NodeFlag{}, err.Error())
connectDependencies(nodeName)
return
}
Expand All @@ -559,11 +559,11 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
// DAG task with empty withParams list should be skipped
if len(expandedTasks) == 0 {
skipReason := "Skipped, empty params"
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeSkipped, &wfv1.NodeFlag{}, skipReason)
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeSkipped, wfv1.NodeFlag{}, skipReason)
connectDependencies(nodeName)
} else if taskGroupNode == nil {
connectDependencies(nodeName)
taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeRunning, &wfv1.NodeFlag{}, "")
taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeRunning, wfv1.NodeFlag{}, "")
}
}

Expand All @@ -578,12 +578,12 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
// Check the task's when clause to decide if it should execute
proceed, err := shouldExecute(t.When)
if err != nil {
woc.initializeNode(taskNodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, &wfv1.NodeFlag{}, err.Error())
woc.initializeNode(taskNodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, wfv1.NodeFlag{}, err.Error())
continue
}
if !proceed {
skipReason := fmt.Sprintf("when '%s' evaluated false", t.When)
woc.initializeNode(taskNodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeSkipped, &wfv1.NodeFlag{}, skipReason)
woc.initializeNode(taskNodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeSkipped, wfv1.NodeFlag{}, skipReason)
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/exit_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, exitHook *wfv1.Lif
onExitNode, err := woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: exitHook.Template, TemplateRef: exitHook.TemplateRef}, tmplCtx, resolvedArgs, &executeTemplateOpts{
boundaryID: boundaryID,
onExitTemplate: true,
nodeFlag: &wfv1.NodeFlag{Hooked: true},
nodeFlag: wfv1.NodeFlag{Hooked: true},
})
woc.addChildNode(parentNode.Name, onExitNodeName)
return true, onExitNode, err
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (woc *wfOperationCtx) executeWfLifeCycleHook(ctx context.Context, tmplCtx *
if execute || hookedNode != nil {
woc.log.WithField("lifeCycleHook", hookName).WithField("node", hookNodeName).Infof("Running workflow level hooks")
hookNode, err := woc.executeTemplate(ctx, hookNodeName, &wfv1.WorkflowStep{Template: hook.Template, TemplateRef: hook.TemplateRef}, tmplCtx, hook.Arguments,
&executeTemplateOpts{nodeFlag: &wfv1.NodeFlag{Hooked: true}},
&executeTemplateOpts{nodeFlag: wfv1.NodeFlag{Hooked: true}},
)
if err != nil {
return true, err
Expand Down Expand Up @@ -89,7 +89,7 @@ func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope *
}
hookNode, err := woc.executeTemplate(ctx, hookNodeName, &wfv1.WorkflowStep{Template: hook.Template, TemplateRef: hook.TemplateRef}, tmplCtx, resolvedArgs, &executeTemplateOpts{
boundaryID: boundaryID,
nodeFlag: &wfv1.NodeFlag{Hooked: true},
nodeFlag: wfv1.NodeFlag{Hooked: true},
})
if err != nil {
return false, err
Expand Down
12 changes: 8 additions & 4 deletions workflow/controller/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,8 @@ spec:
assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Nil(t, node.NodeFlag)
assert.False(t, node.NodeFlag.Hooked)
assert.False(t, node.NodeFlag.Retried)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running")
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.True(t, node.NodeFlag.Hooked)
Expand All @@ -1163,7 +1164,8 @@ spec:
assert.True(t, node.NodeFlag.Hooked)
node = woc.wf.Status.Nodes.FindByDisplayName("hook-running")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Nil(t, node.NodeFlag)
assert.False(t, node.NodeFlag.Hooked)
assert.False(t, node.NodeFlag.Retried)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

Expand Down Expand Up @@ -1238,7 +1240,8 @@ spec:
assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress)
node = woc.wf.Status.Nodes.FindByDisplayName("job")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Nil(t, node.NodeFlag)
assert.False(t, node.NodeFlag.Hooked)
assert.False(t, node.NodeFlag.Retried)
node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running")
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.True(t, node.NodeFlag.Hooked)
Expand All @@ -1254,6 +1257,7 @@ spec:
assert.True(t, node.NodeFlag.Hooked)
node = woc.wf.Status.Nodes.FindByDisplayName("job")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Nil(t, node.NodeFlag)
assert.False(t, node.NodeFlag.Hooked)
assert.False(t, node.NodeFlag.Retried)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}
60 changes: 47 additions & 13 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
if onExitNode != nil || woc.GetShutdownStrategy().ShouldExecute(true) {
exitHook := woc.execWf.Spec.GetExitHook(woc.execWf.Spec.Arguments)
onExitNode, err = woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: exitHook.Template, TemplateRef: exitHook.TemplateRef}, tmplCtx, exitHook.Arguments, &executeTemplateOpts{
onExitTemplate: true, nodeFlag: &wfv1.NodeFlag{Hooked: true},
onExitTemplate: true, nodeFlag: wfv1.NodeFlag{Hooked: true},
})
if err != nil {
x := fmt.Errorf("error in exit template execution : %w", err)
Expand Down Expand Up @@ -1784,6 +1784,18 @@ func (woc *wfOperationCtx) possiblyGetRetryChildNode(node *wfv1.NodeStatus) *wfv
if childNode == nil {
continue
}

// childNode.nodeFlag was missing, this can only happen if Node created before
// #13504
if childNode.NodeFlag == nil {
// fallback where nodeFlags is absent
// ensure that we do not return
if common.CheckHookNode(childNode.Name) {
continue
}
return childNode
}

if childNode.NodeFlag == nil || !childNode.NodeFlag.Hooked {
return childNode
}
Expand Down Expand Up @@ -1823,6 +1835,16 @@ func getRetryNodeChildrenIds(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {
if node == nil {
continue
}
// if nodeFlag is nil fallback to old behaviour
if node.NodeFlag == nil {
if common.CheckHookNode(node.Name) {
childrenIds = append(childrenIds, node.ID)
} else {
childrenIds = append(childrenIds, node.Children...)
}
continue
}

if node.NodeFlag != nil && node.NodeFlag.Hooked {
childrenIds = append(childrenIds, node.ID)
} else if len(node.Children) > 0 {
Expand Down Expand Up @@ -1864,15 +1886,16 @@ type executeTemplateOpts struct {
// activeDeadlineSeconds is a deadline to set to any pods executed. This is necessary for pods to inherit backoff.maxDuration
executionDeadline time.Time
// nodeFlag tracks node information such as hook or retry
nodeFlag *wfv1.NodeFlag
// MUST create since we rely on lack of nodeFlag for backwards compatability.
nodeFlag wfv1.NodeFlag
}

// executeTemplate executes the template with the given arguments and returns the created NodeStatus
// for the created node (if created). Nodes may not be created if parallelism or deadline exceeded.
// nodeName is the name to be used as the name of the node, and boundaryID indicates which template
// boundary this node belongs to.
func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, orgTmpl wfv1.TemplateReferenceHolder, tmplCtx *templateresolution.Context, args wfv1.Arguments, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s", nodeName, common.GetTemplateHolderString(orgTmpl), opts.boundaryID)
woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s, stack depth: %d", nodeName, common.GetTemplateHolderString(orgTmpl), opts.boundaryID, woc.currentStackDepth)

// Set templateScope from which the template resolution starts.
templateScope := tmplCtx.GetTemplateScope()
Expand Down Expand Up @@ -2082,9 +2105,6 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
woc.log.Debugf("Inject a retry node for node %s", retryNodeName)
retryParentNode = woc.initializeExecutableNode(retryNodeName, wfv1.NodeTypeRetry, templateScope, processedTmpl, orgTmpl, opts.boundaryID, wfv1.NodeRunning, opts.nodeFlag)
}
if opts.nodeFlag == nil {
opts.nodeFlag = &wfv1.NodeFlag{}
}
opts.nodeFlag.Retried = true
processedRetryParentNode, continueExecution, err := woc.processNodeRetries(retryParentNode, *woc.retryStrategy(processedTmpl), opts)
if err != nil {
Expand Down Expand Up @@ -2464,7 +2484,7 @@ func (woc *wfOperationCtx) markWorkflowError(ctx context.Context, err error) {
var stepsOrDagSeparator = regexp.MustCompile(`^(\[\d+\])?\.`)

// initializeExecutableNode initializes a node and stores the template.
func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wfv1.NodeType, templateScope string, executeTmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, phase wfv1.NodePhase, nodeFlag *wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wfv1.NodeType, templateScope string, executeTmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, phase wfv1.NodePhase, nodeFlag wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
node := woc.initializeNode(nodeName, nodeType, templateScope, orgTmpl, boundaryID, phase, nodeFlag)

// Set the input values to the node.
Expand Down Expand Up @@ -2498,7 +2518,7 @@ func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wf
}

// initializeNodeOrMarkError initializes an error node or mark a node if it already exists.
func (woc *wfOperationCtx) initializeNodeOrMarkError(node *wfv1.NodeStatus, nodeName string, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, nodeFlag *wfv1.NodeFlag, err error) *wfv1.NodeStatus {
func (woc *wfOperationCtx) initializeNodeOrMarkError(node *wfv1.NodeStatus, nodeName string, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, nodeFlag wfv1.NodeFlag, err error) *wfv1.NodeStatus {
if node != nil {
return woc.markNodeError(nodeName, err)
}
Expand All @@ -2507,7 +2527,7 @@ func (woc *wfOperationCtx) initializeNodeOrMarkError(node *wfv1.NodeStatus, node
}

// Creates a node status that is or will be cached
func (woc *wfOperationCtx) initializeCacheNode(nodeName string, resolvedTmpl *wfv1.Template, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, memStat *wfv1.MemoizationStatus, nodeFlag *wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
func (woc *wfOperationCtx) initializeCacheNode(nodeName string, resolvedTmpl *wfv1.Template, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, memStat *wfv1.MemoizationStatus, nodeFlag wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
if resolvedTmpl.Memoize == nil {
err := fmt.Errorf("cannot initialize a cached node from a non-memoized template")
woc.log.WithFields(log.Fields{"namespace": woc.wf.Namespace, "wfName": woc.wf.Name}).WithError(err)
Expand All @@ -2521,15 +2541,15 @@ func (woc *wfOperationCtx) initializeCacheNode(nodeName string, resolvedTmpl *wf
}

// Creates a node status that has been cached, completely initialized, and marked as finished
func (woc *wfOperationCtx) initializeCacheHitNode(nodeName string, resolvedTmpl *wfv1.Template, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, outputs *wfv1.Outputs, memStat *wfv1.MemoizationStatus, nodeFlag *wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
func (woc *wfOperationCtx) initializeCacheHitNode(nodeName string, resolvedTmpl *wfv1.Template, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, outputs *wfv1.Outputs, memStat *wfv1.MemoizationStatus, nodeFlag wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
node := woc.initializeCacheNode(nodeName, resolvedTmpl, templateScope, orgTmpl, boundaryID, memStat, nodeFlag, messages...)
node.Phase = wfv1.NodeSucceeded
node.Outputs = outputs
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
return node
}

func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeType, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, phase wfv1.NodePhase, nodeFlag *wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeType, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, phase wfv1.NodePhase, nodeFlag wfv1.NodeFlag, messages ...string) *wfv1.NodeStatus {
woc.log.Debugf("Initializing node %s: template: %s, boundaryID: %s", nodeName, common.GetTemplateHolderString(orgTmpl), boundaryID)

nodeID := woc.wf.NodeID(nodeName)
Expand All @@ -2547,7 +2567,7 @@ func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeTyp
Type: nodeType,
BoundaryID: boundaryID,
Phase: phase,
NodeFlag: nodeFlag,
NodeFlag: &nodeFlag,
StartedAt: metav1.Time{Time: time.Now().UTC()},
EstimatedDuration: woc.estimateNodeDuration(nodeName),
}
Expand Down Expand Up @@ -3165,6 +3185,11 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
// Some of the children may be hooks, only keep those that aren't
nodeIdx := 0
for i := range childNodes {
if childNodes[i].NodeFlag == nil {
if common.CheckHookNode(childNodes[i].Name) {
continue
}
}
if childNodes[i].NodeFlag == nil || !childNodes[i].NodeFlag.Hooked {
childNodes[nodeIdx] = childNodes[i]
nodeIdx++
Expand Down Expand Up @@ -4112,9 +4137,18 @@ func getChildNodeIdsRetried(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {
childrenIds := []string{}
for i := 0; i < len(node.Children); i++ {
n := getChildNodeIndex(node, nodes, i)
if n == nil || n.NodeFlag == nil {
if n == nil {
continue
}

// fallback code here
if n.NodeFlag == nil {
if !common.CheckHookNode(n.Name) {
childrenIds = append(childrenIds, n.ID)
}
continue
}

if n.NodeFlag.Retried {
childrenIds = append(childrenIds, n.ID)
}
Expand Down
Loading

0 comments on commit 88f3276

Please sign in to comment.