Skip to content

Commit

Permalink
simplify input access
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed Mar 17, 2024
1 parent 7955446 commit 62d4aed
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 40 deletions.
3 changes: 2 additions & 1 deletion worker2/dep.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func (g *Group) Add(deps ...Dep) {
}

func (g *Group) Exec(ctx context.Context, ins InStore, outs OutStore) error {
ins.Copy(outs)
e := executionFromContext(ctx)
outs.Set(MapValue(e.inputs))
return nil
}

Expand Down
32 changes: 20 additions & 12 deletions worker2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,11 @@ func (e *Engine) waitForDeps(exec *Execution, execCache map[Dep]*Execution) bool
}

if allDepsSucceeded {
break
return true
}

e.c.Wait()
}

return true
}

func (e *Engine) waitForDepsAndSchedule(exec *Execution) {
Expand All @@ -152,17 +150,18 @@ func (e *Engine) waitForDepsAndSchedule(exec *Execution) {

exec.Dep.Freeze()

ins := &inStore{m: map[string]Value{}}
exec.m.Lock()
ins := map[string]Value{}
for _, dep := range exec.Dep.DirectDeps() {
if dep, ok := dep.(Named); ok {
exec := e.mustExecutionForDep(dep.Dep, execCache)

ins.m[dep.Name] = exec.outStore.Get()
vv := exec.outStore.Get()

ins[dep.Name] = vv
}
}

exec.m.Lock()
exec.inStore = ins
exec.inputs = ins
exec.State = ExecStateWaiting
exec.m.Unlock()

Expand All @@ -175,6 +174,14 @@ func (e *Engine) notifySkipped(exec *Execution) {
}
}

func (e *Engine) notifyCompleted(exec *Execution, output Value, err error) {
e.eventsCh <- EventCompleted{
Execution: exec,
Output: output,
Error: err,
}
}

func (e *Engine) notifyTryExecuteOne() {
e.eventsCh <- EventTryExecuteOne{}
}
Expand All @@ -192,7 +199,8 @@ func (e *Engine) tryExecuteOne() bool {
if errors.Is(err, ErrNoWorkerAvail) {
continue
}
panic(err)
e.notifyCompleted(candidate, nil, err)
continue
}
e.deleteExecutionWaiting(candidate)
e.runHooks(EventStarted{Execution: candidate}, candidate)
Expand Down Expand Up @@ -310,9 +318,9 @@ func (e *Engine) scheduleOne(dep Dep) *Execution {
eventsCh: e.eventsCh,

// see field comments
worker: nil,
errCh: nil,
inStore: nil,
worker: nil,
errCh: nil,
inputs: nil,
}
e.executions = append(e.executions, exec)
e.wg.Add(1)
Expand Down
10 changes: 5 additions & 5 deletions worker2/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func TestExecDeps(t *testing.T) {
Named{Name: "v2", Dep: a1_2},
},
Do: func(ctx context.Context, ds InStore, os OutStore) error {
v1, _ := ds.Get("v1")
v2, _ := ds.Get("v2")
v1 := ds.Get("v1")
v2 := ds.Get("v2")

fmt.Println("Got values", v1, v2)

Expand Down Expand Up @@ -288,7 +288,7 @@ func TestExecGroup(t *testing.T) {
a := &Action{
Deps: []Dep{Named{Name: "v", Dep: g}},
Do: func(ctx context.Context, ds InStore, os OutStore) error {
received, _ = ds.Get("v")
received = ds.Get("v")
return nil
},
}
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestExecStress(t *testing.T) {
a := &Action{
Deps: []Dep{Named{Name: "v", Dep: g}},
Do: func(ctx context.Context, ds InStore, os OutStore) error {
received, _ = ds.Get("v")
received = ds.Get("v")
return nil
},
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestExecProducerConsumer(t *testing.T) {
Do: func(ctx context.Context, ds InStore, os OutStore) error {
fmt.Println("Running consumer")

received, _ = ds.Get("v")
received = ds.Get("v")
return nil
},
}
Expand Down
30 changes: 23 additions & 7 deletions worker2/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type Execution struct {
outStore OutStore
eventsCh chan Event

worker Worker // gets populated when a worker accepts it
errCh chan error // gets populated when exec is called
inStore InStore // gets populated when its deps are ready
m sync.Mutex
worker Worker // gets populated when a worker accepts it
errCh chan error // gets populated when exec is called
inputs map[string]Value // gets populated before marking as ready
m sync.Mutex

suspendCh chan struct{}
resumeCh chan struct{}
Expand All @@ -57,30 +57,46 @@ func (e *Execution) Start(ctx context.Context) error {
e.suspendCh = make(chan struct{})

go func() {
err := e.safeExec(ctx)
err := e.run(ctx)
e.errCh <- err
}()
} else {
e.resumeAckCh <- struct{}{}
e.State = ExecStateRunning
}
e.m.Unlock()

select {
case <-e.suspendCh:
e.State = ExecStateSuspended
return ErrSuspended
case err := <-e.errCh:
return err
}
}

func (e *Execution) safeExec(ctx context.Context) (err error) {
func (e *Execution) run(ctx context.Context) error {
ins := &inStore{m: map[string]any{}}
for k, value := range e.inputs {
vv, err := value.Get()
if err != nil {
return fmt.Errorf("%v: %w", k, err)
}

ins.m[k] = vv
}

return e.safeExec(ctx, ins)
}

func (e *Execution) safeExec(ctx context.Context, ins InStore) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()

return e.Dep.Exec(ctx, e.inStore, e.outStore)
return e.Dep.Exec(ctx, ins, e.outStore)
}

func (e *Execution) Suspend() {
Expand Down
7 changes: 5 additions & 2 deletions worker2/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ func (g *GoroutineWorker) Start(e *Execution) error {
g.m.Unlock()
if errors.Is(err, ErrSuspended) {
go func() {
<-e.resumeCh
e.eventsCh <- EventReady{Execution: e}
select {
case <-ctx.Done():
case <-e.resumeCh:
e.eventsCh <- EventReady{Execution: e}
}
}()
} else {
e.eventsCh <- EventCompleted{
Expand Down
17 changes: 4 additions & 13 deletions worker2/store.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package worker2

type InStore interface {
Copy(OutStore)
Get(key string) (any, error)
Get(key string) any
}

type OutStore interface {
Expand All @@ -11,19 +10,11 @@ type OutStore interface {
}

type inStore struct {
m map[string]Value
m map[string]any
}

func (s *inStore) Copy(outs OutStore) {
mv := make(MapValue, len(s.m))
for k, v := range s.m {
mv.Set(k, v)
}
outs.Set(mv)
}

func (s *inStore) Get(name string) (any, error) {
return s.m[name].Get()
func (s *inStore) Get(name string) any {
return s.m[name]
}

type outStore struct {
Expand Down

0 comments on commit 62d4aed

Please sign in to comment.