diff --git a/solver/edge.go b/solver/edge.go index 50e4f77f0d1b..51b4f6319af2 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { e := &edge{ edge: ed, op: op, - depRequests: map[pipe.Receiver]*dep{}, + depRequests: map[pipeReceiver]*dep{}, keyMap: map[string]struct{}{}, cacheRecords: map[string]*CacheRecord{}, cacheRecordsLoaded: map[string]struct{}{}, @@ -65,14 +65,14 @@ type edge struct { op activeOp edgeState - depRequests map[pipe.Receiver]*dep + depRequests map[pipeReceiver]*dep deps []*dep - cacheMapReq pipe.Receiver + cacheMapReq pipeReceiver cacheMapDone bool cacheMapIndex int cacheMapDigests []digest.Digest - execReq pipe.Receiver + execReq pipeReceiver execCacheLoad bool err error cacheRecords map[string]*CacheRecord @@ -99,11 +99,11 @@ type edge struct { // dep holds state for a dependant edge type dep struct { - req pipe.Receiver + req pipeReceiver edgeState index Index keyMap map[string]*CacheKey - slowCacheReq pipe.Receiver + slowCacheReq pipeReceiver slowCacheComplete bool slowCacheFoundKey bool slowCacheKey *ExportableCacheKey @@ -122,7 +122,7 @@ func newDep(i Index) *dep { // edgePipe is a pipe for requests between two edges type edgePipe struct { - *pipe.Pipe + *pipe.Pipe[*edgeRequest, any] From, Target *edge mu sync.Mutex } @@ -198,21 +198,21 @@ func (e *edge) isComplete() bool { } // finishIncoming finalizes the incoming pipe request -func (e *edge) finishIncoming(req pipe.Sender) { +func (e *edge) finishIncoming(req pipeSender) { err := e.err if req.Request().Canceled && err == nil { err = context.Canceled } if e.debug { - bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) + bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState) } req.Finalize(&e.edgeState, err) } // updateIncoming updates the current value of incoming pipe request -func (e *edge) updateIncoming(req pipe.Sender) { +func (e *edge) updateIncoming(req pipeSender) { if e.debug { - bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) + bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState) } req.Update(&e.edgeState) } @@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool { // requests were not completed // 2. this function may not return outgoing requests if it has completed all // incoming requests -func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) { +func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) { // process all incoming changes depChanged := false for _, upt := range updates { @@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) { } // processUpdate is called by unpark for every updated pipe request -func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { +func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) { // response for cachemap request if upt == e.cacheMapReq && upt.Status().Completed { if err := upt.Status().Err; err != nil { @@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() { // respondToIncoming responds to all incoming requests. completing or // updating them when possible -func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) { +func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) { // detect the result state for the requests allIncomingCanComplete := true desiredState := e.state @@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive for _, req := range incoming { if !req.Request().Canceled { allCanceled = false - if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState { + if r := req.Request().Payload; desiredState < r.desiredState { desiredState = r.desiredState if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) { allIncomingCanComplete = false @@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive } // can close all but one requests - var leaveOpen pipe.Sender + var leaveOpen pipeSender for _, req := range incoming { if !req.Request().Canceled { leaveOpen = req @@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive // update incoming based on current state for _, req := range incoming { - r := req.Request().Payload.(*edgeRequest) + r := req.Request().Payload if req.Request().Canceled { e.finishIncoming(req) } else if !e.hasActiveOutgoing && e.state >= r.desiredState { @@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, // initialize deps state if e.deps == nil { - e.depRequests = make(map[pipe.Receiver]*dep) + e.depRequests = make(map[pipeReceiver]*dep) e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) for i := range e.edge.Vertex.Inputs() { e.deps = append(e.deps, newDep(Index(i))) @@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, if dep.state < desiredStateDep { addNew := true if dep.req != nil && !dep.req.Status().Completed { - if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep { + if dep.req.Request().desiredState != desiredStateDep { if e.debug { bklog.G(context.TODO()). WithField("edge_vertex_name", e.edge.Vertex.Name()). WithField("edge_vertex_digest", e.edge.Vertex.Digest()). WithField("dep_index", dep.index). - WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState). + WithField("dep_req_desired_state", dep.req.Request().desiredState). WithField("dep_desired_state", desiredStateDep). WithField("dep_state", dep.state). Debug("cancel input request") @@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, WithField("edge_vertex_name", e.edge.Vertex.Name()). WithField("edge_vertex_digest", e.edge.Vertex.Digest()). WithField("dep_index", dep.index). - WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState). + WithField("dep_req_desired_state", dep.req.Request().desiredState). WithField("dep_desired_state", desiredStateDep). WithField("dep_state", dep.state). Debug("skip input request based on existing request") @@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW } return out } + +type ( + pipeRequest = pipe.Request[*edgeRequest] + pipeSender = pipe.Sender[*edgeRequest, any] + pipeReceiver = pipe.Receiver[*edgeRequest, any] +) + +func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] { + return pipe.New[*edgeRequest, any](req) +} diff --git a/solver/internal/pipe/pipe.go b/solver/internal/pipe/pipe.go index 01cfa5a06b6d..55309c2a9017 100644 --- a/solver/internal/pipe/pipe.go +++ b/solver/internal/pipe/pipe.go @@ -8,67 +8,63 @@ import ( "github.com/pkg/errors" ) -type channel struct { +type channel[V any] struct { OnSendCompletion func() - value atomic.Value - lastValue *wrappedValue + value atomic.Pointer[V] + lastValue *V } -type wrappedValue struct { - value interface{} -} - -func (c *channel) Send(v interface{}) { - c.value.Store(&wrappedValue{value: v}) +func (c *channel[V]) Send(v V) { + c.value.Store(&v) if c.OnSendCompletion != nil { c.OnSendCompletion() } } -func (c *channel) Receive() (interface{}, bool) { +func (c *channel[V]) Receive() (V, bool) { v := c.value.Load() - if v == nil || v.(*wrappedValue) == c.lastValue { - return nil, false + if v == nil || v == c.lastValue { + return *new(V), false } - c.lastValue = v.(*wrappedValue) - return v.(*wrappedValue).value, true + c.lastValue = v + return *v, true } -type Pipe struct { - Sender Sender - Receiver Receiver +type Pipe[Payload, Value any] struct { + Sender Sender[Payload, Value] + Receiver Receiver[Payload, Value] OnReceiveCompletion func() OnSendCompletion func() } -type Request struct { - Payload interface{} +type Request[Payload any] struct { + Payload Payload Canceled bool } -type Sender interface { - Request() Request - Update(v interface{}) - Finalize(v interface{}, err error) - Status() Status +type Sender[Payload, Value any] interface { + Request() Request[Payload] + Update(v Value) + Finalize(v Value, err error) + Status() Status[Value] } -type Receiver interface { +type Receiver[Payload, Value any] interface { Receive() bool Cancel() - Status() Status - Request() interface{} + Status() Status[Value] + Request() Payload } -type Status struct { +type Status[Value any] struct { Canceled bool Completed bool Err error - Value interface{} + Value Value } -func NewWithFunction(f func(context.Context) (interface{}, error)) (*Pipe, func()) { - p := New(Request{}) +func NewWithFunction[Payload, Value any](f func(context.Context) (Value, error)) (*Pipe[Payload, Value], func()) { + p := New[Payload, Value](Request[Payload]{}) ctx, cancel := context.WithCancelCause(context.TODO()) @@ -81,27 +77,27 @@ func NewWithFunction(f func(context.Context) (interface{}, error)) (*Pipe, func( return p, func() { res, err := f(ctx) if err != nil { - p.Sender.Finalize(nil, err) + p.Sender.Finalize(*new(Value), err) return } p.Sender.Finalize(res, nil) } } -func New(req Request) *Pipe { - cancelCh := &channel{} - roundTripCh := &channel{} - pw := &sender{ +func New[Payload, Value any](req Request[Payload]) *Pipe[Payload, Value] { + cancelCh := &channel[Request[Payload]]{} + roundTripCh := &channel[Status[Value]]{} + pw := &sender[Payload, Value]{ req: req, sendChannel: roundTripCh, } - pr := &receiver{ + pr := &receiver[Payload, Value]{ req: req, recvChannel: roundTripCh, sendChannel: cancelCh, } - p := &Pipe{ + p := &Pipe[Payload, Value]{ Sender: pw, Receiver: pr, } @@ -109,7 +105,7 @@ func New(req Request) *Pipe { cancelCh.OnSendCompletion = func() { v, ok := cancelCh.Receive() if ok { - pw.setRequest(v.(Request)) + pw.setRequest(v) } if p.OnReceiveCompletion != nil { p.OnReceiveCompletion() @@ -125,38 +121,36 @@ func New(req Request) *Pipe { return p } -type sender struct { - status Status - req Request - sendChannel *channel +type sender[Payload, Value any] struct { + status Status[Value] + req Request[Payload] + sendChannel *channel[Status[Value]] mu sync.Mutex } -func (pw *sender) Status() Status { +func (pw *sender[Payload, Value]) Status() Status[Value] { return pw.status } -func (pw *sender) Request() Request { +func (pw *sender[Payload, Value]) Request() Request[Payload] { pw.mu.Lock() defer pw.mu.Unlock() return pw.req } -func (pw *sender) setRequest(req Request) { +func (pw *sender[Payload, Value]) setRequest(req Request[Payload]) { pw.mu.Lock() defer pw.mu.Unlock() pw.req = req } -func (pw *sender) Update(v interface{}) { +func (pw *sender[Payload, Value]) Update(v Value) { pw.status.Value = v pw.sendChannel.Send(pw.status) } -func (pw *sender) Finalize(v interface{}, err error) { - if v != nil { - pw.status.Value = v - } +func (pw *sender[Payload, Value]) Finalize(v Value, err error) { + pw.status.Value = v pw.status.Err = err pw.status.Completed = true if errors.Is(err, context.Canceled) && pw.req.Canceled { @@ -165,27 +159,27 @@ func (pw *sender) Finalize(v interface{}, err error) { pw.sendChannel.Send(pw.status) } -type receiver struct { - status Status - req Request - recvChannel *channel - sendChannel *channel +type receiver[Payload, Value any] struct { + status Status[Value] + req Request[Payload] + recvChannel *channel[Status[Value]] + sendChannel *channel[Request[Payload]] } -func (pr *receiver) Request() interface{} { +func (pr *receiver[Payload, Value]) Request() Payload { return pr.req.Payload } -func (pr *receiver) Receive() bool { +func (pr *receiver[Payload, Value]) Receive() bool { v, ok := pr.recvChannel.Receive() if !ok { return false } - pr.status = v.(Status) + pr.status = v return true } -func (pr *receiver) Cancel() { +func (pr *receiver[Payload, Value]) Cancel() { req := pr.req if req.Canceled { return @@ -194,6 +188,6 @@ func (pr *receiver) Cancel() { pr.sendChannel.Send(req) } -func (pr *receiver) Status() Status { +func (pr *receiver[Payload, Value]) Status() Status[Value] { return pr.status } diff --git a/solver/internal/pipe/pipe_test.go b/solver/internal/pipe/pipe_test.go index 605a8a65bd88..b7ea1508faa6 100644 --- a/solver/internal/pipe/pipe_test.go +++ b/solver/internal/pipe/pipe_test.go @@ -11,10 +11,10 @@ func TestPipe(t *testing.T) { t.Parallel() runCh := make(chan struct{}) - f := func(ctx context.Context) (interface{}, error) { + f := func(ctx context.Context) (string, error) { select { case <-ctx.Done(): - return nil, context.Cause(ctx) + return "", context.Cause(ctx) case <-runCh: return "res0", nil } @@ -27,7 +27,7 @@ func TestPipe(t *testing.T) { waitSignal <- struct{}{} } - p, start := NewWithFunction(f) + p, start := NewWithFunction[any](f) p.OnSendCompletion = signal go start() require.Equal(t, false, p.Receiver.Receive()) @@ -35,7 +35,7 @@ func TestPipe(t *testing.T) { st := p.Receiver.Status() require.Equal(t, false, st.Completed) require.Equal(t, false, st.Canceled) - require.Nil(t, st.Value) + require.Zero(t, st.Value) require.Equal(t, 0, signalled) close(runCh) @@ -46,17 +46,17 @@ func TestPipe(t *testing.T) { require.Equal(t, true, st.Completed) require.Equal(t, false, st.Canceled) require.NoError(t, st.Err) - require.Equal(t, "res0", st.Value.(string)) + require.Equal(t, "res0", st.Value) } func TestPipeCancel(t *testing.T) { t.Parallel() runCh := make(chan struct{}) - f := func(ctx context.Context) (interface{}, error) { + f := func(ctx context.Context) (string, error) { select { case <-ctx.Done(): - return nil, context.Cause(ctx) + return "", context.Cause(ctx) case <-runCh: return "res0", nil } @@ -69,7 +69,7 @@ func TestPipeCancel(t *testing.T) { waitSignal <- struct{}{} } - p, start := NewWithFunction(f) + p, start := NewWithFunction[any](f) p.OnSendCompletion = signal go start() p.Receiver.Receive() @@ -77,7 +77,7 @@ func TestPipeCancel(t *testing.T) { st := p.Receiver.Status() require.Equal(t, false, st.Completed) require.Equal(t, false, st.Canceled) - require.Nil(t, st.Value) + require.Zero(t, st.Value) require.Equal(t, 0, signalled) p.Receiver.Cancel() diff --git a/solver/scheduler.go b/solver/scheduler.go index ec3da4fc89ef..194f4813626b 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -13,9 +13,11 @@ import ( "github.com/tonistiigi/go-csvvalue" ) -var debugScheduler = false // TODO: replace with logs in build trace -var debugSchedulerSteps []string -var debugSchedulerStepsParseOnce sync.Once +var ( + debugScheduler = false // TODO: replace with logs in build trace + debugSchedulerSteps []string + debugSchedulerStepsParseOnce sync.Once +) func init() { if os.Getenv("BUILDKIT_SCHEDULER_DEBUG") == "1" { @@ -121,17 +123,17 @@ func (s *scheduler) loop() { // dispatch schedules an edge to be processed func (s *scheduler) dispatch(e *edge) { - inc := make([]pipe.Sender, len(s.incoming[e])) + inc := make([]pipeSender, len(s.incoming[e])) for i, p := range s.incoming[e] { inc[i] = p.Sender } - out := make([]pipe.Receiver, len(s.outgoing[e])) + out := make([]pipeReceiver, len(s.outgoing[e])) for i, p := range s.outgoing[e] { out[i] = p.Receiver } e.hasActiveOutgoing = false - updates := []pipe.Receiver{} + updates := []pipeReceiver{} for _, p := range out { if ok := p.Receive(); ok { updates = append(updates, p) @@ -272,7 +274,7 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) wait := make(chan struct{}) - p := s.newPipe(e, nil, pipe.Request{Payload: &edgeRequest{desiredState: edgeStatusComplete}}) + p := s.newPipe(e, nil, pipeRequest{Payload: &edgeRequest{desiredState: edgeStatusComplete}}) p.OnSendCompletion = func() { p.Receiver.Receive() if p.Receiver.Status().Completed { @@ -298,9 +300,9 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) } // newPipe creates a new request pipe between two edges -func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe { +func (s *scheduler) newPipe(target, from *edge, req pipeRequest) *pipe.Pipe[*edgeRequest, any] { p := &edgePipe{ - Pipe: pipe.New(req), + Pipe: newPipe(req), Target: target, From: from, } @@ -324,8 +326,8 @@ func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe { } // newRequestWithFunc creates a new request pipe that invokes a async function -func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interface{}, error)) pipe.Receiver { - pp, start := pipe.NewWithFunction(f) +func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (any, error)) pipeReceiver { + pp, start := pipe.NewWithFunction[*edgeRequest](f) p := &edgePipe{ Pipe: pp, From: e, @@ -395,7 +397,7 @@ type pipeFactory struct { s *scheduler } -func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver { +func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipeReceiver { target := pf.s.ef.getEdge(ee) if target == nil { bklog.G(context.TODO()). @@ -407,14 +409,14 @@ func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver return nil, errdefs.Internal(errors.Errorf("failed to get edge: inconsistent graph state in edge %s %s %d", ee.Vertex.Name(), ee.Vertex.Digest(), ee.Index)) }) } - p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req}) + p := pf.s.newPipe(target, pf.e, pipeRequest{Payload: req}) if pf.e.debug { bklog.G(context.TODO()).Debugf("> newPipe %s %p desiredState=%s", ee.Vertex.Name(), p, req.desiredState) } return p.Receiver } -func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipe.Receiver { +func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipeReceiver { p := pf.s.newRequestWithFunc(pf.e, f) if pf.e.debug { bklog.G(context.TODO()).Debugf("> newFunc %p", p) @@ -422,7 +424,7 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro return p } -func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) { +func debugSchedulerPreUnpark(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) { log := bklog.G(context.TODO()). WithField("edge_vertex_name", e.edge.Vertex.Name()). WithField("edge_vertex_digest", e.edge.Vertex.Digest()). @@ -438,7 +440,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip for i, dep := range e.deps { des := edgeStatusInitial if dep.req != nil { - des = dep.req.Request().(*edgeRequest).desiredState + des = dep.req.Request().desiredState } log. WithField("dep_index", i). @@ -457,7 +459,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip log. WithField("incoming_index", i). WithField("incoming_pointer", in). - WithField("incoming_desired_state", req.Payload.(*edgeRequest).desiredState). + WithField("incoming_desired_state", req.Payload.desiredState). WithField("incoming_canceled", req.Canceled). Debug("> incoming") } @@ -499,7 +501,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip } } -func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) { +func debugSchedulerPostUnpark(e *edge, inc []pipeSender) { log := bklog.G(context.TODO()) for i, in := range inc { log.