Skip to content

Commit

Permalink
Merge pull request #5328 from jsternberg/debug-scheduler-refactor
Browse files Browse the repository at this point in the history
solver: move scheduler debug statements to their own functions
  • Loading branch information
tonistiigi authored Sep 23, 2024
2 parents 9837185 + 2f18b83 commit ad0c36c
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 245 deletions.
312 changes: 312 additions & 0 deletions solver/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package solver

import (
"context"
"os"
"strings"
"sync"

"github.com/moby/buildkit/solver/internal/pipe"
"github.com/moby/buildkit/util/bklog"
"github.com/tonistiigi/go-csvvalue"
)

var (
debugScheduler = false // TODO: replace with logs in build trace
debugSchedulerSteps = sync.OnceValue(parseSchedulerDebugSteps)
)

func init() {
if os.Getenv("BUILDKIT_SCHEDULER_DEBUG") == "1" {
debugScheduler = true
}
}

func parseSchedulerDebugSteps() []string {
if s := os.Getenv("BUILDKIT_SCHEDULER_DEBUG_STEPS"); s != "" {
fields, err := csvvalue.Fields(s, nil)
if err != nil {
return nil
}
return fields
}
return nil
}

// debugSchedulerCheckEdge determines if this edge should be debugged
// depending on the set environment variables.
func debugSchedulerCheckEdge(e *edge) bool {
if debugScheduler {
return true
}

if steps := debugSchedulerSteps(); len(steps) > 0 {
withParents := strings.HasSuffix(steps[0], "^")
name := strings.TrimSuffix(steps[0], "^")
for _, v := range steps {
if strings.Contains(name, v) {
return true
}
}

if withParents {
for _, vtx := range e.edge.Vertex.Inputs() {
name := strings.TrimSuffix(vtx.Vertex.Name(), "^")
for _, v := range steps {
if strings.Contains(name, v) {
return true
}
}
}
}
}
return false
}

func debugSchedulerSkipMergeDueToDependency(e, origEdge *edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("origEdge_vertex_name", origEdge.edge.Vertex.Name()).
WithField("origEdge_vertex_digest", origEdge.edge.Vertex.Digest()).
WithField("origEdge_index", origEdge.edge.Index).
Debug("skip merge due to dependency")
}

func debugSchedulerSwapMergeDueToOwner(e, origEdge *edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("origEdge_vertex_name", origEdge.edge.Vertex.Name()).
WithField("origEdge_vertex_digest", origEdge.edge.Vertex.Digest()).
WithField("origEdge_index", origEdge.edge.Index).
Debug("swap merge due to owner")
}

func debugSchedulerMergingEdges(src, dest *edge) {
bklog.G(context.TODO()).
WithField("source_edge_vertex_name", src.edge.Vertex.Name()).
WithField("source_edge_vertex_digest", src.edge.Vertex.Digest()).
WithField("source_edge_index", src.edge.Index).
WithField("dest_vertex_name", dest.edge.Vertex.Name()).
WithField("dest_vertex_digest", dest.edge.Vertex.Digest()).
WithField("dest_index", dest.edge.Index).
Debug("merging edges")
}

func debugSchedulerMergingEdgesSkipped(src, dest *edge) {
bklog.G(context.TODO()).
WithField("source_edge_vertex_name", src.edge.Vertex.Name()).
WithField("source_edge_vertex_digest", src.edge.Vertex.Digest()).
WithField("source_edge_index", src.edge.Index).
WithField("dest_vertex_name", dest.edge.Vertex.Name()).
WithField("dest_vertex_digest", dest.edge.Vertex.Digest()).
WithField("dest_index", dest.edge.Index).
Debug("merging edges skipped")
}

func debugSchedulerPreUnpark(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) {
if e.debug {
debugSchedulerPreUnparkSlow(e, inc, updates, allPipes)
}
}

func debugSchedulerPreUnparkSlow(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) {
log := bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index)

log.
WithField("edge_state", e.state).
WithField("req", len(inc)).
WithField("upt", len(updates)).
WithField("out", len(allPipes)).
Debug(">> unpark")

for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().desiredState
}
log.
WithField("dep_index", i).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[i].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[i].Vertex.Digest()).
WithField("dep_state", dep.state).
WithField("dep_desired_state", des).
WithField("dep_keys", len(dep.keys)).
WithField("dep_has_slow_cache", e.slowCacheFunc(dep) != nil).
WithField("dep_preprocess_func", e.preprocessFunc(dep) != nil).
Debug(":: dep")
}

for i, in := range inc {
req := in.Request()
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_desired_state", req.Payload.desiredState).
WithField("incoming_canceled", req.Canceled).
Debug("> incoming")
}

for i, up := range updates {
if up == e.cacheMapReq {
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update cacheMapReq")
} else if up == e.execReq {
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update execReq")
} else {
st, ok := up.Status().Value.(*edgeState)
if ok {
index := -1
if dep, ok := e.depRequests[up]; ok {
index = int(dep.index)
}
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
WithField("update_input_index", index).
WithField("update_keys", len(st.keys)).
WithField("update_state", st.state).
Debugf("> update edgeState")
} else {
log.
WithField("update_index", i).
Debug("> update unknown")
}
}
}
}

func debugSchedulerPostUnpark(e *edge, inc []pipeSender) {
if e.debug {
debugSchedulerPostUnparkSlow(e, inc)
}
}

func debugSchedulerPostUnparkSlow(e *edge, inc []pipeSender) {
log := bklog.G(context.TODO())
for i, in := range inc {
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_complete", in.Status().Completed).
Debug("< incoming")
}
log.
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("edge_state", e.state).
Debug("<< unpark")
}

func debugSchedulerNewPipe(e *edge, p *pipe.Pipe[*edgeRequest, any], req *edgeRequest) {
if e.debug {
bklog.G(context.TODO()).Debugf("> newPipe %s %p desiredState=%s", e.edge.Vertex.Name(), p, req.desiredState)
}
}

func debugSchedulerNewFunc(e *edge, p pipeReceiver) {
if e.debug {
bklog.G(context.TODO()).Debugf("> newFunc %p", p)
}
}

func debugSchedulerInconsistentGraphState(ee Edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", ee.Vertex.Name()).
WithField("edge_vertex_digest", ee.Vertex.Digest()).
WithField("edge_index", ee.Index).
Error("failed to get edge: inconsistent graph state")
}

func debugSchedulerFinishIncoming(e *edge, err error, req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
}
}

func debugSchedulerUpdateIncoming(e *edge, req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
}
}

func debugSchedulerUpgradeCacheSlow(e *edge) {
if e.debug {
bklog.G(context.TODO()).Debugf("upgrade to cache-slow because no open keys")
}
}

func debugSchedulerRespondToIncomingStatus(e *edge, allIncomingCanComplete bool) {
if e.debug {
bklog.G(context.TODO()).Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
}
}

func debugSchedulerCancelInputRequest(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("cancel input request")
}
}

func debugSchedulerSkipInputRequestBasedOnExistingRequest(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("skip input request based on existing request")
}
}

func debugSchedulerAddInputRequest(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[dep.index].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[dep.index].Vertex.Digest()).
Debug("add input request")
}
}

func debugSchedulerSkipInputRequestBasedOnDepState(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[dep.index].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[dep.index].Vertex.Digest()).
Debug("skip input request based on dep state")
}
}
Loading

0 comments on commit ad0c36c

Please sign in to comment.