Skip to content

Commit

Permalink
Fix tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed Apr 8, 2024
1 parent 55fcd65 commit 105ab70
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 77 deletions.
34 changes: 17 additions & 17 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pbnjay/memory"
"os"
"path/filepath"
"runtime"
"strings"
)

Expand Down Expand Up @@ -142,9 +141,9 @@ type Bootstrap struct {
PlatformProviders []platform.PlatformProvider
}

func DefaultScheduler() *worker2.ResourceScheduler {
func DefaultScheduler(cpu int) *worker2.ResourceScheduler {
return worker2.NewResourceScheduler(map[string]float64{
"cpu": float64(runtime.NumCPU()),
"cpu": float64(cpu),
"mem": float64(memory.TotalMemory()),
}, map[string]float64{
"cpu": float64(1),
Expand Down Expand Up @@ -194,7 +193,7 @@ func Boot(ctx context.Context, opts BootOpts) (Bootstrap, error) {
pool := opts.Pool
if pool == nil {
pool = worker2.NewEngine()
pool.SetDefaultScheduler(DefaultScheduler())
pool.SetDefaultScheduler(DefaultScheduler(opts.Workers))
go pool.Run()
}
bs.Pool = pool
Expand Down Expand Up @@ -307,19 +306,20 @@ func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, err
}

e := scheduler.New(scheduler.Scheduler{
Cwd: bs.Cwd,
Root: bs.Root,
Config: bs.Config,
Observability: bs.Observability,
GetFlowID: getFlowId,
LocalCache: localCache,
RemoteCache: remoteCache,
Packages: bs.Packages,
BuildFilesState: bs.BuildFiles,
Graph: bs.Graph,
Pool: bs.Pool,
Finalizers: fins,
Runner: runner,
Cwd: bs.Cwd,
Root: bs.Root,
Config: bs.Config,
Observability: bs.Observability,
GetFlowID: getFlowId,
LocalCache: localCache,
RemoteCache: remoteCache,
Packages: bs.Packages,
BuildFilesState: bs.BuildFiles,
Graph: bs.Graph,
Pool: bs.Pool,
BackgroundTracker: worker2.NewRunningTracker(),
Finalizers: fins,
Runner: runner,
})

bs.Finalizers.RegisterWithErr(func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion bootstrapwatch/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Boot(ctx context.Context, root *hroot.State, bootopts bootstrap.BootOpts, c
}

pool := worker2.NewEngine()
pool.SetDefaultScheduler(bootstrap.DefaultScheduler())
pool.SetDefaultScheduler(bootstrap.DefaultScheduler(bootopts.Workers))
go pool.Run()
bootopts.Pool = pool

Expand Down
10 changes: 5 additions & 5 deletions cmd/heph/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func schedulerInit(ctx context.Context, postBoot func(bootstrap.BaseBootstrap) e
Finalizers.RegisterWithErr(func(err error) {
bs.Finalizers.Run(err)

// TODO
//if !bs.Scheduler.Pool.IsDone() {
if false {
gb := bs.Scheduler.BackgroundTracker.Group()
bs.Pool.Schedule(gb)
if !gb.GetState().IsFinal() {
log.Tracef("Waiting for all pool items to finish")
select {
case <-bs.Scheduler.Pool.Wait():
case <-gb.Wait():
case <-time.After(time.Second):
log.Infof("Waiting for background jobs to finish...")
<-bs.Scheduler.Pool.Wait()
<-gb.Wait()
}
log.Tracef("All pool items finished")

Expand Down
15 changes: 10 additions & 5 deletions scheduler/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (e *Scheduler) pullExternalCache(ctx context.Context, target *graph.Target,
return true, nil
}

func (e *Scheduler) scheduleStoreExternalCache(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, tracker *worker2.RunningTracker) worker2.Dep {
func (e *Scheduler) scheduleStoreExternalCache(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, trackers []*worker2.RunningTracker) worker2.Dep {
// input hash is used as a marker that everything went well,
// wait for everything else to be done before copying the input hash
inputHashArtifact := target.Artifacts.InputHash
Expand All @@ -182,17 +182,22 @@ func (e *Scheduler) scheduleStoreExternalCache(ctx context.Context, target *grap
continue
}

j := e.scheduleStoreExternalCacheArtifact(ctx, target, cache, artifact, nil, tracker)
j := e.scheduleStoreExternalCacheArtifact(ctx, target, cache, artifact, nil, trackers)
deps.AddDep(j)
}

return e.scheduleStoreExternalCacheArtifact(ctx, target, cache, inputHashArtifact, deps, tracker)
return e.scheduleStoreExternalCacheArtifact(ctx, target, cache, inputHashArtifact, deps, trackers)
}

func (e *Scheduler) scheduleStoreExternalCacheArtifact(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, artifact artifacts.Artifact, deps *worker2.Group, tracker *worker2.RunningTracker) worker2.Dep {
func (e *Scheduler) scheduleStoreExternalCacheArtifact(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, artifact artifacts.Artifact, deps *worker2.Group, trackers []*worker2.RunningTracker) worker2.Dep {
var hooks []worker2.Hook
for _, tracker := range trackers {
hooks = append(hooks, tracker.Hook())
}

return e.Pool.Schedule(worker2.NewAction(worker2.ActionConfig{
Name: fmt.Sprintf("cache %v %v %v", target.Addr, cache.Name, artifact.Name()),
Hooks: []worker2.Hook{tracker.Hook()},
Hooks: hooks,
Deps: []worker2.Dep{deps},
Ctx: ctx,
Do: func(ctx context.Context, ins worker2.InStore, outs worker2.OutStore) error {
Expand Down
29 changes: 15 additions & 14 deletions scheduler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@ import (
)

type Scheduler struct {
Cwd string
Root *hroot.State
Config *config.Config
Observability *observability.Observability
GetFlowID func() string
LocalCache *lcache.LocalCacheState
RemoteCache *rcache.RemoteCache
RemoteCacheHints *rcache.HintStore
Packages *packages.Registry
BuildFilesState *buildfiles.State
Graph *graph.State
Pool *worker2.Engine
Finalizers *finalizers.Finalizers
Runner *targetrun.Runner
Cwd string
Root *hroot.State
Config *config.Config
Observability *observability.Observability
GetFlowID func() string
LocalCache *lcache.LocalCacheState
RemoteCache *rcache.RemoteCache
RemoteCacheHints *rcache.HintStore
Packages *packages.Registry
BuildFilesState *buildfiles.State
Graph *graph.State
Pool *worker2.Engine
BackgroundTracker *worker2.RunningTracker
Finalizers *finalizers.Finalizers
Runner *targetrun.Runner

toolsLock locks.Locker
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/target_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (e *Scheduler) Run(ctx context.Context, rr targetrun.Request, iocfg sandbox

if len(writeableCaches) > 0 {
for _, cache := range writeableCaches {
_ = e.scheduleStoreExternalCache(ctx, rtarget.Target, cache, tracker)
_ = e.scheduleStoreExternalCache(ctx, rtarget.Target, cache, []*worker2.RunningTracker{tracker, e.BackgroundTracker})
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/go/e2e.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ load("//test", "e2e_test")
e2e_test(
name = "sanity_go_version",
cmd = "heph run //test/go:version",
expect_output_contains = "go version go1.21.4",
expect_output_contains = "go version go1.22.2",
)

e2e_test(
Expand Down
67 changes: 43 additions & 24 deletions worker/poolui/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
"github.com/hephbuild/heph/log/log"
"github.com/hephbuild/heph/utils/ads"
"github.com/hephbuild/heph/status"
"github.com/hephbuild/heph/utils/xcontext"
"github.com/hephbuild/heph/utils/xtea"
"github.com/hephbuild/heph/utils/xtime"
Expand All @@ -15,8 +15,14 @@ import (
"time"
)

type workerEntry struct {
status status.Statuser
duration time.Duration
exec *worker2.Execution
}

type UpdateMessage struct {
workers []*worker2.Worker
workers []workerEntry
stats worker2.Stats
final bool
}
Expand Down Expand Up @@ -71,22 +77,38 @@ func (m *Model) updateMsg(final bool) UpdateMessage {
}
}

s := worker2.CollectStats(m.deps)
return UpdateMessage{
stats: s,
workers: ads.Filter(m.pool.GetWorkers(), func(worker *worker2.Worker) bool {
exec := worker.Execution()
if exec == nil {
return false
}
var workers []workerEntry
for _, w := range m.pool.GetWorkers() {
exec := w.Execution()
if exec == nil {
continue
}

if _, ok := exec.Dep.(*worker2.Group); ok {
return false
}
if _, ok := exec.Dep.(*worker2.Group); ok {
continue
}

return true
}),
final: final,
var duration time.Duration
if !exec.StartedAt.IsZero() {
duration = time.Since(exec.StartedAt)
}

if duration < 200*time.Millisecond {
continue
}

workers = append(workers, workerEntry{
status: w.GetStatus(),
duration: duration,
exec: exec,
})
}

s := worker2.CollectStats(m.deps)
return UpdateMessage{
stats: s,
workers: workers,
final: final,
}
}

Expand Down Expand Up @@ -148,17 +170,14 @@ func (m *Model) View() string {
}

for _, w := range m.workers {
runtime := ""
if j := w.Execution(); j != nil {
runtime = fmt.Sprintf("=> [%5s]", xtime.FormatFixedWidthDuration(time.Since(j.StartedAt)))
}
runtime := fmt.Sprintf("=> [%5s]", xtime.FormatFixedWidthDuration(w.duration))

status := w.GetStatus().String(log.Renderer())
if status == "" {
status = styleFaint.Render("=|")
statusStr := w.status.String(log.Renderer())
if statusStr == "" {
statusStr = styleFaint.Render("=> Thinking...")
}

s.WriteString(fmt.Sprintf("%v %v\n", styleWorkerStart.Render(runtime), status))
s.WriteString(fmt.Sprintf("%v %v\n", styleWorkerStart.Render(runtime), statusStr))
}

return s.String()
Expand Down
4 changes: 2 additions & 2 deletions worker2/dep.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Dep interface {

setExecution(*Execution)
getExecution() *Execution
getMutex() sync.Locker
getMutex() *sync.RWMutex
GetScheduler() Scheduler
GetRequest() map[string]float64
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (a *baseDep) Wait() <-chan struct{} {
return doneCh
}

func (a *baseDep) getMutex() sync.Locker {
func (a *baseDep) getMutex() *sync.RWMutex {
return &a.m
}

Expand Down
2 changes: 1 addition & 1 deletion worker2/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (d *Deps) Freeze() {

for _, dep := range d.deps.Slice() {
if !dep.GetDepsObj().IsFrozen() {
panic("attempting to freeze while all deps aren't frozen")
panic(fmt.Sprintf("attempting to freeze '%v' while all deps aren't frozen, '%v' isnt", d.owner.GetName(), dep.GetName()))
}
}

Expand Down
8 changes: 3 additions & 5 deletions worker2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ func (e *Engine) Schedule(a Dep) Dep {
func (e *Engine) registerOne(dep Dep, lock bool) *Execution {
dep = flattenNamed(dep)

m := dep.getMutex()
if lock {
m := dep.getMutex()
m.Lock()
defer m.Unlock()
}
Expand All @@ -322,15 +322,13 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution {
return exec
}

// force deps registration
_ = dep.GetDepsObj()

exec := &Execution{
ID: atomic.AddUint64(&e.execUid, 1),
Dep: dep,
outStore: &outStore{},
eventsCh: e.eventsCh,
completedCh: make(chan struct{}),
m: m,

// see field comments
errCh: nil,
Expand All @@ -344,7 +342,7 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution {
exec.c.L.Unlock()
})
}
exec.c = sync.NewCond(&exec.m)
exec.c = sync.NewCond(exec.m)
dep.setExecution(exec)

return exec
Expand Down
2 changes: 1 addition & 1 deletion worker2/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Execution struct {

errCh chan error // gets populated when exec is called
inputs map[string]Value // gets populated before marking as ready
m sync.Mutex
m *sync.RWMutex

suspendCh chan struct{}
resumeCh chan struct{}
Expand Down
2 changes: 2 additions & 0 deletions worker2/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (t *RunningTracker) Hook() Hook {

return func(event Event) {
switch event := event.(type) {
case EventDeclared:
t.group.AddDep(event.Dep)
case EventScheduled:
t.group.AddDep(event.Execution.Dep)
case EventStarted:
Expand Down

0 comments on commit 105ab70

Please sign in to comment.