Skip to content

Commit

Permalink
solver: pipe implementation utilizes generics for better typing
Browse files Browse the repository at this point in the history
This updates the pipe library to use generics for the request payload
and the status value. This allows the solver to put in explicit types
rather than rely on type casting from interfaces which helps with type
safety and understandability.

The status value used by the solver uses the `any` type instead of an
explicit type because the `unpark` method takes a generic list of pipes
and the different pipes have different result types. We can likely
change this in the future or create a discriminated union for the
types that can be used in this package. That is left for future work
because at least the request payload is typed now.

Signed-off-by: Jonathan A. Sternberg <[email protected]>
  • Loading branch information
jsternberg committed Sep 12, 2024
1 parent 3a70550 commit ad64996
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 109 deletions.
52 changes: 31 additions & 21 deletions solver/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
depRequests: map[pipeReceiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
Expand Down Expand Up @@ -65,14 +65,14 @@ type edge struct {
op activeOp

edgeState
depRequests map[pipe.Receiver]*dep
depRequests map[pipeReceiver]*dep
deps []*dep

cacheMapReq pipe.Receiver
cacheMapReq pipeReceiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
execReq pipeReceiver
execCacheLoad bool
err error
cacheRecords map[string]*CacheRecord
Expand All @@ -99,11 +99,11 @@ type edge struct {

// dep holds state for a dependant edge
type dep struct {
req pipe.Receiver
req pipeReceiver
edgeState
index Index
keyMap map[string]*CacheKey
slowCacheReq pipe.Receiver
slowCacheReq pipeReceiver
slowCacheComplete bool
slowCacheFoundKey bool
slowCacheKey *ExportableCacheKey
Expand All @@ -122,7 +122,7 @@ func newDep(i Index) *dep {

// edgePipe is a pipe for requests between two edges
type edgePipe struct {
*pipe.Pipe
*pipe.Pipe[*edgeRequest, any]
From, Target *edge
mu sync.Mutex
}
Expand Down Expand Up @@ -198,21 +198,21 @@ func (e *edge) isComplete() bool {
}

// finishIncoming finalizes the incoming pipe request
func (e *edge) finishIncoming(req pipe.Sender) {
func (e *edge) finishIncoming(req pipeSender) {
err := e.err
if req.Request().Canceled && err == nil {
err = context.Canceled
}
if e.debug {
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
}
req.Finalize(&e.edgeState, err)
}

// updateIncoming updates the current value of incoming pipe request
func (e *edge) updateIncoming(req pipe.Sender) {
func (e *edge) updateIncoming(req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
}
req.Update(&e.edgeState)
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool {
// requests were not completed
// 2. this function may not return outgoing requests if it has completed all
// incoming requests
func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) {
// process all incoming changes
depChanged := false
for _, upt := range updates {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) {
}

// processUpdate is called by unpark for every updated pipe request
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) {
// response for cachemap request
if upt == e.cacheMapReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
Expand Down Expand Up @@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() {

// respondToIncoming responds to all incoming requests. completing or
// updating them when possible
func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) {
// detect the result state for the requests
allIncomingCanComplete := true
desiredState := e.state
Expand All @@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
for _, req := range incoming {
if !req.Request().Canceled {
allCanceled = false
if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
if r := req.Request().Payload; desiredState < r.desiredState {
desiredState = r.desiredState
if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
allIncomingCanComplete = false
Expand All @@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
}

// can close all but one requests
var leaveOpen pipe.Sender
var leaveOpen pipeSender
for _, req := range incoming {
if !req.Request().Canceled {
leaveOpen = req
Expand All @@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive

// update incoming based on current state
for _, req := range incoming {
r := req.Request().Payload.(*edgeRequest)
r := req.Request().Payload
if req.Request().Canceled {
e.finishIncoming(req)
} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
Expand All @@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,

// initialize deps state
if e.deps == nil {
e.depRequests = make(map[pipe.Receiver]*dep)
e.depRequests = make(map[pipeReceiver]*dep)
e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
for i := range e.edge.Vertex.Inputs() {
e.deps = append(e.deps, newDep(Index(i)))
Expand Down Expand Up @@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
if dep.state < desiredStateDep {
addNew := true
if dep.req != nil && !dep.req.Status().Completed {
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
if dep.req.Request().desiredState != desiredStateDep {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("cancel input request")
Expand All @@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("skip input request based on existing request")
Expand Down Expand Up @@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
}
return out
}

type (
pipeRequest = pipe.Request[*edgeRequest]
pipeSender = pipe.Sender[*edgeRequest, any]
pipeReceiver = pipe.Receiver[*edgeRequest, any]
)

func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] {
return pipe.New[*edgeRequest, any](req)
}
Loading

0 comments on commit ad64996

Please sign in to comment.