diff --git a/README.md b/README.md index 46c9794..12a680b 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ -A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired by taskflow-cpp, with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks. +A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired by [taskflow-cpp](https://github.com/taskflow/taskflow), with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks. ## Feature - **High extensibility**: Easily extend the framework to adapt to various specific use cases. @@ -36,126 +36,34 @@ A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired ## Example import latest version: `go get -u github.com/noneback/go-taskflow` -```go -package main - -import ( - "fmt" - "log" - "os" - "runtime" - "time" - - gotaskflow "github.com/noneback/go-taskflow" -) - -func main() { - // 1. Create An executor - executor := gotaskflow.NewExecutor(uint(runtime.NumCPU() - 1)) - // 2. Prepare all node you want and arrenge their dependencies in a refined DAG - tf := gotaskflow.NewTaskFlow("G") - A, B, C := - gotaskflow.NewTask("A", func() { - fmt.Println("A") - }), - gotaskflow.NewTask("B", func() { - fmt.Println("B") - }), - gotaskflow.NewTask("C", func() { - fmt.Println("C") - }) - - A1, B1, C1 := - gotaskflow.NewTask("A1", func() { - fmt.Println("A1") - }).Priority(gotaskflow.HIGH), - gotaskflow.NewTask("B1", func() { - fmt.Println("B1") - }), - gotaskflow.NewTask("C1", func() { - fmt.Println("C1") - }) - A.Precede(B) - C.Precede(B) - A1.Precede(B) - C.Succeed(A1) - C.Succeed(B1) - - subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) { - A2, B2, C2 := - gotaskflow.NewTask("A2", func() { - fmt.Println("A2") - }), - gotaskflow.NewTask("B2", func() { - fmt.Println("B2") - }), - gotaskflow.NewTask("C2", func() { - fmt.Println("C2") - }) - A2.Precede(B2) - C2.Precede(B2) - sf.Push(A2, B2, C2) - }) - - subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) { - A3, B3, C3 := - gotaskflow.NewTask("A3", func() { - fmt.Println("A3") - }), - gotaskflow.NewTask("B3", func() { - fmt.Println("B3") - }), - gotaskflow.NewTask("C3", func() { - fmt.Println("C3") - }) - A3.Precede(B3) - C3.Precede(B3) - sf.Push(A3, B3, C3) - }) - - cond := gotaskflow.NewCondition("binary", func() uint { - return uint(time.Now().Second() % 2) - }) - B.Precede(cond) - cond.Precede(subflow, subflow2) - - // 3. Push all node into Taskflow - tf.Push(A, B, C) - tf.Push(A1, B1, C1, cond, subflow, subflow2) - // 4. Run Taskflow via Executor - executor.Run(tf).Wait() - - // Visualize dag if you need to check dag execution. - if err := gotaskflow.Visualize(tf, os.Stdout); err != nil { - log.Fatal(err) - } - // Profile it if you need to see which task is most time-consuming - if err := executor.Profile(os.Stdout); err != nil { - log.Fatal(err) - } -} -``` -### How to use visualize taskflow +https://github.com/noneback/go-taskflow/blob/2b3889035dd159f06ff0fe222371a3e92d11b306/examples/conditional/condition.go#L1-L97 + +## Understand Condition Task Correctly +Condition Node is special in [taskflow-cpp](https://github.com/taskflow/taskflow). It not only enrolls in Condition Control but also in Looping. + +Our repo keeps almost the same behavior. You should read [ConditionTasking](https://taskflow.github.io/taskflow/ConditionalTasking.html) to avoid common pitfalls. + +## How to use visualize taskflow ```go if err := gotaskflow.Visualize(tf, os.Stdout); err != nil { log.Fatal(err) } ``` -`Visualize` generate raw string in dot format, just use dot to draw a DAG svg. +`Visualize` generates raw strings in dot format, use `dot` to draw a DAG svg. ![dot](image/condition.svg) -### How to use profile taskflow + +## How to use profile taskflow ```go if err :=exector.Profile(os.Stdout);err != nil { log.Fatal(err) } ``` -`Profile` alse generate raw string in flamegraph format, just use flamegraph to draw a flamegraph svg. +`Profile` generates raw strings in flamegraph format, use `flamegraph` to draw a flamegraph svg. ![flg](image/fl.svg) -## What's next -- [x] Conditional Tasking -- [x] Task Priority Schedule -- [x] Taskflow Loop Support +## What's more +Any Features Request or Discussions are all welcomed. + diff --git a/examples/parallel_merge_sort/parallel_merge_sort.go b/examples/parallel_merge_sort/parallel_merge_sort.go new file mode 100644 index 0000000..0a740bb --- /dev/null +++ b/examples/parallel_merge_sort/parallel_merge_sort.go @@ -0,0 +1,86 @@ +package main + +import ( + "fmt" + "log" + "math/rand" + "os" + "slices" + "strconv" + "sync" + + gtf "github.com/noneback/go-taskflow" +) + +// meger sorted src to sorted dest +func mergeInto(dest, src []int) []int { + size := len(dest) + len(src) + tmp := make([]int, 0, size) + i, j := 0, 0 + for i < len(dest) && j < len(src) { + if dest[i] < src[j] { + tmp = append(tmp, dest[i]) + i++ + } else { + tmp = append(tmp, src[j]) + j++ + } + } + + if i < len(dest) { + tmp = append(tmp, dest[i:]...) + } else { + tmp = append(tmp, src[j:]...) + } + + return tmp +} +func main() { + size := 100 + radomArr := make([][]int, 10) + sortedArr := make([]int, 0, 10*size) + mutex := &sync.Mutex{} + + for i := 0; i < 10; i++ { + for j := 0; j < size; j++ { + radomArr[i] = append(radomArr[i], rand.Int()) + } + } + + sortTasks := make([]*gtf.Task, 10) + tf := gtf.NewTaskFlow("merge sort") + done := gtf.NewTask("Done", func() { + if !slices.IsSorted(sortedArr) { + log.Fatal("Failed") + } + fmt.Println("Sorted") + fmt.Println(sortedArr[:1000]) + }) + + for i := 0; i < 10; i++ { + sortTasks[i] = gtf.NewTask("sort_"+strconv.Itoa(i), func() { + arr := radomArr[i] + slices.Sort(arr) + mutex.Lock() + defer mutex.Unlock() + sortedArr = mergeInto(sortedArr, arr) + }) + + } + done.Succeed(sortTasks...) + tf.Push(sortTasks...) + tf.Push(done) + + executor := gtf.NewExecutor(1000) + + executor.Run(tf).Wait() + + if err := gtf.Visualize(tf, os.Stdout); err != nil { + log.Fatal("V->", err) + } + + if err := executor.Profile(os.Stdout); err != nil { + log.Fatal("P->", err) + } + +}