From d90f3b4d14f0bb11f116fdf8b3970c4c47603e15 Mon Sep 17 00:00:00 2001 From: gshaibi Date: Mon, 25 Nov 2024 23:23:37 +0200 Subject: [PATCH] . --- internal/common/constants/constants.go | 2 + internal/common/util/kubernetes/kubeutil.go | 63 +++++++++++ ..._deployments.go => fake_node_resources.go} | 101 +++++++++++------- .../status-updater/handlers/node/handler.go | 6 +- 4 files changed, 133 insertions(+), 39 deletions(-) create mode 100644 internal/common/util/kubernetes/kubeutil.go rename internal/status-updater/handlers/node/{fake_node_deployments.go => fake_node_resources.go} (50%) diff --git a/internal/common/constants/constants.go b/internal/common/constants/constants.go index 78a4cb2..644d236 100644 --- a/internal/common/constants/constants.go +++ b/internal/common/constants/constants.go @@ -12,8 +12,10 @@ const ( LabelGpuProduct = "nvidia.com/gpu.product" LabelMigConfigState = "nvidia.com/mig.config.state" LabelFakeNodeDeploymentTemplate = "run.ai/fake-node-deployment-template" + LabelNodeName = "run.ai/node-name" LabelTopologyCMNodeTopology = "node-topology" LabelTopologyCMNodeName = "node-name" + LabelDcgmExporterDummyPod = "dcgm-exporter-dummy-pod" ReservationNs = "runai-reservation" diff --git a/internal/common/util/kubernetes/kubeutil.go b/internal/common/util/kubernetes/kubeutil.go new file mode 100644 index 0000000..b364a90 --- /dev/null +++ b/internal/common/util/kubernetes/kubeutil.go @@ -0,0 +1,63 @@ +package kubernetes + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/kubernetes" +) + +// TODO: Replace with a generic apply +func ApplyDeployment(k8s kubernetes.Interface, deployment appsv1.Deployment) error { + existingDeployment, err := k8s.AppsV1().Deployments(deployment.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to get deployment %s: %w", deployment.Name, err) + } + + if errors.IsNotFound(err) { + deployment.ResourceVersion = "" + _, err := k8s.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err) + } + } else { + deployment.UID = existingDeployment.UID + deployment.ResourceVersion = existingDeployment.ResourceVersion + _, err := k8s.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), &deployment, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update deployment %s: %w", deployment.Name, err) + } + } + + return nil +} + +// TODO: Replace with a generic apply +func ApplyPod(k8s kubernetes.Interface, pod corev1.Pod) error { + existingPod, err := k8s.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to get pod %s: %w", pod.Name, err) + } + + if errors.IsNotFound(err) { + pod.ResourceVersion = "" + _, err := k8s.CoreV1().Pods(pod.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create pod %s: %w", pod.Name, err) + } + } else { + pod.UID = existingPod.UID + pod.ResourceVersion = existingPod.ResourceVersion + _, err := k8s.CoreV1().Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update pod %s: %w", pod.Name, err) + } + } + + return nil +} diff --git a/internal/status-updater/handlers/node/fake_node_deployments.go b/internal/status-updater/handlers/node/fake_node_resources.go similarity index 50% rename from internal/status-updater/handlers/node/fake_node_deployments.go rename to internal/status-updater/handlers/node/fake_node_resources.go index f75ab9e..c1fd5d4 100644 --- a/internal/status-updater/handlers/node/fake_node_deployments.go +++ b/internal/status-updater/handlers/node/fake_node_resources.go @@ -5,7 +5,11 @@ import ( "fmt" "os" + "github.com/hashicorp/go-multierror" "github.com/run-ai/fake-gpu-operator/internal/common/constants" + kubeutil "github.com/run-ai/fake-gpu-operator/internal/common/util/kubernetes" + "github.com/spf13/viper" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -14,7 +18,7 @@ import ( "k8s.io/utils/ptr" ) -func (p *NodeHandler) applyFakeNodeDeployments(node *v1.Node) error { +func (p *NodeHandler) applyFakeNodeResources(node *v1.Node) error { if !isFakeNode(node) { return nil } @@ -25,33 +29,81 @@ func (p *NodeHandler) applyFakeNodeDeployments(node *v1.Node) error { } for _, deployment := range deployments { - err := p.applyDeployment(deployment) + err := kubeutil.ApplyDeployment(p.kubeClient, deployment) if err != nil { return fmt.Errorf("failed to apply deployment: %w", err) } } + dcgmExporterDummyPod := p.generateDcgmExporterDummyPod(node) + err = kubeutil.ApplyPod(p.kubeClient, dcgmExporterDummyPod) + if err != nil { + return fmt.Errorf("failed to apply dcgm-exporter dummy pod: %w", err) + } + return nil } -func (p *NodeHandler) deleteFakeNodeDeployments(node *v1.Node) error { +func (p *NodeHandler) deleteFakeNodeResources(node *v1.Node) error { if !isFakeNode(node) { return nil } - deployments, err := p.generateFakeNodeDeployments(node) - if err != nil { - return fmt.Errorf("failed to get fake node deployments: %w", err) + var multiErr error + + err := p.kubeClient.AppsV1().Deployments(os.Getenv(constants.EnvFakeGpuOperatorNs)).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", constants.LabelNodeName, node.Name), + }) + if err != nil && !errors.IsNotFound(err) { + multiErr = multierror.Append(multiErr, fmt.Errorf("failed to delete fake node deployments: %w", err)) } - for _, deployment := range deployments { - err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Delete(context.TODO(), deployment.Name, metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete deployment %s: %w", deployment.Name, err) - } + err = p.kubeClient.CoreV1().Pods(viper.GetString(constants.EnvFakeGpuOperatorNs)).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=true,%s=%s", constants.LabelDcgmExporterDummyPod, constants.LabelNodeName, node.Name), + }) + if err != nil && !errors.IsNotFound(err) { + multiErr = multierror.Append(multiErr, fmt.Errorf("failed to delete dcgm-exporter dummy pod: %w", err)) } - return nil + return multiErr +} + +func (p *NodeHandler) generateDcgmExporterDummyPod(node *v1.Node) v1.Pod { + return v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("nvidia-dcgm-exporter-%s", node.Name), + Namespace: viper.GetString(constants.EnvFakeGpuOperatorNs), + Labels: map[string]string{ + constants.LabelDcgmExporterDummyPod: "true", + constants.LabelNodeName: node.Name, + }, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: ptr.To(int64(0)), + NodeName: node.Name, + Containers: []v1.Container{ + { + Name: "sleeper", + Image: "busybox", + Command: []string{ + "sh", + "-c", + "sleep infinity", + }, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourceCPU: resource.MustParse("10m"), + }, + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("0Mi"), + v1.ResourceCPU: resource.MustParse("0m"), + }, + }, + }, + }, + }, + } } func (p *NodeHandler) generateFakeNodeDeployments(node *v1.Node) ([]appsv1.Deployment, error) { @@ -70,35 +122,12 @@ func (p *NodeHandler) generateFakeNodeDeployments(node *v1.Node) ([]appsv1.Deplo return deployments, nil } -func (p *NodeHandler) applyDeployment(deployment appsv1.Deployment) error { - existingDeployment, err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("failed to get deployment %s: %w", deployment.Name, err) - } - - if errors.IsNotFound(err) { - deployment.ResourceVersion = "" - _, err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err) - } - } else { - deployment.UID = existingDeployment.UID - deployment.ResourceVersion = existingDeployment.ResourceVersion - _, err := p.kubeClient.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), &deployment, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to update deployment %s: %w", deployment.Name, err) - } - } - - return nil -} - func generateFakeNodeDeploymentFromTemplate(template *appsv1.Deployment, node *v1.Node) *appsv1.Deployment { deployment := template.DeepCopy() delete(deployment.Labels, constants.LabelFakeNodeDeploymentTemplate) deployment.Name = fmt.Sprintf("%s-%s", deployment.Name, node.Name) + deployment.Labels[constants.LabelNodeName] = node.Name deployment.Spec.Replicas = ptr.To(int32(1)) deployment.Spec.Template.Spec.Containers[0].Env = append(deployment.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{ Name: constants.EnvNodeName, diff --git a/internal/status-updater/handlers/node/handler.go b/internal/status-updater/handlers/node/handler.go index f404278..bce573f 100644 --- a/internal/status-updater/handlers/node/handler.go +++ b/internal/status-updater/handlers/node/handler.go @@ -38,9 +38,9 @@ func (p *NodeHandler) HandleAdd(node *v1.Node) error { return fmt.Errorf("failed to create node topology ConfigMap: %w", err) } - err = p.applyFakeNodeDeployments(node) + err = p.applyFakeNodeResources(node) if err != nil { - return fmt.Errorf("failed to apply fake node deployments: %w", err) + return fmt.Errorf("failed to apply fake node resources: %w", err) } err = p.labelNode(node) @@ -59,7 +59,7 @@ func (p *NodeHandler) HandleDelete(node *v1.Node) error { return fmt.Errorf("failed to delete node topology: %w", err) } - err = p.deleteFakeNodeDeployments(node) + err = p.deleteFakeNodeResources(node) if err != nil { return fmt.Errorf("failed to delete fake node deployments: %w", err) }