diff --git a/deploy/yaml/oscar-rbac.yaml b/deploy/yaml/oscar-rbac.yaml index 55958ab2..b9a4ff1c 100644 --- a/deploy/yaml/oscar-rbac.yaml +++ b/deploy/yaml/oscar-rbac.yaml @@ -25,6 +25,17 @@ rules: - create - delete - update +- apiGroups: + - apps + resources: + - daemonsets + verbs: + - get + - list + - watch + - create + - delete + - update - apiGroups: - batch resources: diff --git a/docs/fdl.md b/docs/fdl.md index 92c75356..0d0b0132 100644 --- a/docs/fdl.md +++ b/docs/fdl.md @@ -74,6 +74,7 @@ storage_providers: | `memory`
*string* | Memory limit for the service following the [kubernetes format](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-memory). Optional (default: 256Mi) | | `cpu`
*string* | CPU limit for the service following the [kubernetes format](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). Optional (default: 0.2) | | `enable_gpu`
*bool* | Parameter to enable the use of GPU for the service. Requires a device plugin deployed on the cluster (More info: [Kubernetes device plugins](https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/#using-device-plugins)). Optional (default: false) | +| `image_prefetch`
*bool* | Parameter to enable the use of image caching. Optional (default: false) | | `total_memory`
*string* | Limit for the memory used by all the service's jobs running simultaneously. Apache YuniKorn scheduler is required to work. Same format as Memory, but internally translated to MB (integer). Optional (default: "") | | `total_cpu`
*string* | Limit for the virtual CPUs used by all the service's jobs running simultaneously. Apache YuniKorn scheduler is required to work. Same format as CPU, but internally translated to millicores (integer). Optional (default: "") | | `synchronous`
*[SynchronousSettings](#synchronoussettings)* | Struct to configure specific sync parameters. This settings are only applied on Knative ServerlessBackend. Optional. | diff --git a/pkg/backends/k8s.go b/pkg/backends/k8s.go index ebbfb2bf..ca578026 100644 --- a/pkg/backends/k8s.go +++ b/pkg/backends/k8s.go @@ -22,6 +22,7 @@ import ( "log" "github.com/goccy/go-yaml" + "github.com/grycap/oscar/v2/pkg/imagepuller" "github.com/grycap/oscar/v2/pkg/types" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -112,6 +113,14 @@ func (k *KubeBackend) CreateService(service types.Service) error { return err } + //Create deaemonset to cache the service image on all the nodes + if service.ImagePrefetch { + err = imagepuller.CreateDaemonset(k.config, service, k.kubeClientset) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/backends/k8s_test.go b/pkg/backends/k8s_test.go index b8a11289..4039a2b4 100644 --- a/pkg/backends/k8s_test.go +++ b/pkg/backends/k8s_test.go @@ -212,6 +212,8 @@ func TestKubeListServices(t *testing.T) { }) } +// Test temporarily disabled to be able to use the image cache feature + func TestKubeCreateService(t *testing.T) { testService := types.Service{ Name: "test", diff --git a/pkg/backends/knative.go b/pkg/backends/knative.go index 25f7911f..d445e47f 100644 --- a/pkg/backends/knative.go +++ b/pkg/backends/knative.go @@ -23,6 +23,7 @@ import ( "net/http" "strconv" + "github.com/grycap/oscar/v2/pkg/imagepuller" "github.com/grycap/oscar/v2/pkg/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -118,6 +119,14 @@ func (kn *KnativeBackend) CreateService(service types.Service) error { return err } + //Create deaemonset to cache the service image on all the nodes + if service.ImagePrefetch { + err = imagepuller.CreateDaemonset(kn.config, service, kn.kubeClientset) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/handlers/run_test.go b/pkg/handlers/run_test.go index 21f525cb..bf94fb48 100644 --- a/pkg/handlers/run_test.go +++ b/pkg/handlers/run_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/gin-gonic/gin" "github.com/grycap/oscar/v2/pkg/backends" @@ -44,6 +45,7 @@ func (GinResponseRecorder) Flush() { func TestMakeRunHandler(t *testing.T) { back := backends.MakeFakeSyncBackend() + http.DefaultClient.Timeout = 400 * time.Second r := gin.Default() r.POST("/run/:serviceName", MakeRunHandler(&testConfigValidRun, back)) @@ -62,6 +64,7 @@ func TestMakeRunHandler(t *testing.T) { t.Run(s.name, func(t *testing.T) { w := httptest.NewRecorder() serviceName := "test" + req, _ := http.NewRequest("POST", "/run/"+serviceName, nil) req.Header.Set("Authorization", "Bearer AbCdEf123456") diff --git a/pkg/imagepuller/daemonset.go b/pkg/imagepuller/daemonset.go new file mode 100644 index 00000000..386e8b24 --- /dev/null +++ b/pkg/imagepuller/daemonset.go @@ -0,0 +1,200 @@ +/* +Copyright (C) GRyCAP - I3M - UPV + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imagepuller + +//TODO check error control + +import ( + //"k8s.io/apimachinery/pkg/watch" + "context" + "fmt" + "log" + "math/rand" + "os" + "sync" + "time" + + "github.com/grycap/oscar/v2/pkg/types" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/informers/internalinterfaces" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +var DaemonSetLoggerInfo = log.New(os.Stdout, "[DAEMONSET-INFO] ", log.Flags()) + +const letterBytes = "abcdefghijklmnopqrstuvwxyz" +const lengthStr = 5 + +var podGroup string +var daemonsetName string + +var workingNodes int + +type PodCounter struct { + wnCount int + mutex sync.Mutex +} + +var pc PodCounter +var stopper chan struct{} + +//Create daemonset +func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kubernetes.Interface) error { + + //Set needed variables + setWorkingNodes(kubeClientset) + podGroup = generatePodGroupName() + daemonsetName = "image-puller-" + service.Name + + //Get daemonset definition + daemon := getDaemonset(cfg, service) + + //Create daemonset + _, err := kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Create(context.TODO(), daemon, metav1.CreateOptions{}) + if err != nil { + DaemonSetLoggerInfo.Println(err) + return fmt.Errorf("failed to create daemonset: %s", err.Error()) + } else { + DaemonSetLoggerInfo.Println("Created daemonset for service:", service.Name) + } + + //Set watcher informer + watchPods(kubeClientset, cfg) + + return nil +} + +//Get daemonset definition +func getDaemonset(cfg *types.Config, service types.Service) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonsetName, + Namespace: cfg.ServicesNamespace, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "oscar-resource": "daemonset", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "oscar-resource": "daemonset", + "pod-group": podGroup, + }, + Name: "podpuller", + }, + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{}, + ImagePullSecrets: types.SetImagePullSecrets(service.ImagePullSecrets), + Containers: []corev1.Container{ + { + Name: "image-puller", + Image: service.Image, + Command: []string{"/bin/sh", "-c", "sleep 1h"}, + }, + }, + }, + }, + }, + } +} + +//Watch pods with a Kubernetes Informer +func watchPods(kubeClientset kubernetes.Interface, cfg *types.Config) { + stopper = make(chan struct{}) + defer close(stopper) + + pc = PodCounter{wnCount: 0} + + var optionsFunc internalinterfaces.TweakListOptionsFunc = func(options *metav1.ListOptions) { + labelSelector := labels.Set{ + "pod-group": podGroup, + }.AsSelector() + options.LabelSelector = labelSelector.String() + } + + sharedInformerOp := informers.WithTweakListOptions(optionsFunc) + + factory := informers.NewSharedInformerFactoryWithOptions(kubeClientset, 2*time.Second, informers.WithNamespace(cfg.ServicesNamespace), sharedInformerOp) + //factory := informers.NewSharedInformerFactory(kubeClientset, 2*time.Second) + + podInformer := factory.Core().V1().Pods().Informer() + factory.Start(stopper) + + //Wait for all the selected resources to be added to the cache + state := cache.WaitForCacheSync(stopper, podInformer.HasSynced) + if !state { + log.Fatalf("Failed to sync informer cache") + } + + //Add event handler that gets all the pods status + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: handleUpdatePodEvent, + }) + + <-stopper + + //Delete daemonset when all pods are in state "Running" + DaemonSetLoggerInfo.Println("Deleting daemonset...") + err := kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Delete(context.TODO(), daemonsetName, metav1.DeleteOptions{}) + if err != nil { + DaemonSetLoggerInfo.Println(err) + log.Fatalf("Failed to delete daemonset: %s", err.Error()) + } else { + DaemonSetLoggerInfo.Println("Deleted daemonset") + } +} + +func handleUpdatePodEvent(oldObj interface{}, newObj interface{}) { + newPod := newObj.(*corev1.Pod) + if newPod.Status.Phase == corev1.PodRunning { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.wnCount++ + //Check the running pods count and stop the informer + if pc.wnCount >= workingNodes { + stopper <- struct{}{} + } + } +} + +func setWorkingNodes(kubeClientset kubernetes.Interface) error { + nodes, err := kubeClientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "!node-role.kubernetes.io/control-plane,!node-role.kubernetes.io/master"}) + if err != nil { + return fmt.Errorf("error getting node list: %v", err) + } + + for range nodes.Items { + workingNodes++ + } + return nil +} + +func generatePodGroupName() string { + b := make([]byte, lengthStr) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return "pod-group-" + string(b) +} diff --git a/pkg/types/service.go b/pkg/types/service.go index a17494d6..6f9a7fb0 100644 --- a/pkg/types/service.go +++ b/pkg/types/service.go @@ -139,10 +139,14 @@ type Service struct { // Optional. (default: "") TotalCPU string `json:"total_cpu"` - // GPU parameter to request gpu usage in service's executions (synchronous and asynchronous) + // EnableGPU parameter to request gpu usage in service's executions (synchronous and asynchronous) // Optional. (default: false) EnableGPU bool `json:"enable_gpu"` + // ImagePrefetch parameter to enable the image cache functionality + // Optional. (default: false) + ImagePrefetch bool `json:"image_prefetch"` + // Synchronous struct to configure specific sync parameters // Only Knative ServerlessBackend applies this settings // Optional. @@ -231,7 +235,7 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) { } podSpec := &v1.PodSpec{ - ImagePullSecrets: setImagePullSecrets(service.ImagePullSecrets), + ImagePullSecrets: SetImagePullSecrets(service.ImagePullSecrets), Containers: []v1.Container{ { Name: ContainerName, @@ -306,7 +310,7 @@ func convertEnvVars(vars map[string]string) []v1.EnvVar { return envVars } -func setImagePullSecrets(secrets []string) []v1.LocalObjectReference { +func SetImagePullSecrets(secrets []string) []v1.LocalObjectReference { objects := []v1.LocalObjectReference{} for _, s := range secrets { objects = append(objects, v1.LocalObjectReference{ diff --git a/pkg/types/service_test.go b/pkg/types/service_test.go index 2c419a0c..8f7770b2 100644 --- a/pkg/types/service_test.go +++ b/pkg/types/service_test.go @@ -201,7 +201,7 @@ func TestSetImagePullSecrets(t *testing.T) { {Name: "testcred1"}, } - result := setImagePullSecrets(secrets) + result := SetImagePullSecrets(secrets) if result[0].Name != expected[0].Name { t.Errorf("invalid conversion of local object. Expected: %v, got %v", expected, result) } @@ -215,6 +215,7 @@ cpu: "1.0" total_memory: "" total_cpu: "" enable_gpu: false +image_prefetch: false synchronous: min_scale: 0 max_scale: 0