Skip to content

Commit

Permalink
fix(executor): hang when task panic
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Oct 15, 2024
1 parent bfa3f48 commit 54bf40f
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 51 deletions.
79 changes: 44 additions & 35 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ func NewExecutor(concurrency uint) Executor {
}

func (e *ExecutorImpl) Run(tf *TaskFlow) Executor {
// register graceful exit when pinc
e.pool.SetPanicHandler(func(ctx *context.Context, i interface{}) {

})

tf.graph.setup()
for _, node := range tf.graph.entries {
e.schedule(node)
Expand All @@ -59,12 +54,12 @@ func (e *ExecutorImpl) invokeGraph(g *Graph, parentSpan *span) {
ctx := context.Background()
for {
g.scheCond.L.Lock()
for g.JoinCounter() != 0 && e.wq.Len() == 0 {
for g.JoinCounter() != 0 && e.wq.Len() == 0 && !g.canceled.Load() {
g.scheCond.Wait()
}
g.scheCond.L.Unlock()

if g.JoinCounter() == 0 {
if g.JoinCounter() == 0 || g.canceled.Load() {
break
}

Expand All @@ -86,25 +81,30 @@ func (e *ExecutorImpl) invokeNode(ctx *context.Context, node *Node, parentSpan *
typ: NodeStatic,
name: node.name,
}, begin: time.Now(), parent: parentSpan}

defer func() {
span.end = time.Now()
span.extra.success = true
e.profiler.AddSpan(&span)
if r := recover(); r != nil {
fmt.Println("node", node.name, "recovered ", r)
node.g.canceled.Store(true)
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}

e.wg.Done()
node.drop()
for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
node.g.scheCond.Signal()
}()

defer e.wg.Done()
node.state.Store(kNodeStateRunning)
defer node.state.Store(kNodeStateFinished)

p.handle()
node.drop()
for _, n := range node.successors {
// fmt.Println("put", n.Name)
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
node.g.scheCond.Signal()
node.state.Store(kNodeStateFinished)
})
case *Subflow:
e.pool.Go(func() {
Expand All @@ -115,36 +115,45 @@ func (e *ExecutorImpl) invokeNode(ctx *context.Context, node *Node, parentSpan *
defer func() {
span.end = time.Now()
span.extra.success = true
e.profiler.AddSpan(&span)
if r := recover(); r != nil {
fmt.Println("subflow", node.name, "recovered ", r)
node.g.canceled.Store(true)
p.g.canceled.Store(true)
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}
e.wg.Done()
e.scheduleGraph(p.g, &span)
node.drop()

for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
node.g.scheCond.Signal()
}()

defer e.wg.Done()
node.state.Store(kNodeStateRunning)
defer node.state.Store(kNodeStateFinished)

if !p.g.instancelized {
p.handle(p)
}
p.g.instancelized = true
node.state.Store(kNodeStateFinished)

e.scheduleGraph(p.g, &span)
node.drop()

for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}

node.g.scheCond.Signal()
})
default:
fmt.Println("exit: ", node.name)
panic("do nothing")
panic("unsupported node")
}
}

func (e *ExecutorImpl) schedule(node *Node) {
if node.g.canceled.Load() {
node.g.scheCond.Signal()
fmt.Println("node cannot be scheduled, cuz graph canceled", node.name)
return
}

e.wg.Add(1)
e.wq.Put(node)
node.state.Store(kNodeStateWaiting)
Expand Down
2 changes: 2 additions & 0 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gotaskflow

import (
"sync"
"sync/atomic"

"github.com/noneback/go-taskflow/utils"
)
Expand All @@ -13,6 +14,7 @@ type Graph struct {
entries []*Node
scheCond *sync.Cond
instancelized bool
canceled atomic.Bool // only changes when task in graph panic
}

func newGraph(name string) *Graph {
Expand Down
2 changes: 2 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const (
kNodeStateWaiting = int32(1)
kNodeStateRunning = int32(2)
kNodeStateFinished = int32(3)
kNodeStateFailed = int32(4)
kNodeStateCanceled = int32(5)
)

type NodeType string
Expand Down
81 changes: 67 additions & 14 deletions taskflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "net/http/pprof"
"os"
"testing"
"time"

gotaskflow "github.com/noneback/go-taskflow"
)
Expand Down Expand Up @@ -135,17 +136,69 @@ func TestSubflow(t *testing.T) {
// }
}

// func TestTaskflowPanic(t *testing.T) {
// A, B, C :=
// gotaskflow.NewTask("A", func() {
// fmt.Println("A")
// }),
// gotaskflow.NewTask("B", func() {
// fmt.Println("B")
// }),
// gotaskflow.NewTask("C", func() {
// fmt.Println("C")
// panic("panic C")
// })

// }
// ERROR robust testing
func TestTaskflowPanic(t *testing.T) {
A, B, C :=
gotaskflow.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
panic("panic C")
})
A.Precede(B)
C.Precede(B)
tf := gotaskflow.NewTaskFlow("G")
tf.Push(A, B, C)

exector.Run(tf).Wait()
}

func TestSubflowPanic(t *testing.T) {
A, B, C :=
gotaskflow.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
})
A.Precede(B)
C.Precede(B)

subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
fmt.Println("A2")
time.Sleep(1 * time.Second)
}),
gotaskflow.NewTask("B2", func() {
fmt.Println("B2")
}),
gotaskflow.NewTask("C2", func() {
fmt.Println("C2")
panic("C2 paniced")
})
A2.Precede(B2)
C2.Precede(B2)
sf.Push(A2, B2, C2)
panic("subflow panic")
})

subflow.Precede(B)

tf := gotaskflow.NewTaskFlow("G")
tf.Push(A, B, C)
tf.Push(subflow)
exector.Run(tf)
exector.Wait()
// if err := gotaskflow.Visualizer.Visualize(tf, os.Stdout); err != nil {
// log.Fatal(err)
// }
exector.Profile(os.Stdout)
}
3 changes: 1 addition & 2 deletions utils/copool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package utils
import (
"context"
"fmt"
"log"
"runtime/debug"
"sync"
)
Expand Down Expand Up @@ -59,7 +58,7 @@ func (cp *Copool) CtxGo(ctx *context.Context, f func()) {
cp.panicHandler(ctx, r)
} else {
msg := fmt.Sprintf("[panic] copool: %v: %s", r, debug.Stack())
log.Println(msg)
fmt.Println(msg)
}
}
}()
Expand Down
20 changes: 20 additions & 0 deletions utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"fmt"
"testing"
)

Expand Down Expand Up @@ -74,3 +75,22 @@ func TestSet(t *testing.T) {
t.Errorf("Expected count to be -1, got %d", rc.Value())
}
}

func TestPanic(t *testing.T) {
f := func() {
defer func() {
// 使用 recover 捕获 panic
if r := recover(); r != nil {
fmt.Println("Recovered in causePanic:", r)
}
fmt.Println("1")
}()

fmt.Println("result")
// panic("Atest")
}
f()
}
// result
// Recovered in causePanic: Atest
// 1

0 comments on commit 54bf40f

Please sign in to comment.