From 79554461341addce12e84755a74cba2d11abfe04 Mon Sep 17 00:00:00 2001 From: Raphael Vigee Date: Sun, 17 Mar 2024 11:49:06 +0000 Subject: [PATCH] move code around --- worker2/{action.go => dep.go} | 70 ++--- worker2/engine.go | 336 +-------------------- worker2/{action_test.go => engine_test.go} | 0 worker2/events.go | 55 ++++ worker2/execution.go | 113 +++++++ worker2/goroutine.go | 90 ++++++ worker2/hook.go | 2 +- worker2/store.go | 39 +++ worker2/value.go | 35 +++ worker2/worker.go | 24 ++ 10 files changed, 392 insertions(+), 372 deletions(-) rename worker2/{action.go => dep.go} (75%) rename worker2/{action_test.go => engine_test.go} (100%) create mode 100644 worker2/events.go create mode 100644 worker2/execution.go create mode 100644 worker2/goroutine.go create mode 100644 worker2/store.go create mode 100644 worker2/value.go create mode 100644 worker2/worker.go diff --git a/worker2/action.go b/worker2/dep.go similarity index 75% rename from worker2/action.go rename to worker2/dep.go index 2a9915a0..69617ddf 100644 --- a/worker2/action.go +++ b/worker2/dep.go @@ -5,50 +5,25 @@ import ( "sync" ) -type Value interface { - Get() (any, error) -} - -type MemoryValue[T any] struct { - V T -} - -func (v MemoryValue[T]) Get() (any, error) { - return v.V, nil -} - -type MapValue map[string]Value - -func (m MapValue) Get() (any, error) { - out := make(map[string]any, len(m)) - for k, vv := range m { - if vv == nil { - continue - } - v, err := vv.Get() - if err != nil { - return nil, err - } - out[k] = v - } - - return out, nil -} - -func (m MapValue) Set(k string, v Value) { - m[k] = v +type Dep interface { + GetID() string + Exec(ctx context.Context, ins InStore, outs OutStore) error + Freeze() + Frozen() bool + DirectDeps() []Dep + GetHooks() []Hook } -type Base struct { +type baseDep struct { frozen bool m sync.Mutex } -func (g *Base) Frozen() bool { +func (g *baseDep) Frozen() bool { return g.frozen } -func (g *Base) Freeze() { +func (g *baseDep) Freeze() { g.m.Lock() defer g.m.Unlock() @@ -56,7 +31,7 @@ func (g *Base) Freeze() { } type Action struct { - Base + baseDep ID string Deps []Dep Hooks []Hook @@ -92,7 +67,7 @@ func (a *Action) DirectDeps() []Dep { } type Group struct { - Base + baseDep ID string Deps []Dep Hooks []Hook @@ -138,16 +113,19 @@ func (g *Group) Exec(ctx context.Context, ins InStore, outs OutStore) error { return nil } -type Dep interface { - GetID() string - Exec(ctx context.Context, ins InStore, outs OutStore) error - Freeze() - Frozen() bool - DirectDeps() []Dep - GetHooks() []Hook -} - type Named struct { Name string Dep } + +func flattenNamed(dep Dep) Dep { + for { + if ndep, ok := dep.(Named); ok { + dep = ndep.Dep + } else { + break + } + } + + return dep +} diff --git a/worker2/engine.go b/worker2/engine.go index a1778093..09be2655 100644 --- a/worker2/engine.go +++ b/worker2/engine.go @@ -3,160 +3,12 @@ package worker2 import ( "context" "errors" - "fmt" "runtime" "slices" "sync" "time" ) -var ErrWorkerNotAvail = errors.New("worker not available") -var ErrNoWorkerAvail = errors.New("no worker available") - -type Event any - -type WithExecution interface { - getExecution() *Execution -} - -type EventTryExecuteOne struct{} - -type EventCompleted struct { - Execution *Execution - Output Value - Error error -} - -func (e EventCompleted) getExecution() *Execution { - return e.Execution -} - -type EventScheduled struct { - Execution *Execution -} - -func (e EventScheduled) getExecution() *Execution { - return e.Execution -} - -type EventStarted struct { - Execution *Execution -} - -func (e EventStarted) getExecution() *Execution { - return e.Execution -} - -type EventSkipped struct { - Execution *Execution -} - -func (e EventSkipped) getExecution() *Execution { - return e.Execution -} - -type EventWorkerAvailable struct { - Worker Worker -} - -type EventReady struct { - Execution *Execution -} - -func (e EventReady) getExecution() *Execution { - return e.Execution -} - -type InStore interface { - Copy(OutStore) - Get(key string) (any, error) -} - -type OutStore interface { - Set(Value) - Get() Value -} - -type inStore struct { - m map[string]Value -} - -func (s *inStore) Copy(outs OutStore) { - mv := make(MapValue, len(s.m)) - for k, v := range s.m { - mv.Set(k, v) - } - outs.Set(mv) -} - -func (s *inStore) Get(name string) (any, error) { - return s.m[name].Get() -} - -type outStore struct { - value Value -} - -func (s *outStore) Set(v Value) { - s.value = v -} - -func (s *outStore) Get() Value { - return s.value -} - -type Worker interface { - Start(a *Execution) error - State() WorkerState -} - -type GoroutineWorker struct { - m sync.Mutex - ctx context.Context - state WorkerState -} - -func NewGoroutineWorker(ctx context.Context) *GoroutineWorker { - w := &GoroutineWorker{ - ctx: ctx, - state: WorkerStateIdle, - } - return w -} - -func (g *GoroutineWorker) State() WorkerState { - return g.state -} - -func (g *GoroutineWorker) Start(e *Execution) error { - ok := g.m.TryLock() - if !ok { - return ErrWorkerNotAvail - } - - go func() { - g.state = WorkerStateRunning - ctx := contextWithExecution(g.ctx, e) - err := e.Start(ctx) - g.state = WorkerStateIdle - g.m.Unlock() - if errors.Is(err, ErrSuspended) { - go func() { - <-e.resumeCh - e.eventsCh <- EventReady{Execution: e} - }() - } else { - e.eventsCh <- EventCompleted{ - Execution: e, - Output: e.outStore.Get(), - Error: err, - } - } - }() - - return nil -} - type Engine struct { wg sync.WaitGroup workerProviders []WorkerProvider @@ -175,167 +27,13 @@ func NewEngine() *Engine { } } -func NewGoroutineWorkerProvider(ctx context.Context) WorkerProvider { - wp := &GoroutineWorkerProvider{} - for i := 0; i < runtime.NumCPU(); i++ { - wp.workers = append(wp.workers, NewGoroutineWorker(ctx)) - } - return wp -} - -type WorkerProvider interface { - Start(*Execution) (Worker, error) - Workers() []Worker -} - -type GoroutineWorkerProvider struct { - workers []*GoroutineWorker -} - -func (wp *GoroutineWorkerProvider) Workers() []Worker { - workers := make([]Worker, 0, len(wp.workers)) - for _, worker := range wp.workers { - workers = append(workers, worker) - } - return workers -} - -func (wp *GoroutineWorkerProvider) Start(e *Execution) (Worker, error) { - for _, w := range wp.workers { - err := w.Start(e) - if err != nil { - if errors.Is(err, ErrWorkerNotAvail) { - continue - } - return nil, err - } - - return w, nil - } - - return nil, ErrNoWorkerAvail -} - -type ExecState int - -func (s ExecState) IsFinal() bool { - return s == ExecStateSucceeded || s == ExecStateFailed || s == ExecStateSkipped -} - -const ( - ExecStateUnknown ExecState = iota - ExecStateScheduled - ExecStateWaiting - ExecStateRunning - ExecStateSucceeded - ExecStateFailed - ExecStateSkipped - ExecStateSuspended -) - -type WorkerState int - -const ( - WorkerStateUnknown WorkerState = iota - WorkerStateIdle - WorkerStateRunning -) - -type Execution struct { - Action Dep - State ExecState - outStore OutStore - eventsCh chan Event - - worker Worker // gets populated when a worker accepts it - errCh chan error // gets populated when exec is called - inStore InStore // gets populated when its deps are ready - m sync.Mutex - - suspendCh chan struct{} - resumeCh chan struct{} - resumeAckCh chan struct{} -} - -func (e *Execution) String() string { - return e.Action.GetID() -} - -func (e *Execution) GetOutput() Value { - return e.outStore.Get() -} - -var ErrSuspended = errors.New("suspended") - -func (e *Execution) Start(ctx context.Context) error { - e.m.Lock() - if e.errCh == nil { - e.errCh = make(chan error) - e.suspendCh = make(chan struct{}) - - go func() { - err := e.safeExec(ctx) - e.errCh <- err - }() - } else { - e.resumeAckCh <- struct{}{} - } - e.m.Unlock() - - select { - case <-e.suspendCh: - return ErrSuspended - case err := <-e.errCh: - return err - } -} - -func (e *Execution) safeExec(ctx context.Context) (err error) { - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("panic: %v", r) - } - }() - - return e.Action.Exec(ctx, e.inStore, e.outStore) -} - -func (e *Execution) Suspend() { - e.m.Lock() - defer e.m.Unlock() - - if e.resumeCh != nil { - panic("attempting to suspend an already suspended execution") - } - - e.suspendCh <- struct{}{} - e.resumeCh = make(chan struct{}) - e.resumeAckCh = make(chan struct{}) -} - -func (e *Execution) Resume() <-chan struct{} { - e.m.Lock() - defer e.m.Unlock() - - if e.resumeAckCh == nil { - panic("attempting to resume an unsuspended execution") - } - - ackCh := e.resumeAckCh - - e.resumeCh <- struct{}{} - e.resumeCh = nil - - return ackCh -} - func (e *Engine) deepDeps(a Dep, m map[Dep]struct{}, deps *[]Dep) { if m == nil { m = map[Dep]struct{}{} } for _, dep := range a.DirectDeps() { - dep := noNamed(dep) + dep := flattenNamed(dep) if _, ok := m[dep]; ok { continue @@ -348,18 +46,6 @@ func (e *Engine) deepDeps(a Dep, m map[Dep]struct{}, deps *[]Dep) { } } -func noNamed(dep Dep) Dep { - for { - if ndep, ok := dep.(Named); ok { - dep = ndep.Dep - } else { - break - } - } - - return dep -} - func (e *Engine) loop() { for event := range e.eventsCh { e.handle(event) @@ -415,7 +101,7 @@ func (e *Engine) runHooks(event Event, exec *Execution) { hook(event) } - for _, hook := range exec.Action.GetHooks() { + for _, hook := range exec.Dep.GetHooks() { hook(event) } } @@ -426,7 +112,7 @@ func (e *Engine) waitForDeps(exec *Execution, execCache map[Dep]*Execution) bool for { var deepDeps []Dep - e.deepDeps(exec.Action, nil, &deepDeps) + e.deepDeps(exec.Dep, nil, &deepDeps) allDepsSucceeded := true for _, dep := range deepDeps { @@ -464,10 +150,10 @@ func (e *Engine) waitForDepsAndSchedule(exec *Execution) { return } - exec.Action.Freeze() + exec.Dep.Freeze() ins := &inStore{m: map[string]Value{}} - for _, dep := range exec.Action.DirectDeps() { + for _, dep := range exec.Dep.DirectDeps() { if dep, ok := dep.(Named); ok { exec := e.mustExecutionForDep(dep.Dep, execCache) @@ -552,7 +238,7 @@ func (e *Engine) RegisterHook(hook Hook) { } func (e *Engine) Run(ctx context.Context) { - e.RegisterWorkerProvider(NewGoroutineWorkerProvider(ctx)) + e.RegisterWorkerProvider(NewGoroutineWorkerProvider(ctx, runtime.NumCPU())) e.loop() } @@ -574,7 +260,7 @@ func (e *Engine) allWorkersIdle() bool { } func (e *Engine) mustExecutionForDep(dep Dep, c map[Dep]*Execution) *Execution { - dep = noNamed(dep) + dep = flattenNamed(dep) if exec, ok := c[dep]; ok { return exec @@ -582,7 +268,7 @@ func (e *Engine) mustExecutionForDep(dep Dep, c map[Dep]*Execution) *Execution { e.m.RLock() for _, exec := range e.executions { - if exec.Action == dep { + if exec.Dep == dep { e.m.RUnlock() c[dep] = exec return exec @@ -610,15 +296,15 @@ func (e *Engine) Schedule(a Dep) { } func (e *Engine) scheduleOne(dep Dep) *Execution { - dep = noNamed(dep) + dep = flattenNamed(dep) for _, exec := range e.executions { - if exec.Action == dep { + if exec.Dep == dep { return exec } } exec := &Execution{ - Action: dep, + Dep: dep, State: ExecStateScheduled, outStore: &outStore{}, eventsCh: e.eventsCh, diff --git a/worker2/action_test.go b/worker2/engine_test.go similarity index 100% rename from worker2/action_test.go rename to worker2/engine_test.go diff --git a/worker2/events.go b/worker2/events.go new file mode 100644 index 00000000..bcfbac08 --- /dev/null +++ b/worker2/events.go @@ -0,0 +1,55 @@ +package worker2 + +type Event any + +type WithExecution interface { + getExecution() *Execution +} + +type EventTryExecuteOne struct{} + +type EventCompleted struct { + Execution *Execution + Output Value + Error error +} + +func (e EventCompleted) getExecution() *Execution { + return e.Execution +} + +type EventScheduled struct { + Execution *Execution +} + +func (e EventScheduled) getExecution() *Execution { + return e.Execution +} + +type EventStarted struct { + Execution *Execution +} + +func (e EventStarted) getExecution() *Execution { + return e.Execution +} + +type EventSkipped struct { + Execution *Execution +} + +func (e EventSkipped) getExecution() *Execution { + return e.Execution +} + +type EventWorkerAvailable struct { + Worker Worker +} + +type EventReady struct { + Execution *Execution +} + +func (e EventReady) getExecution() *Execution { + return e.Execution +} diff --git a/worker2/execution.go b/worker2/execution.go new file mode 100644 index 00000000..785c950d --- /dev/null +++ b/worker2/execution.go @@ -0,0 +1,113 @@ +package worker2 + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type ExecState int + +func (s ExecState) IsFinal() bool { + return s == ExecStateSucceeded || s == ExecStateFailed || s == ExecStateSkipped +} + +const ( + ExecStateUnknown ExecState = iota + ExecStateScheduled + ExecStateWaiting + ExecStateRunning + ExecStateSucceeded + ExecStateFailed + ExecStateSkipped + ExecStateSuspended +) + +type Execution struct { + Dep Dep + State ExecState + outStore OutStore + eventsCh chan Event + + worker Worker // gets populated when a worker accepts it + errCh chan error // gets populated when exec is called + inStore InStore // gets populated when its deps are ready + m sync.Mutex + + suspendCh chan struct{} + resumeCh chan struct{} + resumeAckCh chan struct{} +} + +func (e *Execution) String() string { + return e.Dep.GetID() +} + +func (e *Execution) GetOutput() Value { + return e.outStore.Get() +} + +var ErrSuspended = errors.New("suspended") + +func (e *Execution) Start(ctx context.Context) error { + e.m.Lock() + if e.errCh == nil { + e.errCh = make(chan error) + e.suspendCh = make(chan struct{}) + + go func() { + err := e.safeExec(ctx) + e.errCh <- err + }() + } else { + e.resumeAckCh <- struct{}{} + } + e.m.Unlock() + + select { + case <-e.suspendCh: + return ErrSuspended + case err := <-e.errCh: + return err + } +} + +func (e *Execution) safeExec(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + return e.Dep.Exec(ctx, e.inStore, e.outStore) +} + +func (e *Execution) Suspend() { + e.m.Lock() + defer e.m.Unlock() + + if e.resumeCh != nil { + panic("attempting to suspend an already suspended execution") + } + + e.suspendCh <- struct{}{} + e.resumeCh = make(chan struct{}) + e.resumeAckCh = make(chan struct{}) +} + +func (e *Execution) Resume() <-chan struct{} { + e.m.Lock() + defer e.m.Unlock() + + if e.resumeAckCh == nil { + panic("attempting to resume an unsuspended execution") + } + + ackCh := e.resumeAckCh + + e.resumeCh <- struct{}{} + e.resumeCh = nil + + return ackCh +} diff --git a/worker2/goroutine.go b/worker2/goroutine.go new file mode 100644 index 00000000..fcdfe542 --- /dev/null +++ b/worker2/goroutine.go @@ -0,0 +1,90 @@ +package worker2 + +import ( + "context" + "errors" + "sync" +) + +type GoroutineWorker struct { + m sync.Mutex + ctx context.Context + state WorkerState +} + +func NewGoroutineWorker(ctx context.Context) *GoroutineWorker { + w := &GoroutineWorker{ + ctx: ctx, + state: WorkerStateIdle, + } + return w +} + +func (g *GoroutineWorker) State() WorkerState { + return g.state +} + +func (g *GoroutineWorker) Start(e *Execution) error { + ok := g.m.TryLock() + if !ok { + return ErrWorkerNotAvail + } + + go func() { + g.state = WorkerStateRunning + ctx := contextWithExecution(g.ctx, e) + err := e.Start(ctx) + g.state = WorkerStateIdle + g.m.Unlock() + if errors.Is(err, ErrSuspended) { + go func() { + <-e.resumeCh + e.eventsCh <- EventReady{Execution: e} + }() + } else { + e.eventsCh <- EventCompleted{ + Execution: e, + Output: e.outStore.Get(), + Error: err, + } + } + }() + + return nil +} + +func NewGoroutineWorkerProvider(ctx context.Context, n int) WorkerProvider { + wp := &GoroutineWorkerProvider{} + for i := 0; i < n; i++ { + wp.workers = append(wp.workers, NewGoroutineWorker(ctx)) + } + return wp +} + +type GoroutineWorkerProvider struct { + workers []*GoroutineWorker +} + +func (wp *GoroutineWorkerProvider) Workers() []Worker { + workers := make([]Worker, 0, len(wp.workers)) + for _, worker := range wp.workers { + workers = append(workers, worker) + } + return workers +} + +func (wp *GoroutineWorkerProvider) Start(e *Execution) (Worker, error) { + for _, w := range wp.workers { + err := w.Start(e) + if err != nil { + if errors.Is(err, ErrWorkerNotAvail) { + continue + } + return nil, err + } + + return w, nil + } + + return nil, ErrNoWorkerAvail +} diff --git a/worker2/hook.go b/worker2/hook.go index 28d21564..d25236d8 100644 --- a/worker2/hook.go +++ b/worker2/hook.go @@ -39,7 +39,7 @@ func ErrorHook() (Hook, <-chan error) { func LogHook() Hook { return func(event Event) { if event, ok := event.(WithExecution); ok { - fmt.Printf("%v: %T %+v\n", event.getExecution().Action.GetID(), event, event) + fmt.Printf("%v: %T %+v\n", event.getExecution().Dep.GetID(), event, event) } else { fmt.Printf("%T %+v\n", event, event) } diff --git a/worker2/store.go b/worker2/store.go new file mode 100644 index 00000000..bb9e2f04 --- /dev/null +++ b/worker2/store.go @@ -0,0 +1,39 @@ +package worker2 + +type InStore interface { + Copy(OutStore) + Get(key string) (any, error) +} + +type OutStore interface { + Set(Value) + Get() Value +} + +type inStore struct { + m map[string]Value +} + +func (s *inStore) Copy(outs OutStore) { + mv := make(MapValue, len(s.m)) + for k, v := range s.m { + mv.Set(k, v) + } + outs.Set(mv) +} + +func (s *inStore) Get(name string) (any, error) { + return s.m[name].Get() +} + +type outStore struct { + value Value +} + +func (s *outStore) Set(v Value) { + s.value = v +} + +func (s *outStore) Get() Value { + return s.value +} diff --git a/worker2/value.go b/worker2/value.go new file mode 100644 index 00000000..e633bcad --- /dev/null +++ b/worker2/value.go @@ -0,0 +1,35 @@ +package worker2 + +type Value interface { + Get() (any, error) +} + +type MemoryValue[T any] struct { + V T +} + +func (v MemoryValue[T]) Get() (any, error) { + return v.V, nil +} + +type MapValue map[string]Value + +func (m MapValue) Get() (any, error) { + out := make(map[string]any, len(m)) + for k, vv := range m { + if vv == nil { + continue + } + v, err := vv.Get() + if err != nil { + return nil, err + } + out[k] = v + } + + return out, nil +} + +func (m MapValue) Set(k string, v Value) { + m[k] = v +} diff --git a/worker2/worker.go b/worker2/worker.go new file mode 100644 index 00000000..94bc98c4 --- /dev/null +++ b/worker2/worker.go @@ -0,0 +1,24 @@ +package worker2 + +import "errors" + +type WorkerState int + +const ( + WorkerStateUnknown WorkerState = iota + WorkerStateIdle + WorkerStateRunning +) + +var ErrWorkerNotAvail = errors.New("worker not available") +var ErrNoWorkerAvail = errors.New("no worker available") + +type Worker interface { + Start(a *Execution) error + State() WorkerState +} + +type WorkerProvider interface { + Start(*Execution) (Worker, error) + Workers() []Worker +}