diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 931e6e8b..ee104ebd 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -271,9 +271,17 @@ func Boot(ctx context.Context, opts BootOpts) (Bootstrap, error) { return bs, nil } -func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, error) { +type SchedulerOpts struct { + Approver scheduler.Approver +} + +func BootScheduler(ctx context.Context, bs Bootstrap, opts SchedulerOpts) (*scheduler.Scheduler, error) { fins := &finalizers.Finalizers{} + if opts.Approver == nil { + opts.Approver = scheduler.StaticApprover{Value: false} + } + localCache, err := lcache.NewState(bs.Root, bs.Pool, bs.Graph.Targets(), bs.Observability, fins, bs.Config.Engine.GC, bs.Config.Engine.ParallelCaching) if err != nil { return nil, err @@ -321,6 +329,7 @@ func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, err BackgroundTracker: worker2.NewRunningTracker(), Finalizers: fins, Runner: runner, + Approver: opts.Approver, }) if bs.Config.Engine.GitCacheHints { @@ -346,16 +355,16 @@ type SchedulerBootstrap struct { Scheduler *scheduler.Scheduler } -func BootWithScheduler(ctx context.Context, opts BootOpts) (SchedulerBootstrap, error) { +func BootWithScheduler(ctx context.Context, bootOpts BootOpts, schedOpts SchedulerOpts) (SchedulerBootstrap, error) { ebs := SchedulerBootstrap{} - bs, err := Boot(ctx, opts) + bs, err := Boot(ctx, bootOpts) if err != nil { return ebs, err } ebs.Bootstrap = bs - e, err := BootScheduler(ctx, bs) + e, err := BootScheduler(ctx, bs, schedOpts) if err != nil { return ebs, err } diff --git a/bootstrap/rrs.go b/bootstrap/rrs.go index 966b3523..bb6489d9 100644 --- a/bootstrap/rrs.go +++ b/bootstrap/rrs.go @@ -125,7 +125,7 @@ func RunGen(ctx context.Context, e *scheduler.Scheduler, plain bool, filterFacto return err } - err = poolwait.Wait(ctx, fmt.Sprintf("Gen run %v", i), e.Pool, deps, plain, e.Config.ProgressInterval) + err = poolwait.Wait(ctx, fmt.Sprintf("Gen run %v", i), e.Pool, deps, plain, e.Config.ProgressInterval, e.Approver) if err != nil { return err } diff --git a/bootstrap/run.go b/bootstrap/run.go index c8fdd0de..8a77929b 100644 --- a/bootstrap/run.go +++ b/bootstrap/run.go @@ -78,7 +78,7 @@ func RunMode(ctx context.Context, e *scheduler.Scheduler, rrs targetrun.Requests tdeps := tdepsMap.All() tdeps.AddDep(tracker.Group()) - err = poolwait.Wait(ctx, "Run", e.Pool, tdeps, runopts.Plain, e.Config.ProgressInterval) + err = poolwait.Wait(ctx, "Run", e.Pool, tdeps, runopts.Plain, e.Config.ProgressInterval, e.Approver) if err != nil { return err } diff --git a/bootstrapwatch/bootstrap.go b/bootstrapwatch/bootstrap.go index 7b086179..043a4e12 100644 --- a/bootstrapwatch/bootstrap.go +++ b/bootstrapwatch/bootstrap.go @@ -324,7 +324,7 @@ func (s *State) trigger(ctx context.Context, events []fsEvent) error { usingBs := false if bs.Scheduler == nil { - cbs, err := bootstrap.BootWithScheduler(ctx, s.bootopts) + cbs, err := bootstrap.BootWithScheduler(ctx, s.bootopts, bootstrap.SchedulerOpts{}) if err != nil { return fmt.Errorf("boot: %w", err) } @@ -379,7 +379,7 @@ func (s *State) trigger(ctx context.Context, events []fsEvent) error { runDeps.AddDep(tdepsMap.All()) runDeps.AddDep(tracker.Group()) - err = poolwait.Wait(ctx, "Change", bs.Pool, runDeps, s.runopts.Plain, bs.Config.ProgressInterval) + err = poolwait.Wait(ctx, "Change", bs.Pool, runDeps, s.runopts.Plain, bs.Config.ProgressInterval, bs.Scheduler.Approver) if err != nil { return err } diff --git a/cmd/heph/init.go b/cmd/heph/init.go index 3ba4fed4..e2bdf5ed 100644 --- a/cmd/heph/init.go +++ b/cmd/heph/init.go @@ -8,6 +8,7 @@ import ( "github.com/hephbuild/heph/scheduler" "github.com/hephbuild/heph/specs" "github.com/hephbuild/heph/utils/tuistatus" + "github.com/hephbuild/heph/worker2/poolui" "os" "strings" "time" @@ -51,7 +52,14 @@ func schedulerInit(ctx context.Context, postBoot func(bootstrap.BaseBootstrap) e // This allows to block reading stdin right after a potential CheckAndUpgrade has run opts.PostBootBase = postBoot - bs, err := bootstrap.BootWithScheduler(ctx, opts) + schedOpts := bootstrap.SchedulerOpts{ + Approver: poolui.NewApprover(), + } + if autoApprove { + schedOpts.Approver = scheduler.StaticApprover{Value: true} + } + + bs, err := bootstrap.BootWithScheduler(ctx, opts, schedOpts) if err != nil { return bs, err } diff --git a/cmd/heph/query.go b/cmd/heph/query.go index 3a2515db..e295da78 100644 --- a/cmd/heph/query.go +++ b/cmd/heph/query.go @@ -809,7 +809,7 @@ var hashinCmd = &cobra.Command{ return err } - err = poolwait.Wait(ctx, "Run", bs.Scheduler.Pool, tdeps.All(), *plain, bs.Config.ProgressInterval) + err = poolwait.Wait(ctx, "Run", bs.Scheduler.Pool, tdeps.All(), *plain, bs.Config.ProgressInterval, bs.Scheduler.Approver) if err != nil { return err } diff --git a/cmd/heph/root.go b/cmd/heph/root.go index a6804e2d..5b4d2e10 100644 --- a/cmd/heph/root.go +++ b/cmd/heph/root.go @@ -43,6 +43,7 @@ var summary *bool var summaryGen *bool var jaegerEndpoint *string var check *bool +var autoApprove bool func getRunOpts() bootstrap.RunOpts { return bootstrap.RunOpts{ @@ -80,6 +81,7 @@ func init() { alwaysOut = runCmd.Flags().Bool("always-out", false, "Ensure output will be present in cache") runCmd.Flags().BoolVarP(&all, "all", "a", false, "Force run all") runCmd.Flags().MarkHidden("all") + runCmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Auto-approve dangerous targets") check = fmtCmd.Flags().Bool("check", false, "Only check formatting") diff --git a/hbuiltin/predeclared.go b/hbuiltin/predeclared.go index fc4cda8d..15eff21f 100644 --- a/hbuiltin/predeclared.go +++ b/hbuiltin/predeclared.go @@ -184,6 +184,7 @@ func target(thread *starlark.Thread, fn *starlark.Builtin, args starlark.Tuple, "gen_deps_meta?", &sargs.GenDepsMeta, "annotations?", &sargs.Annotations, "requests?", &sargs.Requests, + "dangerous?", &sargs.Dangerous, ); err != nil { if sargs.Name != "" { return nil, fmt.Errorf("%v: %w", pkg.TargetAddr(sargs.Name), err) diff --git a/hbuiltin/predeclared_types.go b/hbuiltin/predeclared_types.go index a2cadc11..7b0dda58 100644 --- a/hbuiltin/predeclared_types.go +++ b/hbuiltin/predeclared_types.go @@ -42,6 +42,7 @@ type TargetArgs struct { GenDepsMeta bool Annotations xstarlark.Distruct Requests xstarlark.Distruct + Dangerous bool } type TargetArgsTransitive struct { diff --git a/hbuiltin/target_spec_parser.go b/hbuiltin/target_spec_parser.go index ddb0b959..594db203 100644 --- a/hbuiltin/target_spec_parser.go +++ b/hbuiltin/target_spec_parser.go @@ -57,6 +57,7 @@ func specFromArgs(args TargetArgs, pkg *packages.Package) (specs.Target, error) OutEnv: args.OutEnv, HashFile: args.HashFile, GenDepsMeta: args.GenDepsMeta, + Dangerous: args.Dangerous, } var err error diff --git a/scheduler/engine.go b/scheduler/engine.go index 5acdc4db..a75e3c8a 100644 --- a/scheduler/engine.go +++ b/scheduler/engine.go @@ -22,6 +22,18 @@ import ( "sync" ) +type Approver interface { + Approve(ctx context.Context, t specs.Target) bool +} + +type StaticApprover struct { + Value bool +} + +func (a StaticApprover) Approve(ctx context.Context, t specs.Target) bool { + return a.Value +} + type Scheduler struct { Cwd string Root *hroot.State @@ -39,6 +51,7 @@ type Scheduler struct { Finalizers *finalizers.Finalizers Runner *targetrun.Runner GitStatus *gitstatus.GitStatus + Approver Approver toolsLock locks.Locker } diff --git a/scheduler/target_run.go b/scheduler/target_run.go index 247d7f02..33f3b81b 100644 --- a/scheduler/target_run.go +++ b/scheduler/target_run.go @@ -24,6 +24,13 @@ func (e *Scheduler) Run(ctx context.Context, rr targetrun.Request, iocfg sandbox target := rr.Target + if target.Dangerous { + approved := e.Approver.Approve(ctx, target.Spec()) + if !approved { + return fmt.Errorf("dangerous target, pass --auto-approve to run") + } + } + done := log.TraceTiming("run " + target.Addr) defer done() diff --git a/specs/target_spec.go b/specs/target_spec.go index fe5fde11..d7cc2e4c 100644 --- a/specs/target_spec.go +++ b/specs/target_spec.go @@ -136,6 +136,7 @@ type Target struct { GenDepsMeta bool Annotations map[string]interface{} Requests map[string]float64 + Dangerous bool } func (t Target) MarshalJSON() ([]byte, error) { diff --git a/utils/locks/flock.go b/utils/locks/flock.go index 1ab31248..c139b28c 100644 --- a/utils/locks/flock.go +++ b/utils/locks/flock.go @@ -150,10 +150,13 @@ func (l *Flock) lock(ctx context.Context, ro bool) error { lockCh <- flock.Flock(f, ro, true) }() - err := worker2.WaitChanE(ctx, lockCh) + lockErr, err := worker2.WaitChanReceive(ctx, lockCh) if err != nil { return false, err } + if lockErr != nil { + return false, lockErr + } return true, nil }) diff --git a/worker2/poolui/tui.go b/worker2/poolui/tui.go index 80a0f52f..5c5f1af2 100644 --- a/worker2/poolui/tui.go +++ b/worker2/poolui/tui.go @@ -6,6 +6,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" "github.com/hephbuild/heph/log/log" + "github.com/hephbuild/heph/scheduler" "github.com/hephbuild/heph/status" "github.com/hephbuild/heph/utils/xcontext" "github.com/hephbuild/heph/utils/xtea" @@ -27,8 +28,8 @@ type UpdateMessage struct { final bool } -func New(ctx context.Context, name string, deps worker2.Dep, pool *worker2.Engine, quitWhenDone bool) *Model { - return &Model{ +func New(ctx context.Context, name string, deps worker2.Dep, pool *worker2.Engine, approver scheduler.Approver, quitWhenDone bool) *Model { + m := &Model{ name: name, deps: deps, pool: pool, @@ -39,25 +40,36 @@ func New(ctx context.Context, name string, deps worker2.Dep, pool *worker2.Engin log: xtea.NewLogModel(), quitWhenDone: quitWhenDone, } + if approver, ok := approver.(*Approver); ok { + m.approver = approver + } + + return m } type Model struct { - name string - deps worker2.Dep - start time.Time - cancel func() - pool *worker2.Engine - log xtea.LogModel - quitWhenDone bool + name string + deps worker2.Dep + start time.Time + cancel func() + pool *worker2.Engine + log xtea.LogModel + quitWhenDone bool + approver *Approver + approveRequest *ApproveRequest UpdateMessage } func (m *Model) Init() tea.Cmd { m.log.Init() m.UpdateMessage = m.updateMsg(false) + if m.approver != nil { + m.approver.SetConnected(true) + } return tea.Batch( m.log.Next, m.doUpdateMsgTicker(), + m.doApproveMsg(), ) } @@ -67,6 +79,18 @@ func (m *Model) doUpdateMsgTicker() tea.Cmd { }) } +func (m *Model) doApproveMsg() tea.Cmd { + if m.approver == nil { + return nil + } + + return func() tea.Msg { + req := m.approver.Next() + + return req + } +} + func (m *Model) updateMsg(final bool) UpdateMessage { if !final { select { @@ -124,6 +148,21 @@ func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case tea.KeyBreak: m.cancel() return m, nil + case tea.KeyRunes: + switch msg.String() { + case "Y", "y": + if req := m.approveRequest; req != nil { + req.Respond(true) + m.approveRequest = nil + cmds = append(cmds, m.doApproveMsg()) + } + case "N", "n": + if req := m.approveRequest; req != nil { + req.Respond(false) + m.approveRequest = nil + cmds = append(cmds, m.doApproveMsg()) + } + } } case UpdateMessage: m.UpdateMessage = msg @@ -133,6 +172,8 @@ func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } } return m, m.doUpdateMsgTicker() + case ApproveRequest: + m.approveRequest = &msg } m.log, cmd = m.log.Update(msg) @@ -180,9 +221,16 @@ func (m *Model) View() string { s.WriteString(fmt.Sprintf("%v %v\n", styleWorkerStart.Render(runtime), statusStr)) } + if req := m.approveRequest; req != nil { + s.WriteString(fmt.Sprintf("\n%v needs your approval to run [Yy/Nn]", req.Target.Addr)) + } + return s.String() } func (m *Model) Clean() { + if m.approver != nil { + m.approver.SetConnected(false) + } m.log.Clean() } diff --git a/worker2/poolwait/tui.go b/worker2/poolwait/tui.go index fc6601ac..58745f9b 100644 --- a/worker2/poolwait/tui.go +++ b/worker2/poolwait/tui.go @@ -2,20 +2,21 @@ package poolwait import ( "context" + "github.com/hephbuild/heph/scheduler" "github.com/hephbuild/heph/utils/xtea" "github.com/hephbuild/heph/worker2" "github.com/hephbuild/heph/worker2/poolui" "time" ) -func termUI(ctx context.Context, name string, deps worker2.Dep, pool *worker2.Engine) error { +func termUI(ctx context.Context, name string, deps worker2.Dep, pool *worker2.Engine, approver scheduler.Approver) error { if !xtea.SingleflightTry() { return logUI(name, deps, pool, time.Second) } defer xtea.SingleflightDone() - m := poolui.New(ctx, name, deps, pool, true) + m := poolui.New(ctx, name, deps, pool, approver, true) defer m.Clean() err := xtea.RunModel(m) diff --git a/worker2/poolwait/wait.go b/worker2/poolwait/wait.go index 81f9c67c..756413ff 100644 --- a/worker2/poolwait/wait.go +++ b/worker2/poolwait/wait.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/hephbuild/heph/log/log" + "github.com/hephbuild/heph/scheduler" "github.com/hephbuild/heph/utils/xtea" "github.com/hephbuild/heph/worker2" "os" @@ -17,7 +18,7 @@ func init() { debug, _ = strconv.ParseBool(os.Getenv("HEPH_DEBUG_POOLWAIT")) } -func Wait(ctx context.Context, name string, pool *worker2.Engine, deps worker2.Dep, plain bool, interval time.Duration) error { +func Wait(ctx context.Context, name string, pool *worker2.Engine, deps worker2.Dep, plain bool, interval time.Duration, approver scheduler.Approver) error { pool.Schedule(deps) if debug { @@ -36,7 +37,7 @@ func Wait(ctx context.Context, name string, pool *worker2.Engine, deps worker2.D }() if useTUI { - err := termUI(ctx, name, deps, pool) + err := termUI(ctx, name, deps, pool, approver) if err != nil { return fmt.Errorf("poolui: %w", err) } diff --git a/worker2/suspend.go b/worker2/suspend.go index 4ccd8ddf..19f95496 100644 --- a/worker2/suspend.go +++ b/worker2/suspend.go @@ -13,6 +13,7 @@ func executionFromContext(ctx context.Context) *Execution { return e } +// Wait suspends execution until the function returns func Wait(ctx context.Context, f func()) { _ = WaitE(ctx, func() error { f() @@ -20,6 +21,7 @@ func Wait(ctx context.Context, f func()) { }) } +// WaitE suspends execution until the function returns func WaitE(ctx context.Context, f func() error) error { e := executionFromContext(ctx) if e == nil { @@ -39,10 +41,16 @@ func WaitE(ctx context.Context, f func() error) error { return nil } +// WaitDep suspends execution until Dep completes or the context gets canceled func WaitDep(ctx context.Context, dep Dep) error { + return WaitChan(ctx, dep.Wait()) +} + +// WaitChan suspends execution until reading from the channel returns or the context gets canceled +func WaitChan[T any](ctx context.Context, ch <-chan T) error { return WaitE(ctx, func() error { select { - case <-dep.Wait(): + case <-ch: return nil case <-ctx.Done(): return ctx.Err() @@ -50,22 +58,28 @@ func WaitDep(ctx context.Context, dep Dep) error { }) } -func WaitChan[T any](ctx context.Context, ch <-chan T) error { - return WaitE(ctx, func() error { +// WaitChanReceive suspends execution until reading from the channel returns or the context gets canceled +// Unlike WaitChan, it will return the value +func WaitChanReceive[T any](ctx context.Context, ch <-chan T) (T, error) { + var out T + err := WaitE(ctx, func() error { select { - case <-ch: + case v := <-ch: + out = v return nil case <-ctx.Done(): return ctx.Err() } }) + return out, err } -func WaitChanE[T error](ctx context.Context, ch <-chan T) error { +// WaitChanSend suspends execution until sending to the channel succeeds or the context gets canceled +func WaitChanSend[T any](ctx context.Context, ch chan<- T, v T) error { return WaitE(ctx, func() error { select { - case err := <-ch: - return err + case ch <- v: + return nil case <-ctx.Done(): return ctx.Err() } diff --git a/x/BUILD b/x/BUILD index 1d6738eb..41243ea7 100644 --- a/x/BUILD +++ b/x/BUILD @@ -132,3 +132,20 @@ target( deps = reqdeps, cache = False, ) + +dangerdeps = [] +for i in range(0, 3): + t = target( + name = "danger{}".format(i), + cache = False, + dangerous = True, + ) + dangerdeps.append(t) + +target( + name = "danger", + run = "echo ran", + deps = dangerdeps, + cache = False, + dangerous = True, +)