Skip to content

Commit

Permalink
Merge pull request #206 from grycap/devel
Browse files Browse the repository at this point in the history
Added image cache functionality
  • Loading branch information
catttam authored Jul 10, 2023
2 parents 3305734 + 26e0490 commit edc9611
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 4 deletions.
11 changes: 11 additions & 0 deletions deploy/yaml/oscar-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ rules:
- create
- delete
- update
- apiGroups:
- apps
resources:
- daemonsets
verbs:
- get
- list
- watch
- create
- delete
- update
- apiGroups:
- batch
resources:
Expand Down
1 change: 1 addition & 0 deletions docs/fdl.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ storage_providers:
| `memory` </br> *string* | Memory limit for the service following the [kubernetes format](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-memory). Optional (default: 256Mi) |
| `cpu` </br> *string* | CPU limit for the service following the [kubernetes format](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). Optional (default: 0.2) |
| `enable_gpu` </br> *bool* | Parameter to enable the use of GPU for the service. Requires a device plugin deployed on the cluster (More info: [Kubernetes device plugins](https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/#using-device-plugins)). Optional (default: false) |
| `image_prefetch` </br> *bool* | Parameter to enable the use of image caching. Optional (default: false) |
| `total_memory` </br> *string* | Limit for the memory used by all the service's jobs running simultaneously. Apache YuniKorn scheduler is required to work. Same format as Memory, but internally translated to MB (integer). Optional (default: "") |
| `total_cpu` </br> *string* | Limit for the virtual CPUs used by all the service's jobs running simultaneously. Apache YuniKorn scheduler is required to work. Same format as CPU, but internally translated to millicores (integer). Optional (default: "") |
| `synchronous` </br> *[SynchronousSettings](#synchronoussettings)* | Struct to configure specific sync parameters. This settings are only applied on Knative ServerlessBackend. Optional. |
Expand Down
9 changes: 9 additions & 0 deletions pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"log"

"github.com/goccy/go-yaml"
"github.com/grycap/oscar/v2/pkg/imagepuller"
"github.com/grycap/oscar/v2/pkg/types"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -112,6 +113,14 @@ func (k *KubeBackend) CreateService(service types.Service) error {
return err
}

//Create deaemonset to cache the service image on all the nodes
if service.ImagePrefetch {
err = imagepuller.CreateDaemonset(k.config, service, k.kubeClientset)
if err != nil {
return err
}
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/backends/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ func TestKubeListServices(t *testing.T) {
})
}

// Test temporarily disabled to be able to use the image cache feature

func TestKubeCreateService(t *testing.T) {
testService := types.Service{
Name: "test",
Expand Down
9 changes: 9 additions & 0 deletions pkg/backends/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"strconv"

"github.com/grycap/oscar/v2/pkg/imagepuller"
"github.com/grycap/oscar/v2/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -118,6 +119,14 @@ func (kn *KnativeBackend) CreateService(service types.Service) error {
return err
}

//Create deaemonset to cache the service image on all the nodes
if service.ImagePrefetch {
err = imagepuller.CreateDaemonset(kn.config, service, kn.kubeClientset)
if err != nil {
return err
}
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/handlers/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/grycap/oscar/v2/pkg/backends"
Expand Down Expand Up @@ -44,6 +45,7 @@ func (GinResponseRecorder) Flush() {

func TestMakeRunHandler(t *testing.T) {
back := backends.MakeFakeSyncBackend()
http.DefaultClient.Timeout = 400 * time.Second
r := gin.Default()
r.POST("/run/:serviceName", MakeRunHandler(&testConfigValidRun, back))

Expand All @@ -62,6 +64,7 @@ func TestMakeRunHandler(t *testing.T) {
t.Run(s.name, func(t *testing.T) {
w := httptest.NewRecorder()
serviceName := "test"

req, _ := http.NewRequest("POST", "/run/"+serviceName, nil)
req.Header.Set("Authorization", "Bearer AbCdEf123456")

Expand Down
200 changes: 200 additions & 0 deletions pkg/imagepuller/daemonset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
Copyright (C) GRyCAP - I3M - UPV
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package imagepuller

//TODO check error control

import (
//"k8s.io/apimachinery/pkg/watch"
"context"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"

"github.com/grycap/oscar/v2/pkg/types"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

var DaemonSetLoggerInfo = log.New(os.Stdout, "[DAEMONSET-INFO] ", log.Flags())

const letterBytes = "abcdefghijklmnopqrstuvwxyz"
const lengthStr = 5

var podGroup string
var daemonsetName string

var workingNodes int

type PodCounter struct {
wnCount int
mutex sync.Mutex
}

var pc PodCounter
var stopper chan struct{}

//Create daemonset
func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kubernetes.Interface) error {

//Set needed variables
setWorkingNodes(kubeClientset)
podGroup = generatePodGroupName()
daemonsetName = "image-puller-" + service.Name

//Get daemonset definition
daemon := getDaemonset(cfg, service)

//Create daemonset
_, err := kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Create(context.TODO(), daemon, metav1.CreateOptions{})
if err != nil {
DaemonSetLoggerInfo.Println(err)
return fmt.Errorf("failed to create daemonset: %s", err.Error())
} else {
DaemonSetLoggerInfo.Println("Created daemonset for service:", service.Name)
}

//Set watcher informer
watchPods(kubeClientset, cfg)

return nil
}

//Get daemonset definition
func getDaemonset(cfg *types.Config, service types.Service) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: cfg.ServicesNamespace,
},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"oscar-resource": "daemonset",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"oscar-resource": "daemonset",
"pod-group": podGroup,
},
Name: "podpuller",
},
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{},
ImagePullSecrets: types.SetImagePullSecrets(service.ImagePullSecrets),
Containers: []corev1.Container{
{
Name: "image-puller",
Image: service.Image,
Command: []string{"/bin/sh", "-c", "sleep 1h"},
},
},
},
},
},
}
}

//Watch pods with a Kubernetes Informer
func watchPods(kubeClientset kubernetes.Interface, cfg *types.Config) {
stopper = make(chan struct{})
defer close(stopper)

pc = PodCounter{wnCount: 0}

var optionsFunc internalinterfaces.TweakListOptionsFunc = func(options *metav1.ListOptions) {
labelSelector := labels.Set{
"pod-group": podGroup,
}.AsSelector()
options.LabelSelector = labelSelector.String()
}

sharedInformerOp := informers.WithTweakListOptions(optionsFunc)

factory := informers.NewSharedInformerFactoryWithOptions(kubeClientset, 2*time.Second, informers.WithNamespace(cfg.ServicesNamespace), sharedInformerOp)
//factory := informers.NewSharedInformerFactory(kubeClientset, 2*time.Second)

podInformer := factory.Core().V1().Pods().Informer()
factory.Start(stopper)

//Wait for all the selected resources to be added to the cache
state := cache.WaitForCacheSync(stopper, podInformer.HasSynced)
if !state {
log.Fatalf("Failed to sync informer cache")
}

//Add event handler that gets all the pods status
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: handleUpdatePodEvent,
})

<-stopper

//Delete daemonset when all pods are in state "Running"
DaemonSetLoggerInfo.Println("Deleting daemonset...")
err := kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Delete(context.TODO(), daemonsetName, metav1.DeleteOptions{})
if err != nil {
DaemonSetLoggerInfo.Println(err)
log.Fatalf("Failed to delete daemonset: %s", err.Error())
} else {
DaemonSetLoggerInfo.Println("Deleted daemonset")
}
}

func handleUpdatePodEvent(oldObj interface{}, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
if newPod.Status.Phase == corev1.PodRunning {
pc.mutex.Lock()
defer pc.mutex.Unlock()
pc.wnCount++
//Check the running pods count and stop the informer
if pc.wnCount >= workingNodes {
stopper <- struct{}{}
}
}
}

func setWorkingNodes(kubeClientset kubernetes.Interface) error {
nodes, err := kubeClientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "!node-role.kubernetes.io/control-plane,!node-role.kubernetes.io/master"})
if err != nil {
return fmt.Errorf("error getting node list: %v", err)
}

for range nodes.Items {
workingNodes++
}
return nil
}

func generatePodGroupName() string {
b := make([]byte, lengthStr)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return "pod-group-" + string(b)
}
10 changes: 7 additions & 3 deletions pkg/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ type Service struct {
// Optional. (default: "")
TotalCPU string `json:"total_cpu"`

// GPU parameter to request gpu usage in service's executions (synchronous and asynchronous)
// EnableGPU parameter to request gpu usage in service's executions (synchronous and asynchronous)
// Optional. (default: false)
EnableGPU bool `json:"enable_gpu"`

// ImagePrefetch parameter to enable the image cache functionality
// Optional. (default: false)
ImagePrefetch bool `json:"image_prefetch"`

// Synchronous struct to configure specific sync parameters
// Only Knative ServerlessBackend applies this settings
// Optional.
Expand Down Expand Up @@ -231,7 +235,7 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) {
}

podSpec := &v1.PodSpec{
ImagePullSecrets: setImagePullSecrets(service.ImagePullSecrets),
ImagePullSecrets: SetImagePullSecrets(service.ImagePullSecrets),
Containers: []v1.Container{
{
Name: ContainerName,
Expand Down Expand Up @@ -306,7 +310,7 @@ func convertEnvVars(vars map[string]string) []v1.EnvVar {
return envVars
}

func setImagePullSecrets(secrets []string) []v1.LocalObjectReference {
func SetImagePullSecrets(secrets []string) []v1.LocalObjectReference {
objects := []v1.LocalObjectReference{}
for _, s := range secrets {
objects = append(objects, v1.LocalObjectReference{
Expand Down
3 changes: 2 additions & 1 deletion pkg/types/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestSetImagePullSecrets(t *testing.T) {
{Name: "testcred1"},
}

result := setImagePullSecrets(secrets)
result := SetImagePullSecrets(secrets)
if result[0].Name != expected[0].Name {
t.Errorf("invalid conversion of local object. Expected: %v, got %v", expected, result)
}
Expand All @@ -215,6 +215,7 @@ cpu: "1.0"
total_memory: ""
total_cpu: ""
enable_gpu: false
image_prefetch: false
synchronous:
min_scale: 0
max_scale: 0
Expand Down

0 comments on commit edc9611

Please sign in to comment.