diff --git a/worker2/dep.go b/worker2/dep.go index 3435b71a..263a3c6f 100644 --- a/worker2/dep.go +++ b/worker2/dep.go @@ -2,6 +2,7 @@ package worker2 import ( "context" + "sync" ) type Dep interface { @@ -10,7 +11,6 @@ type Dep interface { Freeze() IsFrozen() bool GetDepsObj() *Deps - GetDependencies() []Dep AddDep(Dep) GetHooks() []Hook Wait() <-chan struct{} @@ -24,13 +24,18 @@ type Dep interface { type baseDep struct { execution *Execution + m sync.RWMutex } func (a *baseDep) setExecution(e *Execution) { + a.m.Lock() + defer a.m.Unlock() a.execution = e } func (a *baseDep) getExecution() *Execution { + a.m.RLock() + defer a.m.RUnlock() return a.execution } @@ -95,13 +100,10 @@ func (a *Action) GetDepsObj() *Deps { if a.Deps == nil { a.Deps = NewDeps() } + a.Deps.setOwner(a) return a.Deps } -func (a *Action) GetDependencies() []Dep { - return a.GetDepsObj().Dependencies() -} - func (a *Action) AddDep(dep Dep) { a.GetDepsObj().Add(dep) } @@ -110,6 +112,12 @@ func (a *Action) DeepDo(f func(Dep)) { deepDo(a, f) } +func (a *Action) LinkDeps() { + for _, dep := range a.GetDepsObj().TransitiveDependencies() { + _ = dep.GetDepsObj() + } +} + type Group struct { baseDep ID string @@ -135,17 +143,20 @@ func (g *Group) GetID() string { return g.ID } +func (g *Group) LinkDeps() { + for _, dep := range g.GetDepsObj().TransitiveDependencies() { + _ = dep.GetDepsObj() + } +} + func (g *Group) GetDepsObj() *Deps { if g.Deps == nil { g.Deps = NewDeps() } + g.Deps.setOwner(g) return g.Deps } -func (g *Group) GetDependencies() []Dep { - return g.Deps.Dependencies() -} - func (g *Group) GetHooks() []Hook { return g.Hooks } diff --git a/worker2/deps.go b/worker2/deps.go index b30c8e7b..a61020f6 100644 --- a/worker2/deps.go +++ b/worker2/deps.go @@ -11,16 +11,11 @@ import ( type DepHook = func(dep Dep) func NewDeps(deps ...Dep) *Deps { - return newDeps("", deps) + return newDeps(deps) } -func NewDepsID(id string, deps ...Dep) *Deps { - return newDeps(id, deps) -} - -func newDeps(id string, deps []Dep) *Deps { +func newDeps(deps []Dep) *Deps { d := &Deps{ - id: id, deps: sets.NewIdentitySet[Dep](0), transitiveDeps: sets.NewIdentitySet[Dep](0), dependees: sets.NewIdentitySet[*Deps](0), @@ -31,7 +26,7 @@ func newDeps(id string, deps []Dep) *Deps { } type Deps struct { - id string + owner Dep deps *sets.Set[Dep, Dep] transitiveDeps *sets.Set[Dep, Dep] dependees *sets.Set[*Deps, *Deps] @@ -40,6 +35,16 @@ type Deps struct { frozen bool } +func (d *Deps) setOwner(dep Dep) { + d.m.Lock() + defer d.m.Unlock() + + if d.owner != nil && d.owner != dep { + panic("deps owner is already set") + } + d.owner = dep +} + func (d *Deps) IsFrozen() bool { return d.frozen } @@ -79,6 +84,13 @@ func (d *Deps) Dependencies() []Dep { return d.deps.Slice() } +func (d *Deps) Dependees() []*Deps { + d.m.RLock() + defer d.m.RUnlock() + + return d.dependees.Slice() +} + func (d *Deps) TransitiveDependencies() []Dep { d.m.RLock() defer d.m.RUnlock() @@ -160,7 +172,7 @@ func (d *Deps) hasDependee(dep *Deps) bool { func (d *Deps) DebugString() string { var sb strings.Builder - fmt.Fprintf(&sb, "%v:\n", d.id) + fmt.Fprintf(&sb, "%v:\n", d.owner.GetID()) deps := ads.Map(d.deps.Slice(), func(d Dep) string { return d.GetID() }) @@ -171,10 +183,10 @@ func (d *Deps) DebugString() string { fmt.Fprintf(&sb, " tdeps: %v\n", tdeps) depdees := ads.Map(d.dependees.Slice(), func(d *Deps) string { - return d.id + return d.owner.GetID() }) tdepdees := ads.Map(d.transitiveDependees.Slice(), func(d *Deps) string { - return d.id + return d.owner.GetID() }) fmt.Fprintf(&sb, " depdees: %v\n", depdees) fmt.Fprintf(&sb, " tdepdees: %v\n", tdepdees) diff --git a/worker2/deps_test.go b/worker2/deps_test.go index 34d86be4..fe52ab1e 100644 --- a/worker2/deps_test.go +++ b/worker2/deps_test.go @@ -11,21 +11,23 @@ func s(s string) string { } func TestLink(t *testing.T) { - d1 := &Action{ID: "1", Deps: NewDepsID("1")} - d2 := &Action{ID: "2", Deps: NewDepsID("2", d1)} + d1 := &Action{ID: "1", Deps: NewDeps()} + d2 := &Action{ID: "2", Deps: NewDeps(d1)} - d3 := &Action{ID: "3", Deps: NewDepsID("3")} - d4 := &Action{ID: "4", Deps: NewDepsID("4", d3)} + d3 := &Action{ID: "3", Deps: NewDeps()} + d4 := &Action{ID: "4", Deps: NewDeps(d3)} d3.AddDep(d2) + d4.LinkDeps() + assert.Equal(t, s(` 1: deps: [] tdeps: [] depdees: [2] tdepdees: [2 4 3] -`), d1.Deps.DebugString()) +`), d1.GetDepsObj().DebugString()) assert.Equal(t, s(` 2: @@ -33,7 +35,7 @@ func TestLink(t *testing.T) { tdeps: [1] depdees: [3] tdepdees: [4 3] -`), d2.Deps.DebugString()) +`), d2.GetDepsObj().DebugString()) assert.Equal(t, s(` 3: @@ -41,7 +43,7 @@ func TestLink(t *testing.T) { tdeps: [2 1] depdees: [4] tdepdees: [4] -`), d3.Deps.DebugString()) +`), d3.GetDepsObj().DebugString()) assert.Equal(t, s(` 4: @@ -49,5 +51,5 @@ func TestLink(t *testing.T) { tdeps: [3 2 1] depdees: [] tdepdees: [] -`), d4.Deps.DebugString()) +`), d4.GetDepsObj().DebugString()) } diff --git a/worker2/engine.go b/worker2/engine.go index e7c2e56e..e9337308 100644 --- a/worker2/engine.go +++ b/worker2/engine.go @@ -1,21 +1,19 @@ package worker2 import ( + "github.com/bep/debounce" "slices" "sync" "time" ) type Engine struct { - wg sync.WaitGroup - defaultScheduler Scheduler - workers []*Worker - m sync.RWMutex - c *sync.Cond - executions []*Execution - executionsWaiting []*Execution - eventsCh chan Event - hooks []Hook + wg sync.WaitGroup + defaultScheduler Scheduler + workers []*Worker + m sync.RWMutex + eventsCh chan Event + hooks []Hook } func NewEngine() *Engine { @@ -23,69 +21,57 @@ func NewEngine() *Engine { eventsCh: make(chan Event, 1000), defaultScheduler: UnlimitedScheduler{}, } - e.c = sync.NewCond(&e.m) return e } func (e *Engine) GetWorkers() []*Worker { - return e.workers + e.m.Lock() + defer e.m.Unlock() + + return e.workers[:] } func (e *Engine) loop() { - done := false - go func() { - for { - time.Sleep(100 * time.Millisecond) - e.c.Broadcast() - e.eventsCh <- EventTryExecuteOne{} - - if done { - return - } - } - }() - for event := range e.eventsCh { e.handle(event) } - - done = true } func (e *Engine) handle(event Event) { - if event, ok := event.(WithExecution); ok { - defer e.runHooks(event, event.getExecution()) - } - switch event := event.(type) { case EventReady: - e.executionsWaiting = append(e.executionsWaiting, event.Execution) - go e.notifyTryExecuteOne() - case EventWorkerAvailable: - go e.notifyTryExecuteOne() - case EventTryExecuteOne: - _ = e.tryExecuteOne() + e.runHooks(event, event.Execution) + e.start(event.Execution) case EventSkipped: e.finalize(event.Execution, ExecStateSkipped) + e.runHooks(event, event.Execution) case EventCompleted: if event.Error != nil { e.finalize(event.Execution, ExecStateFailed) } else { e.finalize(event.Execution, ExecStateSucceeded) } + e.runHooks(event, event.Execution) + default: + if event, ok := event.(WithExecution); ok { + defer e.runHooks(event, event.getExecution()) + } } } func (e *Engine) finalize(exec *Execution, state ExecState) { exec.m.Lock() - defer exec.m.Unlock() - exec.State = state - e.deleteExecution(exec) e.wg.Done() - e.c.Broadcast() close(exec.completedCh) + exec.m.Unlock() + + for _, dep := range exec.Dep.GetDepsObj().Dependees() { + dexec := dep.owner.getExecution() + + dexec.broadcast() + } } func (e *Engine) runHooks(event Event, exec *Execution) { @@ -99,8 +85,8 @@ func (e *Engine) runHooks(event Event, exec *Execution) { } func (e *Engine) waitForDeps(exec *Execution) bool { - e.c.L.Lock() - defer e.c.L.Unlock() + exec.c.L.Lock() + defer exec.c.L.Unlock() for { deepDeps := exec.Dep.GetDepsObj().TransitiveDependencies() @@ -109,16 +95,16 @@ func (e *Engine) waitForDeps(exec *Execution) bool { for _, dep := range deepDeps { depExec := e.executionForDep(dep) - if depExec.State != ExecStateSucceeded { + depExec.m.Lock() + state := depExec.State + depExec.m.Unlock() + + if state != ExecStateSucceeded { allDepsSucceeded = false } - switch depExec.State { - case ExecStateSkipped: - e.notifySkipped(exec) - return false - case ExecStateFailed: - e.notifySkipped(exec) + switch state { + case ExecStateSkipped, ExecStateFailed: return false } } @@ -129,13 +115,14 @@ func (e *Engine) waitForDeps(exec *Execution) bool { return true } - e.c.Wait() + exec.c.Wait() } } func (e *Engine) waitForDepsAndSchedule(exec *Execution) { shouldRun := e.waitForDeps(exec) if !shouldRun { + e.notifySkipped(exec) return } @@ -143,9 +130,7 @@ func (e *Engine) waitForDepsAndSchedule(exec *Execution) { ins := map[string]Value{} for _, dep := range exec.Dep.GetDepsObj().Dependencies() { if dep, ok := dep.(Named); ok { - e.m.Lock() exec := e.executionForDep(dep.Dep) - e.m.Unlock() vv := exec.outStore.Get() @@ -183,37 +168,12 @@ func (e *Engine) notifyCompleted(exec *Execution, output Value, err error) { } } -func (e *Engine) notifyTryExecuteOne() { - e.eventsCh <- EventTryExecuteOne{} -} - func (e *Engine) notifyReady(exec *Execution) { e.eventsCh <- EventReady{ Execution: exec, } } -func (e *Engine) tryExecuteOne() bool { - for _, candidate := range e.executionsWaiting { - e.start(candidate) - e.deleteExecutionWaiting(candidate) - e.runHooks(EventStarted{Execution: candidate}, candidate) - return true - } - - return false -} - -func (e *Engine) deleteExecution(exec *Execution) { - // TODO: would need to be deleted once noone depends on it -} - -func (e *Engine) deleteExecutionWaiting(exec *Execution) { - e.executionsWaiting = slices.DeleteFunc(e.executionsWaiting, func(e *Execution) bool { - return e == exec - }) -} - func (e *Engine) start(exec *Execution) { e.m.Lock() defer e.m.Unlock() @@ -234,6 +194,8 @@ func (e *Engine) start(exec *Execution) { return worker == w }) }() + + e.runHooks(EventStarted{Execution: exec}, exec) } func (e *Engine) RegisterHook(hook Hook) { @@ -244,6 +206,10 @@ func (e *Engine) Run() { e.loop() } +func (e *Engine) Stop() { + close(e.eventsCh) +} + func (e *Engine) Wait() { e.wg.Wait() } @@ -255,15 +221,15 @@ func (e *Engine) executionForDep(dep Dep) *Execution { return e } + e.m.Lock() + defer e.m.Unlock() + return e.scheduleOne(dep) } func (e *Engine) Schedule(a Dep) { - var deps []Dep - a.DeepDo(func(dep Dep) { - deps = append(deps, dep) - }) - slices.Reverse(deps) + deps := a.GetDepsObj().TransitiveDependencies() + deps = append(deps, a) e.m.Lock() defer e.m.Unlock() @@ -280,12 +246,6 @@ func (e *Engine) scheduleOne(dep Dep) *Execution { return exec } - for _, exec := range e.executions { - if exec.Dep == dep { - return exec - } - } - exec := &Execution{ Dep: dep, State: ExecStateScheduled, @@ -297,8 +257,18 @@ func (e *Engine) scheduleOne(dep Dep) *Execution { errCh: nil, inputs: nil, } + debounceBroadcast := debounce.New(time.Millisecond) + exec.broadcast = func() { + debounceBroadcast(func() { + exec.c.L.Lock() + exec.c.Broadcast() + exec.c.L.Unlock() + }) + } + exec.c = sync.NewCond(&exec.m) + // force deps registration + _ = dep.GetDepsObj() dep.setExecution(exec) - e.executions = append(e.executions, exec) e.wg.Add(1) e.runHooks(EventScheduled{Execution: exec}, exec) diff --git a/worker2/engine_test.go b/worker2/engine_test.go index cad60080..f1f94d58 100644 --- a/worker2/engine_test.go +++ b/worker2/engine_test.go @@ -11,7 +11,7 @@ import ( ) // Number of actions to be processed during a stress test -const StressN = 30000 +const StressN = 100000 func TestExecSimple(t *testing.T) { t.Parallel() @@ -20,7 +20,7 @@ func TestExecSimple(t *testing.T) { a := &Action{ Do: func(ctx context.Context, ds InStore, os OutStore) error { didRun = true - fmt.Println("Running 1") + fmt.Println("Running 1") return nil }, } @@ -28,6 +28,7 @@ func TestExecSimple(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) @@ -39,7 +40,6 @@ func TestExecSimple(t *testing.T) { func TestExecSerial(t *testing.T) { t.Parallel() - n := 500 values := make([]int, 0, n) @@ -64,10 +64,10 @@ func TestExecSerial(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(serial) - e.Wait() <-serial.Wait() assert.EqualValues(t, expected, values) @@ -90,6 +90,7 @@ func TestStatus(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) @@ -107,7 +108,6 @@ func TestStatus(t *testing.T) { close(resumeCh) - e.Wait() <-a.Wait() } @@ -133,6 +133,7 @@ func TestExecHook(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) @@ -162,10 +163,11 @@ func TestExecError(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) - e.Wait() + <-a.Wait() assert.ErrorContains(t, <-errCh, "beep bop") } @@ -203,10 +205,11 @@ func TestExecErrorSkip(t *testing.T) { e.RegisterHook(LogHook()) go e.Run() + defer e.Stop() e.Schedule(a3) - e.Wait() + <-a3.Wait() assert.ErrorContains(t, <-err1Ch, "beep bop") assert.ErrorIs(t, <-err2Ch, ErrSkipped) @@ -258,10 +261,11 @@ func TestExecErrorSkipStress(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(g) - e.Wait() + <-g.Wait() for _, errCh := range errChs { assert.ErrorIs(t, <-errCh, ErrSkipped) @@ -287,12 +291,13 @@ func TestExecCancel(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) cancel() - e.Wait() + <-a.Wait() err := <-errCh @@ -339,10 +344,11 @@ func TestExecDeps(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a2) - e.Wait() + <-a2.Wait() assert.Equal(t, "1 hello, world", receivedValue) } @@ -385,17 +391,17 @@ func TestExecGroup(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) - e.Wait() + <-a.Wait() assert.Equal(t, map[string]any{"v1": 1, "v2": "hello, world"}, received) } func TestExecStress(t *testing.T) { t.Parallel() - scheduler := NewLimitScheduler(runtime.NumCPU()) n := StressN @@ -428,6 +434,7 @@ func TestExecStress(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() totalDeps := uint64(n + 2) @@ -436,7 +443,7 @@ func TestExecStress(t *testing.T) { e.Schedule(a) - e.Wait() + <-a.Wait() stats3 := CollectStats(a) assert.Equal(t, Stats{All: totalDeps, Completed: totalDeps, Succeeded: totalDeps}, stats3) @@ -450,7 +457,6 @@ func TestExecStress(t *testing.T) { } func TestExecProducerConsumer(t *testing.T) { - t.Parallel() g := &Group{ Deps: NewDeps(), } @@ -466,7 +472,7 @@ func TestExecProducerConsumer(t *testing.T) { a := &Action{ Do: func(ctx context.Context, ds InStore, os OutStore) error { - fmt.Println("Running inner", i) + //fmt.Println("Running inner", i) os.Set(NewValue(i)) return nil }, @@ -494,10 +500,11 @@ func TestExecProducerConsumer(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(consumer) - e.Wait() + <-consumer.Wait() expected := map[string]any{} for i := 0; i < n; i++ { @@ -539,6 +546,7 @@ func TestSuspend(t *testing.T) { e := NewEngine() go e.Run() + defer e.Stop() e.Schedule(a) diff --git a/worker2/events.go b/worker2/events.go index 294969a3..89083e47 100644 --- a/worker2/events.go +++ b/worker2/events.go @@ -42,10 +42,6 @@ func (e EventSkipped) getExecution() *Execution { return e.Execution } -type EventWorkerAvailable struct { - Worker *Worker -} - type EventReady struct { Execution *Execution } diff --git a/worker2/execution.go b/worker2/execution.go index 10878c81..3e7117c3 100644 --- a/worker2/execution.go +++ b/worker2/execution.go @@ -25,10 +25,12 @@ const ( ) type Execution struct { - Dep Dep - State ExecState - outStore OutStore - eventsCh chan Event + Dep Dep + State ExecState + outStore OutStore + eventsCh chan Event + c *sync.Cond + broadcast func() scheduler Scheduler @@ -41,6 +43,7 @@ type Execution struct { resumeAckCh chan struct{} completedCh chan struct{} + started bool } func (e *Execution) String() string { @@ -67,6 +70,12 @@ func (e *Execution) Run(ctx context.Context) error { e.errCh = make(chan error) e.suspendCh = make(chan struct{}) + if e.started { + panic("already started") + } + + e.started = true + go func() { err := e.run(ctx) e.errCh <- err diff --git a/worker2/worker.go b/worker2/worker.go index 3a55bad9..c788ce76 100644 --- a/worker2/worker.go +++ b/worker2/worker.go @@ -49,5 +49,4 @@ func (w *Worker) Run() { Error: err, } } - w.exec.eventsCh <- EventWorkerAvailable{Worker: w} }