Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shedule changes #189

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions agent/container/api/agent.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion agent/container/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,12 @@ paths:
responses:
'200':
description: OK

/event/jfrog/container:
post:
tags:
- public
summary: Post Jfrog Container Registry webhook events
responses:
'200':
description: OK
# oapi-codegen -config ./cfg.yaml ./openapi.yaml
2 changes: 2 additions & 0 deletions agent/container/pkg/handler/api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func (ah *APIHandler) BindRequest(r *gin.Engine) {
apiGroup.GET("/status", ah.GetStatus)
apiGroup.POST("/event/docker/hub", ah.PostEventDockerHub)
apiGroup.POST("/event/azure/container", ah.PostEventAzureContainer)
apiGroup.POST("/event/jfrog/container", ah.PostEventJfrogContainer)

}
}

Expand Down
45 changes: 45 additions & 0 deletions agent/container/pkg/handler/jfrog_container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package handler

import (
"encoding/json"
"errors"
"io"
"log"
"net/http"

"github.com/gin-gonic/gin"
"github.com/intelops/kubviz/model"
)

var ErrInvalidPayloads = errors.New("invalid or malformed jfrog Container Registry webhook payload")

func (ah *APIHandler) PostEventJfrogContainer(c *gin.Context) {
defer func() {
_, _ = io.Copy(io.Discard, c.Request.Body)
_ = c.Request.Body.Close()
}()
payload, err := io.ReadAll(c.Request.Body)
if err != nil || len(payload) == 0 {
log.Printf("%v: %v", ErrReadingBody, err)
c.Status(http.StatusBadRequest)
return
}

var pushEvent model.JfrogContainerPushEventPayload
err = json.Unmarshal(payload, &pushEvent)
if err != nil {
log.Printf("%v: %v", ErrInvalidPayloads, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Bad Request"})
return
}

log.Printf("Received event from jfrog Container Registry: %v", pushEvent)

err = ah.conn.Publish(payload, "Jfrog_Container_Registry")
if err != nil {
log.Printf("%v: %v", ErrPublishToNats, err)
c.Status(http.StatusInternalServerError)
return
}
c.Status(http.StatusOK)
}
131 changes: 97 additions & 34 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ var (
//for local testing provide the location of kubeconfig
// inside the civo file paste your kubeconfig
// uncomment this line from Dockerfile.Kubviz (COPY --from=builder /workspace/civo /etc/myapp/civo)
cluster_conf_loc string = os.Getenv("CONFIG_LOCATION")
schedulingIntervalStr string = os.Getenv("SCHEDULING_INTERVAL")
cluster_conf_loc string = os.Getenv("CONFIG_LOCATION")
schedulingIntervalStr string = os.Getenv("SCHEDULING_INTERVAL")
enableScheduling string = os.Getenv("ENABLE_SCHEDULING")
outdatedIntervalStr string = os.Getenv("OUTDATED_INTERVAL")
preUpgradeIntervalStr string = os.Getenv("PRE_UPGRADE_INTERVAL")
getAllResourcesIntervalStr string = os.Getenv("GET_ALL_RESOURCES_INTERVAL")
rakkessIntervalStr string = os.Getenv("RAKKESS_INTERVAL")
getclientIntervalStr string = os.Getenv("GETCLIENT_INTERVAL")
trivyIntervalStr string = os.Getenv("TRIVY_INTERVAL")
kubescoreIntervalStr string = os.Getenv("KUBESCORE_INTERVAL")
)

func runTrivyScans(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan chan error) {
Expand Down Expand Up @@ -160,41 +168,96 @@ func main() {
}()
wg.Add(6) // Initialize the WaitGroup for the seven goroutines
// ... start other goroutines ...
go outDatedImages(config, js, &wg, outdatedErrChan)
go KubePreUpgradeDetector(config, js, &wg, kubePreUpgradeChan)
go GetAllResources(config, js, &wg, getAllResourceChan)
go RakeesOutput(config, js, &wg, RakeesErrChan)
go getK8sEvents(clientset)
// Run these functions sequentially within a single goroutine using the wrapper function
go runTrivyScans(config, js, &wg, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan)
go RunKubeScore(clientset, js, &wg, kubescoreMetricsChan)
wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
// close(clusterMetricsChan)
close(kubescoreMetricsChan)
close(trivyImagescanChan)
close(trivySbomcanChan)
close(trivyK8sMetricsChan)
close(RakeesErrChan)
// Signal that all other goroutines have finished
doneChan <- true
close(doneChan)
if enableScheduling == "true" {

// ... read other intervals ...

// Convert interval strings to time.Duration
outdatedInterval, _ := time.ParseDuration(outdatedIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for outdated: %v", err)
}
preUpgradeInterval, _ := time.ParseDuration(preUpgradeIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for preupgrade: %v", err)
}
getAllResourcesInterval, _ := time.ParseDuration(getAllResourcesIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for allresource: %v", err)
}
rakkessInterval, _ := time.ParseDuration(rakkessIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for Rakkess: %v", err)
}
getclientInterval, _ := time.ParseDuration(getclientIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for Rakkess: %v", err)
}
trivyInterval, _ := time.ParseDuration(trivyIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for Rakkess: %v", err)
}
kubescoreInterval, _ := time.ParseDuration(kubescoreIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL for Rakkess: %v", err)
}
// ... convert other intervals ...
s := gocron.NewScheduler(time.UTC)

s.Every(outdatedInterval).Do(outDatedImages, config, js, &wg, outdatedErrChan)
s.Every(preUpgradeInterval).Do(KubePreUpgradeDetector, config, js, &wg, kubePreUpgradeChan)
s.Every(getAllResourcesInterval).Do(GetAllResources, config, js, &wg, getAllResourceChan)
s.Every(rakkessInterval).Do(RakeesOutput, config, js, &wg, RakeesErrChan)
s.Every(getclientInterval).Do(getK8sClient, clientset)
s.Every(trivyInterval).Do(runTrivyScans, js, &wg, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan)
s.Every(kubescoreInterval).Do(RunKubeScore, clientset, js, &wg, kubescoreMetricsChan)

// once the go routines completes we will close the error channels
s.StartBlocking()
// ... call other functions ...
} else {

outDatedImages(config, js, &wg, outdatedErrChan)
KubePreUpgradeDetector(config, js, &wg, kubePreUpgradeChan)
GetAllResources(config, js, &wg, getAllResourceChan)
RakeesOutput(config, js, &wg, RakeesErrChan)
getK8sEvents(clientset)
// Run these functions sequentially within a single goroutine using the wrapper function
runTrivyScans(config, js, &wg, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan)
RunKubeScore(clientset, js, &wg, kubescoreMetricsChan)

wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
// close(clusterMetricsChan)
close(kubescoreMetricsChan)
close(trivyImagescanChan)
close(trivySbomcanChan)
close(trivyK8sMetricsChan)
close(RakeesErrChan)
// Signal that all other goroutines have finished
doneChan <- true
close(doneChan)

}
}
collectAndPublishMetrics()
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m" // Default value, e.g., 20 minutes
}
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
if enableScheduling == "false" {
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m" // Default value, e.g., 20 minutes
}
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(collectAndPublishMetrics) // Run immediately and then at the scheduled interval
s.StartBlocking()
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(collectAndPublishMetrics) // Run immediately and then at the scheduled interval
s.StartBlocking() // Blocks the main function
}

} // Blocks the main function

// publishMetrics publishes stream of events
// with subject "METRICS.created"
Expand Down
66 changes: 29 additions & 37 deletions agent/kubviz/kube_score.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,59 @@
package main

import (
"context"
"encoding/json"
"log"
exec "os/exec"
"sync"

"github.com/google/uuid"
"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/model"
"github.com/nats-io/nats.go"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/zegl/kube-score/renderer/json_v2"
"k8s.io/client-go/kubernetes"
"log"
exec "os/exec"
"sync"
)

func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
var report []json_v2.ScoredObject
cmd := "kubectl api-resources --verbs=list --namespaced -o name | xargs -n1 -I{} sh -c \"kubectl get {} --all-namespaces -oyaml && echo ---\" | kube-score score - -o json"
log.Printf("Command: %#v,", cmd)
out, err := executeCommand(cmd)

nsList, err := clientset.CoreV1().
Namespaces().
List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Println("Error occurred while getting client set for kube-score: ", err)
return
log.Printf("Error scanning image %s:", err)
}

log.Printf("Namespace size: %d", len(nsList.Items))
for _, n := range nsList.Items {
log.Printf("Publishing kube-score recommendations for namespace: %s\n", n.Name)
publish(n.Name, js, errCh)
err = json.Unmarshal([]byte(out), &report)

if err != nil {
log.Printf("Error occurred while Unmarshalling json: %v", err)
}
publishKubescoreMetrics(report, js, errCh)
}

func publish(ns string, js nats.JetStreamContext, errCh chan error) {
cmd := "kubectl api-resources --verbs=list --namespaced -o name | xargs -n1 -I{} sh -c \"kubectl get {} -n " + ns + " -oyaml && echo ---\" | kube-score score - "
log.Printf("Command: %#v,", cmd)
out, err := executeCommand(cmd)
if err != nil {
log.Println("Error occurred while running kube-score: ", err)
errCh <- err
func publishKubescoreMetrics(report []json_v2.ScoredObject, js nats.JetStreamContext, errCh chan error) {
metrics := model.KubeScoreRecommendations{
ID: uuid.New().String(),
ClusterName: ClusterName,
Report: report,
}
err = publishKubescoreMetrics(uuid.New().String(), ns, out, js)
metricsJson, err := json.Marshal(metrics)
if err != nil {
log.Printf("Error marshaling metrics to JSON: %v", err)
errCh <- err
return
}
errCh <- nil
}

func publishKubescoreMetrics(id string, ns string, recommendations string, js nats.JetStreamContext) error {
metrics := model.KubeScoreRecommendations{
ID: id,
Namespace: ns,
Recommendations: recommendations,
ClusterName: ClusterName,
}
metricsJson, _ := json.Marshal(metrics)
_, err := js.Publish(constants.KUBESCORE_SUBJECT, metricsJson)
_, err = js.Publish(constants.KUBESCORE_SUBJECT, metricsJson)
if err != nil {
return err
log.Printf("error occures while publish %v", err)
errCh <- err
return
}
log.Printf("Recommendations with ID:%s has been published\n", id)
log.Printf("Recommendations :%#v", recommendations)
return nil
log.Printf("KubeScore report with ID:%s has been published\n", metrics.ID)
errCh <- nil
}

func executeCommand(command string) (string, error) {
Expand Down
Loading
Loading