Skip to content

Commit

Permalink
Create a dummy dcgm exporter pod on fake nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
gshaibi committed Nov 26, 2024
1 parent 1c8d570 commit bb9af60
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 39 deletions.
2 changes: 2 additions & 0 deletions internal/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ const (
LabelGpuProduct = "nvidia.com/gpu.product"
LabelMigConfigState = "nvidia.com/mig.config.state"
LabelFakeNodeDeploymentTemplate = "run.ai/fake-node-deployment-template"
LabelNodeName = "run.ai/node-name"
LabelTopologyCMNodeTopology = "node-topology"
LabelTopologyCMNodeName = "node-name"
LabelDcgmExporterDummyPod = "dcgm-exporter-dummy-pod"

ReservationNs = "runai-reservation"

Expand Down
63 changes: 63 additions & 0 deletions internal/common/util/kubernetes/kubeutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package kubernetes

import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
)

// TODO: Replace with a generic apply
func ApplyDeployment(k8s kubernetes.Interface, deployment appsv1.Deployment) error {
existingDeployment, err := k8s.AppsV1().Deployments(deployment.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to get deployment %s: %w", deployment.Name, err)
}

if errors.IsNotFound(err) {
deployment.ResourceVersion = ""
_, err := k8s.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err)
}
} else {
deployment.UID = existingDeployment.UID
deployment.ResourceVersion = existingDeployment.ResourceVersion
_, err := k8s.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), &deployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update deployment %s: %w", deployment.Name, err)
}
}

return nil
}

// TODO: Replace with a generic apply
func ApplyPod(k8s kubernetes.Interface, pod corev1.Pod) error {
existingPod, err := k8s.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to get pod %s: %w", pod.Name, err)
}

if errors.IsNotFound(err) {
pod.ResourceVersion = ""
_, err := k8s.CoreV1().Pods(pod.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create pod %s: %w", pod.Name, err)
}
} else {
pod.UID = existingPod.UID
pod.ResourceVersion = existingPod.ResourceVersion
_, err := k8s.CoreV1().Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update pod %s: %w", pod.Name, err)
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"fmt"
"os"

"github.com/hashicorp/go-multierror"
"github.com/run-ai/fake-gpu-operator/internal/common/constants"
kubeutil "github.com/run-ai/fake-gpu-operator/internal/common/util/kubernetes"
"github.com/spf13/viper"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -14,7 +18,7 @@ import (
"k8s.io/utils/ptr"
)

func (p *NodeHandler) applyFakeNodeDeployments(node *v1.Node) error {
func (p *NodeHandler) applyFakeNodeResources(node *v1.Node) error {
if !isFakeNode(node) {
return nil
}
Expand All @@ -25,33 +29,81 @@ func (p *NodeHandler) applyFakeNodeDeployments(node *v1.Node) error {
}

for _, deployment := range deployments {
err := p.applyDeployment(deployment)
err := kubeutil.ApplyDeployment(p.kubeClient, deployment)
if err != nil {
return fmt.Errorf("failed to apply deployment: %w", err)
}
}

dcgmExporterDummyPod := p.generateDcgmExporterDummyPod(node)
err = kubeutil.ApplyPod(p.kubeClient, dcgmExporterDummyPod)
if err != nil {
return fmt.Errorf("failed to apply dcgm-exporter dummy pod: %w", err)
}

return nil
}

func (p *NodeHandler) deleteFakeNodeDeployments(node *v1.Node) error {
func (p *NodeHandler) deleteFakeNodeResources(node *v1.Node) error {
if !isFakeNode(node) {
return nil
}

deployments, err := p.generateFakeNodeDeployments(node)
if err != nil {
return fmt.Errorf("failed to get fake node deployments: %w", err)
var multiErr error

err := p.kubeClient.AppsV1().Deployments(os.Getenv(constants.EnvFakeGpuOperatorNs)).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", constants.LabelNodeName, node.Name),
})
if err != nil && !errors.IsNotFound(err) {
multiErr = multierror.Append(multiErr, fmt.Errorf("failed to delete fake node deployments: %w", err))
}

for _, deployment := range deployments {
err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Delete(context.TODO(), deployment.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete deployment %s: %w", deployment.Name, err)
}
err = p.kubeClient.CoreV1().Pods(viper.GetString(constants.EnvFakeGpuOperatorNs)).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=true,%s=%s", constants.LabelDcgmExporterDummyPod, constants.LabelNodeName, node.Name),
})
if err != nil && !errors.IsNotFound(err) {
multiErr = multierror.Append(multiErr, fmt.Errorf("failed to delete dcgm-exporter dummy pod: %w", err))
}

return nil
return multiErr
}

func (p *NodeHandler) generateDcgmExporterDummyPod(node *v1.Node) v1.Pod {
return v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("nvidia-dcgm-exporter-%s", node.Name),
Namespace: viper.GetString(constants.EnvFakeGpuOperatorNs),
Labels: map[string]string{
constants.LabelDcgmExporterDummyPod: "true",
constants.LabelNodeName: node.Name,
},
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: ptr.To(int64(0)),
NodeName: node.Name,
Containers: []v1.Container{
{
Name: "sleeper",
Image: "busybox",
Command: []string{
"sh",
"-c",
"sleep infinity",
},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("50Mi"),
v1.ResourceCPU: resource.MustParse("10m"),
},
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("0Mi"),
v1.ResourceCPU: resource.MustParse("0m"),
},
},
},
},
},
}
}

func (p *NodeHandler) generateFakeNodeDeployments(node *v1.Node) ([]appsv1.Deployment, error) {
Expand All @@ -70,35 +122,12 @@ func (p *NodeHandler) generateFakeNodeDeployments(node *v1.Node) ([]appsv1.Deplo
return deployments, nil
}

func (p *NodeHandler) applyDeployment(deployment appsv1.Deployment) error {
existingDeployment, err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to get deployment %s: %w", deployment.Name, err)
}

if errors.IsNotFound(err) {
deployment.ResourceVersion = ""
_, err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err)
}
} else {
deployment.UID = existingDeployment.UID
deployment.ResourceVersion = existingDeployment.ResourceVersion
_, err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), &deployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update deployment %s: %w", deployment.Name, err)
}
}

return nil
}

func generateFakeNodeDeploymentFromTemplate(template *appsv1.Deployment, node *v1.Node) *appsv1.Deployment {
deployment := template.DeepCopy()

delete(deployment.Labels, constants.LabelFakeNodeDeploymentTemplate)
deployment.Name = fmt.Sprintf("%s-%s", deployment.Name, node.Name)
deployment.Labels[constants.LabelNodeName] = node.Name
deployment.Spec.Replicas = ptr.To(int32(1))
deployment.Spec.Template.Spec.Containers[0].Env = append(deployment.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{
Name: constants.EnvNodeName,
Expand Down
6 changes: 3 additions & 3 deletions internal/status-updater/handlers/node/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (p *NodeHandler) HandleAdd(node *v1.Node) error {
return fmt.Errorf("failed to create node topology ConfigMap: %w", err)
}

err = p.applyFakeNodeDeployments(node)
err = p.applyFakeNodeResources(node)
if err != nil {
return fmt.Errorf("failed to apply fake node deployments: %w", err)
return fmt.Errorf("failed to apply fake node resources: %w", err)
}

err = p.labelNode(node)
Expand All @@ -59,7 +59,7 @@ func (p *NodeHandler) HandleDelete(node *v1.Node) error {
return fmt.Errorf("failed to delete node topology: %w", err)
}

err = p.deleteFakeNodeDeployments(node)
err = p.deleteFakeNodeResources(node)
if err != nil {
return fmt.Errorf("failed to delete fake node deployments: %w", err)
}
Expand Down

0 comments on commit bb9af60

Please sign in to comment.