diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index af1c68e2..d96565c3 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -27,7 +27,6 @@ import ( "github.com/pbnjay/memory" "os" "path/filepath" - "runtime" "strings" ) @@ -142,9 +141,9 @@ type Bootstrap struct { PlatformProviders []platform.PlatformProvider } -func DefaultScheduler() *worker2.ResourceScheduler { +func DefaultScheduler(cpu int) *worker2.ResourceScheduler { return worker2.NewResourceScheduler(map[string]float64{ - "cpu": float64(runtime.NumCPU()), + "cpu": float64(cpu), "mem": float64(memory.TotalMemory()), }, map[string]float64{ "cpu": float64(1), @@ -194,7 +193,7 @@ func Boot(ctx context.Context, opts BootOpts) (Bootstrap, error) { pool := opts.Pool if pool == nil { pool = worker2.NewEngine() - pool.SetDefaultScheduler(DefaultScheduler()) + pool.SetDefaultScheduler(DefaultScheduler(opts.Workers)) go pool.Run() } bs.Pool = pool @@ -307,19 +306,20 @@ func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, err } e := scheduler.New(scheduler.Scheduler{ - Cwd: bs.Cwd, - Root: bs.Root, - Config: bs.Config, - Observability: bs.Observability, - GetFlowID: getFlowId, - LocalCache: localCache, - RemoteCache: remoteCache, - Packages: bs.Packages, - BuildFilesState: bs.BuildFiles, - Graph: bs.Graph, - Pool: bs.Pool, - Finalizers: fins, - Runner: runner, + Cwd: bs.Cwd, + Root: bs.Root, + Config: bs.Config, + Observability: bs.Observability, + GetFlowID: getFlowId, + LocalCache: localCache, + RemoteCache: remoteCache, + Packages: bs.Packages, + BuildFilesState: bs.BuildFiles, + Graph: bs.Graph, + Pool: bs.Pool, + BackgroundTracker: worker2.NewRunningTracker(), + Finalizers: fins, + Runner: runner, }) bs.Finalizers.RegisterWithErr(func(err error) { diff --git a/bootstrapwatch/bootstrap.go b/bootstrapwatch/bootstrap.go index 21d95820..1497bbe1 100644 --- a/bootstrapwatch/bootstrap.go +++ b/bootstrapwatch/bootstrap.go @@ -89,7 +89,7 @@ func Boot(ctx context.Context, root *hroot.State, bootopts bootstrap.BootOpts, c } pool := worker2.NewEngine() - pool.SetDefaultScheduler(bootstrap.DefaultScheduler()) + pool.SetDefaultScheduler(bootstrap.DefaultScheduler(bootopts.Workers)) go pool.Run() bootopts.Pool = pool diff --git a/cmd/heph/init.go b/cmd/heph/init.go index 88e90830..25b7840c 100644 --- a/cmd/heph/init.go +++ b/cmd/heph/init.go @@ -59,15 +59,15 @@ func schedulerInit(ctx context.Context, postBoot func(bootstrap.BaseBootstrap) e Finalizers.RegisterWithErr(func(err error) { bs.Finalizers.Run(err) - // TODO - //if !bs.Scheduler.Pool.IsDone() { - if false { + gb := bs.Scheduler.BackgroundTracker.Group() + bs.Pool.Schedule(gb) + if !gb.GetState().IsFinal() { log.Tracef("Waiting for all pool items to finish") select { - case <-bs.Scheduler.Pool.Wait(): + case <-gb.Wait(): case <-time.After(time.Second): log.Infof("Waiting for background jobs to finish...") - <-bs.Scheduler.Pool.Wait() + <-gb.Wait() } log.Tracef("All pool items finished") diff --git a/scheduler/cache.go b/scheduler/cache.go index ab392592..82bebcc8 100644 --- a/scheduler/cache.go +++ b/scheduler/cache.go @@ -171,7 +171,7 @@ func (e *Scheduler) pullExternalCache(ctx context.Context, target *graph.Target, return true, nil } -func (e *Scheduler) scheduleStoreExternalCache(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, tracker *worker2.RunningTracker) worker2.Dep { +func (e *Scheduler) scheduleStoreExternalCache(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, trackers []*worker2.RunningTracker) worker2.Dep { // input hash is used as a marker that everything went well, // wait for everything else to be done before copying the input hash inputHashArtifact := target.Artifacts.InputHash @@ -182,17 +182,22 @@ func (e *Scheduler) scheduleStoreExternalCache(ctx context.Context, target *grap continue } - j := e.scheduleStoreExternalCacheArtifact(ctx, target, cache, artifact, nil, tracker) + j := e.scheduleStoreExternalCacheArtifact(ctx, target, cache, artifact, nil, trackers) deps.AddDep(j) } - return e.scheduleStoreExternalCacheArtifact(ctx, target, cache, inputHashArtifact, deps, tracker) + return e.scheduleStoreExternalCacheArtifact(ctx, target, cache, inputHashArtifact, deps, trackers) } -func (e *Scheduler) scheduleStoreExternalCacheArtifact(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, artifact artifacts.Artifact, deps *worker2.Group, tracker *worker2.RunningTracker) worker2.Dep { +func (e *Scheduler) scheduleStoreExternalCacheArtifact(ctx context.Context, target *graph.Target, cache rcache.CacheConfig, artifact artifacts.Artifact, deps *worker2.Group, trackers []*worker2.RunningTracker) worker2.Dep { + var hooks []worker2.Hook + for _, tracker := range trackers { + hooks = append(hooks, tracker.Hook()) + } + return e.Pool.Schedule(worker2.NewAction(worker2.ActionConfig{ Name: fmt.Sprintf("cache %v %v %v", target.Addr, cache.Name, artifact.Name()), - Hooks: []worker2.Hook{tracker.Hook()}, + Hooks: hooks, Deps: []worker2.Dep{deps}, Ctx: ctx, Do: func(ctx context.Context, ins worker2.InStore, outs worker2.OutStore) error { diff --git a/scheduler/engine.go b/scheduler/engine.go index 962c82fb..a8f607b4 100644 --- a/scheduler/engine.go +++ b/scheduler/engine.go @@ -21,20 +21,21 @@ import ( ) type Scheduler struct { - Cwd string - Root *hroot.State - Config *config.Config - Observability *observability.Observability - GetFlowID func() string - LocalCache *lcache.LocalCacheState - RemoteCache *rcache.RemoteCache - RemoteCacheHints *rcache.HintStore - Packages *packages.Registry - BuildFilesState *buildfiles.State - Graph *graph.State - Pool *worker2.Engine - Finalizers *finalizers.Finalizers - Runner *targetrun.Runner + Cwd string + Root *hroot.State + Config *config.Config + Observability *observability.Observability + GetFlowID func() string + LocalCache *lcache.LocalCacheState + RemoteCache *rcache.RemoteCache + RemoteCacheHints *rcache.HintStore + Packages *packages.Registry + BuildFilesState *buildfiles.State + Graph *graph.State + Pool *worker2.Engine + BackgroundTracker *worker2.RunningTracker + Finalizers *finalizers.Finalizers + Runner *targetrun.Runner toolsLock locks.Locker } diff --git a/scheduler/target_run.go b/scheduler/target_run.go index d2723322..247d7f02 100644 --- a/scheduler/target_run.go +++ b/scheduler/target_run.go @@ -86,7 +86,7 @@ func (e *Scheduler) Run(ctx context.Context, rr targetrun.Request, iocfg sandbox if len(writeableCaches) > 0 { for _, cache := range writeableCaches { - _ = e.scheduleStoreExternalCache(ctx, rtarget.Target, cache, tracker) + _ = e.scheduleStoreExternalCache(ctx, rtarget.Target, cache, []*worker2.RunningTracker{tracker, e.BackgroundTracker}) } } diff --git a/test/go/e2e.BUILD b/test/go/e2e.BUILD index 2c2bde04..ab09aa50 100644 --- a/test/go/e2e.BUILD +++ b/test/go/e2e.BUILD @@ -3,7 +3,7 @@ load("//test", "e2e_test") e2e_test( name = "sanity_go_version", cmd = "heph run //test/go:version", - expect_output_contains = "go version go1.21.4", + expect_output_contains = "go version go1.22.2", ) e2e_test( diff --git a/worker/poolui/tui.go b/worker/poolui/tui.go index 8c626bd8..80a0f52f 100644 --- a/worker/poolui/tui.go +++ b/worker/poolui/tui.go @@ -6,7 +6,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" "github.com/hephbuild/heph/log/log" - "github.com/hephbuild/heph/utils/ads" + "github.com/hephbuild/heph/status" "github.com/hephbuild/heph/utils/xcontext" "github.com/hephbuild/heph/utils/xtea" "github.com/hephbuild/heph/utils/xtime" @@ -15,8 +15,14 @@ import ( "time" ) +type workerEntry struct { + status status.Statuser + duration time.Duration + exec *worker2.Execution +} + type UpdateMessage struct { - workers []*worker2.Worker + workers []workerEntry stats worker2.Stats final bool } @@ -71,22 +77,38 @@ func (m *Model) updateMsg(final bool) UpdateMessage { } } - s := worker2.CollectStats(m.deps) - return UpdateMessage{ - stats: s, - workers: ads.Filter(m.pool.GetWorkers(), func(worker *worker2.Worker) bool { - exec := worker.Execution() - if exec == nil { - return false - } + var workers []workerEntry + for _, w := range m.pool.GetWorkers() { + exec := w.Execution() + if exec == nil { + continue + } - if _, ok := exec.Dep.(*worker2.Group); ok { - return false - } + if _, ok := exec.Dep.(*worker2.Group); ok { + continue + } - return true - }), - final: final, + var duration time.Duration + if !exec.StartedAt.IsZero() { + duration = time.Since(exec.StartedAt) + } + + if duration < 200*time.Millisecond { + continue + } + + workers = append(workers, workerEntry{ + status: w.GetStatus(), + duration: duration, + exec: exec, + }) + } + + s := worker2.CollectStats(m.deps) + return UpdateMessage{ + stats: s, + workers: workers, + final: final, } } @@ -148,17 +170,14 @@ func (m *Model) View() string { } for _, w := range m.workers { - runtime := "" - if j := w.Execution(); j != nil { - runtime = fmt.Sprintf("=> [%5s]", xtime.FormatFixedWidthDuration(time.Since(j.StartedAt))) - } + runtime := fmt.Sprintf("=> [%5s]", xtime.FormatFixedWidthDuration(w.duration)) - status := w.GetStatus().String(log.Renderer()) - if status == "" { - status = styleFaint.Render("=|") + statusStr := w.status.String(log.Renderer()) + if statusStr == "" { + statusStr = styleFaint.Render("=> Thinking...") } - s.WriteString(fmt.Sprintf("%v %v\n", styleWorkerStart.Render(runtime), status)) + s.WriteString(fmt.Sprintf("%v %v\n", styleWorkerStart.Render(runtime), statusStr)) } return s.String() diff --git a/worker2/dep.go b/worker2/dep.go index 76f60864..82b6945a 100644 --- a/worker2/dep.go +++ b/worker2/dep.go @@ -24,7 +24,7 @@ type Dep interface { setExecution(*Execution) getExecution() *Execution - getMutex() sync.Locker + getMutex() *sync.RWMutex GetScheduler() Scheduler GetRequest() map[string]float64 } @@ -83,7 +83,7 @@ func (a *baseDep) Wait() <-chan struct{} { return doneCh } -func (a *baseDep) getMutex() sync.Locker { +func (a *baseDep) getMutex() *sync.RWMutex { return &a.m } diff --git a/worker2/deps.go b/worker2/deps.go index e47fccbf..fd79d526 100644 --- a/worker2/deps.go +++ b/worker2/deps.go @@ -55,7 +55,7 @@ func (d *Deps) Freeze() { for _, dep := range d.deps.Slice() { if !dep.GetDepsObj().IsFrozen() { - panic("attempting to freeze while all deps aren't frozen") + panic(fmt.Sprintf("attempting to freeze '%v' while all deps aren't frozen, '%v' isnt", d.owner.GetName(), dep.GetName())) } } diff --git a/worker2/engine.go b/worker2/engine.go index 68c72236..6aa2cf31 100644 --- a/worker2/engine.go +++ b/worker2/engine.go @@ -312,8 +312,8 @@ func (e *Engine) Schedule(a Dep) Dep { func (e *Engine) registerOne(dep Dep, lock bool) *Execution { dep = flattenNamed(dep) + m := dep.getMutex() if lock { - m := dep.getMutex() m.Lock() defer m.Unlock() } @@ -322,15 +322,13 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution { return exec } - // force deps registration - _ = dep.GetDepsObj() - exec := &Execution{ ID: atomic.AddUint64(&e.execUid, 1), Dep: dep, outStore: &outStore{}, eventsCh: e.eventsCh, completedCh: make(chan struct{}), + m: m, // see field comments errCh: nil, @@ -344,7 +342,7 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution { exec.c.L.Unlock() }) } - exec.c = sync.NewCond(&exec.m) + exec.c = sync.NewCond(exec.m) dep.setExecution(exec) return exec diff --git a/worker2/execution.go b/worker2/execution.go index 1425fad3..1e6e15b2 100644 --- a/worker2/execution.go +++ b/worker2/execution.go @@ -63,7 +63,7 @@ type Execution struct { errCh chan error // gets populated when exec is called inputs map[string]Value // gets populated before marking as ready - m sync.Mutex + m *sync.RWMutex suspendCh chan struct{} resumeCh chan struct{} diff --git a/worker2/tracker.go b/worker2/tracker.go index 86518b3a..98160a96 100644 --- a/worker2/tracker.go +++ b/worker2/tracker.go @@ -31,6 +31,8 @@ func (t *RunningTracker) Hook() Hook { return func(event Event) { switch event := event.(type) { + case EventDeclared: + t.group.AddDep(event.Dep) case EventScheduled: t.group.AddDep(event.Execution.Dep) case EventStarted: