Skip to content

Commit

Permalink
fix: address feedback, find boundaries when stepgroup or taskgroup
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Oct 28, 2024
1 parent de7516e commit ab68186
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 57 deletions.
102 changes: 48 additions & 54 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,24 +823,20 @@ func createNewRetryWorkflow(wf *wfv1.Workflow, parameters []string) (*wfv1.Workf
return newWF, nil
}

type parentMap map[string]*wfv1.NodeStatus

type node struct {
type dagNode struct {
n *wfv1.NodeStatus
parent *node
children []*node
parent *dagNode
children []*dagNode
}

func newWorkflowsDag(wf *wfv1.Workflow) ([]*node, error) {

nodes := make(map[string]*node)

parentsMap := make(parentMap)
func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
nodes := make(map[string]*dagNode)
parentsMap := make(map[string]*wfv1.NodeStatus)

// create mapping from node to parent
// as well as creating temp mappings from nodeID to node
for _, wfNode := range wf.Status.Nodes {
n := node{}
n := dagNode{}
n.n = &wfNode
nodes[wfNode.ID] = &n
for _, child := range wfNode.Children {
Expand All @@ -854,12 +850,12 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*node, error) {
return nil, fmt.Errorf("couldn't find parent node for %s", wfNode.ID)
}

var parentNode *node
var parentNode *dagNode
if parentWfNode != nil {
parentNode = nodes[parentWfNode.ID]
}

children := []*node{}
children := []*dagNode{}

for _, childID := range wfNode.Children {
childNode, ok := nodes[childID]
Expand All @@ -868,25 +864,23 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*node, error) {
}
children = append(children, childNode)
}

nodes[wfNode.ID].parent = parentNode
nodes[wfNode.ID].children = children

}
values := []*node{}

values := []*dagNode{}
for _, v := range nodes {
values = append(values, v)
}
return values, nil
}

func singularPath(nodes []*node, toNode string) ([]*node, error) {

func singularPath(nodes []*dagNode, toNode string) ([]*dagNode, error) {
if len(nodes) <= 0 {
return nil, fmt.Errorf("expected at least 1 node")
}
var root *node
var leaf *node
var root *dagNode
var leaf *dagNode
for i := range nodes {
if nodes[i].n.ID == toNode {
leaf = nodes[i]
Expand All @@ -902,7 +896,7 @@ func singularPath(nodes []*node, toNode string) ([]*node, error) {

curr := leaf

reverseNodes := []*node{}
reverseNodes := []*dagNode{}
for {
reverseNodes = append(reverseNodes, curr)
if curr.n.ID == root.n.ID {
Expand All @@ -918,7 +912,7 @@ func singularPath(nodes []*node, toNode string) ([]*node, error) {
return reverseNodes, nil
}

func getChildren(n *node) map[string]bool {
func getChildren(n *dagNode) map[string]bool {
children := make(map[string]bool)
queue := list.New()
queue.PushBack(n)
Expand All @@ -928,7 +922,7 @@ func getChildren(n *node) map[string]bool {
break
}

curr := currNode.Value.(*node)
curr := currNode.Value.(*dagNode)
for i := range curr.children {
children[curr.children[i].n.ID] = true
queue.PushBack(curr.children[i])
Expand All @@ -940,15 +934,15 @@ func getChildren(n *node) map[string]bool {

type resetFn func(string, bool)
type deleteFn func(string, bool)
type tillFn func(*node) (bool, bool)
type tillFn func(*dagNode) (bool, bool)

func getTillFnNodeType(nodeType wfv1.NodeType) tillFn {
return func(n *node) (bool, bool) {
return func(n *dagNode) (bool, bool) {
return n.n.Type == nodeType, true
}
}

func consumeTill(n *node, should tillFn, resetFunc resetFn) (*node, error) {
func consumeTill(n *dagNode, should tillFn, resetFunc resetFn) (*dagNode, error) {
curr := n
for {
if curr == nil {
Expand All @@ -966,12 +960,12 @@ func consumeTill(n *node, should tillFn, resetFunc resetFn) (*node, error) {
}

func getTillBoundaryFn(boundaryID string) tillFn {
return func(n *node) (bool, bool) {
return func(n *dagNode) (bool, bool) {
return n.n.ID == boundaryID, n.n.BoundaryID != ""
}
}

func consumeBoundaries(n *node, resetFunc resetFn) (*node, error) {
func consumeBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
curr := n
for {
if curr == nil {
Expand All @@ -992,31 +986,31 @@ func consumeBoundaries(n *node, resetFunc resetFn) (*node, error) {
}
}

func consumeStepGroup(n *node, resetFunc resetFn) (*node, error) {
func consumeStepGroup(n *dagNode, resetFunc resetFn) (*dagNode, error) {
return consumeTill(n, getTillFnNodeType(wfv1.NodeTypeStepGroup), resetFunc)
}

func consumeSteps(n *node, resetFunc resetFn) (*node, error) {
func consumeSteps(n *dagNode, resetFunc resetFn) (*dagNode, error) {
n, err := consumeTill(n, getTillFnNodeType(wfv1.NodeTypeSteps), resetFunc)
if err != nil {
return nil, err
}
return consumeBoundaries(n, resetFunc)
}

func consumeTaskGroup(n *node, resetFunc resetFn) (*node, error) {
func consumeTaskGroup(n *dagNode, resetFunc resetFn) (*dagNode, error) {
return consumeTill(n, getTillFnNodeType(wfv1.NodeTypeTaskGroup), resetFunc)
}

func consumeDAG(n *node, resetFunc resetFn) (*node, error) {
func consumeDAG(n *dagNode, resetFunc resetFn) (*dagNode, error) {
n, err := consumeTill(n, getTillFnNodeType(wfv1.NodeTypeDAG), resetFunc)
if err != nil {
return nil, err
}
return consumeBoundaries(n, resetFunc)
}

func consumePod(n *node, resetFunc resetFn, addToDelete deleteFn) (*node, error) {
func consumePod(n *dagNode, resetFunc resetFn, addToDelete deleteFn) (*dagNode, error) {
// this sets to reset but resets are overridden by deletes in the final FormulateRetryWorkflow logic.
curr, err := consumeTill(n, getTillFnNodeType(wfv1.NodeTypePod), resetFunc)
if err != nil {
Expand All @@ -1030,7 +1024,7 @@ func consumePod(n *node, resetFunc resetFn, addToDelete deleteFn) (*node, error)
return curr, nil
}

func resetPath(allNodes []*node, toNode string) (map[string]bool, map[string]bool, error) {
func resetPath(allNodes []*dagNode, toNode string) (map[string]bool, map[string]bool, error) {
nodes, err := singularPath(allNodes, toNode)

curr := nodes[len(nodes)-1]
Expand Down Expand Up @@ -1091,13 +1085,13 @@ func resetPath(allNodes []*node, toNode string) (map[string]bool, map[string]boo
findBoundaries = true
case wfv1.NodeTypeStepGroup:
addToReset(curr.n.ID, false)
mustFind = wfv1.NodeTypeSteps
findBoundaries = true
case wfv1.NodeTypeDAG:
addToReset(curr.n.ID, false)
findBoundaries = true
case wfv1.NodeTypeTaskGroup:
addToReset(curr.n.ID, false)
mustFind = wfv1.NodeTypeDAG
findBoundaries = true
case wfv1.NodeTypeRetry:
addToReset(curr.n.ID, false)
case wfv1.NodeTypeSkipped:
Expand Down Expand Up @@ -1168,15 +1162,15 @@ func shouldRetryFailedType(nodeTyp wfv1.NodeType) bool {
return false
}

// dagSortedNodes sorts the nodes based on the order they were created, omits onExitNode
func dagSortedNodes(nodes []*node, rootNodeName string) []*node {
sortedNodes := make([]*node, 0)
// dagSortedNodes sorts the nodes based on topological order, omits onExitNode
func dagSortedNodes(nodes []*dagNode, rootNodeName string) []*dagNode {
sortedNodes := make([]*dagNode, 0)

if len(nodes) == 0 {
return sortedNodes
}

queue := make([]*node, 0)
queue := make([]*dagNode, 0)

for _, n := range nodes {
if n.n.Name == rootNodeName {
Expand Down Expand Up @@ -1225,7 +1219,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
return nil, nil, err
}

deleteNodesMap, err := getNodeIDsToResetNoChildren(restartSuccessful, nodeFieldSelector, wf.Status.Nodes)
deleteNodesMap, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, wf.Status.Nodes)
if err != nil {
return nil, nil, err
}
Expand All @@ -1248,14 +1242,14 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
toReset := make(map[string]bool)
toDelete := make(map[string]bool)

nodesMap := make(map[string]*node)
nodesMap := make(map[string]*dagNode)
for i := range nodes {
nodesMap[nodes[i].n.ID] = nodes[i]
}

nodes = dagSortedNodes(nodes, wf.Name)

deleteNodes := make([]*node, 0)
deleteNodes := make([]*dagNode, 0)

// deleteNodes will not contain an exit node
for i := range nodes {
Expand Down Expand Up @@ -1285,22 +1279,22 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
toDelete = setUnion(toDelete, pathToDelete)
}

for k := range toReset {
for nodeID := range toReset {
// avoid reseting nodes that are marked for deletion
if in := toDelete[k]; in {
if in := toDelete[nodeID]; in {
continue
}

n := wf.Status.Nodes[k]
n := wf.Status.Nodes[nodeID]

newWf.Status.Nodes.Set(k, resetNode(n))
newWf.Status.Nodes.Set(nodeID, resetNode(*n.DeepCopy()))
}

deletedPods := make(map[string]bool)
podsToDelete := []string{}

for k := range toDelete {
n := wf.Status.Nodes[k]
for nodeID := range toDelete {
n := wf.Status.Nodes[nodeID]
if n.Type == wfv1.NodeTypePod {
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, n, deletedPods, podsToDelete)
}
Expand Down Expand Up @@ -1342,11 +1336,11 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}

newChildren := []string{}
for _, childId := range oldWfNode.Children {
if toDelete[childId] {
for _, childID := range oldWfNode.Children {
if toDelete[childID] {
continue
}
newChildren = append(newChildren, childId)
newChildren = append(newChildren, childID)
}
newOutboundNodes := []string{}

Expand All @@ -1356,7 +1350,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}
newOutboundNodes = append(newOutboundNodes, outBoundNodeID)
}
// proven to exist

wfNode := newWf.Status.Nodes[id]
wfNode.Children = newChildren
wfNode.OutboundNodes = newOutboundNodes
Expand Down Expand Up @@ -1398,7 +1392,7 @@ func GetTemplateFromNode(node wfv1.NodeStatus) string {
return node.TemplateName
}

func getNodeIDsToResetNoChildren(restartSuccessful bool, nodeFieldSelector string, nodes wfv1.Nodes) (map[string]bool, error) {
func getNodeIDsToReset(restartSuccessful bool, nodeFieldSelector string, nodes wfv1.Nodes) (map[string]bool, error) {
nodeIDsToReset := make(map[string]bool)
if !restartSuccessful || len(nodeFieldSelector) == 0 {
return nodeIDsToReset, nil
Expand Down
4 changes: 1 addition & 3 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2278,9 +2278,7 @@ func TestDagConversion(t *testing.T) {
numNilParent++
}
}

assert.Equal(1, numNilParent)

}

const dagDiamondRetry = `apiVersion: argoproj.io/v1alpha1
Expand Down Expand Up @@ -3800,8 +3798,8 @@ func TestNestedDAG(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(nestedDAG)

newWf, podsToDelete, err := FormulateRetryWorkflow(context.Background(), wf, true, "id=dag-nested-zxlc2-744943701", []string{})

require.NoError(err)
_ = newWf
_ = podsToDelete

}

0 comments on commit ab68186

Please sign in to comment.