Skip to content

Commit

Permalink
mark configmap with kwok status to simplify kwok gdp
Browse files Browse the repository at this point in the history
  • Loading branch information
enoodle committed Aug 20, 2024
1 parent d2f27b5 commit 1e69249
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 44 deletions.
24 changes: 17 additions & 7 deletions internal/common/topology/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apcorev1 "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -25,25 +26,31 @@ func GetNodeTopologyFromCM(kubeclient kubernetes.Interface, nodeName string) (*N
return FromNodeTopologyCM(cm)
}

func CreateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTopology, nodeName string) error {
cm, err := ToNodeTopologyCM(nodeTopology, nodeName)
func CreateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTopology, node *corev1.Node) error {
cm, _, err := ToNodeTopologyCM(nodeTopology, node.Name)
if err != nil {
return err
}
if value, found := node.Annotations[constants.AnnotationKwokNode]; found {
if cm.Annotations == nil {
cm.Annotations = make(map[string]string)
}
cm.Annotations[constants.AnnotationKwokNode] = value
}

_, err = kubeclient.CoreV1().ConfigMaps(
viper.GetString(constants.EnvTopologyCmNamespace)).Create(context.TODO(), cm, metav1.CreateOptions{})
return err
}

func UpdateNodeTopologyCM(kubeclient kubernetes.Interface, nodeTopology *NodeTopology, nodeName string) error {
cm, err := ToNodeTopologyCM(nodeTopology, nodeName)
_, cm, err := ToNodeTopologyCM(nodeTopology, nodeName)
if err != nil {
return err
}

_, err = kubeclient.CoreV1().ConfigMaps(
viper.GetString(constants.EnvTopologyCmNamespace)).Update(context.TODO(), cm, metav1.UpdateOptions{})
viper.GetString(constants.EnvTopologyCmNamespace)).Apply(context.TODO(), cm, metav1.ApplyOptions{})
return err
}

Expand Down Expand Up @@ -108,7 +115,7 @@ func ToClusterTopologyCM(clusterTopology *ClusterTopology) (*corev1.ConfigMap, e
return cm, nil
}

func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.ConfigMap, error) {
func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.ConfigMap, *apcorev1.ConfigMapApplyConfiguration, error) {
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: GetNodeTopologyCMName(nodeName),
Expand All @@ -120,15 +127,18 @@ func ToNodeTopologyCM(nodeTopology *NodeTopology, nodeName string) (*corev1.Conf
},
Data: make(map[string]string),
}
cmApplyConfig := apcorev1.ConfigMap(cm.Name, cm.Namespace).WithLabels(cm.Labels)

topologyData, err := yaml.Marshal(nodeTopology)
if err != nil {
return nil, err
return nil, nil, err
}

cm.Data[cmTopologyKey] = string(topologyData)

return cm, nil
cmApplyConfig = cmApplyConfig.WithData(cm.Data)

return cm, cmApplyConfig, nil
}

func GetNodeTopologyCMName(nodeName string) string {
Expand Down
5 changes: 4 additions & 1 deletion internal/kwok-gpu-device-plugin/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ var _ = Describe("KwokGpuDevicePlugin", func() {
{ID: "fake-gpu-id-4", Status: topology.GpuStatus{}},
},
}
cm, err := topology.ToNodeTopologyCM(&nodeTopology, node1.Name)
cm, _, err := topology.ToNodeTopologyCM(&nodeTopology, node1.Name)
Expect(err).ToNot(HaveOccurred())
cm.Namespace = gpuOperatorNamespace
cm.Annotations = map[string]string{
constants.AnnotationKwokNode: "fake",
}

_, err = kubeClient.CoreV1().ConfigMaps(gpuOperatorNamespace).Create(context.TODO(), cm, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package configmamp

import (
"context"
"log"
"time"

Expand All @@ -13,7 +12,6 @@ import (
cmhandler "github.com/run-ai/fake-gpu-operator/internal/kwok-gpu-device-plugin/handlers/configmap"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
listersv1 "k8s.io/client-go/listers/core/v1"

"k8s.io/client-go/informers"
Expand Down Expand Up @@ -69,7 +67,7 @@ func NewConfigMapController(
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
go func() {
c.callConfigMapHandler(obj.(*v1.ConfigMap), 0)
util.LogErrorIfExist(c.handler.HandleAdd(obj.(*v1.ConfigMap)), "Failed to handle cm addition")
}()
},
},
Expand All @@ -87,37 +85,13 @@ func (c *ConfigMapController) Run(stopCh <-chan struct{}) {
}

func (c *ConfigMapController) isFakeGpuKWOKNodeConfigMap(cm *v1.ConfigMap) bool {
if cm == nil || cm.Labels == nil {
if cm == nil || cm.Labels == nil || cm.Annotations == nil {
return false
}
nodeName, foundNodeName := cm.Labels[constants.LabelTopologyCMNodeName]
_, foundNodeName := cm.Labels[constants.LabelTopologyCMNodeName]
if !foundNodeName {
return false
}

node, err := c.nodeLister.Get(nodeName)
if err != nil {
node, err = c.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
log.Printf("Failed to get node %s: %v", nodeName, err)
return false
}
}

return node.Annotations[constants.AnnotationKwokNode] == "fake"
}

func (c *ConfigMapController) callConfigMapHandler(cm *v1.ConfigMap, retryCount int) {
nodeName := cm.Labels[constants.LabelTopologyCMNodeName]
node, err := c.nodeLister.Get(nodeName)
if err != nil {
delay := baseRetryDelay * (1 << retryCount)
log.Printf("Failed to get node %s: %v. retry in %v", nodeName, err, delay)
time.Sleep(delay)
if retryCount < maxRetryCount {
c.callConfigMapHandler(cm, retryCount+1)
}
return
}
util.LogErrorIfExist(c.handler.HandleAdd(cm, node), "Failed to handle cm addition")
return cm.Annotations[constants.AnnotationKwokNode] == "fake"
}
11 changes: 6 additions & 5 deletions internal/kwok-gpu-device-plugin/handlers/configmap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type Interface interface {
HandleAdd(cm *v1.ConfigMap, node *v1.Node) error
HandleAdd(cm *v1.ConfigMap) error
}

type ConfigMapHandler struct {
Expand All @@ -32,24 +32,25 @@ func NewConfigMapHandler(kubeClient kubernetes.Interface, clusterTopology *topol
}
}

func (p *ConfigMapHandler) HandleAdd(cm *v1.ConfigMap, node *v1.Node) error {
func (p *ConfigMapHandler) HandleAdd(cm *v1.ConfigMap) error {
log.Printf("Handling config map addition: %s\n", cm.Name)

nodeTopology, err := topology.FromNodeTopologyCM(cm)
if err != nil {
return fmt.Errorf("failed to read node topology ConfigMap: %w", err)
}
nodeName := cm.Labels[constants.LabelTopologyCMNodeName]

return p.applyFakeDevicePlugin(len(nodeTopology.Gpus), node)
return p.applyFakeDevicePlugin(len(nodeTopology.Gpus), nodeName)
}

func (p *ConfigMapHandler) applyFakeDevicePlugin(gpuCount int, node *v1.Node) error {
func (p *ConfigMapHandler) applyFakeDevicePlugin(gpuCount int, nodeName string) error {
patch := fmt.Sprintf(
`{"status": {"capacity": {"%s": "%d"}, "allocatable": {"%s": "%d"}}}`,
constants.GpuResourceName, gpuCount, constants.GpuResourceName, gpuCount,
)
_, err := p.kubeClient.CoreV1().Nodes().Patch(
context.TODO(), node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}, "status",
context.TODO(), nodeName, types.MergePatchType, []byte(patch), metav1.PatchOptions{}, "status",
)
if err != nil {
return fmt.Errorf("failed to update node capacity and allocatable: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/status-updater/handlers/node/topology_cm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *NodeHandler) createNodeTopologyCM(node *v1.Node) error {
MigStrategy: p.clusterTopology.MigStrategy,
}

err := topology.CreateNodeTopologyCM(p.kubeClient, nodeTopology, node.Name)
err := topology.CreateNodeTopologyCM(p.kubeClient, nodeTopology, node)
if err != nil {
return fmt.Errorf("failed to create node topology: %w", err)
}
Expand Down

0 comments on commit 1e69249

Please sign in to comment.