Skip to content

Commit

Permalink
Merge pull request #5323 from jsternberg/generic-pipes
Browse files Browse the repository at this point in the history
solver: pipe implementation utilizes generics for better typing
  • Loading branch information
tonistiigi authored Sep 12, 2024
2 parents 2a7accc + ad64996 commit b473b7a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 109 deletions.
52 changes: 31 additions & 21 deletions solver/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
depRequests: map[pipeReceiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
Expand Down Expand Up @@ -65,14 +65,14 @@ type edge struct {
op activeOp

edgeState
depRequests map[pipe.Receiver]*dep
depRequests map[pipeReceiver]*dep
deps []*dep

cacheMapReq pipe.Receiver
cacheMapReq pipeReceiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
execReq pipeReceiver
execCacheLoad bool
err error
cacheRecords map[string]*CacheRecord
Expand All @@ -99,11 +99,11 @@ type edge struct {

// dep holds state for a dependant edge
type dep struct {
req pipe.Receiver
req pipeReceiver
edgeState
index Index
keyMap map[string]*CacheKey
slowCacheReq pipe.Receiver
slowCacheReq pipeReceiver
slowCacheComplete bool
slowCacheFoundKey bool
slowCacheKey *ExportableCacheKey
Expand All @@ -122,7 +122,7 @@ func newDep(i Index) *dep {

// edgePipe is a pipe for requests between two edges
type edgePipe struct {
*pipe.Pipe
*pipe.Pipe[*edgeRequest, any]
From, Target *edge
mu sync.Mutex
}
Expand Down Expand Up @@ -198,21 +198,21 @@ func (e *edge) isComplete() bool {
}

// finishIncoming finalizes the incoming pipe request
func (e *edge) finishIncoming(req pipe.Sender) {
func (e *edge) finishIncoming(req pipeSender) {
err := e.err
if req.Request().Canceled && err == nil {
err = context.Canceled
}
if e.debug {
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
}
req.Finalize(&e.edgeState, err)
}

// updateIncoming updates the current value of incoming pipe request
func (e *edge) updateIncoming(req pipe.Sender) {
func (e *edge) updateIncoming(req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
}
req.Update(&e.edgeState)
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool {
// requests were not completed
// 2. this function may not return outgoing requests if it has completed all
// incoming requests
func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) {
// process all incoming changes
depChanged := false
for _, upt := range updates {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) {
}

// processUpdate is called by unpark for every updated pipe request
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) {
// response for cachemap request
if upt == e.cacheMapReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
Expand Down Expand Up @@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() {

// respondToIncoming responds to all incoming requests. completing or
// updating them when possible
func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) {
// detect the result state for the requests
allIncomingCanComplete := true
desiredState := e.state
Expand All @@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
for _, req := range incoming {
if !req.Request().Canceled {
allCanceled = false
if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
if r := req.Request().Payload; desiredState < r.desiredState {
desiredState = r.desiredState
if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
allIncomingCanComplete = false
Expand All @@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
}

// can close all but one requests
var leaveOpen pipe.Sender
var leaveOpen pipeSender
for _, req := range incoming {
if !req.Request().Canceled {
leaveOpen = req
Expand All @@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive

// update incoming based on current state
for _, req := range incoming {
r := req.Request().Payload.(*edgeRequest)
r := req.Request().Payload
if req.Request().Canceled {
e.finishIncoming(req)
} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
Expand All @@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,

// initialize deps state
if e.deps == nil {
e.depRequests = make(map[pipe.Receiver]*dep)
e.depRequests = make(map[pipeReceiver]*dep)
e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
for i := range e.edge.Vertex.Inputs() {
e.deps = append(e.deps, newDep(Index(i)))
Expand Down Expand Up @@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
if dep.state < desiredStateDep {
addNew := true
if dep.req != nil && !dep.req.Status().Completed {
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
if dep.req.Request().desiredState != desiredStateDep {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("cancel input request")
Expand All @@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("skip input request based on existing request")
Expand Down Expand Up @@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
}
return out
}

type (
pipeRequest = pipe.Request[*edgeRequest]
pipeSender = pipe.Sender[*edgeRequest, any]
pipeReceiver = pipe.Receiver[*edgeRequest, any]
)

func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] {
return pipe.New[*edgeRequest, any](req)
}
Loading

0 comments on commit b473b7a

Please sign in to comment.