Skip to content

Commit

Permalink
Prune non-existing pods from topology
Browse files Browse the repository at this point in the history
  • Loading branch information
gshaibi committed Apr 3, 2024
1 parent 8d63b4b commit a000d69
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 12 deletions.
2 changes: 2 additions & 0 deletions internal/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const (
LabelGpuProduct = "nvidia.com/gpu.product"
LabelMigConfigState = "nvidia.com/mig.config.state"
LabelFakeNodeDeploymentTemplate = "run.ai/fake-node-deployment-template"
LabelTopologyCMNodeTopology = "node-topology"
LabelTopologyCMNodeName = "node-name"

ReservationNs = "runai-reservation"

Expand Down
3 changes: 2 additions & 1 deletion internal/common/topology/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.Conf
Name: GetNodeTopologyCMName(nodeName),
Namespace: viper.GetString(constants.EnvTopologyCmNamespace),
Labels: map[string]string{
"node-topology": "true",
constants.LabelTopologyCMNodeTopology: "true",
constants.LabelTopologyCMNodeName: nodeName,
},
},
Data: make(map[string]string),
Expand Down
7 changes: 5 additions & 2 deletions internal/status-exporter/export/fs/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (e *FsExporter) Run(stopCh <-chan struct{}) {
}

func (e *FsExporter) export(nodeTopology *topology.NodeTopology) {
if err := os.RemoveAll("/runai/proc/pod"); err != nil {
log.Printf("Failed deleting runai/proc/pod directory: %s", err.Error())
}

for gpuIdx, gpu := range nodeTopology.Gpus {
// Ignoring pods that are not supposed to be seen by runai-container-toolkit
Expand All @@ -50,7 +53,7 @@ func (e *FsExporter) export(nodeTopology *topology.NodeTopology) {
for podUuid, gpuUsageStatus := range gpu.Status.PodGpuUsageStatus {
log.Printf("Exporting pod %s gpu stats to filesystem", podUuid)

path := fmt.Sprintf("runai/proc/pod/%s/metrics/gpu/%d", podUuid, gpuIdx)
path := fmt.Sprintf("/runai/proc/pod/%s/metrics/gpu/%d", podUuid, gpuIdx)
if err := os.MkdirAll(path, 0644); err != nil {
log.Printf("Failed creating directory for pod %s: %s", podUuid, err.Error())
}
Expand All @@ -74,5 +77,5 @@ func writeFile(path string, content []byte) error {
}

func mbToBytes(mb int) int {
return mb * 1024 * 1024
return mb * (1000 * 1000)
}
7 changes: 4 additions & 3 deletions internal/status-updater/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type StatusUpdaterAppConfiguration struct {

type StatusUpdaterApp struct {
Controllers []controllers.Interface
kubeClient kubernetes.Interface
stopCh chan struct{}
wg *sync.WaitGroup
}
Expand All @@ -53,11 +54,11 @@ func (app *StatusUpdaterApp) Init(stopCh chan struct{}) {

app.wg = &sync.WaitGroup{}

kubeClient := KubeClientFn(clusterConfig)
app.kubeClient = KubeClientFn(clusterConfig)
dynamicClient := DynamicClientFn(clusterConfig)

app.Controllers = append(app.Controllers, podcontroller.NewPodController(kubeClient, dynamicClient, app.wg))
app.Controllers = append(app.Controllers, nodecontroller.NewNodeController(kubeClient, app.wg))
app.Controllers = append(app.Controllers, podcontroller.NewPodController(app.kubeClient, dynamicClient, app.wg))
app.Controllers = append(app.Controllers, nodecontroller.NewNodeController(app.kubeClient, app.wg))
}

func (app *StatusUpdaterApp) Name() string {
Expand Down
63 changes: 57 additions & 6 deletions internal/status-updater/controllers/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"sync"

"github.com/hashicorp/go-multierror"
"github.com/run-ai/fake-gpu-operator/internal/common/constants"
"github.com/run-ai/fake-gpu-operator/internal/common/topology"
"github.com/run-ai/fake-gpu-operator/internal/status-updater/controllers"
Expand All @@ -15,6 +16,7 @@ import (
nodehandler "github.com/run-ai/fake-gpu-operator/internal/status-updater/handlers/node"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/informers"
Expand Down Expand Up @@ -65,16 +67,17 @@ func NewNodeController(kubeClient kubernetes.Interface, wg *sync.WaitGroup) *Nod
}

func (c *NodeController) Run(stopCh <-chan struct{}) {
err := c.pruneTopologyNodes()
err := c.pruneTopologyConfigMaps()
if err != nil {
log.Fatalf("Failed to prune topology nodes: %v", err)
}

c.informer.Run(stopCh)
}

func (c *NodeController) pruneTopologyNodes() error {
log.Print("Pruning topology nodes...")
// This function prunes the topology ConfigMaps that are not associated with any fake gpu nodes, and initializes the GpuTopologyStatus field in the remaining ConfigMaps.
func (c *NodeController) pruneTopologyConfigMaps() error {
log.Print("Pruning topology ConfigMaps...")

gpuNodes, err := c.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: "nvidia.com/gpu.deploy.dcgm-exporter=true,nvidia.com/gpu.deploy.device-plugin=true",
Expand All @@ -84,7 +87,7 @@ func (c *NodeController) pruneTopologyNodes() error {
}

nodeTopologyCms, err := c.kubeClient.CoreV1().ConfigMaps(viper.GetString(constants.EnvTopologyCmNamespace)).List(context.TODO(), metav1.ListOptions{
LabelSelector: "node-topology=true",
LabelSelector: fmt.Sprintf("%s=true", constants.LabelTopologyCMNodeTopology),
})
if err != nil {
return fmt.Errorf("failed listing fake gpu nodes: %v", err)
Expand All @@ -95,10 +98,47 @@ func (c *NodeController) pruneTopologyNodes() error {
validNodeTopologyCMMap[topology.GetNodeTopologyCMName(node.Name)] = true
}

var multiErr error
for _, cm := range nodeTopologyCms.Items {
if _, ok := validNodeTopologyCMMap[cm.Name]; !ok {
util.LogErrorIfExist(c.kubeClient.CoreV1().ConfigMaps(viper.GetString(constants.EnvTopologyCmNamespace)).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}), fmt.Sprintf("Failed to delete node topology cm %s", cm.Name))
_, ok := validNodeTopologyCMMap[cm.Name]
multiErr = multierror.Append(multiErr, c.pruneTopologyConfigMap(&cm, ok))
}

return nil
}

func (c *NodeController) pruneTopologyConfigMap(cm *v1.ConfigMap, isValidNodeTopologyCM bool) error {
if !isValidNodeTopologyCM {
util.LogErrorIfExist(c.kubeClient.CoreV1().ConfigMaps(viper.GetString(constants.EnvTopologyCmNamespace)).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}), fmt.Sprintf("Failed to delete node topology cm %s", cm.Name))
}

nodeTopology, err := topology.FromNodeTopologyCM(cm)
if err != nil {
return fmt.Errorf("failed to parse node topology cm %s: %v", cm.Name, err)
}

for i := range nodeTopology.Gpus {
nodeTopology.Gpus[i].Status.PodGpuUsageStatus = topology.PodGpuUsageStatusMap{}

// Remove non-existing pods from the allocation info
allocatingPodExists, err := isPodExist(c.kubeClient, nodeTopology.Gpus[i].Status.AllocatedBy.Pod, nodeTopology.Gpus[i].Status.AllocatedBy.Namespace)
if err != nil {
return fmt.Errorf("failed to check if pod %s exists: %v", nodeTopology.Gpus[i].Status.AllocatedBy.Pod, err)
}

if !allocatingPodExists {
nodeTopology.Gpus[i].Status.AllocatedBy = topology.ContainerDetails{}
}
}

nodeName, ok := cm.ObjectMeta.Labels[constants.LabelTopologyCMNodeName]
if !ok {
return fmt.Errorf("node topology cm %s does not have node name label", cm.Name)
}

err = topology.UpdateNodeTopologyCM(c.kubeClient, nodeTopology, nodeName)
if err != nil {
return fmt.Errorf("failed to update node topology cm %s: %v", cm.Name, err)
}

return nil
Expand All @@ -109,3 +149,14 @@ func isFakeGpuNode(node *v1.Node) bool {
node.Labels["nvidia.com/gpu.deploy.dcgm-exporter"] == "true" &&
node.Labels["nvidia.com/gpu.deploy.device-plugin"] == "true"
}

func isPodExist(kubeClient kubernetes.Interface, podName string, namespace string) (bool, error) {
_, err := kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return false, err
}
return false, nil
}
return true, nil
}

0 comments on commit a000d69

Please sign in to comment.