Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed May 4, 2024
1 parent 57c1fdc commit ec6d932
Show file tree
Hide file tree
Showing 20 changed files with 235 additions and 31 deletions.
17 changes: 13 additions & 4 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,17 @@ func Boot(ctx context.Context, opts BootOpts) (Bootstrap, error) {
return bs, nil
}

func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, error) {
type SchedulerOpts struct {
Approver scheduler.Approver
}

func BootScheduler(ctx context.Context, bs Bootstrap, opts SchedulerOpts) (*scheduler.Scheduler, error) {
fins := &finalizers.Finalizers{}

if opts.Approver == nil {
opts.Approver = scheduler.StaticApprover{Value: false}
}

localCache, err := lcache.NewState(bs.Root, bs.Pool, bs.Graph.Targets(), bs.Observability, fins, bs.Config.Engine.GC, bs.Config.Engine.ParallelCaching)
if err != nil {
return nil, err
Expand Down Expand Up @@ -321,6 +329,7 @@ func BootScheduler(ctx context.Context, bs Bootstrap) (*scheduler.Scheduler, err
BackgroundTracker: worker2.NewRunningTracker(),
Finalizers: fins,
Runner: runner,
Approver: opts.Approver,
})

if bs.Config.Engine.GitCacheHints {
Expand All @@ -346,16 +355,16 @@ type SchedulerBootstrap struct {
Scheduler *scheduler.Scheduler
}

func BootWithScheduler(ctx context.Context, opts BootOpts) (SchedulerBootstrap, error) {
func BootWithScheduler(ctx context.Context, bootOpts BootOpts, schedOpts SchedulerOpts) (SchedulerBootstrap, error) {
ebs := SchedulerBootstrap{}

bs, err := Boot(ctx, opts)
bs, err := Boot(ctx, bootOpts)
if err != nil {
return ebs, err
}
ebs.Bootstrap = bs

e, err := BootScheduler(ctx, bs)
e, err := BootScheduler(ctx, bs, schedOpts)
if err != nil {
return ebs, err
}
Expand Down
2 changes: 1 addition & 1 deletion bootstrap/rrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func RunGen(ctx context.Context, e *scheduler.Scheduler, plain bool, filterFacto
return err
}

err = poolwait.Wait(ctx, fmt.Sprintf("Gen run %v", i), e.Pool, deps, plain, e.Config.ProgressInterval)
err = poolwait.Wait(ctx, fmt.Sprintf("Gen run %v", i), e.Pool, deps, plain, e.Config.ProgressInterval, e.Approver)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion bootstrap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func RunMode(ctx context.Context, e *scheduler.Scheduler, rrs targetrun.Requests
tdeps := tdepsMap.All()
tdeps.AddDep(tracker.Group())

err = poolwait.Wait(ctx, "Run", e.Pool, tdeps, runopts.Plain, e.Config.ProgressInterval)
err = poolwait.Wait(ctx, "Run", e.Pool, tdeps, runopts.Plain, e.Config.ProgressInterval, e.Approver)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions bootstrapwatch/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *State) trigger(ctx context.Context, events []fsEvent) error {

usingBs := false
if bs.Scheduler == nil {
cbs, err := bootstrap.BootWithScheduler(ctx, s.bootopts)
cbs, err := bootstrap.BootWithScheduler(ctx, s.bootopts, bootstrap.SchedulerOpts{})
if err != nil {
return fmt.Errorf("boot: %w", err)
}
Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *State) trigger(ctx context.Context, events []fsEvent) error {
runDeps.AddDep(tdepsMap.All())
runDeps.AddDep(tracker.Group())

err = poolwait.Wait(ctx, "Change", bs.Pool, runDeps, s.runopts.Plain, bs.Config.ProgressInterval)
err = poolwait.Wait(ctx, "Change", bs.Pool, runDeps, s.runopts.Plain, bs.Config.ProgressInterval, bs.Scheduler.Approver)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/heph/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hephbuild/heph/scheduler"
"github.com/hephbuild/heph/specs"
"github.com/hephbuild/heph/utils/tuistatus"
"github.com/hephbuild/heph/worker2/poolui"
"os"
"strings"
"time"
Expand Down Expand Up @@ -51,7 +52,14 @@ func schedulerInit(ctx context.Context, postBoot func(bootstrap.BaseBootstrap) e
// This allows to block reading stdin right after a potential CheckAndUpgrade has run
opts.PostBootBase = postBoot

bs, err := bootstrap.BootWithScheduler(ctx, opts)
schedOpts := bootstrap.SchedulerOpts{
Approver: poolui.NewApprover(),
}
if autoApprove {
schedOpts.Approver = scheduler.StaticApprover{Value: true}
}

bs, err := bootstrap.BootWithScheduler(ctx, opts, schedOpts)
if err != nil {
return bs, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/heph/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ var hashinCmd = &cobra.Command{
return err
}

err = poolwait.Wait(ctx, "Run", bs.Scheduler.Pool, tdeps.All(), *plain, bs.Config.ProgressInterval)
err = poolwait.Wait(ctx, "Run", bs.Scheduler.Pool, tdeps.All(), *plain, bs.Config.ProgressInterval, bs.Scheduler.Approver)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/heph/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var summary *bool
var summaryGen *bool
var jaegerEndpoint *string
var check *bool
var autoApprove bool

func getRunOpts() bootstrap.RunOpts {
return bootstrap.RunOpts{
Expand Down Expand Up @@ -80,6 +81,7 @@ func init() {
alwaysOut = runCmd.Flags().Bool("always-out", false, "Ensure output will be present in cache")
runCmd.Flags().BoolVarP(&all, "all", "a", false, "Force run all")
runCmd.Flags().MarkHidden("all")
runCmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Auto-approve dangerous targets")

check = fmtCmd.Flags().Bool("check", false, "Only check formatting")

Expand Down
1 change: 1 addition & 0 deletions hbuiltin/predeclared.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func target(thread *starlark.Thread, fn *starlark.Builtin, args starlark.Tuple,
"gen_deps_meta?", &sargs.GenDepsMeta,
"annotations?", &sargs.Annotations,
"requests?", &sargs.Requests,
"dangerous?", &sargs.Dangerous,
); err != nil {
if sargs.Name != "" {
return nil, fmt.Errorf("%v: %w", pkg.TargetAddr(sargs.Name), err)
Expand Down
1 change: 1 addition & 0 deletions hbuiltin/predeclared_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type TargetArgs struct {
GenDepsMeta bool
Annotations xstarlark.Distruct
Requests xstarlark.Distruct
Dangerous bool
}

type TargetArgsTransitive struct {
Expand Down
1 change: 1 addition & 0 deletions hbuiltin/target_spec_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func specFromArgs(args TargetArgs, pkg *packages.Package) (specs.Target, error)
OutEnv: args.OutEnv,
HashFile: args.HashFile,
GenDepsMeta: args.GenDepsMeta,
Dangerous: args.Dangerous,
}

var err error
Expand Down
13 changes: 13 additions & 0 deletions scheduler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import (
"sync"
)

type Approver interface {
Approve(ctx context.Context, t specs.Target) bool
}

type StaticApprover struct {
Value bool
}

func (a StaticApprover) Approve(ctx context.Context, t specs.Target) bool {
return a.Value
}

type Scheduler struct {
Cwd string
Root *hroot.State
Expand All @@ -39,6 +51,7 @@ type Scheduler struct {
Finalizers *finalizers.Finalizers
Runner *targetrun.Runner
GitStatus *gitstatus.GitStatus
Approver Approver

toolsLock locks.Locker
}
Expand Down
7 changes: 7 additions & 0 deletions scheduler/target_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ func (e *Scheduler) Run(ctx context.Context, rr targetrun.Request, iocfg sandbox

target := rr.Target

if target.Dangerous {
approved := e.Approver.Approve(ctx, target.Spec())
if !approved {
return fmt.Errorf("dangerous target, pass --auto-approve to run")
}
}

done := log.TraceTiming("run " + target.Addr)
defer done()

Expand Down
1 change: 1 addition & 0 deletions specs/target_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type Target struct {
GenDepsMeta bool
Annotations map[string]interface{}
Requests map[string]float64
Dangerous bool
}

func (t Target) MarshalJSON() ([]byte, error) {
Expand Down
5 changes: 4 additions & 1 deletion utils/locks/flock.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,13 @@ func (l *Flock) lock(ctx context.Context, ro bool) error {
lockCh <- flock.Flock(f, ro, true)
}()

err := worker2.WaitChanE(ctx, lockCh)
lockErr, err := worker2.WaitChanReceive(ctx, lockCh)
if err != nil {
return false, err
}
if lockErr != nil {
return false, lockErr
}

return true, nil
})
Expand Down
77 changes: 77 additions & 0 deletions worker2/poolui/approver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package poolui

import (
"context"
"github.com/hephbuild/heph/specs"
"github.com/hephbuild/heph/worker2"
"sync"
)

func NewApprover() *Approver {
return &Approver{
queue: make(chan ApproveRequest),
}
}

type ApproveRequest struct {
Target specs.Target
Respond func(bool)
}

type Approver struct {
m sync.Mutex
connected bool
queue chan ApproveRequest
}

func (a *Approver) SetConnected(connected bool) {
a.m.Lock()
defer a.m.Unlock()

a.connected = connected

if !connected {
for {
select {
case req := <-a.queue:
req.Respond(false)
default:
return
}
}
}
}

func (a *Approver) Approve(ctx context.Context, t specs.Target) bool {
a.m.Lock()

if !a.connected {
a.m.Unlock()
return false
}
a.m.Unlock()

valueCh := make(chan bool)
defer close(valueCh)

err := worker2.WaitChanSend(ctx, a.queue, ApproveRequest{
Target: t,
Respond: func(approved bool) {
valueCh <- approved
},
})
if err != nil {
return false
}

res, err := worker2.WaitChanReceive(ctx, valueCh)
if err != nil {
return false
}

return res
}

func (a *Approver) Next() ApproveRequest {
return <-a.queue
}
Loading

0 comments on commit ec6d932

Please sign in to comment.