Skip to content

Commit

Permalink
Merge pull request #1 from DescartesResearch/general_refactoring
Browse files Browse the repository at this point in the history
General package refactoring, api restructuring, klog logging
  • Loading branch information
martinstraesser authored Oct 6, 2023
2 parents 518b098 + 9c6a0ae commit d714033
Show file tree
Hide file tree
Showing 86 changed files with 2,891 additions and 1,952 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,22 @@ For Kubernetes components that use leader election mechanisms make sure to deact

## Cite us

The paper related to this repository is currently under review. We will add citation info as soon as available.
```
@inproceedings{straesser2023kubernetesintheloop,
abstract = {Microservices deployed and managed by container orchestration frameworks like Kubernetes are the bases of modern cloud applications. In microservice performance modeling and prediction, simulations provide a lightweight alternative to experimental analysis, which requires dedicated infrastructure and a laborious setup. However, existing simulators cannot run realistic scenarios, as performance-critical orchestration mechanisms (like scheduling or autoscaling) are manually modeled and can consequently not be represented in their full complexity and configuration space. This work combines a state-of-the-art simulation for microservice performance with Kubernetes container orchestration. Hereby, we include the original implementation of Kubernetes artifacts enabling realistic scenarios and testing of orchestration policies with low overhead. In two experiments with Kubernetes' kube-scheduler and cluster-autoscaler, we demonstrate that our framework can correctly handle different configurations of these orchestration mechanisms boosting both the simulation's use cases and authenticity.},
added-at = {2023-08-17T01:05:43.000+0200},
author = {Straesser, Martin and Haas, Patrick and Frank, Sebastian and Hakamian, Alireza and Van Hoorn, André and Kounev, Samuel},
biburl = {https://www.bibsonomy.org/bibtex/23ea9a74ebfc49b6a1a29bce1d6083855/samuel.kounev},
booktitle = {Performance Evaluation Methodologies and Tools},
interhash = {373d040402db63c40b7b0b707adf66ad},
intrahash = {3ea9a74ebfc49b6a1a29bce1d6083855},
keywords = {cloud_computing container_orchestration descartes discrete_event_simulation kubernetes microservices software_performance t_full myown},
note = {In print.},
timestamp = {2023-08-17T01:05:43.000+0200},
title = {Kubernetes-in-the-Loop: Enriching Microservice Simulation Through Authentic Container Orchestration},
year = 2023
}
```

## Any questions?

Expand Down
19 changes: 15 additions & 4 deletions cmd/kube-rise/main.go → cmd/go-kube/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"kube-rise/internal/inmemorystorage"
"kube-rise/pkg/server"
"kube-rise/pkg/storage"
"flag"
"go-kube/pkg/interfaces"
"go-kube/pkg/storage"
"go-kube/pkg/storage/inmemorystorage"
"k8s.io/klog/v2"
)

func initStorages() storage.StorageContainer {
Expand All @@ -14,6 +16,9 @@ func initStorages() storage.StorageContainer {
var machineStorage = inmemorystorage.NewMachineInMemoryStorage()
var machineSetStorage = inmemorystorage.NewMachineSetInMemoryStorage(&nodeStorage, &machineStorage)
var statusConfigMapStorage = inmemorystorage.NewStatusMapInMemoryStorage()
var podIdStorage = inmemorystorage.NewIdInMemoryStorage()
var machineIdStorage = inmemorystorage.NewIdInMemoryStorage()
var adapterStateStorage = inmemorystorage.NewAdapterStateInMemoryStorage()

return storage.StorageContainer{
Pods: &podStorage,
Expand All @@ -23,11 +28,17 @@ func initStorages() storage.StorageContainer {
Machines: &machineStorage,
MachineSets: &machineSetStorage,
StatusConfigMap: &statusConfigMapStorage,
PodIds: &podIdStorage,
MachineIds: &machineIdStorage,
AdapterState: &adapterStateStorage,
}
}

func main() {
klog.InitFlags(nil) // initializing the flags
defer klog.Flush() // flushes all pending log I/O
flag.Parse() // parses the command-line flags
var storages = initStorages()
var app = server.NewAdapterApplication(&storages)
var app = interfaces.NewAdapterApplication(&storages)
app.Start()
}
21 changes: 21 additions & 0 deletions docs/Packages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Package Descriptions

## cmd

Contains main file and command line flag parsing

## internal

Contains generic helper structs and functions:

- broadcast: A utility packages for broadcasting channels
- infrastructure: Definition and handling of REST endpoint handlers

## pkg

Contains the core functionality and interfaces of this adapter:

- control: The core logic regarding essential resource types, e.g., pods and nodes
- interfaces: The REST interfaces for communication with MiSim and Kubernetes components
- misim: Misim specific data types and logic
- storage: Interfaces and structs for storing data in the adapter
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module kube-rise
module go-kube

go 1.20

require (
github.com/gorilla/mux v1.8.0
k8s.io/api v0.26.5
k8s.io/apimachinery v0.27.2
k8s.io/klog/v2 v2.90.1
sigs.k8s.io/cluster-api v1.4.3
)

Expand Down Expand Up @@ -59,7 +60,6 @@ require (
k8s.io/apiextensions-apiserver v0.26.1 // indirect
k8s.io/client-go v0.26.1 // indirect
k8s.io/component-base v0.26.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/controller-runtime v0.14.5 // indirect
Expand Down
6 changes: 3 additions & 3 deletions internal/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package broadcast

import (
"context"
"fmt"
"k8s.io/klog/v2"
)

// https://betterprogramming.pub/how-to-broadcast-messages-in-go-using-channels-b68f42bdf32e
Expand All @@ -16,14 +16,14 @@ type BroadcastServer[T any] struct {
}

func (s *BroadcastServer[T]) Subscribe() <-chan T {
fmt.Printf("Subscribe to %s\n", s.name)
klog.V(7).Info("Subscribe to ", s.name)
newListener := make(chan T, 500)
s.addListener <- newListener
return newListener
}

func (s *BroadcastServer[T]) CancelSubscription(channel <-chan T) {
fmt.Printf("Remove from %s\n", s.name)
klog.V(7).Info("Remove from ", s.name)
s.removeListener <- channel
}

Expand Down
48 changes: 48 additions & 0 deletions internal/infrastructure/emptyresourcemocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package infrastructure

import (
apps "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
core "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
cluster "sigs.k8s.io/cluster-api/api/v1beta1"
exp "sigs.k8s.io/cluster-api/exp/api/v1beta1"
)

func GetEmptyResourceList(resourceType string) runtime.Object {
switch resourceType {
case "replicasets":
return &apps.ReplicaSetList{TypeMeta: metav1.TypeMeta{Kind: "ReplicaSetList", APIVersion: "apps/v1"}, Items: nil}
case "persistentvolumes":
return &core.PersistentVolumeList{TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeList", APIVersion: "v1"}, Items: nil}
case "statefulsets":
return &apps.StatefulSetList{TypeMeta: metav1.TypeMeta{Kind: "StatefulSetList", APIVersion: "apps/v1"}, Items: nil}
case "storageclasses":
return &storage.StorageClassList{TypeMeta: metav1.TypeMeta{Kind: "StorageClassList", APIVersion: "storage.k8s.io/v1"}, Items: nil}
case "csidrivers":
return &storage.CSIDriverList{TypeMeta: metav1.TypeMeta{Kind: "CSIDriverList", APIVersion: "storage.k8s.io/v1"}, Items: nil}
case "poddisruptionbudgets":
return &policy.PodDisruptionBudgetList{TypeMeta: metav1.TypeMeta{Kind: "PodDisruptionBudgetList", APIVersion: "policy/v1"}, Items: nil}
case "csinodes":
return &storage.CSINodeList{TypeMeta: metav1.TypeMeta{Kind: "CSINodeList", APIVersion: "storage.k8s.io/v1"}, Items: nil}
case "persistentvolumeclaims":
return &core.PersistentVolumeClaimList{TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, Items: nil}
case "csistoragecapacities":
return &storage.CSIStorageCapacityList{TypeMeta: metav1.TypeMeta{Kind: "CSIStorageCapacityList", APIVersion: "storage.k8s.io/v1beta1"}, Items: nil}
case "services":
return &core.ServiceList{TypeMeta: metav1.TypeMeta{Kind: "ServiceList", APIVersion: "v1"}, Items: nil}
case "replicationcontrollers":
return &core.ReplicationControllerList{TypeMeta: metav1.TypeMeta{Kind: "ReplicationControllerList", APIVersion: "v1"}, Items: nil}
case "jobs":
return &batch.JobList{TypeMeta: metav1.TypeMeta{Kind: "JobList", APIVersion: "batch/v1"}, Items: nil}
case "machinedeployments":
return &cluster.MachineDeploymentList{TypeMeta: metav1.TypeMeta{Kind: "MachineDeploymentList", APIVersion: "cluster.x-k8s.io/v1beta1"}, Items: nil}
case "machinepools":
return &exp.MachinePoolList{TypeMeta: metav1.TypeMeta{Kind: "MachinePoolList", APIVersion: "cluster.x-k8s.io/v1beta1"}, Items: nil}
default:
return nil
}
}
7 changes: 7 additions & 0 deletions internal/infrastructure/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package infrastructure

import (
"net/http"
)

type Endpoint func(w http.ResponseWriter, r *http.Request)
61 changes: 61 additions & 0 deletions internal/infrastructure/requestutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package infrastructure

import (
"encoding/json"
"github.com/gorilla/mux"
"io"
"k8s.io/klog/v2"
"net/http"
)

func HandleRequest[T any](supplier func() T) Endpoint {
return func(w http.ResponseWriter, r *http.Request) {
klog.V(7).Infof("Req: %s%s?%s", r.Host, r.URL.Path, r.URL.RawQuery)
w.Header().Set("Content-Type", "application/json")
resourceList := supplier()
json.NewEncoder(w).Encode(resourceList)
}
}

func HandleRequestWithBody[B any, T any](supplier func(B) T) Endpoint {
return func(w http.ResponseWriter, r *http.Request) {
klog.V(7).Infof("Req: %s%s?%s", r.Host, r.URL.Path, r.URL.RawQuery)
w.Header().Set("Content-Type", "application/json")
reqBody, _ := io.ReadAll(r.Body)
var payload B
err := json.Unmarshal(reqBody, &payload)
if err != nil {
klog.V(1).ErrorS(err, "There was an error decoding the json. err = %s", err)
w.WriteHeader(500)
return
}
resourceList := supplier(payload)
json.NewEncoder(w).Encode(resourceList)
}
}

func HandleRequestWithParamsAndBody[B any, T any](supplier func(map[string]string, B) T) Endpoint {
return func(w http.ResponseWriter, r *http.Request) {
klog.V(7).Infof("Req: %s%s?%s", r.Host, r.URL.Path, r.URL.RawQuery)
w.Header().Set("Content-Type", "application/json")
reqBody, _ := io.ReadAll(r.Body)
var payload B
err := json.Unmarshal(reqBody, &payload)
if err != nil {
klog.V(1).ErrorS(err, "There was an error decoding the json. err = %s", err)
w.WriteHeader(500)
return
}
resourceList := supplier(mux.Vars(r), payload)
json.NewEncoder(w).Encode(resourceList)
}
}

func HandleRequestWithParams[T any](supplier func(map[string]string) T) Endpoint {
return func(w http.ResponseWriter, r *http.Request) {
klog.V(7).Infof("Req: %s%s?%s", r.Host, r.URL.Path, r.URL.RawQuery)
w.Header().Set("Content-Type", "application/json")
resourceList := supplier(mux.Vars(r))
json.NewEncoder(w).Encode(resourceList)
}
}
57 changes: 57 additions & 0 deletions internal/infrastructure/unsupportedresource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package infrastructure

import (
"encoding/json"
"k8s.io/klog/v2"
"net/http"
"strings"
)

// If query parameter "watch" is added writes empty
// Writes {"metadata": null, "items": null} to the response
func UnsupportedResource() Endpoint {
return func(w http.ResponseWriter, r *http.Request) {
klog.V(7).Infof("Req: %s%s?%s", r.Host, r.URL.Path, r.URL.RawQuery)
w.Header().Set("Content-Type", "application/json")
if r.URL.Query().Get("watch") != "" {
ctx := r.Context()
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

for {
select {
case <-ctx.Done():
klog.V(6).Info("Client stopped listening")
return
}
}
} else {
// if no watch we just list the resource
// just return nothing here, to *string datatype enables us to use nil
// y := map[string]*string{"metadata": nil, "items": nil}
resourceType := strings.Split(r.URL.Path, "/")

y := GetEmptyResourceList(resourceType[len(resourceType)-1])
var err error
if y == nil {
z := map[string]*string{"metadata": nil, "items": nil}
err = json.NewEncoder(w).Encode(z)
klog.V(6).ErrorS(err, "unseen type %s\n", resourceType[len(resourceType)-1])
} else {
err = json.NewEncoder(w).Encode(y)
}
if err != nil {
klog.V(1).ErrorS(err, "unable to encode empty resource list, error is: %v", err)
return
}
}
}
}
67 changes: 67 additions & 0 deletions internal/infrastructure/watchablestream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package infrastructure

import (
"encoding/json"
"go-kube/internal/broadcast"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"net/http"
)

func HandleWatchableRequest[T any](supplier func() (T, *broadcast.BroadcastServer[metav1.WatchEvent])) Endpoint {
return func(w http.ResponseWriter, r *http.Request) {
klog.V(7).Infof("Req: %s%s?%s", r.Host, r.URL.Path, r.URL.RawQuery)
w.Header().Set("Content-Type", "application/json")
resourceList, broadcastServer := supplier()
if r.URL.Query().Get("watch") != "" {
// watch initiated HTTP streaming answers
// Sources: https://gist.github.com/vmarmol/b967b29917a34d9307ce
// https://github.com/kubernetes/kubernetes/blob/828495bcc013b77bb63bcb64111e094e455715bb/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go#L181
// https://stackoverflow.com/questions/54890809/how-to-use-request-context-instead-of-closenotifier
ctx := r.Context()
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)

eventChannel := broadcastServer.Subscribe()
defer broadcastServer.CancelSubscription(eventChannel)

klog.V(6).Infof("Client started listening (%s)...", r.URL.Path)
for {
klog.V(6).Infof("Client waits for result (%s)...", r.URL.Path)
select {
case <-ctx.Done():
klog.V(6).Infof("Client stopped listening (%s)", r.URL.Path)
return
case event := <-eventChannel:
klog.V(6).Infof("Received event for client (%s) of type %s", r.URL.Path, event.Type)
if err := enc.Encode(event); err != nil {
klog.V(1).ErrorS(err, "unable to encode watch object %T: %v", event, err)
// client disconnect.
return
}
if len(eventChannel) == 0 {
flusher.Flush()
klog.V(6).Infof("Client flushed (%s)!", r.URL.Path)
//return
}
}
}
} else {
// if no watch we just list the resource
err := json.NewEncoder(w).Encode(resourceList)
if err != nil {
klog.V(1).ErrorS(err, "unable to encode resource list, error is: %v", err)
return
}
}
}
}
Loading

0 comments on commit d714033

Please sign in to comment.