Skip to content

Commit

Permalink
adding kwok dp app test
Browse files Browse the repository at this point in the history
  • Loading branch information
enoodle committed Aug 14, 2024
1 parent 97057e4 commit 5cafdda
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cmd/kwok-gpu-device-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ func main() {
requiredEnvVars := []string{constants.EnvTopologyCmName, constants.EnvTopologyCmNamespace, constants.EnvFakeGpuOperatorNs}
config.ValidateConfig(requiredEnvVars)

appRunner := app.NewAppRunner(&status_updater.StatusUpdaterApp{})
appRunner := app.NewAppRunner(&status_updater.KWOKDevicePluginApp{})
appRunner.Run()
}
28 changes: 4 additions & 24 deletions internal/kwok-gpu-device-plugin/app.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kwokgdp

import (
"sync"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -24,39 +22,27 @@ var DynamicClientFn = func(c *rest.Config) dynamic.Interface {
return dynamic.NewForConfigOrDie(c)
}

type StatusUpdaterAppConfiguration struct {
TopologyCmName string `mapstructure:"TOPOLOGY_CM_NAME" validate:"required"`
TopologyCmNamespace string `mapstructure:"TOPOLOGY_CM_NAMESPACE" validate:"required"`
}

type StatusUpdaterApp struct {
type KWOKDevicePluginApp struct {
Controllers []controllers.Interface
kubeClient kubernetes.Interface
stopCh chan struct{}
wg *sync.WaitGroup
}

func (app *StatusUpdaterApp) Run() {
app.wg.Add(len(app.Controllers))
func (app *KWOKDevicePluginApp) Run() {
for _, controller := range app.Controllers {
go func(controller controllers.Interface) {
defer app.wg.Done()
controller.Run(app.stopCh)
}(controller)
}

app.wg.Wait()
}

func (app *StatusUpdaterApp) Init(stopCh chan struct{}) {
func (app *KWOKDevicePluginApp) Init(stopCh chan struct{}) {
app.stopCh = stopCh

clusterConfig := InClusterConfigFn()
clusterConfig.QPS = 100
clusterConfig.Burst = 200

app.wg = &sync.WaitGroup{}

app.kubeClient = KubeClientFn(clusterConfig)

app.Controllers = append(
Expand All @@ -66,12 +52,6 @@ func (app *StatusUpdaterApp) Init(stopCh chan struct{}) {
)
}

func (app *StatusUpdaterApp) Name() string {
func (app *KWOKDevicePluginApp) Name() string {
return "StatusUpdater"
}

func (app *StatusUpdaterApp) GetConfig() interface{} {
var config StatusUpdaterAppConfiguration

return config
}
143 changes: 143 additions & 0 deletions internal/kwok-gpu-device-plugin/app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package kwokgdp

import (
"context"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/viper"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"github.com/run-ai/fake-gpu-operator/internal/common/constants"
"github.com/run-ai/fake-gpu-operator/internal/common/topology"
cmcontroller "github.com/run-ai/fake-gpu-operator/internal/kwok-gpu-device-plugin/controllers/configmap"
"github.com/run-ai/fake-gpu-operator/internal/status-updater/controllers"
)

const (
gpuOperatorNamespace = "gpu-operator"
nodePoolLabelKey = "run.ai/node-pool"
defaultNodePoolName = "default"
)

func TestKwokGpuDevicePlugin(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "KwokGpuDevicePlugin Suite")
}

var _ = Describe("KwokGpuDevicePlugin", func() {
var (
app *KWOKDevicePluginApp
kubeClient kubernetes.Interface
stopChan chan struct{}
wg *sync.WaitGroup
)

BeforeEach(func() {
clusterTopology := topology.ClusterTopology{
NodePoolLabelKey: nodePoolLabelKey,
NodePools: map[string]topology.NodePoolTopology{
defaultNodePoolName: {
GpuCount: 4,
GpuMemory: 1000,
GpuProduct: "nvidia-tesla-t4",
},
},
MigStrategy: "none",
}
clusterTopologyCM, err := topology.ToClusterTopologyCM(&clusterTopology)
Expect(err).ToNot(HaveOccurred())
clusterTopologyCM.Name = "cluster-topology"
clusterTopologyCM.Namespace = gpuOperatorNamespace

kubeClient = fake.NewSimpleClientset(clusterTopologyCM)
stopChan = make(chan struct{})

viper.SetDefault(constants.EnvTopologyCmName, clusterTopologyCM.Name)
viper.SetDefault(constants.EnvTopologyCmNamespace, gpuOperatorNamespace)

app = &KWOKDevicePluginApp{
Controllers: []controllers.Interface{
cmcontroller.NewConfigMapController(
kubeClient, gpuOperatorNamespace,
),
},
kubeClient: kubeClient,
stopCh: stopChan,
}
wg = &sync.WaitGroup{}
go func() {
wg.Add(1)
app.Run()
wg.Done()
}()
})

AfterEach(func() {
close(stopChan)
wg.Wait()
})

Context("app", func() {
It("should run until channel is closed", func() {})

Context("ConfigMap", func() {
It("should handle new Config Map without node labels", func() {
kubeClient.CoreV1().ConfigMaps(gpuOperatorNamespace).Create(context.TODO(), &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "configmap1",
Namespace: gpuOperatorNamespace,
},
}, metav1.CreateOptions{})
})

It("should add gpu devices to kwok nodes by configmap data", func() {
node1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: map[string]string{
nodePoolLabelKey: defaultNodePoolName,
},
Annotations: map[string]string{
constants.AnnotationKwokNode: "fake",
},
},
}
kubeClient.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})

nodeTopology := topology.NodeTopology{
GpuMemory: 1000,
GpuProduct: "nvidia-tesla-t4",
Gpus: []topology.GpuDetails{
{ID: "fake-gpu-id-1", Status: topology.GpuStatus{}},
{ID: "fake-gpu-id-2", Status: topology.GpuStatus{}},
{ID: "fake-gpu-id-3", Status: topology.GpuStatus{}},
{ID: "fake-gpu-id-4", Status: topology.GpuStatus{}},
},
}
cm, err := topology.ToNodeTopologyCM(&nodeTopology, node1.Name)
Expect(err).ToNot(HaveOccurred())
cm.Namespace = gpuOperatorNamespace

kubeClient.CoreV1().ConfigMaps(gpuOperatorNamespace).Create(context.TODO(), cm, metav1.CreateOptions{})

Eventually(func() bool {
node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), node1.Name, metav1.GetOptions{})
if err != nil {
return false
}

gpuQuantity := node.Status.Capacity[constants.GpuResourceName]
return gpuQuantity.Value() == int64(4)
}, 2*time.Second, 100*time.Millisecond).Should(BeTrue())
})
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewConfigMapController(
kubeClient: kubeClient,
cmInformer: informerFactory.Core().V1().ConfigMaps().Informer(),
nodeLister: informerFactory.Core().V1().Nodes().Lister(),
informerFactory: informerFactory,
handler: cmhandler.NewConfigMapHandler(kubeClient, clusterTopology),
clusterTopology: clusterTopology,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/kwok-gpu-device-plugin/handlers/configmap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func NewConfigMapHandler(kubeClient kubernetes.Interface, clusterTopology *topol
}

func (p *ConfigMapHandler) HandleAdd(cm *v1.ConfigMap, node *v1.Node) error {
log.Printf("Handling node addition: %s\n", cm.Name)
log.Printf("Handling config map addition: %s\n", cm.Name)

nodeTopology, err := topology.FromNodeTopologyCM(cm)
if err != nil {
return fmt.Errorf("failed to create node topology ConfigMap: %w", err)
return fmt.Errorf("failed to read node topology ConfigMap: %w", err)
}

return p.applyFakeDevicePlugin(len(nodeTopology.Gpus), node)
Expand Down

0 comments on commit 5cafdda

Please sign in to comment.