Skip to content

Commit

Permalink
feat(executor): support loop
Browse files Browse the repository at this point in the history
  • Loading branch information
noneback committed Nov 13, 2024
1 parent 539267f commit 5583cf0
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 136 deletions.
42 changes: 29 additions & 13 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,23 @@ func (e *innerExecutorImpl) sche_successors(node *innerNode) {
candidate := make([]*innerNode, 0, len(node.successors))

for _, n := range node.successors {
if n.JoinCounter() == 0 {
if n.JoinCounter() == 0 || n.Typ == nodeCondition {
// deps all done or condition node
candidate = append(candidate, n)
}
}

slices.SortFunc(candidate, func(i, j *innerNode) int {
return cmp.Compare(i.priority, j.priority)
})

node.setup()
e.schedule(candidate...)
}

func (e *innerExecutorImpl) invodeStatic(node *innerNode, parentSpan *span, p *Static) func() {
func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *Static) func() {
return func() {
span := span{extra: attr{
typ: NodeStatic,
typ: nodeStatic,
name: node.name,
}, begin: time.Now(), parent: parentSpan}

Expand All @@ -99,9 +100,10 @@ func (e *innerExecutorImpl) invodeStatic(node *innerNode, parentSpan *span, p *S
e.profiler.AddSpan(&span) // remove canceled node span
}

e.wg.Done()
node.drop()
e.sche_successors(node)
node.g.joinCounter.Decrease()
e.wg.Done()
node.g.scheCond.Signal()
}()

Expand All @@ -114,7 +116,7 @@ func (e *innerExecutorImpl) invodeStatic(node *innerNode, parentSpan *span, p *S
func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *Subflow) func() {
return func() {
span := span{extra: attr{
typ: NodeSubflow,
typ: nodeSubflow,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
defer func() {
Expand All @@ -127,11 +129,12 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}
e.wg.Done()

e.scheduleGraph(p.g, &span)
node.drop()

e.sche_successors(node)
node.g.joinCounter.Decrease()
e.wg.Done()
node.g.scheCond.Signal()
}()

Expand All @@ -147,7 +150,7 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p *Condition) func() {
return func() {
span := span{extra: attr{
typ: NodeCondition,
typ: nodeCondition,
name: node.name,
}, begin: time.Now(), parent: parentSpan}

Expand All @@ -160,9 +163,10 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}
e.wg.Done()
node.drop()
e.sche_successors(node)
node.g.joinCounter.Decrease()
e.wg.Done()
node.g.scheCond.Signal()
}()

Expand All @@ -177,7 +181,7 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
if idx == choice {
continue
}
v.state.Store(kNodeStateCanceled) // cancel other nodes
v.state.Store(kNodeStateIgnored) // cancel other nodes
}
// do choice and cancel others
node.state.Store(kNodeStateFinished)
Expand All @@ -186,9 +190,10 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p

func (e *innerExecutorImpl) invokeNode(node *innerNode, parentSpan *span) {
// do job
fmt.Println("[invoke] ", node.name)
switch p := node.ptr.(type) {
case *Static:
e.pool.Go(e.invodeStatic(node, parentSpan, p))
e.pool.Go(e.invokeStatic(node, parentSpan, p))
case *Subflow:
e.pool.Go(e.invokeSubflow(node, parentSpan, p))
case *Condition:
Expand All @@ -200,6 +205,7 @@ func (e *innerExecutorImpl) invokeNode(node *innerNode, parentSpan *span) {

func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
for _, node := range nodes {
fmt.Println("[schedule] ", node.name)
if node.g.canceled.Load() {
node.g.scheCond.Signal()
fmt.Printf("node %v is not scheduled, as graph %v is canceled\n", node.name, node.g.name)
Expand All @@ -213,7 +219,17 @@ func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
v.state.Store(kNodeStateCanceled)
}

return
continue
}

if node.state.Load() == kNodeStateIgnored {
node.g.scheCond.Signal()
fmt.Printf("node %v is ignored\n", node.name)
for _, v := range node.successors {
v.state.Store(kNodeStateIdle)
}

continue
}

node.g.joinCounter.Increase()
Expand Down
6 changes: 3 additions & 3 deletions flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (fb *flowBuilder) NewStatic(name string, f func()) *innerNode {
node.ptr = &Static{
handle: f,
}
node.Typ = NodeStatic
node.Typ = nodeStatic
return node
}

Expand All @@ -61,7 +61,7 @@ func (fb *flowBuilder) NewSubflow(name string, f func(sf *Subflow)) *innerNode {
handle: f,
g: newGraph(name),
}
node.Typ = NodeSubflow
node.Typ = nodeSubflow
return node
}

Expand All @@ -71,6 +71,6 @@ func (fb *flowBuilder) NewCondition(name string, f func() uint) *innerNode {
handle: f,
mapper: make(map[uint]*innerNode),
}
node.Typ = NodeCondition
node.Typ = nodeCondition
return node
}
2 changes: 1 addition & 1 deletion graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (g *eGraph) setup() {
g.reset()

for _, node := range g.nodes {
node.joinCounter.Set(len(node.dependents))
node.setup()

if len(node.dependents) == 0 {
g.entries = append(g.entries, node)
Expand Down
27 changes: 19 additions & 8 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ const (
kNodeStateFinished = int32(3)
kNodeStateFailed = int32(4)
kNodeStateCanceled = int32(5)
kNodeStateIgnored = int32(6)
)

type NodeType string
type nodeType string

const (
NodeSubflow NodeType = "subflow" // subflow
NodeStatic NodeType = "static" // static
NodeCondition NodeType = "condition" // static
nodeSubflow nodeType = "subflow" // subflow
nodeStatic nodeType = "static" // static
nodeCondition nodeType = "condition" // static
)

type innerNode struct {
name string
successors []*innerNode
dependents []*innerNode
Typ NodeType
Typ nodeType
ptr interface{}
rw *sync.RWMutex
state atomic.Int32
Expand All @@ -41,13 +42,23 @@ func (n *innerNode) JoinCounter() int {
return n.joinCounter.Value()
}

func (n *innerNode) setup() {
n.state.Store(kNodeStateIdle)
for _, dep := range n.dependents {
if dep.Typ == nodeCondition {
continue
}

n.joinCounter.Increase()
}
}
func (n *innerNode) drop() {
// release every deps
for _, node := range n.successors {
node.joinCounter.Decrease()
if n.Typ != nodeCondition {
node.joinCounter.Decrease()
}
}

n.g.joinCounter.Decrease()
}

// set dependency: V deps on N, V is input node
Expand Down
4 changes: 2 additions & 2 deletions profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (t *profiler) AddSpan(s *span) {
}

type attr struct {
typ NodeType
typ nodeType
success bool // 0 for success, 1 for abnormal
name string
}
Expand All @@ -46,7 +46,7 @@ func (s *span) String() string {
func (t *profiler) draw(w io.Writer) error {
for _, s := range t.spans {
path := ""
if s.extra.typ == NodeStatic {
if s.extra.typ == nodeStatic {
path = s.String()
cur := s

Expand Down
8 changes: 4 additions & 4 deletions profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestProfilerAddSpan(t *testing.T) {
profiler := newProfiler()
span := &span{
extra: attr{
typ: NodeStatic,
typ: nodeStatic,
success: true,
name: "test-span",
},
Expand All @@ -34,7 +34,7 @@ func TestSpanString(t *testing.T) {
now := time.Now()
span := &span{
extra: attr{
typ: NodeStatic,
typ: nodeStatic,
success: true,
name: "test-span",
},
Expand All @@ -55,7 +55,7 @@ func TestProfilerDraw(t *testing.T) {
now := time.Now()
parentSpan := &span{
extra: attr{
typ: NodeStatic,
typ: nodeStatic,
success: true,
name: "parent",
},
Expand All @@ -65,7 +65,7 @@ func TestProfilerDraw(t *testing.T) {

childSpan := &span{
extra: attr{
typ: NodeStatic,
typ: nodeStatic,
success: true,
name: "child",
},
Expand Down
Loading

0 comments on commit 5583cf0

Please sign in to comment.