Skip to content

Commit

Permalink
refactor(api): remove tf.Push and attach NewTask, NewCondition, NewSu…
Browse files Browse the repository at this point in the history
…bflow with flow
  • Loading branch information
noneback committed Nov 21, 2024
1 parent bc8f6d8 commit f719d3d
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 224 deletions.
38 changes: 17 additions & 21 deletions examples/conditional/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ func main() {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()-1) * 10000)
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func() {
tf.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
tf.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
tf.NewTask("C", func() {
fmt.Println("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
A1, B1, _ :=
tf.NewTask("A1", func() {
fmt.Println("A1")
}),
gotaskflow.NewTask("B1", func() {
tf.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func() {
tf.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
Expand All @@ -40,47 +40,43 @@ func main() {
C.Succeed(A1)
C.Succeed(B1)

subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
subflow := tf.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
sf.NewTask("A2", func() {
fmt.Println("A2")
}),
gotaskflow.NewTask("B2", func() {
sf.NewTask("B2", func() {
fmt.Println("B2")
}),
gotaskflow.NewTask("C2", func() {
sf.NewTask("C2", func() {
fmt.Println("C2")
})
A2.Precede(B2)
C2.Precede(B2)
sf.Push(A2, B2, C2)

})

subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
subflow2 := tf.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
A3, B3, C3 :=
gotaskflow.NewTask("A3", func() {
tf.NewTask("A3", func() {
fmt.Println("A3")
}),
gotaskflow.NewTask("B3", func() {
tf.NewTask("B3", func() {
fmt.Println("B3")
}),
gotaskflow.NewTask("C3", func() {
tf.NewTask("C3", func() {
fmt.Println("C3")
// time.Sleep(10 * time.Second)
})
A3.Precede(B3)
C3.Precede(B3)
sf.Push(A3, B3, C3)
})

cond := gotaskflow.NewCondition("binary", func() uint {
cond := tf.NewCondition("binary", func() uint {
return uint(time.Now().Second() % 2)
})
B.Precede(cond)
cond.Precede(subflow, subflow2)

tf.Push(A, B, C)
tf.Push(A1, B1, C1, cond, subflow, subflow2)
executor.Run(tf).Wait()
fmt.Println("Print DOT")
if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
Expand Down
12 changes: 5 additions & 7 deletions examples/loop/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,30 @@ func main() {
i := 0
tf := gotaskflow.NewTaskFlow("G")
init, cond, body, back, done :=
gotaskflow.NewTask("init", func() {
tf.NewTask("init", func() {
i = 0
fmt.Println("i=0")
}),
gotaskflow.NewCondition("while i < 5", func() uint {
tf.NewCondition("while i < 5", func() uint {
time.Sleep(100 * time.Millisecond)
if i < 5 {
return 0
} else {
return 1
}
}),
gotaskflow.NewTask("body", func() {
tf.NewTask("body", func() {
i += 1
fmt.Println("i++ =", i)
}),
gotaskflow.NewCondition("back", func() uint {
tf.NewCondition("back", func() uint {
fmt.Println("back")
return 0
}),
gotaskflow.NewTask("done", func() {
tf.NewTask("done", func() {
fmt.Println("done")
})

tf.Push(init, cond, body, back, done)

init.Precede(cond)
cond.Precede(body, done)
body.Precede(back)
Expand Down
6 changes: 2 additions & 4 deletions examples/parallel_merge_sort/parallel_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {

sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
done := gtf.NewTask("Done", func() {
done := tf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
log.Fatal("Failed")
}
Expand All @@ -58,7 +58,7 @@ func main() {
})

for i := 0; i < 10; i++ {
sortTasks[i] = gtf.NewTask("sort_"+strconv.Itoa(i), func() {
sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := radomArr[i]
slices.Sort(arr)
mutex.Lock()
Expand All @@ -68,8 +68,6 @@ func main() {

}
done.Succeed(sortTasks...)
tf.Push(sortTasks...)
tf.Push(done)

executor := gtf.NewExecutor(1000)

Expand Down
46 changes: 22 additions & 24 deletions examples/priority/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,30 @@ func main() {
exector := gotaskflow.NewExecutor(uint(2))
q := utils.NewQueue[byte]()
tf := gotaskflow.NewTaskFlow("G")
B, C :=
gotaskflow.NewTask("B", func() {
fmt.Println("B")
q.Put('B')
}).Priority(gotaskflow.NORMAL),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
q.Put('C')

tf.NewTask("B", func() {
fmt.Println("B")
q.Put('B')
}).Priority(gotaskflow.NORMAL)
tf.NewTask("C", func() {
fmt.Println("C")
q.Put('C')
}).Priority(gotaskflow.HIGH)
tf.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
sf.NewTask("A2", func() {
fmt.Println("A2")
q.Put('a')
}).Priority(gotaskflow.LOW)
sf.NewTask("B2", func() {
fmt.Println("B2")
q.Put('b')
}).Priority(gotaskflow.HIGH)
suc := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
fmt.Println("A2")
q.Put('a')
}).Priority(gotaskflow.LOW),
gotaskflow.NewTask("B2", func() {
fmt.Println("B2")
q.Put('b')
}).Priority(gotaskflow.HIGH),
gotaskflow.NewTask("C2", func() {
fmt.Println("C2")
q.Put('c')
}).Priority(gotaskflow.NORMAL)
sf.Push(A2, B2, C2)
sf.NewTask("C2", func() {
fmt.Println("C2")
q.Put('c')
}).Priority(gotaskflow.NORMAL)

}).Priority(gotaskflow.LOW)

tf.Push(B, C, suc)
exector.Run(tf).Wait()
}
35 changes: 15 additions & 20 deletions examples/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ func main() {

tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func() {
tf.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
tf.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
tf.NewTask("C", func() {
fmt.Println("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
A1, B1, _ :=
tf.NewTask("A1", func() {
fmt.Println("A1")
}),
gotaskflow.NewTask("B1", func() {
tf.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func() {
tf.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
Expand All @@ -40,44 +40,39 @@ func main() {
C.Succeed(A1)
C.Succeed(B1)

subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
subflow := tf.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
tf.NewTask("A2", func() {
fmt.Println("A2")
}),
gotaskflow.NewTask("B2", func() {
tf.NewTask("B2", func() {
fmt.Println("B2")
}),
gotaskflow.NewTask("C2", func() {
tf.NewTask("C2", func() {
fmt.Println("C2")
})
A2.Precede(B2)
C2.Precede(B2)
sf.Push(A2, B2, C2)
})

subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
subflow2 := tf.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
A3, B3, C3 :=
gotaskflow.NewTask("A3", func() {
tf.NewTask("A3", func() {
fmt.Println("A3")
}),
gotaskflow.NewTask("B3", func() {
tf.NewTask("B3", func() {
fmt.Println("B3")
}),
gotaskflow.NewTask("C3", func() {
tf.NewTask("C3", func() {
fmt.Println("C3")
// time.Sleep(10 * time.Second)
})
A3.Precede(B3)
C3.Precede(B3)
sf.Push(A3, B3, C3)
})

subflow.Precede(B)
subflow.Precede(subflow2)

tf.Push(A, B, C)
tf.Push(A1, B1, C1, subflow, subflow2)
executor.Run(tf).Wait()
fmt.Println("Print DOT")
if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
Expand Down
1 change: 1 addition & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewExecutor(concurrency uint) Executor {

// Run start to schedule and execute taskflow
func (e *innerExecutorImpl) Run(tf *TaskFlow) Executor {
tf.forzen = true
e.scheduleGraph(tf.graph, nil)
return e
}
Expand Down
17 changes: 7 additions & 10 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ func TestExecutor(t *testing.T) {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func() {
tf.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
tf.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
tf.NewTask("C", func() {
fmt.Println("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
A1, B1, _ :=
tf.NewTask("A1", func() {
fmt.Println("A1")
}),
gotaskflow.NewTask("B1", func() {
tf.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func() {
tf.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
Expand All @@ -39,9 +39,6 @@ func TestExecutor(t *testing.T) {
C.Succeed(A1)
C.Succeed(B1)

tf.Push(A, B, C)
tf.Push(A1, B1, C1)

executor.Run(tf).Wait()
executor.Profile(os.Stdout)
}
31 changes: 29 additions & 2 deletions flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (sf *Subflow) instancelize() (err error) {
}

// Push pushs all tasks into subflow
func (sf *Subflow) Push(tasks ...*Task) {
func (sf *Subflow) push(tasks ...*Task) {
for _, task := range tasks {
sf.g.push(task.node)
}
}

func (fb *flowBuilder) NewStatic(name string, f func()) *innerNode {
func (tf *flowBuilder) NewStatic(name string, f func()) *innerNode {
node := newNode(name)
node.ptr = &Static{
handle: f,
Expand Down Expand Up @@ -74,3 +74,30 @@ func (fb *flowBuilder) NewCondition(name string, f func() uint) *innerNode {
node.Typ = nodeCondition
return node
}

// NewStaticTask returns a static task
func (sf *Subflow) NewTask(name string, f func()) *Task {
task := &Task{
node: builder.NewStatic(name, f),
}
sf.push(task)
return task
}

// NewSubflow returns a subflow task
func (sf *Subflow) NewSubflow(name string, f func(sf *Subflow)) *Task {
task := &Task{
node: builder.NewSubflow(name, f),
}
sf.push(task)
return task
}

// NewCondition returns a condition task. The predict func return value determines its successor.
func (sf *Subflow) NewCondition(name string, predict func() uint) *Task {
task := &Task{
node: builder.NewCondition(name, predict),
}
sf.push(task)
return task
}
Loading

0 comments on commit f719d3d

Please sign in to comment.