Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: provide fallback for 3.4 to 3.5 transition with absent NodeFlag. Fixes #12162 #13504

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,35 @@ func IsDone(un *unstructured.Unstructured) bool {
un.GetLabels()[LabelKeyWorkflowArchivingStatus] != "Pending"
}

// CheckHookNode is used to determine if
// a node was a hook node via its name.
func CheckHookNode(nodeName string) bool {
isubasinghe marked this conversation as resolved.
Show resolved Hide resolved
names := strings.Split(nodeName, ".")
if len(names) == 2 {
return names[1] == "onExit"
}

if len(names) <= 2 {
return false
}

if names[len(names)-1] == "onExit" || names[len(names)-2] == "hooks" {
return true
}
return false
}

// CheckRetryNodeParent determines if a node is a retriable node,
// that is if the node is the child of a retry node.
func CheckRetryNodeParent(parentsMap map[string]*wfv1.NodeStatus, nodeID string) bool {
parent, found := parentsMap[nodeID]
if !found {
return false
}

return parent.Type == wfv1.NodeTypeRetry
}

// Check whether child hooked nodes Fulfilled
func CheckAllHooksFullfilled(node *wfv1.NodeStatus, nodes wfv1.Nodes) bool {
childs := node.Children
Expand Down
46 changes: 45 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,46 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
return &woc
}

// definitePostNodeFlagsWf determines if this workflow is definitely after the introduction of
// the NodeFlag field.
// used to avoid the O(n*m) loop in maybeUpgradeWithNodeFlags
func (woc *wfOperationCtx) definitePostNodeFlagsWf() bool {
for _, node := range woc.wf.Status.Nodes {
if node.NodeFlag != nil {
return true
}
}
return false
}

// maybeUpgradeWithNodeFlags upgrades old workflows without
// NodeFlag fields into ones with.
func (woc *wfOperationCtx) maybeUpgradeWithNodeFlags() {
isubasinghe marked this conversation as resolved.
Show resolved Hide resolved
parentsMap := make(map[string]*wfv1.NodeStatus)

for _, node := range woc.wf.Status.Nodes {
node := node
for _, childNodeID := range node.Children {
parentsMap[childNodeID] = &node
}
}

for key, node := range woc.wf.Status.Nodes {
newNode := node
if common.CheckHookNode(node.Name) {
newNode.NodeFlag = &wfv1.NodeFlag{Hooked: true}
}

if common.CheckRetryNodeParent(parentsMap, node.ID) {
if newNode.NodeFlag == nil {
newNode.NodeFlag = &wfv1.NodeFlag{}
}
newNode.NodeFlag.Retried = true
}
woc.wf.Status.Nodes[key] = newNode
}
}

// operate is the main operator logic of a workflow. It evaluates the current state of the workflow,
// and its pods and decides how to proceed down the execution path.
// TODO: an error returned by this method should result in requeuing the workflow to be retried at a
Expand All @@ -204,6 +244,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}()

if !woc.definitePostNodeFlagsWf() {
woc.maybeUpgradeWithNodeFlags()
}

woc.log.WithFields(log.Fields{"Phase": woc.wf.Status.Phase, "ResourceVersion": woc.wf.ObjectMeta.ResourceVersion}).Info("Processing workflow")

// Set the Execute workflow spec for execution
Expand Down Expand Up @@ -1872,7 +1916,7 @@ type executeTemplateOpts struct {
// nodeName is the name to be used as the name of the node, and boundaryID indicates which template
// boundary this node belongs to.
func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, orgTmpl wfv1.TemplateReferenceHolder, tmplCtx *templateresolution.Context, args wfv1.Arguments, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s", nodeName, common.GetTemplateHolderString(orgTmpl), opts.boundaryID)
woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s, stack depth: %d", nodeName, common.GetTemplateHolderString(orgTmpl), opts.boundaryID, woc.currentStackDepth)

// Set templateScope from which the template resolution starts.
templateScope := tmplCtx.GetTemplateScope()
Expand Down
Loading