Skip to content

Commit

Permalink
test(ut): add rgchain for topology test
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Nov 14, 2024
1 parent 37b00cb commit 78a8687
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 8 deletions.
4 changes: 2 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p

func (e *innerExecutorImpl) invokeNode(node *innerNode, parentSpan *span) {
// do job
fmt.Println("[invoke] ", node.name)
// fmt.Println("[invoke] ", node.name)
switch p := node.ptr.(type) {
case *Static:
e.pool.Go(e.invokeStatic(node, parentSpan, p))
Expand All @@ -200,7 +200,7 @@ func (e *innerExecutorImpl) invokeNode(node *innerNode, parentSpan *span) {

func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
for _, node := range nodes {
fmt.Println("[schedule] ", node.name)
// fmt.Println("[schedule] ", node.name)
if node.g.canceled.Load() {
node.g.scheCond.Signal()
fmt.Printf("node %v is not scheduled, as graph %v is canceled\n", node.name, node.g.name)
Expand Down
117 changes: 111 additions & 6 deletions taskflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,87 @@ import (
"github.com/noneback/go-taskflow/utils"
)

type rgChain[R comparable] struct {
rgs []*rgroup[R]
}

func newRgChain[R comparable]() *rgChain[R] {
return &rgChain[R]{
rgs: make([]*rgroup[R], 0),
}
}

func (c *rgChain[R]) grouping(rs ...R) {
g := newRg[R]()
g.push(rs...)
c.rgs = append(c.rgs, g)
}

// result group
type rgroup[R comparable] struct {
pre, next *rgroup[R]
elems map[R]struct{}
}

func newRg[R comparable]() *rgroup[R] {
return &rgroup[R]{
elems: make(map[R]struct{}),
}
}

func (g *rgroup[R]) push(rs ...R) {
for _, r := range rs {
g.elems[r] = struct{}{}
}
}

func (g *rgroup[R]) chain(successor *rgroup[R]) {
g.next = successor
successor.pre = g.next
}

func (g *rgroup[R]) contains(r R) bool {
_, ok := g.elems[r]
return ok
}

var executor = gotaskflow.NewExecutor(10)

func TestTaskFlow(t *testing.T) {
q := utils.NewQueue[string]()

A, B, C :=
gotaskflow.NewTask("A", func() {
fmt.Println("A")
q.Put("A")
}),
gotaskflow.NewTask("B", func() {
fmt.Println("B")
q.Put("B")
}),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
q.Put("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
fmt.Println("A1")
q.Put("A1")
}),
gotaskflow.NewTask("B1", func() {
fmt.Println("B1")
q.Put("B1")
}),
gotaskflow.NewTask("C1", func() {
fmt.Println("C1")
q.Put("C1")
})
chains := newRgChain[string]()
chains.grouping("C1", "A1", "B1", "A")
chains.grouping("C")
chains.grouping("B")

A.Precede(B)
C.Precede(B)
A1.Precede(B)
Expand All @@ -53,31 +110,50 @@ func TestTaskFlow(t *testing.T) {
})

executor.Run(tf).Wait()
fmt.Print("########### second times")
executor.Run(tf).Wait()

// validate
for _, g := range chains.rgs {
for len(g.elems) != 0 {
node := q.PeakAndTake()
if g.contains(node) {
delete(g.elems, node)
} else {
t.Fail()
}
}
}
}

func TestSubflow(t *testing.T) {
q := utils.NewQueue[string]()
// chains := newRgChain[string]()

A, B, C :=
gotaskflow.NewTask("A", func() {
fmt.Println("A")
q.Put("A")
}),
gotaskflow.NewTask("B", func() {
fmt.Println("B")
q.Put("B")
}),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
q.Put("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
fmt.Println("A1")
q.Put("A1")
}),
gotaskflow.NewTask("B1", func() {
fmt.Println("B1")
q.Put("B1")
}),
gotaskflow.NewTask("C1", func() {
fmt.Println("C1")
q.Put("C1")
})
A.Precede(B)
C.Precede(B)
Expand All @@ -89,12 +165,15 @@ func TestSubflow(t *testing.T) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
fmt.Println("A2")
q.Put("A2")
}),
gotaskflow.NewTask("B2", func() {
fmt.Println("B2")
q.Put("B2")
}),
gotaskflow.NewTask("C2", func() {
fmt.Println("C2")
q.Put("C2")
})
A2.Precede(B2)
C2.Precede(B2)
Expand All @@ -105,12 +184,15 @@ func TestSubflow(t *testing.T) {
A3, B3, C3 :=
gotaskflow.NewTask("A3", func() {
fmt.Println("A3")
q.Put("A3")
}),
gotaskflow.NewTask("B3", func() {
fmt.Println("B3")
q.Put("B3")
}),
gotaskflow.NewTask("C3", func() {
fmt.Println("C3")
q.Put("C3")
// time.Sleep(10 * time.Second)
})
A3.Precede(B3)
Expand All @@ -130,11 +212,34 @@ func TestSubflow(t *testing.T) {
log.Fatal(err)
}
executor.Profile(os.Stdout)
// exector.Wait()

// if err := tf.Visualize(os.Stdout); err != nil {
// panic(err)
// }
chain := newRgChain[string]()

// Group 1 - Top-level nodes
chain.grouping("C1", "A1", "B1", "C1", "A", "A2", "C2", "B2")

// Group 2 - Connections under A, B, C
chain.grouping("C", "A3", "C3",
"B3")

chain.grouping("B")
// validate
if q.Len() != 12 {
t.Fail()
}

for _, g := range chain.rgs {
for len(g.elems) != 0 {
node := q.PeakAndTake()
if g.contains(node) {
delete(g.elems, node)
} else {
fmt.Println(node)
t.Fail()
}
}
}

}

// ERROR robust testing
Expand Down

0 comments on commit 78a8687

Please sign in to comment.