Skip to content

Commit

Permalink
Merge pull request #262 from grycap/devel
Browse files Browse the repository at this point in the history
Changes in API calls
  • Loading branch information
catttam authored Oct 8, 2024
2 parents d36bd4e + 4345d3e commit 3c48845
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 60 deletions.
2 changes: 1 addition & 1 deletion examples/cowsay/cowsay.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
functions:
oscar:
- oscar-cluster:
- oscar-replica:
name: cowsay
cpu: '1.0'
memory: 1Gi
Expand Down
2 changes: 1 addition & 1 deletion examples/dog-breed-detector/dog-breed.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ oscar-cli apply dog-breed-detector.yaml

or through the graphical interface of your cluster.

Usually, DeePaaS models need some given parameters to be defined alongside the input of the inference invocation. To solve this, the service receives a JSON type in the following format where you can define, on the one hand, the key of the JSON the name and value of the parameter to be used on the command line and the other hand, inside the array 'oscar-files' each of the files codified as a base64 string, and the extension of it.
Usually, DeePaaS models need some given parameters to be defined alongside the input of the inference invocation. To solve this, the service receives a JSON type in the following format where you can define, on the one hand, the key of the JSON is the name of the parameter to be used on the command line and the other hand, inside the array 'oscar-files' each of the files codified as a base64 string, and the extension of it.

``` json
{
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func main() {
system.GET("/status", handlers.MakeStatusHandler(kubeClientset, metricsClientset))

// Job path for async invocations
r.POST("/job/:serviceName", handlers.MakeJobHandler(cfg, kubeClientset, back, resMan))
r.POST("/job/:serviceName", auth.GetLoggerMiddleware(), handlers.MakeJobHandler(cfg, kubeClientset, back, resMan))

// Service path for sync invocations (only if ServerlessBackend is enabled)
syncBack, ok := back.(types.SyncBackend)
if cfg.ServerlessBackend != "" && ok {
r.POST("/run/:serviceName", handlers.MakeRunHandler(cfg, syncBack))
r.POST("/run/:serviceName", auth.GetLoggerMiddleware(), handlers.MakeRunHandler(cfg, syncBack))
}

// System info path
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (f *FakeBackend) CreateService(service types.Service) error {

// ReadService returns a Service (fake)
func (f *FakeBackend) ReadService(name string) (*types.Service, error) {
return &types.Service{Token: "AbCdEf123456"}, f.returnError(getCurrentFuncName())
return &types.Service{Token: "11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513acf"}, f.returnError(getCurrentFuncName())
}

// UpdateService updates an existent service (fake)
Expand Down
5 changes: 3 additions & 2 deletions pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,11 @@ func checkAdditionalConfig(configName string, configNamespace string, service ty

if len(additionalConfig.Images.AllowedPrefixes) > 0 {
for _, prefix := range additionalConfig.Images.AllowedPrefixes {
if !strings.Contains(service.Image, prefix) {
return fmt.Errorf("image %s is not allowed for pull on the cluster. Check the additional configuration file on '%s'", service.Image, cfg.AdditionalConfigPath)
if strings.Contains(service.Image, prefix) {
return nil
}
}
return fmt.Errorf("image %s is not allowed for pull on the cluster. Check the additional configuration file on '%s'", service.Image, cfg.AdditionalConfigPath)
}

return nil
Expand Down
1 change: 0 additions & 1 deletion pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg)

// Service is created by an EGI user

if !isAdminUser {
uid, err := auth.GetUIDFromContext(c)
if err != nil {
Expand Down
60 changes: 56 additions & 4 deletions pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handlers

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/google/uuid"
"github.com/grycap/oscar/v3/pkg/resourcemanager"
"github.com/grycap/oscar/v3/pkg/types"
"github.com/grycap/oscar/v3/pkg/utils/auth"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -85,10 +87,37 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
c.Status(http.StatusUnauthorized)
return
}
reqToken := strings.TrimSpace(splitToken[1])
if reqToken != service.Token {
c.Status(http.StatusUnauthorized)
return

// Check if reqToken is the service token
rawToken := strings.TrimSpace(splitToken[1])
if len(rawToken) == tokenLength {

if rawToken != service.Token {
c.Status(http.StatusUnauthorized)
return
}
}

// If isn't service token check if it is an oidc token
if len(rawToken) != tokenLength {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)

if !oidcManager.IsAuthorised(rawToken) {
c.Status(http.StatusUnauthorized)
return
}

hasVO, err := oidcManager.UserHasVO(rawToken, service.VO)

if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if !hasVO {
c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO)
return
}
}

// Get the event from request body
Expand All @@ -98,6 +127,28 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
return
}

// Extract user UID from MinIO event
var decoded map[string]interface{}
if err := json.Unmarshal(eventBytes, &decoded); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
records := decoded["Records"].([]interface{})
// Check if it has the MinIO event format
if records != nil {
r := records[0].(map[string]interface{})

eventInfo := r["requestParameters"].(map[string]interface{})
uid := eventInfo["principalId"]
sourceIPAddress := eventInfo["sourceIPAddress"]

c.Set("IPAddress", sourceIPAddress)
c.Set("uidOrigin", uid)
} else {
c.Set("uidOrigin", "nil")
}
c.Next()

// Initialize event envVar and args var
event := v1.EnvVar{}
var args []string
Expand All @@ -120,6 +171,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back

// Make JOB_UUID envVar
jobUUID := uuid.New().String()
jobUUID = service.Name + "-" + jobUUID
jobUUIDVar := v1.EnvVar{
Name: types.JobUUIDVariable,
Value: jobUUID,
Expand Down
47 changes: 43 additions & 4 deletions pkg/handlers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ import (

"github.com/gin-gonic/gin"
"github.com/grycap/oscar/v3/pkg/types"
"github.com/grycap/oscar/v3/pkg/utils/auth"
"k8s.io/apimachinery/pkg/api/errors"
)

const (
tokenLength = 64
)

// MakeRunHandler makes a handler to manage sync invocations sending them to the gateway of the ServerlessBackend
func MakeRunHandler(cfg *types.Config, back types.SyncBackend) gin.HandlerFunc {
return func(c *gin.Context) {
Expand All @@ -47,10 +52,44 @@ func MakeRunHandler(cfg *types.Config, back types.SyncBackend) gin.HandlerFunc {
c.Status(http.StatusUnauthorized)
return
}
reqToken := strings.TrimSpace(splitToken[1])
if reqToken != service.Token {
c.Status(http.StatusUnauthorized)
return

// Check if reqToken is the service token
rawToken := strings.TrimSpace(splitToken[1])
if len(rawToken) == tokenLength {

if rawToken != service.Token {
c.Status(http.StatusUnauthorized)
return
}
} else {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)

if !oidcManager.IsAuthorised(rawToken) {
c.Status(http.StatusUnauthorized)
return
}

hasVO, err := oidcManager.UserHasVO(rawToken, service.VO)

if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if !hasVO {
c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO)
return
}

ui, err := oidcManager.GetUserInfo(rawToken)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
uid := ui.Subject
c.Set("uidOrigin", uid)
c.Next()

}

proxy := &httputil.ReverseProxy{
Expand Down
6 changes: 3 additions & 3 deletions pkg/handlers/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func TestMakeRunHandler(t *testing.T) {
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
w := httptest.NewRecorder()
serviceName := "test"
serviceName := "testName"

req, _ := http.NewRequest("POST", "/run/"+serviceName, nil)
req.Header.Set("Authorization", "Bearer AbCdEf123456")
req.Header.Set("Authorization", "Bearer 11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513acf")

if s.returnError {
switch s.errType {
Expand All @@ -78,7 +78,7 @@ func TestMakeRunHandler(t *testing.T) {
case "splitErr":
req.Header.Set("Authorization", "AbCdEf123456")
case "diffErr":
req.Header.Set("Authorization", "Bearer AbC123456")
req.Header.Set("Authorization", "Bearer 11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513dfg")
}
}

Expand Down
84 changes: 64 additions & 20 deletions pkg/handlers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ import (
versioned "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

type GeneralInfo struct {
NumberNodes int64 `json:"numberNodes"`
CPUFreeTotal int64 `json:"cpuFreeTotal"`
CPUMaxFree int64 `json:"cpuMaxFree"`
MemoryFreeTotal int64 `json:"memoryFreeTotal"`
MemoryMaxFree int64 `json:"memoryMaxFree"`
DetailsNodes []NodeInfo `json:"detail"`
}

type NodeInfo struct {
NodeName string `json:"nodeName"`
CPUCapacity string `json:"cpuCapacity"`
Expand Down Expand Up @@ -57,31 +66,66 @@ func MakeStatusHandler(kubeClientset *kubernetes.Clientset, metricsClientset *ve
}

var nodeInfoList []NodeInfo
var clusterInfo GeneralInfo

var cpu_free_total int64 = 0
var cpu_max_free int64 = 0
var memory_free_total int64 = 0
var memory_max_free int64 = 0
var number_nodes int64 = 0

// Parameters CPU and Memory.
for id, node := range nodes.Items {
nodeName := node.Name
cpu_alloc := node.Status.Allocatable.Cpu().MilliValue()
cpu_usage := nodeMetricsList.Items[id].Usage["cpu"]
cpu_usage_percent := (float64(cpu_usage.MilliValue()) / float64(cpu_alloc)) * 100

memory_alloc := node.Status.Allocatable.Memory().Value()
memory_usage := nodeMetricsList.Items[id].Usage["memory"]
memory_usage_percent := (float64(memory_usage.Value()) / float64(memory_alloc)) * 100

nodeInfo := NodeInfo{
NodeName: nodeName,
CPUCapacity: strconv.Itoa(int(cpu_alloc)),
CPUUsage: strconv.Itoa(int(cpu_usage.MilliValue())),
CPUPercentage: fmt.Sprintf("%.2f", cpu_usage_percent),
MemoryCapacity: strconv.Itoa(int(memory_alloc)),
MemoryUsage: strconv.Itoa(int(memory_usage.Value())),
MemoryPercentage: fmt.Sprintf("%.2f", memory_usage_percent),
}
//remove fronted node from json list
if id > 0 {
nodeName := node.Name
cpu_alloc := node.Status.Allocatable.Cpu().MilliValue()
cpu_usage := nodeMetricsList.Items[id].Usage["cpu"]
cpu_usage_percent := (float64(cpu_usage.MilliValue()) / float64(cpu_alloc)) * 100

memory_alloc := node.Status.Allocatable.Memory().Value()
memory_usage := nodeMetricsList.Items[id].Usage["memory"]
memory_usage_percent := (float64(memory_usage.Value()) / float64(memory_alloc)) * 100

nodeInfoList = append(nodeInfoList, nodeInfo)
nodeInfo := NodeInfo{
NodeName: nodeName,
CPUCapacity: strconv.Itoa(int(cpu_alloc)),
CPUUsage: strconv.Itoa(int(cpu_usage.MilliValue())),
CPUPercentage: fmt.Sprintf("%.2f", cpu_usage_percent),
MemoryCapacity: strconv.Itoa(int(memory_alloc)),
MemoryUsage: strconv.Itoa(int(memory_usage.Value())),
MemoryPercentage: fmt.Sprintf("%.2f", memory_usage_percent),
}
number_nodes++
cpu_node_free := cpu_alloc - cpu_usage.MilliValue()
cpu_free_total += cpu_node_free

if cpu_max_free < cpu_node_free {
cpu_max_free = cpu_node_free
}

memory_node_free := memory_alloc - memory_usage.Value()
memory_free_total += memory_alloc

if memory_max_free < memory_node_free {
memory_max_free = memory_node_free
}

nodeInfoList = append(nodeInfoList, nodeInfo)
}
// Create cluster status structure
clusterInfo = GeneralInfo{
NumberNodes: number_nodes,
CPUFreeTotal: cpu_free_total,
CPUMaxFree: cpu_max_free,
MemoryFreeTotal: memory_free_total,
MemoryMaxFree: memory_max_free,
DetailsNodes: nodeInfoList,
}
}
// Encode list of NodeInfo structures in json format.

c.JSON(http.StatusOK, nodeInfoList)
c.JSON(http.StatusOK, clusterInfo)

}
}
Loading

0 comments on commit 3c48845

Please sign in to comment.