Skip to content

Commit

Permalink
Merge pull request #1 from noneback/dev_v2
Browse files Browse the repository at this point in the history
refactor: reimplement executor with subflow support
  • Loading branch information
noneback authored Sep 26, 2024
2 parents faccaf8 + f76060c commit b31636b
Show file tree
Hide file tree
Showing 22 changed files with 743 additions and 389 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
# vendor/

# Go workspace file
go.work
go.work
*.prof
2 changes: 1 addition & 1 deletion cmd/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func main() {
executor := gotaskflow.NewExecutor(runtime.NumCPU() - 1)
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func(ctx *context.Context) {
Expand Down
164 changes: 0 additions & 164 deletions copool.go

This file was deleted.

120 changes: 90 additions & 30 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package gotaskflow

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/noneback/go-taskflow/utils"
)

type Executor interface {
Expand All @@ -15,56 +16,115 @@ type Executor interface {
}

type ExecutorImpl struct {
concurrency int
pool Pool
concurrency uint
pool *utils.Copool
wq *utils.Queue[*Node]
wg *sync.WaitGroup
}

func NewExecutor(concurrency int) Executor {
func NewExecutor(concurrency uint) Executor {
if concurrency == 0 {
panic("executor concrurency cannot be zero")
}
return &ExecutorImpl{
concurrency: concurrency,
pool: NewTaskPool(int32(concurrency)),
pool: utils.NewCopool(concurrency),
wq: utils.NewQueue[*Node](),
wg: &sync.WaitGroup{},
}
}

func (e *ExecutorImpl) Run(tf *TaskFlow) error {
nodes, ok := tf.graph.TopologicalSort()
if !ok {
return ErrTaskFlowIsCyclic
tf.graph.setup()

for _, node := range tf.graph.entries {
e.schedule(node)
}

e.invoke(tf)
return nil
}

func (e *ExecutorImpl) invoke_graph(g *Graph) {
ctx := context.Background()
for {
g.scheCond.L.Lock()
for g.JoinCounter() != 0 && e.wq.Len() == 0 {
g.scheCond.Wait()
}
g.scheCond.L.Unlock()

for _, node := range nodes {
e.schedule(ctx, node)
if g.JoinCounter() == 0 {
break
}

node := e.wq.PeakAndTake() // hang
e.invoke_node(&ctx, node)
}
return nil
}

func (e *ExecutorImpl) schedule(ctx context.Context, node *Node) {
waitting := make(map[string]*Node)
for _, dep := range node.dependents {
waitting[dep.name] = dep
}
func (e *ExecutorImpl) invoke(tf *TaskFlow) {
e.invoke_graph(tf.graph)
}

func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node) {
// do job
switch p := node.ptr.(type) {
case *Static:
e.pool.Go(func() {
defer e.wg.Done()
p.handle(ctx)

for len(waitting) > 0 {
for name, dep := range waitting {
if atomic.LoadInt32((*int32)(&dep.state)) == kNodeStateFinished {
delete(waitting, name)
node.drop()
for _, n := range node.successors {
// fmt.Println("put", n.Name)
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
// fmt.Println("Not Ready", name)
}
time.Sleep(time.Microsecond * 100)
node.g.scheCond.Signal()
})
case *Subflow:
e.pool.Go(func() {
defer e.wg.Done()

if !p.g.instancelized {
p.handle(p)
}
p.g.instancelized = true

e.schedule_graph(p.g)
node.drop()

for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}

node.g.scheCond.Signal()
})
default:
fmt.Println("exit: ", node.name)
panic("do nothing")
}
}

func (e *ExecutorImpl) schedule(node *Node) {
e.wg.Add(1)
e.pool.CtxGo(ctx, func() {
defer e.wg.Done()
atomic.StoreInt32((*int32)(&node.state), kNodeStateRunning)
node.handle(&ctx)
atomic.StoreInt32((*int32)(&node.state), kNodeStateFinished)
})
e.wq.Put(node)
node.g.scheCond.Signal()
}

func (e *ExecutorImpl) schedule_graph(g *Graph) {
g.setup()
for _, node := range g.entries {
e.schedule(node)
}

e.invoke_graph(g)

g.scheCond.Signal()
}

func (e *ExecutorImpl) Wait() {
Expand Down
6 changes: 1 addition & 5 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package gotaskflow_test
import (
"context"
"fmt"
"os"
"runtime"
"testing"

gotaskflow "github.com/noneback/go-taskflow"
)

func TestExecutor(t *testing.T) {
executor := gotaskflow.NewExecutor(runtime.NumCPU() - 1)
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func(ctx *context.Context) {
Expand Down Expand Up @@ -43,9 +42,6 @@ func TestExecutor(t *testing.T) {
tf.Push(A, B, C)
tf.Push(A1, B1, C1)

if err := tf.Visualize(os.Stdout); err != nil {
panic(err)
}
executor.Run(tf)
executor.Wait()
}
Loading

0 comments on commit b31636b

Please sign in to comment.