Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KUBE-566: send telemetry to mothership #145

Merged
merged 25 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/controller/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package controller

import (
"github.com/spf13/cobra"
)

const Use = "controller"

func NewCmd() *cobra.Command {
cmd := &cobra.Command{
Use: Use,
RunE: func(cmd *cobra.Command, args []string) error {
return run(cmd.Context())
},
}

return cmd
}
333 changes: 333 additions & 0 deletions cmd/controller/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
package controller

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
"os"
"strings"
"time"

"github.com/bombsimon/logrusr/v4"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"

"github.com/castai/cluster-controller/cmd/utils"
"github.com/castai/cluster-controller/health"
"github.com/castai/cluster-controller/internal/actions/csr"
"github.com/castai/cluster-controller/internal/castai"
"github.com/castai/cluster-controller/internal/config"
"github.com/castai/cluster-controller/internal/controller"
"github.com/castai/cluster-controller/internal/controller/logexporter"
"github.com/castai/cluster-controller/internal/helm"
"github.com/castai/cluster-controller/internal/k8sversion"
"github.com/castai/cluster-controller/internal/monitor"
"github.com/castai/cluster-controller/internal/waitext"
)

const (
maxRequestTimeout = 5 * time.Minute
)

func run(ctx context.Context) error {
log := logrus.WithFields(logrus.Fields{})
cfg := config.Get()

binVersion := ctx.Value(utils.ClusterControllerVersionKey).(*config.ClusterControllerVersion)
log.Infof("running castai-cluster-controller version %v", binVersion)

logger := logexporter.NewLogger(cfg.Log.Level)

cl, err := castai.NewRestyClient(cfg.API.URL, cfg.API.Key, cfg.TLS.CACert, logger.Level, binVersion, maxRequestTimeout)
if err != nil {
log.Fatalf("failed to create castai client: %v", err)
}

client := castai.NewClient(logger, cl, cfg.ClusterID)

logexporter.SetupLogExporter(logger, client)

return runController(ctx, client, logger.WithFields(logrus.Fields{
"cluster_id": cfg.ClusterID,
"version": binVersion.String(),
}), cfg, binVersion)
}

func runController(
ctx context.Context,
client castai.CastAIClient,
logger *logrus.Entry,
cfg config.Config,
binVersion *config.ClusterControllerVersion,
) (reterr error) {
fields := logrus.Fields{}

defer func() {
if reterr == nil {
return
}
reterr = &logContextError{
err: reterr,
fields: fields,
}
}()

restConfig, err := config.RetrieveKubeConfig(logger)
if err != nil {
return err
}
restConfigLeader := rest.CopyConfig(restConfig)
restConfigDynamic := rest.CopyConfig(restConfig)

restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst)
restConfigLeader.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst)
restConfigDynamic.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst)

helmClient := helm.NewClient(logger, helm.NewChartLoader(logger), restConfig)

clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
clientSetLeader, err := kubernetes.NewForConfig(restConfigLeader)
if err != nil {
return err
}

dynamicClient, err := dynamic.NewForConfig(restConfigDynamic)
if err != nil {
return err
}

k8sVer, err := k8sversion.Get(clientset)
if err != nil {
return fmt.Errorf("getting kubernetes version: %w", err)
}

log := logger.WithFields(logrus.Fields{
"version": binVersion.Version,
"k8s_version": k8sVer.Full(),
"running_on": cfg.SelfPod.Node,
"ctrl_pod_name": cfg.SelfPod.Name,
})

// Set logr/klog to logrus adapter so all logging goes through logrus
logr := logrusr.New(log)
klog.SetLogger(logr)

log.Infof("running castai-cluster-controller version %v, log-level: %v", binVersion, logger.Level)

actionsConfig := controller.Config{
PollWaitInterval: 5 * time.Second,
PollTimeout: maxRequestTimeout,
AckTimeout: 30 * time.Second,
AckRetriesCount: 3,
AckRetryWait: 1 * time.Second,
ClusterID: cfg.ClusterID,
Version: binVersion.Version,
Namespace: cfg.SelfPod.Namespace,
}
healthzAction := health.NewHealthzProvider(health.HealthzCfg{HealthyPollIntervalLimit: (actionsConfig.PollWaitInterval + actionsConfig.PollTimeout) * 2, StartTimeLimit: 2 * time.Minute}, log)

svc := controller.NewService(
log,
actionsConfig,
k8sVer.Full(),
clientset,
dynamicClient,
client,
helmClient,
healthzAction,
)
defer func() {
if err := svc.Close(); err != nil {
log.Errorf("failed to close controller service: %v", err)
}
}()

httpMux := http.NewServeMux()
var checks []healthz.HealthChecker
checks = append(checks, healthzAction)
var leaderHealthCheck *leaderelection.HealthzAdaptor
if cfg.LeaderElection.Enabled {
leaderHealthCheck = leaderelection.NewLeaderHealthzAdaptor(time.Minute)
checks = append(checks, leaderHealthCheck)
}
healthz.InstallHandler(httpMux, checks...)
installPprofHandlers(httpMux)

// Start http server for pprof and health checks handlers.
go func() {
addr := fmt.Sprintf(":%d", cfg.PprofPort)
log.Infof("starting pprof server on %s", addr)

//TODO: remove nolint when we have a proper solution for this
//nolint:gosec
if err := http.ListenAndServe(addr, httpMux); err != nil {
log.Errorf("failed to start pprof http server: %v", err)
}
}()

if err := saveMetadata(cfg.ClusterID, cfg, log); err != nil {
return err
}

runSvc := func(ctx context.Context) {
isGKE, err := runningOnGKE(clientset, cfg)
if err != nil {
log.Fatalf("failed to determine if running on GKE: %v", err)
}

if isGKE {
log.Info("auto approve csr started as running on GKE")
csrMgr := csr.NewApprovalManager(log, clientset)
csrMgr.Start(ctx)
}

svc.Run(ctx)
}

if cfg.LeaderElection.Enabled {
// Run actions service with leader election. Blocks.
return runWithLeaderElection(ctx, log, clientSetLeader, leaderHealthCheck, &cfg, runSvc)
}

// Run action service. Blocks.
runSvc(ctx)
return nil
}

func runWithLeaderElection(
ctx context.Context,
log logrus.FieldLogger,
clientset kubernetes.Interface,
watchDog *leaderelection.HealthzAdaptor,
cfg *config.Config,
runFunc func(ctx context.Context),
) error {
id, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to determine hostname used in leader ID: %w", err)
}
id = id + "_" + uuid.New().String()

// Start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: cfg.LeaderElection.LockName,
Namespace: cfg.SelfPod.Namespace,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: cfg.LeaderElection.LeaseDuration,
RenewDeadline: cfg.LeaderElection.LeaseRenewDeadline,
RetryPeriod: 3 * time.Second,
WatchDog: watchDog,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.WithFields(logrus.Fields{
"leaseDuration": cfg.LeaderElection.LeaseDuration.String(),
"leaseRenewDuration": cfg.LeaderElection.LeaseRenewDeadline.String(),
}).Infof("leader elected: %s", id)
runFunc(ctx)
},
OnStoppedLeading: func() {
// This method is always called(even if it was not a leader):
// - when controller shuts dow (for example because of SIGTERM)
// - we actually lost leader
// So we need to check what whas reason of acutally stopping.
if err := ctx.Err(); err != nil {
log.Infof("main context done, stopping controller: %v", err)
return
}
log.Infof("leader lost: %s", id)
// We don't need to exit here.
// Leader "on started leading" receive a context that gets cancelled when you're no longer the leader.
},
OnNewLeader: func(identity string) {
// We're notified when new leader elected.
if identity == id {
// I just got the lock.
return
}
log.Infof("new leader elected: %s", identity)
},
},
})
return nil
}

func installPprofHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

type logContextError struct {
err error
fields logrus.Fields
}

func (e *logContextError) Error() string {
return e.err.Error()
}

func (e *logContextError) Unwrap() error {
return e.err
}

func runningOnGKE(clientset *kubernetes.Clientset, cfg config.Config) (isGKE bool, err error) {
err = waitext.Retry(context.Background(), waitext.DefaultExponentialBackoff(), 3, func(ctx context.Context) (bool, error) {
node, err := clientset.CoreV1().Nodes().Get(ctx, cfg.SelfPod.Node, metav1.GetOptions{})
if err != nil {
return true, fmt.Errorf("getting node: %w", err)
}

for k := range node.Labels {
if strings.HasPrefix(k, "cloud.google.com/") {
isGKE = true
return false, nil
}
}

return false, nil
}, func(err error) {
})

return
}

func saveMetadata(clusterID string, cfg config.Config, log *logrus.Entry) error {
metadata := monitor.Metadata{
ClusterID: clusterID,
LastStart: time.Now().UnixNano(),
}
log.Infof("saving metadata: %v to file: %v", metadata, cfg.MonitorMetadataPath)
if err := metadata.Save(cfg.MonitorMetadataPath); err != nil {
return fmt.Errorf("saving metadata: %w", err)
}
return nil
}
18 changes: 18 additions & 0 deletions cmd/monitor/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package monitor

import (
"github.com/spf13/cobra"
)

const Use = "monitor"

func NewCmd() *cobra.Command {
cmd := &cobra.Command{
Use: Use,
RunE: func(cmd *cobra.Command, args []string) error {
return run(cmd.Context())
},
}

return cmd
}
Loading
Loading