Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Stream changes to gui #53

Merged
merged 19 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
with:
fetch-depth: 5

- name: 🤞 Test UI
run: make test-ui

- name: Go Build Cache
uses: actions/cache@v3
with:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ format:
test:
go test -timeout 60s $(shell go list ./...)

test-ui:
(cd web; npm install; npm run test)

.PHONY: build
build:
CGO_ENABLED=0 go build -ldflags $(LDFLAGS) -o build/capacitor github.com/gimlet-io/capacitor/cmd/capacitor
Expand Down
8 changes: 8 additions & 0 deletions cmd/capacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func main() {
runController(err, helmReleaseController, stopCh)
eventController, err := controllers.EventController(client, dynamicClient, clientHub)
runController(err, eventController, stopCh)
deploymentController, err := controllers.DeploymentController(client, dynamicClient, clientHub)
runController(err, deploymentController, stopCh)
podController, err := controllers.PodController(client, dynamicClient, clientHub)
runController(err, podController, stopCh)
serviceController, err := controllers.ServiceController(client, dynamicClient, clientHub)
runController(err, serviceController, stopCh)
ingressController, err := controllers.IngressController(client, dynamicClient, clientHub)
runController(err, ingressController, stopCh)

r := api.SetupRouter(client, dynamicClient, config, clientHub, runningLogStreams)
go func() {
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/bucketController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func BucketController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not get flux state: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (

sourcev1 "github.com/fluxcd/source-controller/api/v1"
log "github.com/sirupsen/logrus"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

converterRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -53,7 +54,7 @@ func NewController(
name string,
listWatcher cache.ListerWatcher,
objType converterRuntime.Object,
eventHandler func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error,
eventHandler func(informerEvent Event, objectMeta metav1.ObjectMeta, obj interface{}) error,
) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var informerEvent Event
Expand Down Expand Up @@ -173,7 +174,7 @@ func (c *Controller) processNextItem() bool {

objectMeta := getObjectMetaData(obj)

// don't process events from before agent start
// don't process events from before capacitor start
// startup sends the complete cluster state upstream
if informerEvent.(Event).eventType == "create" &&
objectMeta.CreationTimestamp.Sub(serverStartTime).Seconds() < 0 {
Expand Down Expand Up @@ -246,17 +247,17 @@ func (c *Controller) runWorker() {
func getObjectMetaData(obj interface{}) metav1.ObjectMeta {
var objectMeta metav1.ObjectMeta

unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return objectMeta
}
unstructured := unstructuredObj.UnstructuredContent()

var gitRepository sourcev1.GitRepository
err := converterRuntime.DefaultUnstructuredConverter.FromUnstructured(unstructured, &gitRepository)
if err != nil {
return objectMeta
} else {
return gitRepository.ObjectMeta
switch object := obj.(type) {
case *sourcev1.GitRepository:
objectMeta = object.ObjectMeta
case *appsv1.Deployment:
objectMeta = object.ObjectMeta
case *v1.Service:
objectMeta = object.ObjectMeta
case *v1.Pod:
objectMeta = object.ObjectMeta
case *networkingv1.Ingress:
objectMeta = object.ObjectMeta
}
return objectMeta
}
65 changes: 65 additions & 0 deletions pkg/controllers/deploymentController.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package controllers

import (
"encoding/json"

"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
apps_v1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func DeploymentController(
client *kubernetes.Clientset,
dynamicClient *dynamic.DynamicClient,
clientHub *streaming.ClientHub,
) (*Controller, error) {
deploymentListWatcher := cache.NewListWatchFromClient(client.AppsV1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything())
deploymentController := NewController(
"deployment",
deploymentListWatcher,
&apps_v1.Deployment{},
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
switch informerEvent.eventType {
case "create":
createdDeployment := obj.(*apps_v1.Deployment)
deploymentBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.DEPLOYMENT_CREATED,
Payload: createdDeployment,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- deploymentBytes
case "update":
deployment := obj.(*apps_v1.Deployment)
deploymentBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.DEPLOYMENT_UPDATED,
Payload: deployment,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- deploymentBytes
case "delete":
deploymentBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.DEPLOYMENT_DELETED,
Payload: informerEvent.key,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- deploymentBytes
}
return nil
})
return deploymentController, nil
}
4 changes: 3 additions & 1 deletion pkg/controllers/eventController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -42,7 +43,8 @@ func EventController(
Payload: events,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- eventBytes
return nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/gitrepositoryController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func GitRepositoryController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/helmreleaseController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func HelmReleaseController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/controllers/ingressController.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package controllers

import (
"encoding/json"

"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func IngressController(
client *kubernetes.Clientset,
dynamicClient *dynamic.DynamicClient,
clientHub *streaming.ClientHub,
) (*Controller, error) {
ingressListWatcher := cache.NewListWatchFromClient(client.NetworkingV1().RESTClient(), "ingresses", v1.NamespaceAll, fields.Everything())
ingressController := NewController(
"ingress",
ingressListWatcher,
&networking_v1.Ingress{},
func(informerEvent Event, objectMeta meta_v1.ObjectMeta, obj interface{}) error {
switch informerEvent.eventType {
case "create":
createdIngress := obj.(*networking_v1.Ingress)
ingressBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.INGRESS_CREATED,
Payload: createdIngress,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- ingressBytes
case "update":
ingress := obj.(*networking_v1.Ingress)
ingressBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.INGRESS_UPDATED,
Payload: ingress,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- ingressBytes
case "delete":
ingressBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.INGRESS_DELETED,
Payload: informerEvent.key,
})
if err != nil {
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- ingressBytes
}
return nil
})
return ingressController, nil
}
7 changes: 5 additions & 2 deletions pkg/controllers/kustomizationController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func KustomizeController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/ocirepositoryController.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -35,14 +36,16 @@ func OciRepositoryController(
case "delete":
fluxState, err := flux.State(client, dynamicClient)
if err != nil {
panic(err.Error())
logrus.Warnf("could not get flux state: %s", err)
return nil
}
fluxStateBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.FLUX_STATE_RECEIVED,
Payload: fluxState,
})
if err != nil {
panic(err.Error())
logrus.Warnf("could not marshal event: %s", err)
return nil
}
clientHub.Broadcast <- fluxStateBytes
}
Expand Down
Loading
Loading