diff --git a/examples/cowsay/cowsay.yaml b/examples/cowsay/cowsay.yaml index 47852a8a..950a7af2 100644 --- a/examples/cowsay/cowsay.yaml +++ b/examples/cowsay/cowsay.yaml @@ -1,6 +1,6 @@ functions: oscar: - - oscar-cluster: + - oscar-replica: name: cowsay cpu: '1.0' memory: 1Gi diff --git a/examples/dog-breed-detector/dog-breed.md b/examples/dog-breed-detector/dog-breed.md index a3ef4e71..f2228542 100644 --- a/examples/dog-breed-detector/dog-breed.md +++ b/examples/dog-breed-detector/dog-breed.md @@ -10,7 +10,7 @@ oscar-cli apply dog-breed-detector.yaml or through the graphical interface of your cluster. -Usually, DeePaaS models need some given parameters to be defined alongside the input of the inference invocation. To solve this, the service receives a JSON type in the following format where you can define, on the one hand, the key of the JSON the name and value of the parameter to be used on the command line and the other hand, inside the array 'oscar-files' each of the files codified as a base64 string, and the extension of it. +Usually, DeePaaS models need some given parameters to be defined alongside the input of the inference invocation. To solve this, the service receives a JSON type in the following format where you can define, on the one hand, the key of the JSON is the name of the parameter to be used on the command line and the other hand, inside the array 'oscar-files' each of the files codified as a base64 string, and the extension of it. ``` json { diff --git a/main.go b/main.go index 53bca6d4..a98e2a65 100644 --- a/main.go +++ b/main.go @@ -105,12 +105,12 @@ func main() { system.GET("/status", handlers.MakeStatusHandler(kubeClientset, metricsClientset)) // Job path for async invocations - r.POST("/job/:serviceName", handlers.MakeJobHandler(cfg, kubeClientset, back, resMan)) + r.POST("/job/:serviceName", auth.GetLoggerMiddleware(), handlers.MakeJobHandler(cfg, kubeClientset, back, resMan)) // Service path for sync invocations (only if ServerlessBackend is enabled) syncBack, ok := back.(types.SyncBackend) if cfg.ServerlessBackend != "" && ok { - r.POST("/run/:serviceName", handlers.MakeRunHandler(cfg, syncBack)) + r.POST("/run/:serviceName", auth.GetLoggerMiddleware(), handlers.MakeRunHandler(cfg, syncBack)) } // System info path diff --git a/pkg/backends/fake.go b/pkg/backends/fake.go index c9ec6b26..d39b6bba 100644 --- a/pkg/backends/fake.go +++ b/pkg/backends/fake.go @@ -81,7 +81,7 @@ func (f *FakeBackend) CreateService(service types.Service) error { // ReadService returns a Service (fake) func (f *FakeBackend) ReadService(name string) (*types.Service, error) { - return &types.Service{Token: "AbCdEf123456"}, f.returnError(getCurrentFuncName()) + return &types.Service{Token: "11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513acf"}, f.returnError(getCurrentFuncName()) } // UpdateService updates an existent service (fake) diff --git a/pkg/backends/k8s.go b/pkg/backends/k8s.go index 725616d1..79f41ba6 100644 --- a/pkg/backends/k8s.go +++ b/pkg/backends/k8s.go @@ -275,10 +275,11 @@ func checkAdditionalConfig(configName string, configNamespace string, service ty if len(additionalConfig.Images.AllowedPrefixes) > 0 { for _, prefix := range additionalConfig.Images.AllowedPrefixes { - if !strings.Contains(service.Image, prefix) { - return fmt.Errorf("image %s is not allowed for pull on the cluster. Check the additional configuration file on '%s'", service.Image, cfg.AdditionalConfigPath) + if strings.Contains(service.Image, prefix) { + return nil } } + return fmt.Errorf("image %s is not allowed for pull on the cluster. Check the additional configuration file on '%s'", service.Image, cfg.AdditionalConfigPath) } return nil diff --git a/pkg/handlers/create.go b/pkg/handlers/create.go index 37eb5453..3bb2e260 100644 --- a/pkg/handlers/create.go +++ b/pkg/handlers/create.go @@ -70,7 +70,6 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg) // Service is created by an EGI user - if !isAdminUser { uid, err := auth.GetUIDFromContext(c) if err != nil { diff --git a/pkg/handlers/job.go b/pkg/handlers/job.go index 53654aeb..58e2cbaf 100644 --- a/pkg/handlers/job.go +++ b/pkg/handlers/job.go @@ -18,6 +18,7 @@ package handlers import ( "context" + "encoding/json" "fmt" "io" "log" @@ -29,6 +30,7 @@ import ( "github.com/google/uuid" "github.com/grycap/oscar/v3/pkg/resourcemanager" "github.com/grycap/oscar/v3/pkg/types" + "github.com/grycap/oscar/v3/pkg/utils/auth" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -85,10 +87,37 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back c.Status(http.StatusUnauthorized) return } - reqToken := strings.TrimSpace(splitToken[1]) - if reqToken != service.Token { - c.Status(http.StatusUnauthorized) - return + + // Check if reqToken is the service token + rawToken := strings.TrimSpace(splitToken[1]) + if len(rawToken) == tokenLength { + + if rawToken != service.Token { + c.Status(http.StatusUnauthorized) + return + } + } + + // If isn't service token check if it is an oidc token + if len(rawToken) != tokenLength { + oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups) + + if !oidcManager.IsAuthorised(rawToken) { + c.Status(http.StatusUnauthorized) + return + } + + hasVO, err := oidcManager.UserHasVO(rawToken, service.VO) + + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + if !hasVO { + c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO) + return + } } // Get the event from request body @@ -98,6 +127,28 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back return } + // Extract user UID from MinIO event + var decoded map[string]interface{} + if err := json.Unmarshal(eventBytes, &decoded); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + records := decoded["Records"].([]interface{}) + // Check if it has the MinIO event format + if records != nil { + r := records[0].(map[string]interface{}) + + eventInfo := r["requestParameters"].(map[string]interface{}) + uid := eventInfo["principalId"] + sourceIPAddress := eventInfo["sourceIPAddress"] + + c.Set("IPAddress", sourceIPAddress) + c.Set("uidOrigin", uid) + } else { + c.Set("uidOrigin", "nil") + } + c.Next() + // Initialize event envVar and args var event := v1.EnvVar{} var args []string @@ -120,6 +171,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back // Make JOB_UUID envVar jobUUID := uuid.New().String() + jobUUID = service.Name + "-" + jobUUID jobUUIDVar := v1.EnvVar{ Name: types.JobUUIDVariable, Value: jobUUID, diff --git a/pkg/handlers/run.go b/pkg/handlers/run.go index f289713a..ee7dec56 100644 --- a/pkg/handlers/run.go +++ b/pkg/handlers/run.go @@ -23,9 +23,14 @@ import ( "github.com/gin-gonic/gin" "github.com/grycap/oscar/v3/pkg/types" + "github.com/grycap/oscar/v3/pkg/utils/auth" "k8s.io/apimachinery/pkg/api/errors" ) +const ( + tokenLength = 64 +) + // MakeRunHandler makes a handler to manage sync invocations sending them to the gateway of the ServerlessBackend func MakeRunHandler(cfg *types.Config, back types.SyncBackend) gin.HandlerFunc { return func(c *gin.Context) { @@ -47,10 +52,44 @@ func MakeRunHandler(cfg *types.Config, back types.SyncBackend) gin.HandlerFunc { c.Status(http.StatusUnauthorized) return } - reqToken := strings.TrimSpace(splitToken[1]) - if reqToken != service.Token { - c.Status(http.StatusUnauthorized) - return + + // Check if reqToken is the service token + rawToken := strings.TrimSpace(splitToken[1]) + if len(rawToken) == tokenLength { + + if rawToken != service.Token { + c.Status(http.StatusUnauthorized) + return + } + } else { + oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups) + + if !oidcManager.IsAuthorised(rawToken) { + c.Status(http.StatusUnauthorized) + return + } + + hasVO, err := oidcManager.UserHasVO(rawToken, service.VO) + + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + if !hasVO { + c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO) + return + } + + ui, err := oidcManager.GetUserInfo(rawToken) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + uid := ui.Subject + c.Set("uidOrigin", uid) + c.Next() + } proxy := &httputil.ReverseProxy{ diff --git a/pkg/handlers/run_test.go b/pkg/handlers/run_test.go index e1b9e626..e7c9fe3b 100644 --- a/pkg/handlers/run_test.go +++ b/pkg/handlers/run_test.go @@ -63,10 +63,10 @@ func TestMakeRunHandler(t *testing.T) { for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { w := httptest.NewRecorder() - serviceName := "test" + serviceName := "testName" req, _ := http.NewRequest("POST", "/run/"+serviceName, nil) - req.Header.Set("Authorization", "Bearer AbCdEf123456") + req.Header.Set("Authorization", "Bearer 11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513acf") if s.returnError { switch s.errType { @@ -78,7 +78,7 @@ func TestMakeRunHandler(t *testing.T) { case "splitErr": req.Header.Set("Authorization", "AbCdEf123456") case "diffErr": - req.Header.Set("Authorization", "Bearer AbC123456") + req.Header.Set("Authorization", "Bearer 11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513dfg") } } diff --git a/pkg/handlers/status.go b/pkg/handlers/status.go index 231669f9..52061ecb 100644 --- a/pkg/handlers/status.go +++ b/pkg/handlers/status.go @@ -29,6 +29,15 @@ import ( versioned "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ) +type GeneralInfo struct { + NumberNodes int64 `json:"numberNodes"` + CPUFreeTotal int64 `json:"cpuFreeTotal"` + CPUMaxFree int64 `json:"cpuMaxFree"` + MemoryFreeTotal int64 `json:"memoryFreeTotal"` + MemoryMaxFree int64 `json:"memoryMaxFree"` + DetailsNodes []NodeInfo `json:"detail"` +} + type NodeInfo struct { NodeName string `json:"nodeName"` CPUCapacity string `json:"cpuCapacity"` @@ -57,31 +66,66 @@ func MakeStatusHandler(kubeClientset *kubernetes.Clientset, metricsClientset *ve } var nodeInfoList []NodeInfo + var clusterInfo GeneralInfo + + var cpu_free_total int64 = 0 + var cpu_max_free int64 = 0 + var memory_free_total int64 = 0 + var memory_max_free int64 = 0 + var number_nodes int64 = 0 + // Parameters CPU and Memory. for id, node := range nodes.Items { - nodeName := node.Name - cpu_alloc := node.Status.Allocatable.Cpu().MilliValue() - cpu_usage := nodeMetricsList.Items[id].Usage["cpu"] - cpu_usage_percent := (float64(cpu_usage.MilliValue()) / float64(cpu_alloc)) * 100 - - memory_alloc := node.Status.Allocatable.Memory().Value() - memory_usage := nodeMetricsList.Items[id].Usage["memory"] - memory_usage_percent := (float64(memory_usage.Value()) / float64(memory_alloc)) * 100 - - nodeInfo := NodeInfo{ - NodeName: nodeName, - CPUCapacity: strconv.Itoa(int(cpu_alloc)), - CPUUsage: strconv.Itoa(int(cpu_usage.MilliValue())), - CPUPercentage: fmt.Sprintf("%.2f", cpu_usage_percent), - MemoryCapacity: strconv.Itoa(int(memory_alloc)), - MemoryUsage: strconv.Itoa(int(memory_usage.Value())), - MemoryPercentage: fmt.Sprintf("%.2f", memory_usage_percent), - } + //remove fronted node from json list + if id > 0 { + nodeName := node.Name + cpu_alloc := node.Status.Allocatable.Cpu().MilliValue() + cpu_usage := nodeMetricsList.Items[id].Usage["cpu"] + cpu_usage_percent := (float64(cpu_usage.MilliValue()) / float64(cpu_alloc)) * 100 + + memory_alloc := node.Status.Allocatable.Memory().Value() + memory_usage := nodeMetricsList.Items[id].Usage["memory"] + memory_usage_percent := (float64(memory_usage.Value()) / float64(memory_alloc)) * 100 - nodeInfoList = append(nodeInfoList, nodeInfo) + nodeInfo := NodeInfo{ + NodeName: nodeName, + CPUCapacity: strconv.Itoa(int(cpu_alloc)), + CPUUsage: strconv.Itoa(int(cpu_usage.MilliValue())), + CPUPercentage: fmt.Sprintf("%.2f", cpu_usage_percent), + MemoryCapacity: strconv.Itoa(int(memory_alloc)), + MemoryUsage: strconv.Itoa(int(memory_usage.Value())), + MemoryPercentage: fmt.Sprintf("%.2f", memory_usage_percent), + } + number_nodes++ + cpu_node_free := cpu_alloc - cpu_usage.MilliValue() + cpu_free_total += cpu_node_free + + if cpu_max_free < cpu_node_free { + cpu_max_free = cpu_node_free + } + + memory_node_free := memory_alloc - memory_usage.Value() + memory_free_total += memory_alloc + + if memory_max_free < memory_node_free { + memory_max_free = memory_node_free + } + + nodeInfoList = append(nodeInfoList, nodeInfo) + } + // Create cluster status structure + clusterInfo = GeneralInfo{ + NumberNodes: number_nodes, + CPUFreeTotal: cpu_free_total, + CPUMaxFree: cpu_max_free, + MemoryFreeTotal: memory_free_total, + MemoryMaxFree: memory_max_free, + DetailsNodes: nodeInfoList, + } } + // Encode list of NodeInfo structures in json format. - c.JSON(http.StatusOK, nodeInfoList) + c.JSON(http.StatusOK, clusterInfo) } } diff --git a/pkg/resourcemanager/delegate.go b/pkg/resourcemanager/delegate.go index 1be273be..dfa5a225 100644 --- a/pkg/resourcemanager/delegate.go +++ b/pkg/resourcemanager/delegate.go @@ -22,10 +22,12 @@ import ( "encoding/json" "fmt" "log" + "math/rand" "net/http" "net/url" "path" "sort" + "strconv" "strings" "time" @@ -35,6 +37,7 @@ import ( const ( oscarReplicaType = "oscar" endpointReplicaType = "endpoint" + noDelegateCode = 101 ) // tokenCache map to store tokens from services and endpoints -> [CLUSTER_ENDPOINT][SERVICE_NAME] @@ -46,11 +49,35 @@ type DelegatedEvent struct { Event string `json:"event"` } +type GeneralInfo struct { + NumberNodes int64 `json:"numberNodes"` + CPUFreeTotal int64 `json:"cpuFreeTotal"` + CPUMaxFree int64 `json:"cpuMaxFree"` + MemoryFreeTotal int64 `json:"memoryFreeTotal"` + MemoryMaxFree int64 `json:"memoryMaxFree"` + DetailsNodes []NodeInfo `json:"detail"` +} + +type NodeInfo struct { + NodeName string `json:"nodeName"` + CPUCapacity string `json:"cpuCapacity"` + CPUUsage string `json:"cpuUsage"` + CPUPercentage string `json:"cpuPercentage"` + MemoryCapacity string `json:"memoryCapacity"` + MemoryUsage string `json:"memoryUsage"` + MemoryPercentage string `json:"memoryPercentage"` +} + // DelegateJob sends the event to a service's replica func DelegateJob(service *types.Service, event string, logger *log.Logger) error { + //Determine priority level of each replica to delegate + getClusterStatus(service) + fmt.Println("Replicas: ", service.Replicas) + // Check if replicas are sorted by priority and sort it if needed if !sort.IsSorted(service.Replicas) { sort.Stable(service.Replicas) + fmt.Println("Replicas Stable: ", service.Replicas) } delegatedEvent := WrapEvent(service.ClusterID, event) @@ -60,9 +87,11 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error } for _, replica := range service.Replicas { - // Manage if replica.Type is "oscar" - if strings.ToLower(replica.Type) == oscarReplicaType { + // Manage if replica.Type is "oscar" and have the capacity to receive a service + fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority) + if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode { // Check ClusterID is defined in 'Clusters' + fmt.Println("Delegating ...") cluster, ok := service.Clusters[replica.ClusterID] if !ok { logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) @@ -276,3 +305,141 @@ func updateServiceToken(replica types.Replica, cluster types.Cluster) (string, e return svc.Token, nil } + +func getClusterStatus(service *types.Service) { + fmt.Println("Process to getClusterStatus function") + for _, replica := range service.Replicas { + // Manage if replica.Type is "oscar" + if strings.ToLower(replica.Type) == oscarReplicaType { + // Check ClusterID is defined in 'Clusters' + cluster, ok := service.Clusters[replica.ClusterID] + if !ok { + if service.Delegation != "static" { + replica.Priority = noDelegateCode + } + fmt.Printf("Error checking to ClusterID \"%s\": Cluster not defined\n", replica.ClusterID) + continue + } + // Parse the cluster's endpoint URL and add the service's path + getStatusURL, err := url.Parse(cluster.Endpoint) + if err != nil { + replica.Priority = noDelegateCode + fmt.Printf("Error parsing the cluster's endpoint URL to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", replica.ClusterID, cluster.Endpoint, err) + continue + } + getStatusURL.Path = path.Join(getStatusURL.Path, "system", "status") + + // Make request to get status from cluster + req, err := http.NewRequest(http.MethodGet, getStatusURL.String(), nil) + if err != nil { + if service.Delegation != "static" { + replica.Priority = noDelegateCode + } + fmt.Printf("Error making request to ClusterID \"%s\": unable to make request: %v\n", replica.ClusterID, err) + continue + } + // Add cluster's basic auth credentials + req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword) + + // Make HTTP client + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + res, err := client.Do(req) + if err != nil { + if service.Delegation != "static" { + replica.Priority = noDelegateCode + } + fmt.Printf("Error getting cluster status to ClusterID \"%s\": unable to send request: %v\n", replica.ClusterID, err) + continue + } + + // Check status code + if res.StatusCode == http.StatusOK { + fmt.Printf("Successful get of cluster status to ClusterID\"%s\"\n", replica.ClusterID) + + //Convert cluster status response to JSON + var clusterStatus *GeneralInfo + err = json.NewDecoder(res.Body).Decode(&clusterStatus) + if err != nil { + if service.Delegation != "static" { + replica.Priority = noDelegateCode + } + fmt.Println("Error decoding the JSON of the response:", err) + continue + } + + // CPU is in miliCPU + // CPU required to deploy the service + serviceCPU, err := strconv.ParseFloat(service.CPU, 64) + if err != nil { + replica.Priority = noDelegateCode + fmt.Println("Error to converter CPU of service to int: ", err) + continue + } + fmt.Println("serviceCPU :", serviceCPU) + maxNodeCPU := float64(clusterStatus.CPUMaxFree) + fmt.Println("maxNodeCPU", maxNodeCPU) + //Calculate CPU difference to determine whether to delegate a replica to the cluster + dist := maxNodeCPU - (1000 * serviceCPU) + fmt.Println("CPU difference ", dist) + + //The priority of delegating the service is set based on the free CPU of the cluster as long as it has free CPU on a node to delegate the service. + if dist >= 0 { + fmt.Println("Resources available in ClusterID", replica.ClusterID) + if service.Delegation == "random" { + randPriority := rand.Intn(noDelegateCode) + replica.Priority = uint(randPriority) + fmt.Println("Priority ", replica.Priority, " with ", service.Delegation, " delegation") + } else if service.Delegation == "load-based" { + //Map the totalClusterCPU range to a smaller range (input range 0 to 32 cpu to output range 100 to 0 priority) + totalClusterCPU := clusterStatus.CPUFreeTotal + mappedCPUPriority := mapToRange(totalClusterCPU, 0, 32000, 100, 0) + replica.Priority = uint(mappedCPUPriority) + fmt.Println("Priority ", replica.Priority, " with ", service.Delegation, " delegation") + } else if service.Delegation != "static" { + replica.Priority = noDelegateCode + fmt.Println("Error when declaring the type of delegation in ClusterID ", replica.ClusterID) + continue + } + } else { + fmt.Println("No CPU capacity to delegate job in ClusterID ", replica.ClusterID) + if service.Delegation != "static" { + replica.Priority = noDelegateCode + } + continue + } + fmt.Println("Status Cluster ", clusterStatus) + fmt.Println("Priority ", replica.Priority) + + } else { + if service.Delegation != "static" { + replica.Priority = noDelegateCode + } + fmt.Printf("Error to get of cluster status to ClusterID\"%s\"\n", replica.ClusterID) + } + + } + } + +} + +func mapToRange(value, minInput, maxInput, maxOutput, minOutput int64) int { + mappedValue := maxOutput - (maxOutput-minOutput)*(value-minInput)/(maxInput-minInput) + mappedInt := int(mappedValue) + if mappedInt > int(maxOutput) { + mappedInt = int(maxOutput) + } + if mappedInt < int(minOutput) { + mappedInt = int(minOutput) + } + + return mappedInt +} diff --git a/pkg/types/service.go b/pkg/types/service.go index 0fc8c157..1ed5c5d3 100644 --- a/pkg/types/service.go +++ b/pkg/types/service.go @@ -167,6 +167,13 @@ type Service struct { // Optional Replicas ReplicaList `json:"replicas,omitempty"` + //Delegation Mode of job delegation for replicas + // Opcional (default: manual) + //"static" The user select the priority to delegate jobs to the replicas. + //"random" The job delegation priority is generated randomly among the clusters of the available replicas. + //"load-based" The job delegation priority is generated depending on the CPU and Memory available in the replica clusters. + Delegation string `json:"delegation"` + // ReSchedulerThreshold time (in seconds) that a job (with replicas) can be queued before delegating it // Optional ReSchedulerThreshold int `json:"rescheduler_threshold"` diff --git a/pkg/types/service_test.go b/pkg/types/service_test.go index f3dee53a..61289698 100644 --- a/pkg/types/service_test.go +++ b/pkg/types/service_test.go @@ -229,6 +229,7 @@ replicas: priority: 0 headers: Authorization: Bearer testtoken +delegation: "" rescheduler_threshold: 0 log_level: "" image: testimage diff --git a/pkg/utils/auth/auth.go b/pkg/utils/auth/auth.go index 40883f7c..559c6aad 100644 --- a/pkg/utils/auth/auth.go +++ b/pkg/utils/auth/auth.go @@ -18,7 +18,9 @@ package auth import ( "fmt" + "log" "strings" + "time" "github.com/gin-gonic/gin" "github.com/grycap/oscar/v3/pkg/types" @@ -62,6 +64,49 @@ func CustomAuth(cfg *types.Config, kubeClientset *kubernetes.Clientset) gin.Hand } } +// GetLoggerMiddleware returns a gin handler as middleware to log custom info about sync/async executions +func GetLoggerMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + + // Disable default printf timestamp to avoid inconsistencies on logs + log.SetFlags(0) + + startTime := time.Now() + + // Process request + c.Next() + + endTime := time.Now() + + // Log custom information after the request is processed + logTime := endTime.Format("2006/01/02 - 15:04:05") + latency := time.Since(startTime) + status := c.Writer.Status() + clientIP := c.ClientIP() + method := c.Request.Method + path := c.Request.URL.Path + + // Get EGI UID from context (if OIDC auth is used) + uid, uidExists := c.Get("uidOrigin") + var user string + if uidExists { + user, _ = uid.(string) + } else { + // Set OSCAR as default user when no UID is found + user = "oscar" + } + + // Get source IP from context for jobs triggered through MinIO events + IPAddress, AddressExists := c.Get("IPAddress") + if AddressExists { + clientIP, _ = IPAddress.(string) + } + + log.Printf("[GIN-EXECUTIONS-LOGGER] %s | %3d | %13v | %s | %-7s %s | %s", + logTime, status, latency, clientIP, method, path, user) + } +} + func GetUIDFromContext(c *gin.Context) (string, error) { uidOrigin, uidExists := c.Get("uidOrigin") if !uidExists { diff --git a/pkg/utils/auth/oidc.go b/pkg/utils/auth/oidc.go index 9fb2a21f..7582695f 100644 --- a/pkg/utils/auth/oidc.go +++ b/pkg/utils/auth/oidc.go @@ -51,8 +51,8 @@ type oidcManager struct { // userInfo custom struct to store essential fields from UserInfo type userInfo struct { - subject string - groups []string + Subject string + Groups []string } // newOIDCManager returns a new oidcManager or error if the oidc.Provider can't be created @@ -96,17 +96,17 @@ func getOIDCMiddleware(kubeClientset *kubernetes.Clientset, minIOAdminClient *ut rawToken := strings.TrimPrefix(authHeader, "Bearer ") // Check the token - if !oidcManager.isAuthorised(rawToken) { + if !oidcManager.IsAuthorised(rawToken) { c.AbortWithStatus(http.StatusUnauthorized) return } - ui, err := oidcManager.getUserInfo(rawToken) + ui, err := oidcManager.GetUserInfo(rawToken) if err != nil { c.String(http.StatusInternalServerError, fmt.Sprintf("%v", err)) return } - uid := ui.subject + uid := ui.Subject // Check if exist MinIO user in cached users list minioUserExists := mc.UserExists(uid) @@ -142,8 +142,8 @@ func (om *oidcManager) clearExpired() { } } -// getUserInfo obtains UserInfo from the issuer -func (om *oidcManager) getUserInfo(rawToken string) (*userInfo, error) { +// GetUserInfo obtains UserInfo from the issuer +func (om *oidcManager) GetUserInfo(rawToken string) (*userInfo, error) { ot := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: rawToken}) // Get OIDC UserInfo @@ -160,8 +160,8 @@ func (om *oidcManager) getUserInfo(rawToken string) (*userInfo, error) { // Create "userInfo" struct and add the groups return &userInfo{ - subject: ui.Subject, - groups: getGroups(claims.EdupersonEntitlement), + Subject: ui.Subject, + Groups: getGroups(claims.EdupersonEntitlement), }, nil } @@ -184,11 +184,11 @@ func getGroups(urns []string) []string { // UserHasVO checks if the user contained on the request token is enrolled on a specific VO func (om *oidcManager) UserHasVO(rawToken string, vo string) (bool, error) { - ui, err := om.getUserInfo(rawToken) + ui, err := om.GetUserInfo(rawToken) if err != nil { return false, err } - for _, gr := range ui.groups { + for _, gr := range ui.Groups { if vo == gr { return true, nil } @@ -197,16 +197,16 @@ func (om *oidcManager) UserHasVO(rawToken string, vo string) (bool, error) { } func (om *oidcManager) GetUID(rawToken string) (string, error) { - ui, err := om.getUserInfo(rawToken) - oidcLogger.Println("received uid: ", ui.subject) + ui, err := om.GetUserInfo(rawToken) + oidcLogger.Println("received uid: ", ui.Subject) if err != nil { - return ui.subject, nil + return ui.Subject, nil } return "", err } -// isAuthorised checks if a token is authorised to access the API -func (om *oidcManager) isAuthorised(rawToken string) bool { +// IsAuthorised checks if a token is authorised to access the API +func (om *oidcManager) IsAuthorised(rawToken string) bool { // Check if the token is valid _, err := om.provider.Verifier(om.config).Verify(context.TODO(), rawToken) if err != nil { @@ -217,7 +217,7 @@ func (om *oidcManager) isAuthorised(rawToken string) bool { ui, found := om.tokenCache[rawToken] if !found { // Get userInfo from the issuer - ui, err = om.getUserInfo(rawToken) + ui, err = om.GetUserInfo(rawToken) if err != nil { return false } @@ -231,12 +231,12 @@ func (om *oidcManager) isAuthorised(rawToken string) bool { // Check if is authorised // Same subject - if ui.subject == om.subject { + if ui.Subject == om.subject { return true } // Groups - for _, tokenGroup := range ui.groups { + for _, tokenGroup := range ui.Groups { for _, authGroup := range om.groups { if tokenGroup == authGroup { return true