From ad64996f49f4916b1b727a64625eb677fbbe886d Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 12 Sep 2024 10:37:12 -0500 Subject: [PATCH] solver: pipe implementation utilizes generics for better typing This updates the pipe library to use generics for the request payload and the status value. This allows the solver to put in explicit types rather than rely on type casting from interfaces which helps with type safety and understandability. The status value used by the solver uses the `any` type instead of an explicit type because the `unpark` method takes a generic list of pipes and the different pipes have different result types. We can likely change this in the future or create a discriminated union for the types that can be used in this package. That is left for future work because at least the request payload is typed now. Signed-off-by: Jonathan A. Sternberg --- solver/edge.go | 52 ++++++++------ solver/internal/pipe/pipe.go | 116 ++++++++++++++---------------- solver/internal/pipe/pipe_test.go | 18 ++--- solver/scheduler.go | 38 +++++----- 4 files changed, 115 insertions(+), 109 deletions(-) diff --git a/solver/edge.go b/solver/edge.go index 50e4f77f0d1b..51b4f6319af2 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { e := &edge{ edge: ed, op: op, - depRequests: map[pipe.Receiver]*dep{}, + depRequests: map[pipeReceiver]*dep{}, keyMap: map[string]struct{}{}, cacheRecords: map[string]*CacheRecord{}, cacheRecordsLoaded: map[string]struct{}{}, @@ -65,14 +65,14 @@ type edge struct { op activeOp edgeState - depRequests map[pipe.Receiver]*dep + depRequests map[pipeReceiver]*dep deps []*dep - cacheMapReq pipe.Receiver + cacheMapReq pipeReceiver cacheMapDone bool cacheMapIndex int cacheMapDigests []digest.Digest - execReq pipe.Receiver + execReq pipeReceiver execCacheLoad bool err error cacheRecords map[string]*CacheRecord @@ -99,11 +99,11 @@ type edge struct { // dep holds state for a dependant edge type dep struct { - req pipe.Receiver + req pipeReceiver edgeState index Index keyMap map[string]*CacheKey - slowCacheReq pipe.Receiver + slowCacheReq pipeReceiver slowCacheComplete bool slowCacheFoundKey bool slowCacheKey *ExportableCacheKey @@ -122,7 +122,7 @@ func newDep(i Index) *dep { // edgePipe is a pipe for requests between two edges type edgePipe struct { - *pipe.Pipe + *pipe.Pipe[*edgeRequest, any] From, Target *edge mu sync.Mutex } @@ -198,21 +198,21 @@ func (e *edge) isComplete() bool { } // finishIncoming finalizes the incoming pipe request -func (e *edge) finishIncoming(req pipe.Sender) { +func (e *edge) finishIncoming(req pipeSender) { err := e.err if req.Request().Canceled && err == nil { err = context.Canceled } if e.debug { - bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) + bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState) } req.Finalize(&e.edgeState, err) } // updateIncoming updates the current value of incoming pipe request -func (e *edge) updateIncoming(req pipe.Sender) { +func (e *edge) updateIncoming(req pipeSender) { if e.debug { - bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) + bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState) } req.Update(&e.edgeState) } @@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool { // requests were not completed // 2. this function may not return outgoing requests if it has completed all // incoming requests -func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) { +func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) { // process all incoming changes depChanged := false for _, upt := range updates { @@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) { } // processUpdate is called by unpark for every updated pipe request -func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { +func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) { // response for cachemap request if upt == e.cacheMapReq && upt.Status().Completed { if err := upt.Status().Err; err != nil { @@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() { // respondToIncoming responds to all incoming requests. completing or // updating them when possible -func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) { +func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) { // detect the result state for the requests allIncomingCanComplete := true desiredState := e.state @@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive for _, req := range incoming { if !req.Request().Canceled { allCanceled = false - if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState { + if r := req.Request().Payload; desiredState < r.desiredState { desiredState = r.desiredState if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) { allIncomingCanComplete = false @@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive } // can close all but one requests - var leaveOpen pipe.Sender + var leaveOpen pipeSender for _, req := range incoming { if !req.Request().Canceled { leaveOpen = req @@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive // update incoming based on current state for _, req := range incoming { - r := req.Request().Payload.(*edgeRequest) + r := req.Request().Payload if req.Request().Canceled { e.finishIncoming(req) } else if !e.hasActiveOutgoing && e.state >= r.desiredState { @@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, // initialize deps state if e.deps == nil { - e.depRequests = make(map[pipe.Receiver]*dep) + e.depRequests = make(map[pipeReceiver]*dep) e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) for i := range e.edge.Vertex.Inputs() { e.deps = append(e.deps, newDep(Index(i))) @@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, if dep.state < desiredStateDep { addNew := true if dep.req != nil && !dep.req.Status().Completed { - if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep { + if dep.req.Request().desiredState != desiredStateDep { if e.debug { bklog.G(context.TODO()). WithField("edge_vertex_name", e.edge.Vertex.Name()). WithField("edge_vertex_digest", e.edge.Vertex.Digest()). WithField("dep_index", dep.index). - WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState). + WithField("dep_req_desired_state", dep.req.Request().desiredState). WithField("dep_desired_state", desiredStateDep). WithField("dep_state", dep.state). Debug("cancel input request") @@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, WithField("edge_vertex_name", e.edge.Vertex.Name()). WithField("edge_vertex_digest", e.edge.Vertex.Digest()). WithField("dep_index", dep.index). - WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState). + WithField("dep_req_desired_state", dep.req.Request().desiredState). WithField("dep_desired_state", desiredStateDep). WithField("dep_state", dep.state). Debug("skip input request based on existing request") @@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW } return out } + +type ( + pipeRequest = pipe.Request[*edgeRequest] + pipeSender = pipe.Sender[*edgeRequest, any] + pipeReceiver = pipe.Receiver[*edgeRequest, any] +) + +func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] { + return pipe.New[*edgeRequest, any](req) +} diff --git a/solver/internal/pipe/pipe.go b/solver/internal/pipe/pipe.go index 01cfa5a06b6d..55309c2a9017 100644 --- a/solver/internal/pipe/pipe.go +++ b/solver/internal/pipe/pipe.go @@ -8,67 +8,63 @@ import ( "github.com/pkg/errors" ) -type channel struct { +type channel[V any] struct { OnSendCompletion func() - value atomic.Value - lastValue *wrappedValue + value atomic.Pointer[V] + lastValue *V } -type wrappedValue struct { - value interface{} -} - -func (c *channel) Send(v interface{}) { - c.value.Store(&wrappedValue{value: v}) +func (c *channel[V]) Send(v V) { + c.value.Store(&v) if c.OnSendCompletion != nil { c.OnSendCompletion() } } -func (c *channel) Receive() (interface{}, bool) { +func (c *channel[V]) Receive() (V, bool) { v := c.value.Load() - if v == nil || v.(*wrappedValue) == c.lastValue { - return nil, false + if v == nil || v == c.lastValue { + return *new(V), false } - c.lastValue = v.(*wrappedValue) - return v.(*wrappedValue).value, true + c.lastValue = v + return *v, true } -type Pipe struct { - Sender Sender - Receiver Receiver +type Pipe[Payload, Value any] struct { + Sender Sender[Payload, Value] + Receiver Receiver[Payload, Value] OnReceiveCompletion func() OnSendCompletion func() } -type Request struct { - Payload interface{} +type Request[Payload any] struct { + Payload Payload Canceled bool } -type Sender interface { - Request() Request - Update(v interface{}) - Finalize(v interface{}, err error) - Status() Status +type Sender[Payload, Value any] interface { + Request() Request[Payload] + Update(v Value) + Finalize(v Value, err error) + Status() Status[Value] } -type Receiver interface { +type Receiver[Payload, Value any] interface { Receive() bool Cancel() - Status() Status - Request() interface{} + Status() Status[Value] + Request() Payload } -type Status struct { +type Status[Value any] struct { Canceled bool Completed bool Err error - Value interface{} + Value Value } -func NewWithFunction(f func(context.Context) (interface{}, error)) (*Pipe, func()) { - p := New(Request{}) +func NewWithFunction[Payload, Value any](f func(context.Context) (Value, error)) (*Pipe[Payload, Value], func()) { + p := New[Payload, Value](Request[Payload]{}) ctx, cancel := context.WithCancelCause(context.TODO()) @@ -81,27 +77,27 @@ func NewWithFunction(f func(context.Context) (interface{}, error)) (*Pipe, func( return p, func() { res, err := f(ctx) if err != nil { - p.Sender.Finalize(nil, err) + p.Sender.Finalize(*new(Value), err) return } p.Sender.Finalize(res, nil) } } -func New(req Request) *Pipe { - cancelCh := &channel{} - roundTripCh := &channel{} - pw := &sender{ +func New[Payload, Value any](req Request[Payload]) *Pipe[Payload, Value] { + cancelCh := &channel[Request[Payload]]{} + roundTripCh := &channel[Status[Value]]{} + pw := &sender[Payload, Value]{ req: req, sendChannel: roundTripCh, } - pr := &receiver{ + pr := &receiver[Payload, Value]{ req: req, recvChannel: roundTripCh, sendChannel: cancelCh, } - p := &Pipe{ + p := &Pipe[Payload, Value]{ Sender: pw, Receiver: pr, } @@ -109,7 +105,7 @@ func New(req Request) *Pipe { cancelCh.OnSendCompletion = func() { v, ok := cancelCh.Receive() if ok { - pw.setRequest(v.(Request)) + pw.setRequest(v) } if p.OnReceiveCompletion != nil { p.OnReceiveCompletion() @@ -125,38 +121,36 @@ func New(req Request) *Pipe { return p } -type sender struct { - status Status - req Request - sendChannel *channel +type sender[Payload, Value any] struct { + status Status[Value] + req Request[Payload] + sendChannel *channel[Status[Value]] mu sync.Mutex } -func (pw *sender) Status() Status { +func (pw *sender[Payload, Value]) Status() Status[Value] { return pw.status } -func (pw *sender) Request() Request { +func (pw *sender[Payload, Value]) Request() Request[Payload] { pw.mu.Lock() defer pw.mu.Unlock() return pw.req } -func (pw *sender) setRequest(req Request) { +func (pw *sender[Payload, Value]) setRequest(req Request[Payload]) { pw.mu.Lock() defer pw.mu.Unlock() pw.req = req } -func (pw *sender) Update(v interface{}) { +func (pw *sender[Payload, Value]) Update(v Value) { pw.status.Value = v pw.sendChannel.Send(pw.status) } -func (pw *sender) Finalize(v interface{}, err error) { - if v != nil { - pw.status.Value = v - } +func (pw *sender[Payload, Value]) Finalize(v Value, err error) { + pw.status.Value = v pw.status.Err = err pw.status.Completed = true if errors.Is(err, context.Canceled) && pw.req.Canceled { @@ -165,27 +159,27 @@ func (pw *sender) Finalize(v interface{}, err error) { pw.sendChannel.Send(pw.status) } -type receiver struct { - status Status - req Request - recvChannel *channel - sendChannel *channel +type receiver[Payload, Value any] struct { + status Status[Value] + req Request[Payload] + recvChannel *channel[Status[Value]] + sendChannel *channel[Request[Payload]] } -func (pr *receiver) Request() interface{} { +func (pr *receiver[Payload, Value]) Request() Payload { return pr.req.Payload } -func (pr *receiver) Receive() bool { +func (pr *receiver[Payload, Value]) Receive() bool { v, ok := pr.recvChannel.Receive() if !ok { return false } - pr.status = v.(Status) + pr.status = v return true } -func (pr *receiver) Cancel() { +func (pr *receiver[Payload, Value]) Cancel() { req := pr.req if req.Canceled { return @@ -194,6 +188,6 @@ func (pr *receiver) Cancel() { pr.sendChannel.Send(req) } -func (pr *receiver) Status() Status { +func (pr *receiver[Payload, Value]) Status() Status[Value] { return pr.status } diff --git a/solver/internal/pipe/pipe_test.go b/solver/internal/pipe/pipe_test.go index 605a8a65bd88..b7ea1508faa6 100644 --- a/solver/internal/pipe/pipe_test.go +++ b/solver/internal/pipe/pipe_test.go @@ -11,10 +11,10 @@ func TestPipe(t *testing.T) { t.Parallel() runCh := make(chan struct{}) - f := func(ctx context.Context) (interface{}, error) { + f := func(ctx context.Context) (string, error) { select { case <-ctx.Done(): - return nil, context.Cause(ctx) + return "", context.Cause(ctx) case <-runCh: return "res0", nil } @@ -27,7 +27,7 @@ func TestPipe(t *testing.T) { waitSignal <- struct{}{} } - p, start := NewWithFunction(f) + p, start := NewWithFunction[any](f) p.OnSendCompletion = signal go start() require.Equal(t, false, p.Receiver.Receive()) @@ -35,7 +35,7 @@ func TestPipe(t *testing.T) { st := p.Receiver.Status() require.Equal(t, false, st.Completed) require.Equal(t, false, st.Canceled) - require.Nil(t, st.Value) + require.Zero(t, st.Value) require.Equal(t, 0, signalled) close(runCh) @@ -46,17 +46,17 @@ func TestPipe(t *testing.T) { require.Equal(t, true, st.Completed) require.Equal(t, false, st.Canceled) require.NoError(t, st.Err) - require.Equal(t, "res0", st.Value.(string)) + require.Equal(t, "res0", st.Value) } func TestPipeCancel(t *testing.T) { t.Parallel() runCh := make(chan struct{}) - f := func(ctx context.Context) (interface{}, error) { + f := func(ctx context.Context) (string, error) { select { case <-ctx.Done(): - return nil, context.Cause(ctx) + return "", context.Cause(ctx) case <-runCh: return "res0", nil } @@ -69,7 +69,7 @@ func TestPipeCancel(t *testing.T) { waitSignal <- struct{}{} } - p, start := NewWithFunction(f) + p, start := NewWithFunction[any](f) p.OnSendCompletion = signal go start() p.Receiver.Receive() @@ -77,7 +77,7 @@ func TestPipeCancel(t *testing.T) { st := p.Receiver.Status() require.Equal(t, false, st.Completed) require.Equal(t, false, st.Canceled) - require.Nil(t, st.Value) + require.Zero(t, st.Value) require.Equal(t, 0, signalled) p.Receiver.Cancel() diff --git a/solver/scheduler.go b/solver/scheduler.go index ec3da4fc89ef..194f4813626b 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -13,9 +13,11 @@ import ( "github.com/tonistiigi/go-csvvalue" ) -var debugScheduler = false // TODO: replace with logs in build trace -var debugSchedulerSteps []string -var debugSchedulerStepsParseOnce sync.Once +var ( + debugScheduler = false // TODO: replace with logs in build trace + debugSchedulerSteps []string + debugSchedulerStepsParseOnce sync.Once +) func init() { if os.Getenv("BUILDKIT_SCHEDULER_DEBUG") == "1" { @@ -121,17 +123,17 @@ func (s *scheduler) loop() { // dispatch schedules an edge to be processed func (s *scheduler) dispatch(e *edge) { - inc := make([]pipe.Sender, len(s.incoming[e])) + inc := make([]pipeSender, len(s.incoming[e])) for i, p := range s.incoming[e] { inc[i] = p.Sender } - out := make([]pipe.Receiver, len(s.outgoing[e])) + out := make([]pipeReceiver, len(s.outgoing[e])) for i, p := range s.outgoing[e] { out[i] = p.Receiver } e.hasActiveOutgoing = false - updates := []pipe.Receiver{} + updates := []pipeReceiver{} for _, p := range out { if ok := p.Receive(); ok { updates = append(updates, p) @@ -272,7 +274,7 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) wait := make(chan struct{}) - p := s.newPipe(e, nil, pipe.Request{Payload: &edgeRequest{desiredState: edgeStatusComplete}}) + p := s.newPipe(e, nil, pipeRequest{Payload: &edgeRequest{desiredState: edgeStatusComplete}}) p.OnSendCompletion = func() { p.Receiver.Receive() if p.Receiver.Status().Completed { @@ -298,9 +300,9 @@ func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) } // newPipe creates a new request pipe between two edges -func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe { +func (s *scheduler) newPipe(target, from *edge, req pipeRequest) *pipe.Pipe[*edgeRequest, any] { p := &edgePipe{ - Pipe: pipe.New(req), + Pipe: newPipe(req), Target: target, From: from, } @@ -324,8 +326,8 @@ func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe { } // newRequestWithFunc creates a new request pipe that invokes a async function -func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interface{}, error)) pipe.Receiver { - pp, start := pipe.NewWithFunction(f) +func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (any, error)) pipeReceiver { + pp, start := pipe.NewWithFunction[*edgeRequest](f) p := &edgePipe{ Pipe: pp, From: e, @@ -395,7 +397,7 @@ type pipeFactory struct { s *scheduler } -func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver { +func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipeReceiver { target := pf.s.ef.getEdge(ee) if target == nil { bklog.G(context.TODO()). @@ -407,14 +409,14 @@ func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver return nil, errdefs.Internal(errors.Errorf("failed to get edge: inconsistent graph state in edge %s %s %d", ee.Vertex.Name(), ee.Vertex.Digest(), ee.Index)) }) } - p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req}) + p := pf.s.newPipe(target, pf.e, pipeRequest{Payload: req}) if pf.e.debug { bklog.G(context.TODO()).Debugf("> newPipe %s %p desiredState=%s", ee.Vertex.Name(), p, req.desiredState) } return p.Receiver } -func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipe.Receiver { +func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipeReceiver { p := pf.s.newRequestWithFunc(pf.e, f) if pf.e.debug { bklog.G(context.TODO()).Debugf("> newFunc %p", p) @@ -422,7 +424,7 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro return p } -func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) { +func debugSchedulerPreUnpark(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) { log := bklog.G(context.TODO()). WithField("edge_vertex_name", e.edge.Vertex.Name()). WithField("edge_vertex_digest", e.edge.Vertex.Digest()). @@ -438,7 +440,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip for i, dep := range e.deps { des := edgeStatusInitial if dep.req != nil { - des = dep.req.Request().(*edgeRequest).desiredState + des = dep.req.Request().desiredState } log. WithField("dep_index", i). @@ -457,7 +459,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip log. WithField("incoming_index", i). WithField("incoming_pointer", in). - WithField("incoming_desired_state", req.Payload.(*edgeRequest).desiredState). + WithField("incoming_desired_state", req.Payload.desiredState). WithField("incoming_canceled", req.Canceled). Debug("> incoming") } @@ -499,7 +501,7 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip } } -func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) { +func debugSchedulerPostUnpark(e *edge, inc []pipeSender) { log := bklog.G(context.TODO()) for i, in := range inc { log.