Skip to content

Commit

Permalink
Feature: Stream changes to gui (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
dzsak authored Mar 6, 2024
1 parent d49a41e commit f8d80e5
Show file tree
Hide file tree
Showing 25 changed files with 1,136 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
with:
fetch-depth: 5

- name: 🤞 Test UI
run: make test-ui

- name: Go Build Cache
uses: actions/cache@v3
with:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ format:
test:
go test -timeout 60s $(shell go list ./...)

test-ui:
(cd web; npm install; npm run test)

.PHONY: build
build:
CGO_ENABLED=0 go build -ldflags $(LDFLAGS) -o build/capacitor github.com/gimlet-io/capacitor/cmd/capacitor
Expand Down
8 changes: 8 additions & 0 deletions cmd/capacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func main() {
runController(err, helmReleaseController, stopCh)
eventController, err := controllers.EventController(client, dynamicClient, clientHub)
runController(err, eventController, stopCh)
deploymentController, err := controllers.DeploymentController(client, dynamicClient, clientHub)
runController(err, deploymentController, stopCh)
podController, err := controllers.PodController(client, dynamicClient, clientHub)
runController(err, podController, stopCh)
serviceController, err := controllers.ServiceController(client, dynamicClient, clientHub)
runController(err, serviceController, stopCh)
ingressController, err := controllers.IngressController(client, dynamicClient, clientHub)
runController(err, ingressController, stopCh)

r := api.SetupRouter(client, dynamicClient, config, clientHub, runningLogStreams)
go func() {
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/bucketController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func BucketController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not get flux state: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (

sourcev1 "github.com/fluxcd/source-controller/api/v1"
log "github.com/sirupsen/logrus"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

converterRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -53,7 +54,7 @@ func NewController(
name string,
listWatcher cache.ListerWatcher,
objType converterRuntime.Object,
eventHandler func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error,
eventHandler func(informerEvent Event, objectMeta metav1.ObjectMeta, obj interface{}) error,
) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var informerEvent Event
Expand Down Expand Up @@ -173,7 +174,7 @@ func (c *Controller) processNextItem() bool {

objectMeta := getObjectMetaData(obj)

// don't process events from before agent start
// don't process events from before capacitor start
// startup sends the complete cluster state upstream
if informerEvent.(Event).eventType == "create" &&
objectMeta.CreationTimestamp.Sub(serverStartTime).Seconds() < 0 {
Expand Down Expand Up @@ -246,17 +247,17 @@ func (c *Controller) runWorker() {
func getObjectMetaData(obj interface{}) metav1.ObjectMeta {
var objectMeta metav1.ObjectMeta

unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return objectMeta
}
unstructured := unstructuredObj.UnstructuredContent()

var gitRepository sourcev1.GitRepository
err := converterRuntime.DefaultUnstructuredConverter.FromUnstructured(unstructured, &gitRepository)
if err != nil {
return objectMeta
} else {
return gitRepository.ObjectMeta
switch object := obj.(type) {
case *sourcev1.GitRepository:
objectMeta = object.ObjectMeta
case *appsv1.Deployment:
objectMeta = object.ObjectMeta
case *v1.Service:
objectMeta = object.ObjectMeta
case *v1.Pod:
objectMeta = object.ObjectMeta
case *networkingv1.Ingress:
objectMeta = object.ObjectMeta
}
return objectMeta
}
65 changes: 65 additions & 0 deletions pkg/controllers/deploymentController.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package controllers

import (
"encoding/json"

"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
apps_v1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func DeploymentController(
client *kubernetes.Clientset,
dynamicClient *dynamic.DynamicClient,
clientHub *streaming.ClientHub,
) (*Controller, error) {
deploymentListWatcher := cache.NewListWatchFromClient(client.AppsV1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything())
deploymentController := NewController(
"deployment",
deploymentListWatcher,
&apps_v1.Deployment{},
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
switch informerEvent.eventType {
case "create":
createdDeployment := obj.(*apps_v1.Deployment)
deploymentBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.DEPLOYMENT_CREATED,
Payload: createdDeployment,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- deploymentBytes
case "update":
deployment := obj.(*apps_v1.Deployment)
deploymentBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.DEPLOYMENT_UPDATED,
Payload: deployment,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- deploymentBytes
case "delete":
deploymentBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.DEPLOYMENT_DELETED,
Payload: informerEvent.key,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- deploymentBytes
}
return nil
})
return deploymentController, nil
}
4 changes: 3 additions & 1 deletion pkg/controllers/eventController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -42,7 +43,8 @@ func EventController(
Payload: events,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- eventBytes
return nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/gitrepositoryController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func GitRepositoryController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/helmreleaseController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func HelmReleaseController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/controllers/ingressController.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package controllers

import (
"encoding/json"

"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func IngressController(
client *kubernetes.Clientset,
dynamicClient *dynamic.DynamicClient,
clientHub *streaming.ClientHub,
) (*Controller, error) {
ingressListWatcher := cache.NewListWatchFromClient(client.NetworkingV1().RESTClient(), "ingresses", v1.NamespaceAll, fields.Everything())
ingressController := NewController(
"ingress",
ingressListWatcher,
&networking_v1.Ingress{},
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
switch informerEvent.eventType {
case "create":
createdIngress := obj.(*networking_v1.Ingress)
ingressBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.INGRESS_CREATED,
Payload: createdIngress,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- ingressBytes
case "update":
ingress := obj.(*networking_v1.Ingress)
ingressBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.INGRESS_UPDATED,
Payload: ingress,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- ingressBytes
case "delete":
ingressBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.INGRESS_DELETED,
Payload: informerEvent.key,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- ingressBytes
}
return nil
})
return ingressController, nil
}
7 changes: 5 additions & 2 deletions pkg/controllers/kustomizationController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func KustomizeController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/ocirepositoryController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func OciRepositoryController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not get flux state: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
Loading

0 comments on commit f8d80e5

Please sign in to comment.