Skip to content

Commit

Permalink
fix(visualizer): panic when subflow handle is paniced
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Oct 15, 2024
1 parent 54bf40f commit 5994854
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 38 deletions.
18 changes: 13 additions & 5 deletions graph.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gotaskflow

import (
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -72,11 +73,18 @@ func (g *Graph) instancelize() {
}

// only for visualizer
func (g *Graph) topologicalSort() ([]*Node, bool) {
g.instancelize()
func (g *Graph) topologicalSort() (sorted []*Node, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("instancelize may failed or paniced")
return
}
}()

g.instancelize() // may require panic recover
indegree := map[*Node]int{} // Node -> indegree
zeros := make([]*Node, 0) // zero deps
sorted := make([]*Node, 0, len(g.nodes))
sorted = make([]*Node, 0, len(g.nodes))

for _, node := range g.nodes {
set := map[*Node]struct{}{}
Expand Down Expand Up @@ -106,9 +114,9 @@ func (g *Graph) topologicalSort() ([]*Node, bool) {

for _, node := range g.nodes {
if indegree[node] > 0 {
return nil, false
return nil, fmt.Errorf("graph has cycles")
}
}

return sorted, true
return
}
30 changes: 15 additions & 15 deletions graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import "testing"
func TestTopologicalSort(t *testing.T) {
t.Run("TestEmptyGraph", func(t *testing.T) {
graph := newGraph("empty")
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 0 {
t.Errorf("expected true and an empty slice, got %v and %v", ok, sorted)
sorted, err := graph.topologicalSort()
if err != nil || len(sorted) != 0 {
t.Errorf("expected true and an empty slice, got %v and %v", err, sorted)
}
})

t.Run("TestSingleNodeGraph", func(t *testing.T) {
graph := newGraph("single node")
nodeA := newNode("A")
graph.push(nodeA)
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 1 || sorted[0] != nodeA {
t.Errorf("expected true and the single node, got %v and %v", ok, sorted)
sorted, err := graph.topologicalSort()
if err != nil || len(sorted) != 1 || sorted[0] != nodeA {
t.Errorf("expected true and the single node, got %v and %v", err, sorted)
}
})

Expand All @@ -29,9 +29,9 @@ func TestTopologicalSort(t *testing.T) {
nodeA.precede(nodeB)
nodeB.precede(nodeC)
graph.push(nodeA, nodeB, nodeC)
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 3 || sorted[0] != nodeA || sorted[1] != nodeB || sorted[2] != nodeC {
t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted)
sorted, err := graph.topologicalSort()
if err != nil || len(sorted) != 3 || sorted[0] != nodeA || sorted[1] != nodeB || sorted[2] != nodeC {
t.Errorf("expected true and a correct sorted order, got %v and %v", err, sorted)
}
})

Expand All @@ -48,9 +48,9 @@ func TestTopologicalSort(t *testing.T) {
nodeC.precede(nodeD)
nodeD.precede(nodeE)
graph.push(nodeA, nodeB, nodeC, nodeD, nodeE)
sorted, ok := graph.topologicalSort()
if !ok || len(sorted) != 5 {
t.Errorf("expected true and a correct sorted order, got %v and %v", ok, sorted)
sorted, err := graph.topologicalSort()
if err != nil || len(sorted) != 5 {
t.Errorf("expected true and a correct sorted order, got %v and %v", err, sorted)
}
// Further check the ordering
nodeIndex := make(map[*Node]int)
Expand All @@ -71,9 +71,9 @@ func TestTopologicalSort(t *testing.T) {
nodeB.precede(nodeC)
nodeC.precede(nodeA) // Creates a cycle
graph.push(nodeA, nodeB, nodeC)
_, ok := graph.topologicalSort()
if ok {
t.Errorf("expected false due to cycle, got %v", ok)
_, err := graph.topologicalSort()
if err == nil {
t.Errorf("expected false due to cycle, got %v", err)
}
})
}
8 changes: 0 additions & 8 deletions taskflow.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package gotaskflow

import (
"errors"
)

var (
ErrGraphIsCyclic = errors.New("graph is cyclic, not support")
)

type TaskFlow struct {
name string
graph *Graph
Expand Down
6 changes: 3 additions & 3 deletions taskflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func TestSubflowPanic(t *testing.T) {
tf.Push(subflow)
exector.Run(tf)
exector.Wait()
// if err := gotaskflow.Visualizer.Visualize(tf, os.Stdout); err != nil {
// log.Fatal(err)
// }
if err := gotaskflow.Visualizer.Visualize(tf, os.Stdout); err != nil {
fmt.Errorf("%v", err)
}
exector.Profile(os.Stdout)
}
20 changes: 13 additions & 7 deletions visualizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type visualizer struct {
var Visualizer = visualizer{}

func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph.Graph) error {
nodes, ok := g.topologicalSort()
if !ok {
return fmt.Errorf("graph %v topological sort -> %w", g.name, ErrGraphIsCyclic)
nodes, err := g.topologicalSort()
if err != nil {
return fmt.Errorf("graph %v topological sort -> %w", g.name, err)
}
vGraph := parentG
if vGraph == nil {
Expand All @@ -44,10 +44,16 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph
vSubGraph := vGraph.SubGraph("cluster_"+node.name, 1)
err := v.visualizeG(gv, p.g, vSubGraph)
if err != nil {
return fmt.Errorf("graph %v visualize -> %w", g.name, ErrGraphIsCyclic)
fmt.Printf("graph %v visualize -> %s\n", g.name, err)
// return fmt.Errorf()
vNode, err := vSubGraph.CreateNode("unvisualized_" + p.g.name)
if err != nil {
return fmt.Errorf("add node %v -> %w", node.name, err)
}
nodeMap[node.name] = vNode
} else {
nodeMap[node.name] = vSubGraph.FirstNode()
}

nodeMap[node.name] = vSubGraph.FirstNode()
}
}

Expand All @@ -68,7 +74,7 @@ func (v *visualizer) Visualize(tf *TaskFlow, writer io.Writer) error {

err := v.visualizeG(gv, tf.graph, nil)
if err != nil {
return fmt.Errorf("graph %v topological sort -> %w", tf.graph.name, ErrGraphIsCyclic)
return fmt.Errorf("graph %v topological sort -> %w", tf.graph.name, err)
}

if err := gv.Render(v.root, graphviz.XDOT, writer); err != nil {
Expand Down

0 comments on commit 5994854

Please sign in to comment.