From d81ff8db3a13e765d1b0cf59e42c10640d59e6d6 Mon Sep 17 00:00:00 2001 From: Erez Freiberger Date: Tue, 20 Aug 2024 16:18:10 +0300 Subject: [PATCH] mark configmap with kwok status to simplify kwok gdp --- internal/common/topology/kubernetes.go | 24 +++++++---- internal/kwok-gpu-device-plugin/app_test.go | 5 ++- .../controllers/configmap/controller.go | 40 ++----------------- .../handlers/configmap/handler.go | 11 ++--- .../handlers/node/topology_cm.go | 2 +- 5 files changed, 32 insertions(+), 50 deletions(-) diff --git a/internal/common/topology/kubernetes.go b/internal/common/topology/kubernetes.go index de612fa..b40603c 100644 --- a/internal/common/topology/kubernetes.go +++ b/internal/common/topology/kubernetes.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/viper" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apcorev1 "k8s.io/client-go/applyconfigurations/core/v1" "k8s.io/client-go/kubernetes" ) @@ -25,11 +26,17 @@ func GetNodeTopologyFromCM(kubeclient kubernetes.Interface, nodeName string) (*N return FromNodeTopologyCM(cm) } -func CreateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTopology, nodeName string) error { - cm, err := ToNodeTopologyCM(nodeTopology, nodeName) +func CreateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTopology, node *corev1.Node) error { + cm, _, err := ToNodeTopologyCM(nodeTopology, node.Name) if err != nil { return err } + if value, found := node.Annotations[constants.AnnotationKwokNode]; found { + if cm.Annotations == nil { + cm.Annotations = make(map[string]string) + } + cm.Annotations[constants.AnnotationKwokNode] = value + } _, err = kubeclient.CoreV1().ConfigMaps( viper.GetString(constants.EnvTopologyCmNamespace)).Create(context.TODO(), cm, metav1.CreateOptions{}) @@ -37,13 +44,13 @@ func CreateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTop } func UpdateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTopology, nodeName string) error { - cm, err := ToNodeTopologyCM(nodeTopology, nodeName) + _, cm, err := ToNodeTopologyCM(nodeTopology, nodeName) if err != nil { return err } _, err = kubeclient.CoreV1().ConfigMaps( - viper.GetString(constants.EnvTopologyCmNamespace)).Update(context.TODO(), cm, metav1.UpdateOptions{}) + viper.GetString(constants.EnvTopologyCmNamespace)).Apply(context.TODO(), cm, metav1.ApplyOptions{}) return err } @@ -108,7 +115,7 @@ func ToClusterTopologyCM(clusterTopology *ClusterTopology) (*corev1.ConfigMap, e return cm, nil } -func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.ConfigMap, error) { +func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.ConfigMap, *apcorev1.ConfigMapApplyConfiguration, error) { cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: GetNodeTopologyCMName(nodeName), @@ -120,15 +127,18 @@ func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.Conf }, Data: make(map[string]string), } + cmApplyConfig := apcorev1.ConfigMap(cm.Name, cm.Namespace).WithLabels(cm.Labels) topologyData, err := yaml.Marshal(nodeTopology) if err != nil { - return nil, err + return nil, nil, err } cm.Data[cmTopologyKey] = string(topologyData) - return cm, nil + cmApplyConfig = cmApplyConfig.WithData(cm.Data) + + return cm, cmApplyConfig, nil } func GetNodeTopologyCMName(nodeName string) string { diff --git a/internal/kwok-gpu-device-plugin/app_test.go b/internal/kwok-gpu-device-plugin/app_test.go index 781f6ee..46b9c3b 100644 --- a/internal/kwok-gpu-device-plugin/app_test.go +++ b/internal/kwok-gpu-device-plugin/app_test.go @@ -124,9 +124,12 @@ var _ = Describe("KwokGpuDevicePlugin", func() { {ID: "fake-gpu-id-4", Status: topology.GpuStatus{}}, }, } - cm, err := topology.ToNodeTopologyCM(&nodeTopology, node1.Name) + cm, _, err := topology.ToNodeTopologyCM(&nodeTopology, node1.Name) Expect(err).ToNot(HaveOccurred()) cm.Namespace = gpuOperatorNamespace + cm.Annotations = map[string]string{ + constants.AnnotationKwokNode: "fake", + } _, err = kubeClient.CoreV1().ConfigMaps(gpuOperatorNamespace).Create(context.TODO(), cm, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go b/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go index d2e57e8..cec9719 100644 --- a/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go +++ b/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go @@ -1,9 +1,7 @@ package configmamp import ( - "context" "log" - "time" "github.com/run-ai/fake-gpu-operator/internal/common/constants" "github.com/run-ai/fake-gpu-operator/internal/common/topology" @@ -13,7 +11,6 @@ import ( cmhandler "github.com/run-ai/fake-gpu-operator/internal/kwok-gpu-device-plugin/handlers/configmap" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/informers" @@ -21,11 +18,6 @@ import ( "k8s.io/client-go/tools/cache" ) -const ( - maxRetryCount = 10 - baseRetryDelay = time.Millisecond * 100 -) - type ConfigMapController struct { kubeClient kubernetes.Interface cmInformer cache.SharedIndexInformer @@ -69,7 +61,7 @@ func NewConfigMapController( Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { go func() { - c.callConfigMapHandler(obj.(*v1.ConfigMap), 0) + util.LogErrorIfExist(c.handler.HandleAdd(obj.(*v1.ConfigMap)), "Failed to handle cm addition") }() }, }, @@ -87,37 +79,13 @@ func (c *ConfigMapController) Run(stopCh <-chan struct{}) { } func (c *ConfigMapController) isFakeGpuKWOKNodeConfigMap(cm *v1.ConfigMap) bool { - if cm == nil || cm.Labels == nil { + if cm == nil || cm.Labels == nil || cm.Annotations == nil { return false } - nodeName, foundNodeName := cm.Labels[constants.LabelTopologyCMNodeName] + _, foundNodeName := cm.Labels[constants.LabelTopologyCMNodeName] if !foundNodeName { return false } - node, err := c.nodeLister.Get(nodeName) - if err != nil { - node, err = c.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - log.Printf("Failed to get node %s: %v", nodeName, err) - return false - } - } - - return node.Annotations[constants.AnnotationKwokNode] == "fake" -} - -func (c *ConfigMapController) callConfigMapHandler(cm *v1.ConfigMap, retryCount int) { - nodeName := cm.Labels[constants.LabelTopologyCMNodeName] - node, err := c.nodeLister.Get(nodeName) - if err != nil { - delay := baseRetryDelay * (1 << retryCount) - log.Printf("Failed to get node %s: %v. retry in %v", nodeName, err, delay) - time.Sleep(delay) - if retryCount < maxRetryCount { - c.callConfigMapHandler(cm, retryCount+1) - } - return - } - util.LogErrorIfExist(c.handler.HandleAdd(cm, node), "Failed to handle cm addition") + return cm.Annotations[constants.AnnotationKwokNode] == "fake" } diff --git a/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go b/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go index 92683d6..3c342f2 100644 --- a/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go +++ b/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go @@ -14,7 +14,7 @@ import ( ) type Interface interface { - HandleAdd(cm *v1.ConfigMap, node *v1.Node) error + HandleAdd(cm *v1.ConfigMap) error } type ConfigMapHandler struct { @@ -32,24 +32,25 @@ func NewConfigMapHandler(kubeClient kubernetes.Interface, clusterTopology *topol } } -func (p *ConfigMapHandler) HandleAdd(cm *v1.ConfigMap, node *v1.Node) error { +func (p *ConfigMapHandler) HandleAdd(cm *v1.ConfigMap) error { log.Printf("Handling config map addition: %s\n", cm.Name) nodeTopology, err := topology.FromNodeTopologyCM(cm) if err != nil { return fmt.Errorf("failed to read node topology ConfigMap: %w", err) } + nodeName := cm.Labels[constants.LabelTopologyCMNodeName] - return p.applyFakeDevicePlugin(len(nodeTopology.Gpus), node) + return p.applyFakeDevicePlugin(len(nodeTopology.Gpus), nodeName) } -func (p *ConfigMapHandler) applyFakeDevicePlugin(gpuCount int, node *v1.Node) error { +func (p *ConfigMapHandler) applyFakeDevicePlugin(gpuCount int, nodeName string) error { patch := fmt.Sprintf( `{"status": {"capacity": {"%s": "%d"}, "allocatable": {"%s": "%d"}}}`, constants.GpuResourceName, gpuCount, constants.GpuResourceName, gpuCount, ) _, err := p.kubeClient.CoreV1().Nodes().Patch( - context.TODO(), node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}, "status", + context.TODO(), nodeName, types.MergePatchType, []byte(patch), metav1.PatchOptions{}, "status", ) if err != nil { return fmt.Errorf("failed to update node capacity and allocatable: %v", err) diff --git a/internal/status-updater/handlers/node/topology_cm.go b/internal/status-updater/handlers/node/topology_cm.go index e6a4067..44c707c 100644 --- a/internal/status-updater/handlers/node/topology_cm.go +++ b/internal/status-updater/handlers/node/topology_cm.go @@ -31,7 +31,7 @@ func (p *NodeHandler) createNodeTopologyCM(node *v1.Node) error { MigStrategy: p.clusterTopology.MigStrategy, } - err := topology.CreateNodeTopologyCM(p.kubeClient, nodeTopology, node.Name) + err := topology.CreateNodeTopologyCM(p.kubeClient, nodeTopology, node) if err != nil { return fmt.Errorf("failed to create node topology: %w", err) }