diff --git a/internal/common/constants/constants.go b/internal/common/constants/constants.go index 2a9fa7d..78a4cb2 100644 --- a/internal/common/constants/constants.go +++ b/internal/common/constants/constants.go @@ -12,6 +12,8 @@ const ( LabelGpuProduct = "nvidia.com/gpu.product" LabelMigConfigState = "nvidia.com/mig.config.state" LabelFakeNodeDeploymentTemplate = "run.ai/fake-node-deployment-template" + LabelTopologyCMNodeTopology = "node-topology" + LabelTopologyCMNodeName = "node-name" ReservationNs = "runai-reservation" diff --git a/internal/common/topology/kubernetes.go b/internal/common/topology/kubernetes.go index e2de164..7bd4c8d 100644 --- a/internal/common/topology/kubernetes.go +++ b/internal/common/topology/kubernetes.go @@ -115,7 +115,8 @@ func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.Conf Name: GetNodeTopologyCMName(nodeName), Namespace: viper.GetString(constants.EnvTopologyCmNamespace), Labels: map[string]string{ - "node-topology": "true", + constants.LabelTopologyCMNodeTopology: "true", + constants.LabelTopologyCMNodeName: nodeName, }, }, Data: make(map[string]string), diff --git a/internal/status-exporter/export/fs/exporter.go b/internal/status-exporter/export/fs/exporter.go index 97d7179..de7f9ca 100644 --- a/internal/status-exporter/export/fs/exporter.go +++ b/internal/status-exporter/export/fs/exporter.go @@ -40,6 +40,9 @@ func (e *FsExporter) Run(stopCh <-chan struct{}) { } func (e *FsExporter) export(nodeTopology *topology.NodeTopology) { + if err := os.RemoveAll("/runai/proc/pod"); err != nil { + log.Printf("Failed deleting runai/proc/pod directory: %s", err.Error()) + } for gpuIdx, gpu := range nodeTopology.Gpus { // Ignoring pods that are not supposed to be seen by runai-container-toolkit @@ -50,7 +53,7 @@ func (e *FsExporter) export(nodeTopology *topology.NodeTopology) { for podUuid, gpuUsageStatus := range gpu.Status.PodGpuUsageStatus { log.Printf("Exporting pod %s gpu stats to filesystem", podUuid) - path := fmt.Sprintf("runai/proc/pod/%s/metrics/gpu/%d", podUuid, gpuIdx) + path := fmt.Sprintf("/runai/proc/pod/%s/metrics/gpu/%d", podUuid, gpuIdx) if err := os.MkdirAll(path, 0644); err != nil { log.Printf("Failed creating directory for pod %s: %s", podUuid, err.Error()) } @@ -74,5 +77,5 @@ func writeFile(path string, content []byte) error { } func mbToBytes(mb int) int { - return mb * 1024 * 1024 + return mb * (1000 * 1000) } diff --git a/internal/status-updater/app.go b/internal/status-updater/app.go index f23e39e..3e58bf5 100644 --- a/internal/status-updater/app.go +++ b/internal/status-updater/app.go @@ -30,6 +30,7 @@ type StatusUpdaterAppConfiguration struct { type StatusUpdaterApp struct { Controllers []controllers.Interface + kubeClient kubernetes.Interface stopCh chan struct{} wg *sync.WaitGroup } @@ -53,11 +54,11 @@ func (app *StatusUpdaterApp) Init(stopCh chan struct{}) { app.wg = &sync.WaitGroup{} - kubeClient := KubeClientFn(clusterConfig) + app.kubeClient = KubeClientFn(clusterConfig) dynamicClient := DynamicClientFn(clusterConfig) - app.Controllers = append(app.Controllers, podcontroller.NewPodController(kubeClient, dynamicClient, app.wg)) - app.Controllers = append(app.Controllers, nodecontroller.NewNodeController(kubeClient, app.wg)) + app.Controllers = append(app.Controllers, podcontroller.NewPodController(app.kubeClient, dynamicClient, app.wg)) + app.Controllers = append(app.Controllers, nodecontroller.NewNodeController(app.kubeClient, app.wg)) } func (app *StatusUpdaterApp) Name() string { diff --git a/internal/status-updater/controllers/node/controller.go b/internal/status-updater/controllers/node/controller.go index 38289fb..bbe0cf1 100644 --- a/internal/status-updater/controllers/node/controller.go +++ b/internal/status-updater/controllers/node/controller.go @@ -6,6 +6,7 @@ import ( "log" "sync" + "github.com/hashicorp/go-multierror" "github.com/run-ai/fake-gpu-operator/internal/common/constants" "github.com/run-ai/fake-gpu-operator/internal/common/topology" "github.com/run-ai/fake-gpu-operator/internal/status-updater/controllers" @@ -15,6 +16,7 @@ import ( nodehandler "github.com/run-ai/fake-gpu-operator/internal/status-updater/handlers/node" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -65,7 +67,7 @@ func NewNodeController(kubeClient kubernetes.Interface, wg *sync.WaitGroup) *Nod } func (c *NodeController) Run(stopCh <-chan struct{}) { - err := c.pruneTopologyNodes() + err := c.pruneTopologyConfigMaps() if err != nil { log.Fatalf("Failed to prune topology nodes: %v", err) } @@ -73,8 +75,9 @@ func (c *NodeController) Run(stopCh <-chan struct{}) { c.informer.Run(stopCh) } -func (c *NodeController) pruneTopologyNodes() error { - log.Print("Pruning topology nodes...") +// This function prunes the topology ConfigMaps that are not associated with any fake gpu nodes, and initializes the GpuTopologyStatus field in the remaining ConfigMaps. +func (c *NodeController) pruneTopologyConfigMaps() error { + log.Print("Pruning topology ConfigMaps...") gpuNodes, err := c.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ LabelSelector: "nvidia.com/gpu.deploy.dcgm-exporter=true,nvidia.com/gpu.deploy.device-plugin=true", @@ -84,7 +87,7 @@ func (c *NodeController) pruneTopologyNodes() error { } nodeTopologyCms, err := c.kubeClient.CoreV1().ConfigMaps(viper.GetString(constants.EnvTopologyCmNamespace)).List(context.TODO(), metav1.ListOptions{ - LabelSelector: "node-topology=true", + LabelSelector: fmt.Sprintf("%s=true", constants.LabelTopologyCMNodeTopology), }) if err != nil { return fmt.Errorf("failed listing fake gpu nodes: %v", err) @@ -95,10 +98,47 @@ func (c *NodeController) pruneTopologyNodes() error { validNodeTopologyCMMap[topology.GetNodeTopologyCMName(node.Name)] = true } + var multiErr error for _, cm := range nodeTopologyCms.Items { - if _, ok := validNodeTopologyCMMap[cm.Name]; !ok { - util.LogErrorIfExist(c.kubeClient.CoreV1().ConfigMaps(viper.GetString(constants.EnvTopologyCmNamespace)).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}), fmt.Sprintf("Failed to delete node topology cm %s", cm.Name)) + _, ok := validNodeTopologyCMMap[cm.Name] + multiErr = multierror.Append(multiErr, c.pruneTopologyConfigMap(&cm, ok)) + } + + return nil +} + +func (c *NodeController) pruneTopologyConfigMap(cm *v1.ConfigMap, isValidNodeTopologyCM bool) error { + if !isValidNodeTopologyCM { + util.LogErrorIfExist(c.kubeClient.CoreV1().ConfigMaps(viper.GetString(constants.EnvTopologyCmNamespace)).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}), fmt.Sprintf("Failed to delete node topology cm %s", cm.Name)) + } + + nodeTopology, err := topology.FromNodeTopologyCM(cm) + if err != nil { + return fmt.Errorf("failed to parse node topology cm %s: %v", cm.Name, err) + } + + for i := range nodeTopology.Gpus { + nodeTopology.Gpus[i].Status.PodGpuUsageStatus = topology.PodGpuUsageStatusMap{} + + // Remove non-existing pods from the allocation info + allocatingPodExists, err := isPodExist(c.kubeClient, nodeTopology.Gpus[i].Status.AllocatedBy.Pod, nodeTopology.Gpus[i].Status.AllocatedBy.Namespace) + if err != nil { + return fmt.Errorf("failed to check if pod %s exists: %v", nodeTopology.Gpus[i].Status.AllocatedBy.Pod, err) } + + if !allocatingPodExists { + nodeTopology.Gpus[i].Status.AllocatedBy = topology.ContainerDetails{} + } + } + + nodeName, ok := cm.ObjectMeta.Labels[constants.LabelTopologyCMNodeName] + if !ok { + return fmt.Errorf("node topology cm %s does not have node name label", cm.Name) + } + + err = topology.UpdateNodeTopologyCM(c.kubeClient, nodeTopology, nodeName) + if err != nil { + return fmt.Errorf("failed to update node topology cm %s: %v", cm.Name, err) } return nil @@ -109,3 +149,14 @@ func isFakeGpuNode(node *v1.Node) bool { node.Labels["nvidia.com/gpu.deploy.dcgm-exporter"] == "true" && node.Labels["nvidia.com/gpu.deploy.device-plugin"] == "true" } + +func isPodExist(kubeClient kubernetes.Interface, podName string, namespace string) (bool, error) { + _, err := kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + return false, err + } + return false, nil + } + return true, nil +}