Skip to content

Commit

Permalink
doc(example): add parallel merge sort example
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Nov 16, 2024
1 parent 34a3571 commit 235c560
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 108 deletions.
124 changes: 16 additions & 108 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@



A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired by taskflow-cpp, with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.
A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired by [taskflow-cpp](https://github.com/taskflow/taskflow), with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.

## Feature
- **High extensibility**: Easily extend the framework to adapt to various specific use cases.
Expand Down Expand Up @@ -36,126 +36,34 @@ A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired
## Example
import latest version: `go get -u github.com/noneback/go-taskflow`

```go
package main

import (
"fmt"
"log"
"os"
"runtime"
"time"

gotaskflow "github.com/noneback/go-taskflow"
)

func main() {
// 1. Create An executor
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU() - 1))
// 2. Prepare all node you want and arrenge their dependencies in a refined DAG
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
fmt.Println("A1")
}).Priority(gotaskflow.HIGH),
gotaskflow.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
C.Precede(B)
A1.Precede(B)
C.Succeed(A1)
C.Succeed(B1)

subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
fmt.Println("A2")
}),
gotaskflow.NewTask("B2", func() {
fmt.Println("B2")
}),
gotaskflow.NewTask("C2", func() {
fmt.Println("C2")
})
A2.Precede(B2)
C2.Precede(B2)
sf.Push(A2, B2, C2)
})

subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
A3, B3, C3 :=
gotaskflow.NewTask("A3", func() {
fmt.Println("A3")
}),
gotaskflow.NewTask("B3", func() {
fmt.Println("B3")
}),
gotaskflow.NewTask("C3", func() {
fmt.Println("C3")
})
A3.Precede(B3)
C3.Precede(B3)
sf.Push(A3, B3, C3)
})

cond := gotaskflow.NewCondition("binary", func() uint {
return uint(time.Now().Second() % 2)
})
B.Precede(cond)
cond.Precede(subflow, subflow2)

// 3. Push all node into Taskflow
tf.Push(A, B, C)
tf.Push(A1, B1, C1, cond, subflow, subflow2)
// 4. Run Taskflow via Executor
executor.Run(tf).Wait()

// Visualize dag if you need to check dag execution.
if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
log.Fatal(err)
}
// Profile it if you need to see which task is most time-consuming
if err := executor.Profile(os.Stdout); err != nil {
log.Fatal(err)
}
}
```
### How to use visualize taskflow
https://github.com/noneback/go-taskflow/blob/2b3889035dd159f06ff0fe222371a3e92d11b306/examples/conditional/condition.go#L1-L97

## Understand Condition Task Correctly
Condition Node is special in [taskflow-cpp](https://github.com/taskflow/taskflow). It not only enrolls in Condition Control but also in Looping.

Our repo keeps almost the same behavior. You should read [ConditionTasking](https://taskflow.github.io/taskflow/ConditionalTasking.html) to avoid common pitfalls.

## How to use visualize taskflow
```go
if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
log.Fatal(err)
}
```
`Visualize` generate raw string in dot format, just use dot to draw a DAG svg.
`Visualize` generates raw strings in dot format, use `dot` to draw a DAG svg.

![dot](image/condition.svg)
### How to use profile taskflow

## How to use profile taskflow
```go
if err :=exector.Profile(os.Stdout);err != nil {
log.Fatal(err)
}
```

`Profile` alse generate raw string in flamegraph format, just use flamegraph to draw a flamegraph svg.
`Profile` generates raw strings in flamegraph format, use `flamegraph` to draw a flamegraph svg.

![flg](image/fl.svg)

## What's next
- [x] Conditional Tasking
- [x] Task Priority Schedule
- [x] Taskflow Loop Support
## What's more
Any Features Request or Discussions are all welcomed.

86 changes: 86 additions & 0 deletions examples/parallel_merge_sort/parallel_merge_sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"fmt"
"log"
"math/rand"
"os"
"slices"
"strconv"
"sync"

gtf "github.com/noneback/go-taskflow"
)

// meger sorted src to sorted dest
func mergeInto(dest, src []int) []int {
size := len(dest) + len(src)
tmp := make([]int, 0, size)
i, j := 0, 0
for i < len(dest) && j < len(src) {
if dest[i] < src[j] {
tmp = append(tmp, dest[i])
i++
} else {
tmp = append(tmp, src[j])
j++
}
}

if i < len(dest) {
tmp = append(tmp, dest[i:]...)
} else {
tmp = append(tmp, src[j:]...)
}

return tmp
}
func main() {
size := 100
radomArr := make([][]int, 10)
sortedArr := make([]int, 0, 10*size)
mutex := &sync.Mutex{}

for i := 0; i < 10; i++ {
for j := 0; j < size; j++ {
radomArr[i] = append(radomArr[i], rand.Int())
}
}

sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
done := gtf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
log.Fatal("Failed")
}
fmt.Println("Sorted")
fmt.Println(sortedArr[:1000])
})

for i := 0; i < 10; i++ {
sortTasks[i] = gtf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := radomArr[i]
slices.Sort(arr)
mutex.Lock()
defer mutex.Unlock()
sortedArr = mergeInto(sortedArr, arr)
})

}
done.Succeed(sortTasks...)
tf.Push(sortTasks...)
tf.Push(done)

executor := gtf.NewExecutor(1000)

executor.Run(tf).Wait()

if err := gtf.Visualize(tf, os.Stdout); err != nil {
log.Fatal("V->", err)
}

if err := executor.Profile(os.Stdout); err != nil {
log.Fatal("P->", err)
}

}

0 comments on commit 235c560

Please sign in to comment.