Skip to content

Commit

Permalink
fix(profiler): fix condition node profiling failure and compact spans
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Nov 14, 2024
1 parent d1f9a16 commit aa8d65e
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 54 deletions.
2 changes: 2 additions & 0 deletions examples/loop/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"runtime"
"time"

gotaskflow "github.com/noneback/go-taskflow"
)
Expand All @@ -20,6 +21,7 @@ func main() {
fmt.Println("i=0")
}),
gotaskflow.NewCondition("while i < 5", func() uint {
time.Sleep(100 * time.Millisecond)
if i < 5 {
return 0
} else {
Expand Down
19 changes: 3 additions & 16 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *S
}, begin: time.Now(), parent: parentSpan}

defer func() {
span.end = time.Now()
span.extra.success = true
span.cost = time.Now().Sub(span.begin)
if r := recover(); r != nil {
node.g.canceled.Store(true)
fmt.Printf("[recovered] node %s, panic: %s, stack: %s", node.name, r, debug.Stack())
Expand Down Expand Up @@ -120,8 +119,7 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
name: node.name,
}, begin: time.Now(), parent: parentSpan}
defer func() {
span.end = time.Now()
span.extra.success = true
span.cost = time.Now().Sub(span.begin)
if r := recover(); r != nil {
fmt.Printf("[recovered] subflow %s, panic: %s, stack: %s", node.name, r, debug.Stack())
node.g.canceled.Store(true)
Expand Down Expand Up @@ -155,8 +153,7 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
}, begin: time.Now(), parent: parentSpan}

defer func() {
span.end = time.Now()
span.extra.success = true
span.cost = time.Now().Sub(span.begin)
if r := recover(); r != nil {
node.g.canceled.Store(true)
fmt.Printf("[recovered] node %s, panic: %s, stack: %s", node.name, r, debug.Stack())
Expand Down Expand Up @@ -217,16 +214,6 @@ func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
continue
}

if node.state.Load() == kNodeStateIgnored {
node.g.scheCond.Signal()
fmt.Printf("node %v is ignored\n", node.name)
for _, v := range node.successors {
v.state.Store(kNodeStateIdle)
}

continue
}

node.g.joinCounter.Increase()
e.wg.Add(1)
e.wq.Put(node)
Expand Down
1 change: 0 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const (
kNodeStateFinished = int32(3)
kNodeStateFailed = int32(4)
kNodeStateCanceled = int32(5)
kNodeStateIgnored = int32(6)
)

type nodeType string
Expand Down
32 changes: 19 additions & 13 deletions profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,57 @@ import (
)

type profiler struct {
spans []*span
mu *sync.Mutex
spans map[attr]*span

mu *sync.Mutex
}

func newProfiler() *profiler {
return &profiler{
spans: make([]*span, 0),
spans: make(map[attr]*span),
mu: &sync.Mutex{},
}
}

func (t *profiler) AddSpan(s *span) {
t.mu.Lock()
defer t.mu.Unlock()
t.spans = append(t.spans, s)
if span, ok := t.spans[s.extra]; ok {
s.cost += span.cost
}
t.spans[s.extra] = s
}

type attr struct {
typ nodeType
success bool // 0 for success, 1 for abnormal
name string
typ nodeType
name string
}

type span struct {
extra attr
begin, end time.Time
parent *span
extra attr
begin time.Time
cost time.Duration
parent *span
}

func (s *span) String() string {
return fmt.Sprintf("%s,%s,cost %v", s.extra.typ, s.extra.name, utils.NormalizeDuration(s.end.Sub(s.begin)))
return fmt.Sprintf("%s,%s,cost %v", s.extra.typ, s.extra.name, utils.NormalizeDuration(s.cost))
}

func (t *profiler) draw(w io.Writer) error {
// compact spans base on name

for _, s := range t.spans {
path := ""
if s.extra.typ == nodeStatic {
if s.extra.typ != nodeSubflow {
path = s.String()
cur := s

for cur.parent != nil {
path = cur.parent.String() + ";" + path
cur = cur.parent
}
msg := fmt.Sprintf("%s %v\n", path, s.end.Sub(s.begin).Microseconds())
msg := fmt.Sprintf("%s %v\n", path, s.cost.Microseconds())

if _, err := w.Write([]byte(msg)); err != nil {
return fmt.Errorf("write profile -> %w", err)
Expand Down
37 changes: 17 additions & 20 deletions profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,35 @@ import (

func TestProfilerAddSpan(t *testing.T) {
profiler := newProfiler()
mark := attr{
typ: nodeStatic,
name: "test-span",
}
span := &span{
extra: attr{
typ: nodeStatic,
success: true,
name: "test-span",
},
extra: mark,
begin: time.Now(),
end: time.Now().Add(5 * time.Millisecond),
cost: 5 * time.Millisecond,
}
profiler.AddSpan(span)

if len(profiler.spans) != 1 {
t.Errorf("expected 1 span, got %d", len(profiler.spans))
}

if profiler.spans[0] != span {
t.Errorf("expected span to be added correctly, got %v", profiler.spans[0])
if profiler.spans[mark] != span {
t.Errorf("expected span to be added correctly, got %v", profiler.spans[mark])
}
}

func TestSpanString(t *testing.T) {
now := time.Now()
span := &span{
extra: attr{
typ: nodeStatic,
success: true,
name: "test-span",
typ: nodeStatic,
name: "test-span",
},
begin: now,
end: now.Add(10 * time.Millisecond),
cost: 10 * time.Millisecond,
}

expected := "static,test-span,cost " + utils.NormalizeDuration(10*time.Millisecond)
Expand All @@ -55,22 +54,20 @@ func TestProfilerDraw(t *testing.T) {
now := time.Now()
parentSpan := &span{
extra: attr{
typ: nodeStatic,
success: true,
name: "parent",
typ: nodeStatic,
name: "parent",
},
begin: now,
end: now.Add(10 * time.Millisecond),
cost: 10 * time.Millisecond,
}

childSpan := &span{
extra: attr{
typ: nodeStatic,
success: true,
name: "child",
typ: nodeStatic,
name: "child",
},
begin: now,
end: now.Add(5 * time.Millisecond),
cost: 5 * time.Millisecond,
parent: parentSpan,
}

Expand Down
9 changes: 5 additions & 4 deletions taskflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func checkTopology[R comparable](t *testing.T, q *utils.Queue[R], chain *rgChain
if g.contains(node) {
delete(g.elems, node)
} else {
fmt.Println(node)
fmt.Println("failed in", node)
t.Fail()
}
}
Expand Down Expand Up @@ -207,6 +207,7 @@ func TestSubflow(t *testing.T) {

subflow.Precede(B)
subflow.Precede(subflow2)
C1.Precede(subflow)

tf := gotaskflow.NewTaskFlow("G")
tf.Push(A, B, C)
Expand All @@ -221,13 +222,13 @@ func TestSubflow(t *testing.T) {
chain := newRgChain[string]()

// Group 1 - Top-level nodes
chain.grouping("C1", "A1", "B1", "C1", "A", "A2", "C2", "B2")
chain.grouping("C1", "A1", "B1", "A")
chain.grouping("A2", "C2", "B2", "C")

// Group 2 - Connections under A, B, C
chain.grouping("C", "A3", "C3",
chain.grouping("A3", "C3", "B",
"B3")

chain.grouping("B")
// validate
if q.Len() != 12 {
t.Fail()
Expand Down

0 comments on commit aa8d65e

Please sign in to comment.