diff --git a/executor.go b/executor.go index 839f90d..f6e5792 100644 --- a/executor.go +++ b/executor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "sync" "time" @@ -86,8 +87,8 @@ func (e *ExecutorImpl) invokeNode(ctx *context.Context, node *Node, parentSpan * span.end = time.Now() span.extra.success = true if r := recover(); r != nil { - fmt.Println("node", node.name, "recovered ", r) node.g.canceled.Store(true) + fmt.Println("[recovered] node", node.name, "panic:", r, debug.Stack()) } else { e.profiler.AddSpan(&span) // remove canceled node span } @@ -116,7 +117,7 @@ func (e *ExecutorImpl) invokeNode(ctx *context.Context, node *Node, parentSpan * span.end = time.Now() span.extra.success = true if r := recover(); r != nil { - fmt.Println("subflow", node.name, "recovered ", r) + fmt.Println("[recovered] subflow", node.name, "panic:", r, debug.Stack()) node.g.canceled.Store(true) p.g.canceled.Store(true) } else { diff --git a/flow.go b/flow.go index 7056ed1..cd15823 100644 --- a/flow.go +++ b/flow.go @@ -19,7 +19,6 @@ type Subflow struct { func (sf *Subflow) instancelize() (err error) { defer func() { if r := recover(); r != nil { - fmt.Println("instancelize may failed or paniced") err = fmt.Errorf("instancelize may failed or paniced") } }() diff --git a/image/dag.svg b/image/dag.svg index 421e97c..86da3c9 100644 --- a/image/dag.svg +++ b/image/dag.svg @@ -1,153 +1,163 @@ - - + + + + + + G - + cluster_sub2 - + +sub2 cluster_sub1 - + +sub1 A3 - -A3 + +A3 B3 - -B3 + +B3 -A3->B3 - - +A3->B3 + + C3 - -C3 + +C3 -C3->B3 - - +C3->B3 + + + - + A2 - -A2 - - - -A2->A3 - - + +A2 - + B2 - -B2 + +B2 -A2->B2 - - - - - -B - -B - - - -A2->B - - +A2->B2 + + - + C2 - -C2 + +C2 -C2->B2 - - +C2->B2 + + + + + + +sub1->sub2 + + + + + +B + +B + + + +sub1->B + + - + A - -A + +A -A->B - - +A->B + + - + C - -C + +C -C->B - - +C->B + + - + A1 - -A1 + +A1 -A1->B - - +A1->B + + -A1->C - - +A1->C + + - + B1 - -B1 + +B1 -B1->C - - +B1->C + + - + C1 - -C1 + +C1 \ No newline at end of file diff --git a/image/fl.svg b/image/fl.svg index 21d2eb1..82fe9ce 100644 --- a/image/fl.svg +++ b/image/fl.svg @@ -430,64 +430,64 @@ -all (7,780 samples, 100%) - +static,B,cost 11µs (11 samples, 3.44%) +sta.. -static,B2,610ns (610 samples, 7.84%) -static,B2,6.. +static,B3,cost 3µs480ns (3 samples, 0.94%) + -static,C,670ns (670 samples, 8.61%) -static,C,670ns +static,B2,cost 2µs800ns (2 samples, 0.62%) + -static,B1,580ns (580 samples, 7.46%) -static,B1,.. +static,C,cost 3µs278ns (3 samples, 0.94%) + -static,C3,530ns (530 samples, 6.81%) -static,C3.. +subflow,sub2,cost 2µs789ns (38 samples, 11.88%) +subflow,sub2,cost.. -subflow,sub1,11150ns (2,380 samples, 30.59%) -subflow,sub1,11150ns +subflow,sub1,cost 2µs250ns (62 samples, 19.38%) +subflow,sub1,cost 2µs250ns -static,B3,590ns (590 samples, 7.58%) -static,B3,.. +static,C1,cost 70µs619ns (70 samples, 21.88%) +static,C1,cost 70µs619ns -static,A1,610ns (610 samples, 7.84%) -static,A1,6.. +static,A,cost 13µs991ns (13 samples, 4.06%) +stat.. -static,A,1230ns (1,230 samples, 15.81%) -static,A,1230ns +static,B1,cost 71µs394ns (71 samples, 22.19%) +static,B1,cost 71µs394ns -static,A3,670ns (670 samples, 8.61%) -static,A3,67.. +static,A1,cost 63µs495ns (63 samples, 19.69%) +static,A1,cost 63µs495ns -subflow,sub2,5340ns (1,790 samples, 23.01%) -subflow,sub2,5340ns +static,C3,cost 7µs120ns (7 samples, 2.19%) +s.. -static,C2,530ns (530 samples, 6.81%) -static,C2.. +static,A2,cost 55µs376ns (55 samples, 17.19%) +static,A2,cost 55µs376ns -static,A2,570ns (570 samples, 7.33%) -static,A2,.. +static,C2,cost 5µs27ns (5 samples, 1.56%) + -static,C1,480ns (480 samples, 6.17%) -static,C.. +static,A3,cost 17µs306ns (17 samples, 5.31%) +static.. -static,B,710ns (710 samples, 9.13%) -static,B,710ns +all (320 samples, 100%) + \ No newline at end of file diff --git a/taskflow_test.go b/taskflow_test.go index 1a15c5e..01567a0 100644 --- a/taskflow_test.go +++ b/taskflow_test.go @@ -195,8 +195,8 @@ func TestSubflowPanic(t *testing.T) { tf := gotaskflow.NewTaskFlow("G") tf.Push(A, B, C) tf.Push(subflow) - // exector.Run(tf) - // exector.Wait() + exector.Run(tf) + exector.Wait() if err := gotaskflow.Visualizer.Visualize(tf, os.Stdout); err != nil { fmt.Errorf("%v", err) } diff --git a/visualizer.go b/visualizer.go index af42bd6..9e7ec48 100644 --- a/visualizer.go +++ b/visualizer.go @@ -26,6 +26,8 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph if err != nil { return fmt.Errorf("make graph -> %w", err) } + vGraph.SetRankDir(cgraph.TBRank) + vGraph.SetNewRank(true) v.root = vGraph } // defer vGraph.Close() @@ -43,9 +45,10 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph case *Subflow: vSubGraph := vGraph.SubGraph("cluster_"+node.name, 1) vSubGraph.SetLabel(node.name) - vSubGraph.SetBackgroundColor("#FFF8DC") + vSubGraph.SetBackgroundColor("#F5F5F5") + vSubGraph.SetRankDir(cgraph.LRRank) + if p.instancelize() != nil || v.visualizeG(gv, p.g, vSubGraph) != nil { - fmt.Println("unvisualized_subflow_" + p.g.name) vNode, err := vGraph.CreateNode("unvisualized_subflow_" + p.g.name) if err != nil { return fmt.Errorf("add node %v -> %w", node.name, err) @@ -54,14 +57,18 @@ func (v *visualizer) visualizeG(gv *graphviz.Graphviz, g *Graph, parentG *cgraph vNode.SetComment("cannot visualize due to instancelize panic or failed") nodeMap[node.name] = vNode } else { - nodeMap[node.name] = vSubGraph.LastNode() + dummy, _ := vSubGraph.CreateNode(p.g.name) + dummy.SetShape(cgraph.PointShape) + nodeMap[node.name] = dummy + dummy.SetStyle(cgraph.NodeStyle("invis")) + vSubGraph.SetNewRank(true) } } } for _, node := range nodes { for _, deps := range node.dependents { - fmt.Printf("add edge %v - %v\n", deps.name, node.name) + // fmt.Printf("add edge %v - %v\n", deps.name, node.name) if _, err := vGraph.CreateEdge("", nodeMap[deps.name], nodeMap[node.name]); err != nil { return fmt.Errorf("add edge %v - %v -> %w", deps.name, node.name, err) }