Skip to content

Commit

Permalink
Merge pull request #2815 from wangyang0616/fix_node_score
Browse files Browse the repository at this point in the history
Pods are preferentially scheduled to machines that meet the current session resources
  • Loading branch information
volcano-sh-bot authored Aug 7, 2023
2 parents dd51654 + 5e48157 commit 39c6f02
Showing 1 changed file with 50 additions and 24 deletions.
74 changes: 50 additions & 24 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,50 +195,76 @@ func (alloc *Action) Execute(ssn *framework.Session) {
break
}

var candidateNodes []*api.NodeInfo
// Candidate nodes are divided into two gradients:
// - the first gradient node: a list of free nodes that satisfy the task resource request;
// - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
// Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
// otherwise, score the second gradient node and select the appropriate node.
var candidateNodes [][]*api.NodeInfo
var idleCandidateNodes []*api.NodeInfo
var futureIdleCandidateNodes []*api.NodeInfo
for _, n := range predicateNodes {
if task.InitResreq.LessEqual(n.Idle, api.Zero) || task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
candidateNodes = append(candidateNodes, n)
if task.InitResreq.LessEqual(n.Idle, api.Zero) {
idleCandidateNodes = append(idleCandidateNodes, n)
} else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
} else {
klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
n.Name, n.Idle, n.FutureIdle(), task.Name)
}
}
candidateNodes = append(candidateNodes, idleCandidateNodes)
candidateNodes = append(candidateNodes, futureIdleCandidateNodes)

var bestNode *api.NodeInfo
for index, nodes := range candidateNodes {
if klog.V(5).Enabled() {
for _, node := range nodes {
klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle())
}
}
switch {
case len(nodes) == 0:
klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
case len(nodes) == 1: // If only one node after predicate, just use it.
bestNode = nodes[0]
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

bestNode = ssn.BestNodeFn(task, nodeScores)
if bestNode == nil {
bestNode = util.SelectBestNode(nodeScores)
}
}

var node *api.NodeInfo
switch {
case len(candidateNodes) == 0: // If not candidate nodes for this task, skip it.
continue
case len(candidateNodes) == 1: // If only one node after predicate, just use it.
node = candidateNodes[0]
case len(candidateNodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, candidateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
// If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
if bestNode != nil {
break
}
}

// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
task.Namespace, task.Name, bestNode.Name)
if err := stmt.Allocate(task, bestNode); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
task.UID, bestNode.Name, ssn.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
} else {
klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)
task.Namespace, task.Name, bestNode.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name); err != nil {
task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing)
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, node.Name, ssn.UID, err)
task.UID, bestNode.Name, ssn.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand Down

0 comments on commit 39c6f02

Please sign in to comment.