From 59e6b0c2a122b22de495903ac5cfe6e85b402d8b Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Tue, 22 Oct 2024 12:33:07 +0200 Subject: [PATCH] KUBE-566: send telemetry to mothership (#145) * add monitor --------- Co-authored-by: Marcin Kaciuba --- cmd/controller/command.go | 18 + cmd/controller/run.go | 333 +++++++++++++++ cmd/monitor/command.go | 18 + cmd/monitor/run.go | 61 +++ cmd/root.go | 48 +++ cmd/utils/flags.go | 5 + internal/actions/create_event_handler.go | 4 +- internal/actions/create_event_handler_test.go | 8 +- internal/actions/csr/svc.go | 2 +- internal/actions/csr/svc_test.go | 8 +- internal/actions/types.go | 2 + internal/castai/client.go | 11 +- internal/castai/mock/client.go | 14 + internal/config/config.go | 110 ++++- internal/config/config_test.go | 6 +- main_test.go => internal/config/retry_test.go | 4 +- internal/controller/controller.go | 2 +- .../logexporter/logexporter.go | 42 +- .../logexporter/logexporter_test.go | 28 +- internal/logexporter/mock/sender.go | 50 --- internal/monitor/metadata.go | 112 +++++ internal/monitor/metatada_test.go | 98 +++++ internal/monitor/monitor.go | 109 +++++ main.go | 402 +----------------- 24 files changed, 990 insertions(+), 505 deletions(-) create mode 100644 cmd/controller/command.go create mode 100644 cmd/controller/run.go create mode 100644 cmd/monitor/command.go create mode 100644 cmd/monitor/run.go create mode 100644 cmd/root.go create mode 100644 cmd/utils/flags.go rename main_test.go => internal/config/retry_test.go (92%) rename internal/{ => controller}/logexporter/logexporter.go (68%) rename internal/{ => controller}/logexporter/logexporter_test.go (64%) delete mode 100644 internal/logexporter/mock/sender.go create mode 100644 internal/monitor/metadata.go create mode 100644 internal/monitor/metatada_test.go create mode 100644 internal/monitor/monitor.go diff --git a/cmd/controller/command.go b/cmd/controller/command.go new file mode 100644 index 00000000..16418468 --- /dev/null +++ b/cmd/controller/command.go @@ -0,0 +1,18 @@ +package controller + +import ( + "github.com/spf13/cobra" +) + +const Use = "controller" + +func NewCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: Use, + RunE: func(cmd *cobra.Command, args []string) error { + return run(cmd.Context()) + }, + } + + return cmd +} diff --git a/cmd/controller/run.go b/cmd/controller/run.go new file mode 100644 index 00000000..d70fda14 --- /dev/null +++ b/cmd/controller/run.go @@ -0,0 +1,333 @@ +package controller + +import ( + "context" + "fmt" + "net/http" + "net/http/pprof" + "os" + "strings" + "time" + + "github.com/bombsimon/logrusr/v4" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/klog/v2" + + "github.com/castai/cluster-controller/cmd/utils" + "github.com/castai/cluster-controller/health" + "github.com/castai/cluster-controller/internal/actions/csr" + "github.com/castai/cluster-controller/internal/castai" + "github.com/castai/cluster-controller/internal/config" + "github.com/castai/cluster-controller/internal/controller" + "github.com/castai/cluster-controller/internal/controller/logexporter" + "github.com/castai/cluster-controller/internal/helm" + "github.com/castai/cluster-controller/internal/k8sversion" + "github.com/castai/cluster-controller/internal/monitor" + "github.com/castai/cluster-controller/internal/waitext" +) + +const ( + maxRequestTimeout = 5 * time.Minute +) + +func run(ctx context.Context) error { + log := logrus.WithFields(logrus.Fields{}) + cfg := config.Get() + + binVersion := ctx.Value(utils.ClusterControllerVersionKey).(*config.ClusterControllerVersion) + log.Infof("running castai-cluster-controller version %v", binVersion) + + logger := logexporter.NewLogger(cfg.Log.Level) + + cl, err := castai.NewRestyClient(cfg.API.URL, cfg.API.Key, cfg.TLS.CACert, logger.Level, binVersion, maxRequestTimeout) + if err != nil { + log.Fatalf("failed to create castai client: %v", err) + } + + client := castai.NewClient(logger, cl, cfg.ClusterID) + + logexporter.SetupLogExporter(logger, client) + + return runController(ctx, client, logger.WithFields(logrus.Fields{ + "cluster_id": cfg.ClusterID, + "version": binVersion.String(), + }), cfg, binVersion) +} + +func runController( + ctx context.Context, + client castai.CastAIClient, + logger *logrus.Entry, + cfg config.Config, + binVersion *config.ClusterControllerVersion, +) (reterr error) { + fields := logrus.Fields{} + + defer func() { + if reterr == nil { + return + } + reterr = &logContextError{ + err: reterr, + fields: fields, + } + }() + + restConfig, err := config.RetrieveKubeConfig(logger) + if err != nil { + return err + } + restConfigLeader := rest.CopyConfig(restConfig) + restConfigDynamic := rest.CopyConfig(restConfig) + + restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) + restConfigLeader.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) + restConfigDynamic.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) + + helmClient := helm.NewClient(logger, helm.NewChartLoader(logger), restConfig) + + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return err + } + clientSetLeader, err := kubernetes.NewForConfig(restConfigLeader) + if err != nil { + return err + } + + dynamicClient, err := dynamic.NewForConfig(restConfigDynamic) + if err != nil { + return err + } + + k8sVer, err := k8sversion.Get(clientset) + if err != nil { + return fmt.Errorf("getting kubernetes version: %w", err) + } + + log := logger.WithFields(logrus.Fields{ + "version": binVersion.Version, + "k8s_version": k8sVer.Full(), + "running_on": cfg.SelfPod.Node, + "ctrl_pod_name": cfg.SelfPod.Name, + }) + + // Set logr/klog to logrus adapter so all logging goes through logrus + logr := logrusr.New(log) + klog.SetLogger(logr) + + log.Infof("running castai-cluster-controller version %v, log-level: %v", binVersion, logger.Level) + + actionsConfig := controller.Config{ + PollWaitInterval: 5 * time.Second, + PollTimeout: maxRequestTimeout, + AckTimeout: 30 * time.Second, + AckRetriesCount: 3, + AckRetryWait: 1 * time.Second, + ClusterID: cfg.ClusterID, + Version: binVersion.Version, + Namespace: cfg.SelfPod.Namespace, + } + healthzAction := health.NewHealthzProvider(health.HealthzCfg{HealthyPollIntervalLimit: (actionsConfig.PollWaitInterval + actionsConfig.PollTimeout) * 2, StartTimeLimit: 2 * time.Minute}, log) + + svc := controller.NewService( + log, + actionsConfig, + k8sVer.Full(), + clientset, + dynamicClient, + client, + helmClient, + healthzAction, + ) + defer func() { + if err := svc.Close(); err != nil { + log.Errorf("failed to close controller service: %v", err) + } + }() + + httpMux := http.NewServeMux() + var checks []healthz.HealthChecker + checks = append(checks, healthzAction) + var leaderHealthCheck *leaderelection.HealthzAdaptor + if cfg.LeaderElection.Enabled { + leaderHealthCheck = leaderelection.NewLeaderHealthzAdaptor(time.Minute) + checks = append(checks, leaderHealthCheck) + } + healthz.InstallHandler(httpMux, checks...) + installPprofHandlers(httpMux) + + // Start http server for pprof and health checks handlers. + go func() { + addr := fmt.Sprintf(":%d", cfg.PprofPort) + log.Infof("starting pprof server on %s", addr) + + //TODO: remove nolint when we have a proper solution for this + //nolint:gosec + if err := http.ListenAndServe(addr, httpMux); err != nil { + log.Errorf("failed to start pprof http server: %v", err) + } + }() + + if err := saveMetadata(cfg.ClusterID, cfg, log); err != nil { + return err + } + + runSvc := func(ctx context.Context) { + isGKE, err := runningOnGKE(clientset, cfg) + if err != nil { + log.Fatalf("failed to determine if running on GKE: %v", err) + } + + if isGKE { + log.Info("auto approve csr started as running on GKE") + csrMgr := csr.NewApprovalManager(log, clientset) + csrMgr.Start(ctx) + } + + svc.Run(ctx) + } + + if cfg.LeaderElection.Enabled { + // Run actions service with leader election. Blocks. + return runWithLeaderElection(ctx, log, clientSetLeader, leaderHealthCheck, &cfg, runSvc) + } + + // Run action service. Blocks. + runSvc(ctx) + return nil +} + +func runWithLeaderElection( + ctx context.Context, + log logrus.FieldLogger, + clientset kubernetes.Interface, + watchDog *leaderelection.HealthzAdaptor, + cfg *config.Config, + runFunc func(ctx context.Context), +) error { + id, err := os.Hostname() + if err != nil { + return fmt.Errorf("failed to determine hostname used in leader ID: %w", err) + } + id = id + "_" + uuid.New().String() + + // Start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: cfg.LeaderElection.LockName, + Namespace: cfg.SelfPod.Namespace, + }, + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + }, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + ReleaseOnCancel: true, + LeaseDuration: cfg.LeaderElection.LeaseDuration, + RenewDeadline: cfg.LeaderElection.LeaseRenewDeadline, + RetryPeriod: 3 * time.Second, + WatchDog: watchDog, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + log.WithFields(logrus.Fields{ + "leaseDuration": cfg.LeaderElection.LeaseDuration.String(), + "leaseRenewDuration": cfg.LeaderElection.LeaseRenewDeadline.String(), + }).Infof("leader elected: %s", id) + runFunc(ctx) + }, + OnStoppedLeading: func() { + // This method is always called(even if it was not a leader): + // - when controller shuts dow (for example because of SIGTERM) + // - we actually lost leader + // So we need to check what whas reason of acutally stopping. + if err := ctx.Err(); err != nil { + log.Infof("main context done, stopping controller: %v", err) + return + } + log.Infof("leader lost: %s", id) + // We don't need to exit here. + // Leader "on started leading" receive a context that gets cancelled when you're no longer the leader. + }, + OnNewLeader: func(identity string) { + // We're notified when new leader elected. + if identity == id { + // I just got the lock. + return + } + log.Infof("new leader elected: %s", identity) + }, + }, + }) + return nil +} + +func installPprofHandlers(mux *http.ServeMux) { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) +} + +type logContextError struct { + err error + fields logrus.Fields +} + +func (e *logContextError) Error() string { + return e.err.Error() +} + +func (e *logContextError) Unwrap() error { + return e.err +} + +func runningOnGKE(clientset *kubernetes.Clientset, cfg config.Config) (isGKE bool, err error) { + err = waitext.Retry(context.Background(), waitext.DefaultExponentialBackoff(), 3, func(ctx context.Context) (bool, error) { + node, err := clientset.CoreV1().Nodes().Get(ctx, cfg.SelfPod.Node, metav1.GetOptions{}) + if err != nil { + return true, fmt.Errorf("getting node: %w", err) + } + + for k := range node.Labels { + if strings.HasPrefix(k, "cloud.google.com/") { + isGKE = true + return false, nil + } + } + + return false, nil + }, func(err error) { + }) + + return +} + +func saveMetadata(clusterID string, cfg config.Config, log *logrus.Entry) error { + metadata := monitor.Metadata{ + ClusterID: clusterID, + LastStart: time.Now().UnixNano(), + } + log.Infof("saving metadata: %v to file: %v", metadata, cfg.MonitorMetadataPath) + if err := metadata.Save(cfg.MonitorMetadataPath); err != nil { + return fmt.Errorf("saving metadata: %w", err) + } + return nil +} diff --git a/cmd/monitor/command.go b/cmd/monitor/command.go new file mode 100644 index 00000000..f41a1d00 --- /dev/null +++ b/cmd/monitor/command.go @@ -0,0 +1,18 @@ +package monitor + +import ( + "github.com/spf13/cobra" +) + +const Use = "monitor" + +func NewCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: Use, + RunE: func(cmd *cobra.Command, args []string) error { + return run(cmd.Context()) + }, + } + + return cmd +} diff --git a/cmd/monitor/run.go b/cmd/monitor/run.go new file mode 100644 index 00000000..26e526c7 --- /dev/null +++ b/cmd/monitor/run.go @@ -0,0 +1,61 @@ +package monitor + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/cmd/utils" + "github.com/castai/cluster-controller/internal/castai" + "github.com/castai/cluster-controller/internal/config" + "github.com/castai/cluster-controller/internal/controller/logexporter" + "github.com/castai/cluster-controller/internal/monitor" +) + +const ( + maxRequestTimeout = 15 * time.Second +) + +func run(ctx context.Context) error { + cfg := config.Get() + if cfg.API.Key == "" { + return errors.New("env variable \"API_KEY\" is required") + } + if cfg.API.URL == "" { + return errors.New("env variable \"API_URL\" is required") + } + binVersion := ctx.Value(utils.ClusterControllerVersionKey).(*config.ClusterControllerVersion) + + logger := logexporter.NewLogger(cfg.Log.Level) + log := logger.WithFields(logrus.Fields{ + "cluster_id": cfg.ClusterID, + "version": binVersion.String(), + }) + + cl, err := castai.NewRestyClient(cfg.API.URL, cfg.API.Key, cfg.TLS.CACert, logger.Level, binVersion, maxRequestTimeout) + if err != nil { + log.Fatalf("failed to create castai client: %v", err) + } + client := castai.NewClient(logger, cl, cfg.ClusterID) + + logexporter.SetupLogExporter(logger, client) + + return runMonitorMode(ctx, log, &cfg) +} + +func runMonitorMode(ctx context.Context, log *logrus.Entry, cfg *config.Config) error { + restConfig, err := config.RetrieveKubeConfig(log) + if err != nil { + return fmt.Errorf("retrieving kubeconfig: %w", err) + } + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("obtaining kubernetes clientset: %w", err) + } + + return monitor.Run(ctx, log, clientSet, cfg.MonitorMetadataPath, cfg.SelfPod) +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 00000000..16410faa --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,48 @@ +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/castai/cluster-controller/cmd/controller" + "github.com/castai/cluster-controller/cmd/monitor" +) + +var rootCmd = &cobra.Command{ + Use: "castai-cluster-controller", +} + +func Execute(ctx context.Context) { + var cmdFound bool + cmd := rootCmd.Commands() + + for _, a := range cmd { + for _, b := range os.Args[1:] { + if a.Name() == b { + cmdFound = true + break + } + } + } + if !cmdFound { + args := append([]string{controller.Use}, os.Args[1:]...) + rootCmd.SetArgs(args) + } + + if err := rootCmd.ExecuteContext(ctx); err != nil { + fatal(err) + } +} + +func init() { + rootCmd.AddCommand(controller.NewCmd()) + rootCmd.AddCommand(monitor.NewCmd()) +} + +func fatal(err error) { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) +} diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go new file mode 100644 index 00000000..e68079ac --- /dev/null +++ b/cmd/utils/flags.go @@ -0,0 +1,5 @@ +package utils + +type ClusterControllerVersion string + +const ClusterControllerVersionKey ClusterControllerVersion = "cluster-controller-version" diff --git a/internal/actions/create_event_handler.go b/internal/actions/create_event_handler.go index 846ca280..a7269dbf 100644 --- a/internal/actions/create_event_handler.go +++ b/internal/actions/create_event_handler.go @@ -1,5 +1,3 @@ -//go:generate mockgen -package=mock_actions -destination ./mock/kubernetes.go k8s.io/client-go/kubernetes Interface - package actions import ( @@ -23,7 +21,7 @@ func NewCreateEventHandler(log logrus.FieldLogger, clientset kubernetes.Interfac eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: clientset.CoreV1().Events(ns)}) eventBroadcaster.StartStructuredLogging(0) - log.Debug("create new broadcaster and recorder for namespace: %s", ns) + log.Debugf("create new broadcaster and recorder for namespace: %s", ns) // Create an event recorder. return eventBroadcaster, eventBroadcaster.NewRecorder(nil, v1.EventSource{ Component: reporter, diff --git a/internal/actions/create_event_handler_test.go b/internal/actions/create_event_handler_test.go index 06dcb17a..63a5c669 100644 --- a/internal/actions/create_event_handler_test.go +++ b/internal/actions/create_event_handler_test.go @@ -227,7 +227,7 @@ func podObjReference(p *corev1.Pod) corev1.ObjectReference { func TestCreateEventHandler_Handle(t *testing.T) { t.Parallel() type fields struct { - tuneMockClientSey func(m *mock_actions.MockInterface) + tuneMockClientSet func(m *mock_actions.MockInterface) } type args struct { action *castai.ClusterAction @@ -241,7 +241,7 @@ func TestCreateEventHandler_Handle(t *testing.T) { { name: "detect race condition: recorder and broadcaster should be called only once", fields: fields{ - tuneMockClientSey: func(m *mock_actions.MockInterface) { + tuneMockClientSet: func(m *mock_actions.MockInterface) { fakeClientSet := fake.NewClientset() m.EXPECT().CoreV1().Return(fakeClientSet.CoreV1()) }, @@ -263,8 +263,8 @@ func TestCreateEventHandler_Handle(t *testing.T) { m := gomock.NewController(t) defer m.Finish() client := mock_actions.NewMockInterface(m) - if tt.fields.tuneMockClientSey != nil { - tt.fields.tuneMockClientSey(client) + if tt.fields.tuneMockClientSet != nil { + tt.fields.tuneMockClientSet(client) } handler := NewCreateEventHandler(logrus.New(), client) // defer handler.Close() diff --git a/internal/actions/csr/svc.go b/internal/actions/csr/svc.go index 817eba85..a527ff90 100644 --- a/internal/actions/csr/svc.go +++ b/internal/actions/csr/svc.go @@ -37,7 +37,7 @@ func (h *ApprovalManager) Start(ctx context.Context) { go h.runAutoApproveForCastAINodes(ctx) } -func (h *ApprovalManager) Stop(ctx context.Context) { +func (h *ApprovalManager) Stop() { h.stopAutoApproveForCastAINodes() } diff --git a/internal/actions/csr/svc_test.go b/internal/actions/csr/svc_test.go index 4b40fad6..ad0b7dd3 100644 --- a/internal/actions/csr/svc_test.go +++ b/internal/actions/csr/svc_test.go @@ -46,7 +46,7 @@ func TestCSRApprove(t *testing.T) { csrName := "node-csr-123" userName := "kubelet-bootstrap" - client := fake.NewSimpleClientset(getCSR(csrName, userName)) + client := fake.NewClientset(getCSR(csrName, userName)) s := NewApprovalManager(log, client) watcher := watch.NewFake() client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil)) @@ -62,7 +62,7 @@ func TestCSRApprove(t *testing.T) { defer wg.Done() watcher.Add(getCSR(csrName, userName)) time.Sleep(100 * time.Millisecond) - s.Stop(ctx) + s.Stop() }() wg.Wait() @@ -78,7 +78,7 @@ func TestCSRApprove(t *testing.T) { csrName := "123" userName := "kubelet-bootstrap" - client := fake.NewSimpleClientset(getCSR(csrName, userName)) + client := fake.NewClientset(getCSR(csrName, userName)) s := NewApprovalManager(log, client) watcher := watch.NewFake() client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil)) @@ -94,7 +94,7 @@ func TestCSRApprove(t *testing.T) { defer wg.Done() watcher.Add(getCSR(csrName, userName)) time.Sleep(100 * time.Millisecond) - s.Stop(ctx) + s.Stop() }() wg.Wait() diff --git a/internal/actions/types.go b/internal/actions/types.go index 3e21a6be..2274bfdc 100644 --- a/internal/actions/types.go +++ b/internal/actions/types.go @@ -1,4 +1,6 @@ //go:generate mockgen -destination ./mock/handler.go . ActionHandler +//go:generate mockgen -package=mock_actions -destination ./mock/kubernetes.go k8s.io/client-go/kubernetes Interface + package actions import ( diff --git a/internal/castai/client.go b/internal/castai/client.go index 5152723f..11136db6 100644 --- a/internal/castai/client.go +++ b/internal/castai/client.go @@ -15,7 +15,6 @@ import ( "golang.org/x/net/http2" "github.com/castai/cluster-controller/internal/config" - "github.com/castai/cluster-controller/internal/logexporter" ) const ( @@ -29,6 +28,14 @@ type CastAIClient interface { GetActions(ctx context.Context, k8sVersion string) ([]*ClusterAction, error) AckAction(ctx context.Context, actionID string, req *AckClusterActionRequest) error SendAKSInitData(ctx context.Context, req *AKSInitDataRequest) error + SendLog(ctx context.Context, e *LogEntry) error +} + +type LogEntry struct { + Level string `json:"level"` + Time time.Time `json:"time"` + Message string `json:"message"` + Fields logrus.Fields `json:"fields"` } // Client talks to Cast AI. It can poll and acknowledge actions @@ -134,7 +141,7 @@ func (c *Client) SendAKSInitData(ctx context.Context, req *AKSInitDataRequest) e return nil } -func (c *Client) SendLog(ctx context.Context, e *logexporter.LogEntry) error { +func (c *Client) SendLog(ctx context.Context, e *LogEntry) error { // Server expects fields values to be strings. If they're not it fails with BAD_REQUEST/400. // Alternatively we could use "google/protobuf/any.proto" on server side but ATM it doesn't work. for k, v := range e.Fields { diff --git a/internal/castai/mock/client.go b/internal/castai/mock/client.go index 266b7eed..483b3aa4 100644 --- a/internal/castai/mock/client.go +++ b/internal/castai/mock/client.go @@ -77,3 +77,17 @@ func (mr *MockCastAIClientMockRecorder) SendAKSInitData(arg0, arg1 interface{}) mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAKSInitData", reflect.TypeOf((*MockCastAIClient)(nil).SendAKSInitData), arg0, arg1) } + +// SendLog mocks base method. +func (m *MockCastAIClient) SendLog(arg0 context.Context, arg1 *castai.LogEntry) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendLog", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendLog indicates an expected call of SendLog. +func (mr *MockCastAIClientMockRecorder) SendLog(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendLog", reflect.TypeOf((*MockCastAIClient)(nil).SendLog), arg0, arg1) +} diff --git a/internal/config/config.go b/internal/config/config.go index 037eff79..e8abe843 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,11 +1,19 @@ package config import ( + "context" "fmt" + "net/http" + "os" "time" "github.com/sirupsen/logrus" "github.com/spf13/viper" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "github.com/castai/cluster-controller/internal/waitext" ) type Config struct { @@ -17,8 +25,15 @@ type Config struct { ClusterID string PprofPort int LeaderElection LeaderElection - PodName string - NodeName string + + MonitorMetadataPath string `mapstructure:"monitor_metadata"` + SelfPod Pod `mapstructure:"self_pod"` +} + +type Pod struct { + Namespace string `mapstructure:"namespace"` + Name string `mapstructure:"name"` + Node string `mapstructure:"node"` } type Log struct { @@ -36,7 +51,6 @@ type TLS struct { type LeaderElection struct { Enabled bool - Namespace string LockName string LeaseDuration time.Duration LeaseRenewDeadline time.Duration @@ -69,12 +83,13 @@ func Get() Config { _ = viper.BindEnv("kubeclient.burst", "KUBECLIENT_BURST") _ = viper.BindEnv("pprofport", "PPROF_PORT") _ = viper.BindEnv("leaderelection.enabled", "LEADER_ELECTION_ENABLED") - _ = viper.BindEnv("leaderelection.namespace", "LEADER_ELECTION_NAMESPACE") _ = viper.BindEnv("leaderelection.lockname", "LEADER_ELECTION_LOCK_NAME") _ = viper.BindEnv("leaderelection.leaseduration", "LEADER_ELECTION_LEASE_DURATION") _ = viper.BindEnv("leaderelection.leaserenewdeadline", "LEADER_ELECTION_LEASE_RENEW_DEADLINE") - _ = viper.BindEnv("nodename", "KUBERNETES_NODE_NAME") - _ = viper.BindEnv("podname", "KUBERNETES_POD") + _ = viper.BindEnv("monitor_metadata", "MONITOR_METADATA") + _ = viper.BindEnv("self_pod.node", "KUBERNETES_NODE_NAME") + _ = viper.BindEnv("self_pod.name", "KUBERNETES_POD") + _ = viper.BindEnv("self_pod.namespace", "LEADER_ELECTION_NAMESPACE") cfg = &Config{} if err := viper.Unmarshal(&cfg); err != nil { @@ -96,10 +111,11 @@ func Get() Config { if cfg.ClusterID == "" { required("CLUSTER_ID") } + if cfg.SelfPod.Namespace == "" { + required("LEADER_ELECTION_NAMESPACE") + } + if cfg.LeaderElection.Enabled { - if cfg.LeaderElection.Namespace == "" { - required("LEADER_ELECTION_NAMESPACE") - } if cfg.LeaderElection.LockName == "" { required("LEADER_ELECTION_LOCK_NAME") } @@ -123,3 +139,79 @@ func Get() Config { func required(variable string) { panic(fmt.Errorf("env variable %s is required", variable)) } + +func kubeConfigFromEnv() (*rest.Config, error) { + kubepath := Get().Kubeconfig + if kubepath == "" { + return nil, nil + } + + data, err := os.ReadFile(kubepath) + if err != nil { + return nil, fmt.Errorf("reading kubeconfig at %s: %w", kubepath, err) + } + + restConfig, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return nil, fmt.Errorf("building rest config from kubeconfig at %s: %w", kubepath, err) + } + + return restConfig, nil +} + +func RetrieveKubeConfig(log logrus.FieldLogger) (*rest.Config, error) { + kubeconfig, err := kubeConfigFromEnv() + if err != nil { + return nil, err + } + + if kubeconfig != nil { + log.Debug("using kubeconfig from env variables") + return kubeconfig, nil + } + + inClusterConfig, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + inClusterConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &kubeRetryTransport{ + log: log, + next: rt, + maxRetries: 10, + retryInterval: 3 * time.Second, + } + }) + log.Debug("using in cluster kubeconfig") + + return inClusterConfig, nil +} + +type kubeRetryTransport struct { + log logrus.FieldLogger + next http.RoundTripper + maxRetries int + retryInterval time.Duration +} + +func (rt *kubeRetryTransport) RoundTrip(req *http.Request) (*http.Response, error) { + var resp *http.Response + + boff := waitext.NewConstantBackoff(rt.retryInterval) + + err := waitext.Retry(context.Background(), boff, rt.maxRetries, func(_ context.Context) (bool, error) { + var err error + resp, err = rt.next.RoundTrip(req) + if err != nil { + // Previously client-go contained logic to retry connection refused errors. See https://github.com/kubernetes/kubernetes/pull/88267/files + if net.IsConnectionRefused(err) { + return true, err + } + return false, err + } + return false, nil + }, func(err error) { + rt.log.Warnf("kube api server connection refused, will retry: %v", err) + }) + return resp, err +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index f72ad69c..4552b405 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -32,10 +32,12 @@ func TestConfig(t *testing.T) { URL: "api.cast.ai", }, Kubeconfig: "~/.kube/config", - ClusterID: "c1", + SelfPod: Pod{ + Namespace: "castai-agent", + }, + ClusterID: "c1", LeaderElection: LeaderElection{ Enabled: true, - Namespace: "castai-agent", LockName: "castai-cluster-controller", LeaseDuration: time.Second * 25, LeaseRenewDeadline: time.Second * 20, diff --git a/main_test.go b/internal/config/retry_test.go similarity index 92% rename from main_test.go rename to internal/config/retry_test.go index 6947e9e1..5a96b1e5 100644 --- a/main_test.go +++ b/internal/config/retry_test.go @@ -1,4 +1,4 @@ -package main +package config import ( "errors" @@ -56,7 +56,7 @@ type mockRoundTripper struct { calls int32 } -func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { +func (m *mockRoundTripper) RoundTrip(_ *http.Request) (*http.Response, error) { atomic.AddInt32(&m.calls, 1) return nil, m.err } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 170490f6..06bbb4ae 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -212,7 +212,7 @@ func (s *Controller) handleAction(ctx context.Context, action *castai.ClusterAct }).Info("handle action") handler, ok := s.actionHandlers[actionType] if !ok { - return fmt.Errorf("handler not found for agent action=%s", actionType) + return fmt.Errorf("handler not found for action=%s", actionType) } if err := handler.Handle(ctx, action); err != nil { diff --git a/internal/logexporter/logexporter.go b/internal/controller/logexporter/logexporter.go similarity index 68% rename from internal/logexporter/logexporter.go rename to internal/controller/logexporter/logexporter.go index 5a33a3aa..522153f6 100644 --- a/internal/logexporter/logexporter.go +++ b/internal/controller/logexporter/logexporter.go @@ -1,13 +1,16 @@ -//go:generate mockgen -destination ./mock/sender.go . LogSender package logexporter import ( "context" + "fmt" + "path" + "runtime" "sync" "time" "github.com/sirupsen/logrus" + "github.com/castai/cluster-controller/internal/castai" "github.com/castai/cluster-controller/internal/waitext" ) @@ -15,30 +18,39 @@ const ( sendTimeout = 15 * time.Second ) -type LogEntry struct { - Level string `json:"level"` - Time time.Time `json:"time"` - Message string `json:"message"` - Fields logrus.Fields `json:"fields"` -} - -type LogSender interface { - SendLog(ctx context.Context, e *LogEntry) error -} - // LogExporter hooks into logrus and sends logs to Mothership. type LogExporter struct { logger *logrus.Logger - sender LogSender + sender castai.CastAIClient wg sync.WaitGroup } // exporter must satisfy logrus.Hook. var _ logrus.Hook = new(LogExporter) +func NewLogger(logLevel uint32) *logrus.Logger { + logger := logrus.New() + logger.SetLevel(logrus.Level(logLevel)) + logger.SetReportCaller(true) + logger.Formatter = &logrus.TextFormatter{ + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + filename := path.Base(f.File) + return fmt.Sprintf("%s()", f.Function), fmt.Sprintf("%s:%d", filename, f.Line) + }, + } + + return logger +} + +func SetupLogExporter(logger *logrus.Logger, sender castai.CastAIClient) { + logExporter := newLogExporter(logger, sender) + logger.AddHook(logExporter) + logrus.RegisterExitHandler(logExporter.Wait) +} + // NewLogExporter returns new exporter that can be hooked into logrus // to inject logs into Cast AI. -func NewLogExporter(logger *logrus.Logger, sender LogSender) *LogExporter { +func newLogExporter(logger *logrus.Logger, sender castai.CastAIClient) *LogExporter { return &LogExporter{ logger: logger, sender: sender, @@ -78,7 +90,7 @@ func (e *LogExporter) sendLogEvent(log *logrus.Entry) { ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) defer cancel() - logEntry := &LogEntry{ + logEntry := &castai.LogEntry{ Level: log.Level.String(), Time: log.Time, Message: log.Message, diff --git a/internal/logexporter/logexporter_test.go b/internal/controller/logexporter/logexporter_test.go similarity index 64% rename from internal/logexporter/logexporter_test.go rename to internal/controller/logexporter/logexporter_test.go index 38ff050e..7390b96c 100644 --- a/internal/logexporter/logexporter_test.go +++ b/internal/controller/logexporter/logexporter_test.go @@ -1,4 +1,4 @@ -package logexporter_test +package logexporter import ( "fmt" @@ -6,11 +6,9 @@ import ( "github.com/golang/mock/gomock" "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" "go.uber.org/goleak" - "github.com/castai/cluster-controller/internal/logexporter" - mock_logexporter "github.com/castai/cluster-controller/internal/logexporter/mock" + mock_castai "github.com/castai/cluster-controller/internal/castai/mock" ) func TestMain(m *testing.M) { @@ -20,8 +18,8 @@ func TestMain(m *testing.M) { func TestSetupLogExporter(t *testing.T) { t.Parallel() type args struct { - tuneMockSender func(sender *mock_logexporter.MockLogSender) - msg map[uint32]string // level -> message. + tuneMockSender func(sender *mock_castai.MockCastAIClient) + msg map[uint32]string // level -> message } tests := []struct { name string @@ -34,7 +32,7 @@ func TestSetupLogExporter(t *testing.T) { uint32(logrus.ErrorLevel): "foo", uint32(logrus.DebugLevel): "bar", }, - tuneMockSender: func(sender *mock_logexporter.MockLogSender) { + tuneMockSender: func(sender *mock_castai.MockCastAIClient) { sender.EXPECT().SendLog(gomock.Any(), gomock.Any()). Return(nil).Times(1) }, @@ -47,9 +45,9 @@ func TestSetupLogExporter(t *testing.T) { uint32(logrus.ErrorLevel): "foo", uint32(logrus.DebugLevel): "bar", }, - tuneMockSender: func(sender *mock_logexporter.MockLogSender) { + tuneMockSender: func(sender *mock_castai.MockCastAIClient) { sender.EXPECT().SendLog(gomock.Any(), gomock.Any()). - Return(fmt.Errorf("test-error")).Times(4) + Return(fmt.Errorf("test-error")).Times(4) // 1 for first error, 3 for retries }, }, }, @@ -60,22 +58,22 @@ func TestSetupLogExporter(t *testing.T) { t.Parallel() m := gomock.NewController(t) defer m.Finish() - sender := mock_logexporter.NewMockLogSender(m) + sender := mock_castai.NewMockCastAIClient(m) if tt.args.tuneMockSender != nil { tt.args.tuneMockSender(sender) } - logger, hook := test.NewNullLogger() - defer hook.Reset() + logger := NewLogger(uint32(logrus.InfoLevel)) + + logExporter := newLogExporter(logger, sender) + logger.AddHook(logExporter) + defer logExporter.Wait() - e := logexporter.NewLogExporter(logger, sender) - logger.AddHook(e) log := logger.WithFields(logrus.Fields{ "cluster_id": "test-cluster", }) for level, msg := range tt.args.msg { log.Log(logrus.Level(level), msg) } - e.Wait() }) } } diff --git a/internal/logexporter/mock/sender.go b/internal/logexporter/mock/sender.go deleted file mode 100644 index 75264a2d..00000000 --- a/internal/logexporter/mock/sender.go +++ /dev/null @@ -1,50 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/castai/cluster-controller/internal/logexporter (interfaces: LogSender) - -// Package mock_logexporter is a generated GoMock package. -package mock_logexporter - -import ( - context "context" - reflect "reflect" - - logexporter "github.com/castai/cluster-controller/internal/logexporter" - gomock "github.com/golang/mock/gomock" -) - -// MockLogSender is a mock of LogSender interface. -type MockLogSender struct { - ctrl *gomock.Controller - recorder *MockLogSenderMockRecorder -} - -// MockLogSenderMockRecorder is the mock recorder for MockLogSender. -type MockLogSenderMockRecorder struct { - mock *MockLogSender -} - -// NewMockLogSender creates a new mock instance. -func NewMockLogSender(ctrl *gomock.Controller) *MockLogSender { - mock := &MockLogSender{ctrl: ctrl} - mock.recorder = &MockLogSenderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockLogSender) EXPECT() *MockLogSenderMockRecorder { - return m.recorder -} - -// SendLog mocks base method. -func (m *MockLogSender) SendLog(arg0 context.Context, arg1 *logexporter.LogEntry) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendLog", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendLog indicates an expected call of SendLog. -func (mr *MockLogSenderMockRecorder) SendLog(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendLog", reflect.TypeOf((*MockLogSender)(nil).SendLog), arg0, arg1) -} diff --git a/internal/monitor/metadata.go b/internal/monitor/metadata.go new file mode 100644 index 00000000..1888c272 --- /dev/null +++ b/internal/monitor/metadata.go @@ -0,0 +1,112 @@ +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/fsnotify/fsnotify" + "github.com/sirupsen/logrus" +) + +type Metadata struct { + ClusterID string `json:"clusterId"` + LastStart int64 `json:"lastStart"` +} + +func (m *Metadata) Save(file string) error { + if file == "" { + // if monitor is running standalone or with an old chart version, and saving of + // metadata is not configured, we don't need to do anything here + return nil + } + contents, err := json.Marshal(m) + if err != nil { + return fmt.Errorf("marshaling: %w", err) + } + return os.WriteFile(file, contents, 0o600) +} + +var errEmptyMetadata = fmt.Errorf("metadata file is empty") + +func (m *Metadata) Load(file string) error { + contents, err := os.ReadFile(file) + if err != nil { + return fmt.Errorf("reading file: %w", err) + } + if len(contents) == 0 { + return errEmptyMetadata + } + if err := json.Unmarshal(contents, m); err != nil { + return fmt.Errorf("file: %v content: %v parsing json: %w", file, string(contents), err) + } + return nil +} + +// watchForMetadataChanges starts a watch on a local file for updates and returns changes to metadata channel. watcher stops when context is done +func watchForMetadataChanges(ctx context.Context, log logrus.FieldLogger, metadataFilePath string) (chan Metadata, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("setting up new watcher: %w", err) + } + updates := make(chan Metadata, 1) + + if err := watcher.Add(filepath.Dir(metadataFilePath)); err != nil { + return nil, fmt.Errorf("adding watch: %w", err) + } + + checkMetadata := func() { + metadata := Metadata{} + if err := metadata.Load(metadataFilePath); err != nil { + if !strings.Contains(err.Error(), "no such file or directory") { + log.Warnf("loading metadata failed: %v", err) + } + } else { + select { + case updates <- metadata: + default: + log.Warnf("metadata update skipped, channel full") + } + } + } + + go func() { + defer close(updates) + defer func() { + err := watcher.Close() + if err != nil { + log.Warnf("watcher close error: %v", err) + } + }() + checkMetadata() + + for { + select { + case <-ctx.Done(): + return + case event := <-watcher.Events: + if opContains(event.Op, fsnotify.Create, fsnotify.Write) && event.Name == metadataFilePath { + checkMetadata() + } + case err := <-watcher.Errors: + log.Errorf("metadata watch error: %v", err) + } + } + }() + + return updates, nil +} + +// opContains tests that op contains at least one of the values +func opContains(op fsnotify.Op, values ...fsnotify.Op) bool { + for _, v := range values { + // event.Op may contain multiple values or-ed together, can't use simple equality check + if op&v == v { + return true + } + } + return false +} diff --git a/internal/monitor/metatada_test.go b/internal/monitor/metatada_test.go new file mode 100644 index 00000000..07bacaa6 --- /dev/null +++ b/internal/monitor/metatada_test.go @@ -0,0 +1,98 @@ +package monitor + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func TestSaveMetadata(t *testing.T) { + tests := map[string]struct { + createDir string + file string + expectedError *string + }{ + "not configured": { + file: "", + expectedError: nil, + }, + "invalid file dir": { + file: "no_such_dir/abc", + expectedError: lo.ToPtr("open.*no such file or directory"), + }, + "valid dir": { + createDir: "metadata", + file: "metadata/info", + }, + } + + for testName, tt := range tests { + tt := tt + t.Run(testName, func(t *testing.T) { + r := require.New(t) + baseDir := t.TempDir() + if tt.createDir != "" { + r.NoError(os.MkdirAll(filepath.Join(baseDir, tt.createDir), 0o700)) + } + m := Metadata{ + ClusterID: uuid.New().String(), + LastStart: 123, + } + saveTo := tt.file + if tt.file != "" { + saveTo = filepath.Join(baseDir, tt.file) + } + + err := m.Save(saveTo) + if tt.expectedError == nil { + r.NoError(err) + } else { + r.Regexp(*tt.expectedError, err.Error()) + } + }) + } +} + +func Test_monitor_waitForMetadata(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + syncFile := filepath.Join(t.TempDir(), "metadata.json") + + updates, err := watchForMetadataChanges(ctx, logrus.New(), syncFile) + require.NoError(t, err) + + // make sure that watcher does not find the file immediately and goes into watcher loop + time.Sleep(time.Second * 1) + + // create the file, expect the event to arrive at updates channel + var meta Metadata + maxI := int64(124) + for i := int64(0); i <= maxI; i++ { + meta = Metadata{ + ClusterID: uuid.New().String(), + LastStart: i, + } + require.NoError(t, meta.Save(syncFile)) + } + + metadata, ok := <-updates + require.True(t, ok) + require.True(t, maxI >= metadata.LastStart, "expected last start to be %d, got %d", maxI, metadata.LastStart) + require.True(t, metadata.LastStart != 0, "expected last start to be non-zero, got %d", metadata.LastStart) + + cancel() + + for range updates { + // exhaust other events + } + _, ok = <-updates + require.False(t, ok, "after ctx is done, updates channel should get closed as watcher exits") +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go new file mode 100644 index 00000000..d435ba13 --- /dev/null +++ b/internal/monitor/monitor.go @@ -0,0 +1,109 @@ +package monitor + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/samber/lo" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/config" +) + +func Run(ctx context.Context, log logrus.FieldLogger, clientset *kubernetes.Clientset, metadataFile string, pod config.Pod) error { + m := monitor{ + clientset: clientset, + log: log, + pod: pod, + } + + metadataUpdates, err := watchForMetadataChanges(ctx, m.log, metadataFile) + if err != nil { + return fmt.Errorf("setting up metadata watch: %w", err) + } + + for { + select { + case <-ctx.Done(): + return nil + case metadata := <-metadataUpdates: + m.metadataUpdated(ctx, metadata) + } + } +} + +type monitor struct { + clientset *kubernetes.Clientset + log logrus.FieldLogger + metadata Metadata + pod config.Pod +} + +// metadataUpdated gets called each time we receive a notification from metadata file watcher that there were changes to it +func (m *monitor) metadataUpdated(ctx context.Context, metadata Metadata) { + prevMetadata := m.metadata + m.metadata = metadata + if prevMetadata.LastStart == 0 || prevMetadata.LastStart == metadata.LastStart { + // if we just received first metadata or there were no changes, nothing to do + return + } + + m.reportPodDiagnostics(ctx, prevMetadata.LastStart) +} + +func (m *monitor) reportPodDiagnostics(ctx context.Context, prevLastStart int64) { + m.log.Errorf("unexpected controller restart detected, fetching k8s events for %s/%s", m.pod.Namespace, m.pod.Name) + + // log pod-related warnings + m.logEvents(ctx, m.log.WithField("events_group", fmt.Sprintf("%s/%s", m.pod.Namespace, m.pod.Name)), m.pod.Namespace, &metav1.ListOptions{ + FieldSelector: "involvedObject.name=" + m.pod.Name, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + }, + }, func(event *v1.Event) bool { + return true + }) + + // Log node-related warnings. We can't find relevant messages easily as there's no metadata linking events to specific pods, + // and even filtering by PID id does not work (controller process PID is different inside the pod and as seen from the node). + // Instead, will use simple filtering by "cluster-controller"; combined with node-name filter, this should be sufficient enough + // to narrow the list down to controller-related events only. + // Example: Memory cgroup out of memory: Killed process 414273 (castai-cluster-) total-vm:5477892kB, anon-rss:14740kB + m.logEvents(ctx, m.log.WithFields(logrus.Fields{ + "events_group": fmt.Sprintf("node/%s", m.pod.Node), + "prevLastStart": prevLastStart, + }), v1.NamespaceAll, &metav1.ListOptions{ + FieldSelector: "involvedObject.name=" + m.pod.Node, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + }, + }, func(event *v1.Event) bool { + // OOM events are reported on the node, but the only relation to the pod is the killed process PID. + return strings.Contains(event.Message, "castai-cluster-") + }) +} + +func (m *monitor) logEvents(ctx context.Context, log logrus.FieldLogger, namespace string, listOptions *metav1.ListOptions, filter func(event *v1.Event) bool) { + events, err := m.clientset.CoreV1().Events(namespace).List(ctx, *listOptions) + if err != nil { + log.Errorf("failed fetching k8s events after controller restart: %v", err) + return + } + relevantEvents := lo.Filter(events.Items, func(e v1.Event, _ int) bool { + return e.Type != v1.EventTypeNormal && filter(&e) + }) + + if len(relevantEvents) == 0 { + log.Warnf("no relevant k8s events detected out of %d retrieved", len(events.Items)) + return + } + + for _, e := range relevantEvents { + log.Errorf("k8s events detected: TYPE:%s REASON:%s TIMESTAMP:%s MESSAGE:%s", e.Type, e.Reason, e.LastTimestamp.UTC().Format(time.RFC3339), e.Message) + } +} diff --git a/main.go b/main.go index e8e9ea4c..54282a91 100644 --- a/main.go +++ b/main.go @@ -2,41 +2,12 @@ package main import ( "context" - "errors" - "fmt" - "net/http" - "net/http/pprof" - "os" - "path" - "runtime" - "strings" - "time" - "github.com/bombsimon/logrusr/v4" - "github.com/google/uuid" - "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apiserver/pkg/server/healthz" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/util/flowcontrol" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "github.com/castai/cluster-controller/health" - "github.com/castai/cluster-controller/internal/actions/csr" - "github.com/castai/cluster-controller/internal/castai" + "github.com/castai/cluster-controller/cmd" + "github.com/castai/cluster-controller/cmd/utils" "github.com/castai/cluster-controller/internal/config" - "github.com/castai/cluster-controller/internal/controller" - "github.com/castai/cluster-controller/internal/helm" - "github.com/castai/cluster-controller/internal/k8sversion" - "github.com/castai/cluster-controller/internal/logexporter" - "github.com/castai/cluster-controller/internal/waitext" ) // These should be set via `go build` during a release. @@ -46,375 +17,12 @@ var ( Version = "local" ) -const ( - maxRequestTimeout = 5 * time.Minute -) - func main() { - log := logrus.WithFields(logrus.Fields{}) - cfg := config.Get() - - binVersion := &config.ClusterControllerVersion{ + ctx := signals.SetupSignalHandler() + ctx = context.WithValue(ctx, utils.ClusterControllerVersionKey, &config.ClusterControllerVersion{ GitCommit: GitCommit, GitRef: GitRef, Version: Version, - } - log.Infof("running castai-cluster-controller version %v", binVersion) - - logger := logrus.New() - logger.SetLevel(logrus.Level(cfg.Log.Level)) - logger.SetReportCaller(true) - logger.Formatter = &logrus.TextFormatter{ - CallerPrettyfier: func(f *runtime.Frame) (string, string) { - filename := path.Base(f.File) - return fmt.Sprintf("%s()", f.Function), fmt.Sprintf("%s:%d", filename, f.Line) - }, - } - cl, err := castai.NewRestyClient(cfg.API.URL, cfg.API.Key, cfg.TLS.CACert, logger.Level, binVersion, maxRequestTimeout) - if err != nil { - log.Fatalf("failed to create castai client: %v", err) - } - client := castai.NewClient(logger, cl, cfg.ClusterID) - - e := logexporter.NewLogExporter(logger, client) - logger.AddHook(e) - logrus.RegisterExitHandler(e.Wait) - - ctx := signals.SetupSignalHandler() - if err := run(ctx, client, logger, cfg, binVersion); err != nil { - logErr := &logContextError{} - if errors.As(err, &logErr) { - log = logger.WithFields(logErr.fields) - } - log.Fatalf("cluster-controller failed: %v", err) - } -} - -func run( - ctx context.Context, - client castai.CastAIClient, - logger *logrus.Logger, - cfg config.Config, - binVersion *config.ClusterControllerVersion, -) (reterr error) { - fields := logrus.Fields{} - - defer func() { - if reterr == nil { - return - } - reterr = &logContextError{ - err: reterr, - fields: fields, - } - }() - - restconfig, err := retrieveKubeConfig(logger) - if err != nil { - return err - } - restConfigLeader := rest.CopyConfig(restconfig) - restConfigDynamic := rest.CopyConfig(restconfig) - - restconfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) - restConfigLeader.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) - restConfigDynamic.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) - - helmClient := helm.NewClient(logger, helm.NewChartLoader(logger), restconfig) - - clientset, err := kubernetes.NewForConfig(restconfig) - if err != nil { - return err - } - clientSetLeader, err := kubernetes.NewForConfig(restConfigLeader) - if err != nil { - return err - } - - dynamicClient, err := dynamic.NewForConfig(restConfigDynamic) - if err != nil { - return err - } - - k8sVersion, err := k8sversion.Get(clientset) - if err != nil { - return fmt.Errorf("getting kubernetes version: %w", err) - } - - log := logger.WithFields(logrus.Fields{ - "version": binVersion.Version, - "k8s_version": k8sVersion.Full(), - "running_on": cfg.NodeName, - "ctrl_pod_name": cfg.PodName, - }) - - // Set logr/klog to logrus adapter so all logging goes through logrus. - logr := logrusr.New(log) - klog.SetLogger(logr) - - log.Infof("running castai-cluster-controller version %v, log-level: %v", binVersion, logger.Level) - - actionsConfig := controller.Config{ - PollWaitInterval: 5 * time.Second, - PollTimeout: maxRequestTimeout, - AckTimeout: 30 * time.Second, - AckRetriesCount: 3, - AckRetryWait: 1 * time.Second, - ClusterID: cfg.ClusterID, - Version: binVersion.Version, - Namespace: cfg.LeaderElection.Namespace, - } - healthzAction := health.NewHealthzProvider(health.HealthzCfg{HealthyPollIntervalLimit: (actionsConfig.PollWaitInterval + actionsConfig.PollTimeout) * 2, StartTimeLimit: 2 * time.Minute}, log) - - svc := controller.NewService( - log, - actionsConfig, - k8sVersion.Full(), - clientset, - dynamicClient, - client, - helmClient, - healthzAction, - ) - defer func() { - if err := svc.Close(); err != nil { - log.Errorf("failed to close controller service: %v", err) - } - }() - - httpMux := http.NewServeMux() - var checks []healthz.HealthChecker - checks = append(checks, healthzAction) - var leaderHealthCheck *leaderelection.HealthzAdaptor - if cfg.LeaderElection.Enabled { - leaderHealthCheck = leaderelection.NewLeaderHealthzAdaptor(time.Minute) - checks = append(checks, leaderHealthCheck) - } - healthz.InstallHandler(httpMux, checks...) - installPprofHandlers(httpMux) - - // Start http server for pprof and health checks handlers. - go func() { - addr := fmt.Sprintf(":%d", cfg.PprofPort) - log.Infof("starting pprof server on %s", addr) - - //TODO: remove nolint when we have a proper solution for this - //nolint:gosec - if err := http.ListenAndServe(addr, httpMux); err != nil { - log.Errorf("failed to start pprof http server: %v", err) - } - }() - - runSvc := func(ctx context.Context) { - isGKE, err := runningOnGKE(clientset, cfg) - if err != nil { - log.Fatalf("failed to determine if running on GKE: %v", err) - } - - if isGKE { - log.Info("auto approve csr started as running on GKE") - csrMgr := csr.NewApprovalManager(log, clientset) - csrMgr.Start(ctx) - } - - svc.Run(ctx) - } - - if cfg.LeaderElection.Enabled { - // Run actions service with leader election. Blocks. - return runWithLeaderElection(ctx, log, clientSetLeader, leaderHealthCheck, cfg.LeaderElection, runSvc) - } - - // Run action service. Blocks. - runSvc(ctx) - return nil -} - -func runWithLeaderElection( - ctx context.Context, - log logrus.FieldLogger, - clientset kubernetes.Interface, - watchDog *leaderelection.HealthzAdaptor, - cfg config.LeaderElection, - runFunc func(ctx context.Context), -) error { - id, err := os.Hostname() - if err != nil { - return fmt.Errorf("failed to determine hostname used in leader ID: %w", err) - } - id = id + "_" + uuid.New().String() - - // Start the leader election code loop. - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: &resourcelock.LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Name: cfg.LockName, - Namespace: cfg.Namespace, - }, - Client: clientset.CoordinationV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: id, - }, - }, - // IMPORTANT: you MUST ensure that any code you have that - // is protected by the lease must terminate **before** - // you call cancel. Otherwise, you could have a background - // loop still running and another process could - // get elected before your background loop finished, violating - // the stated goal of the lease. - ReleaseOnCancel: true, - LeaseDuration: cfg.LeaseDuration, - RenewDeadline: cfg.LeaseRenewDeadline, - RetryPeriod: 3 * time.Second, - WatchDog: watchDog, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - log.WithFields(logrus.Fields{ - "leaseDuration": cfg.LeaseDuration.String(), - "leaseRenewDuration": cfg.LeaseRenewDeadline.String(), - }).Infof("leader elected: %s", id) - runFunc(ctx) - }, - OnStoppedLeading: func() { - // This method is always called(even if it was not a leader): - // - when controller shuts dow (for example because of SIGTERM) - // - we actually lost leader - // So we need to check what whas reason of acutally stopping. - if err := ctx.Err(); err != nil { - log.Infof("main context done, stopping controller: %v", err) - return - } - log.Infof("leader lost: %s", id) - // We don't need to exit here. - // Leader "on started leading" receive a context that gets cancelled when you're no longer the leader. - }, - OnNewLeader: func(identity string) { - // We're notified when new leader elected. - if identity == id { - // I just got the lock. - return - } - log.Infof("new leader elected: %s", identity) - }, - }, }) - return nil -} - -func installPprofHandlers(mux *http.ServeMux) { - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) -} - -func kubeConfigFromEnv() (*rest.Config, error) { - kubepath := config.Get().Kubeconfig - if kubepath == "" { - return nil, nil - } - - data, err := os.ReadFile(kubepath) - if err != nil { - return nil, fmt.Errorf("reading kubeconfig at %s: %w", kubepath, err) - } - - restConfig, err := clientcmd.RESTConfigFromKubeConfig(data) - if err != nil { - return nil, fmt.Errorf("building rest config from kubeconfig at %s: %w", kubepath, err) - } - - return restConfig, nil -} - -func retrieveKubeConfig(log logrus.FieldLogger) (*rest.Config, error) { - kubeconfig, err := kubeConfigFromEnv() - if err != nil { - return nil, err - } - - if kubeconfig != nil { - log.Debug("using kubeconfig from env variables") - return kubeconfig, nil - } - - inClusterConfig, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - inClusterConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { - return &kubeRetryTransport{ - log: log, - next: rt, - maxRetries: 10, - retryInterval: 3 * time.Second, - } - }) - log.Debug("using in cluster kubeconfig") - - return inClusterConfig, nil -} - -type kubeRetryTransport struct { - log logrus.FieldLogger - next http.RoundTripper - maxRetries int - retryInterval time.Duration -} - -func (rt *kubeRetryTransport) RoundTrip(req *http.Request) (*http.Response, error) { - var resp *http.Response - - boff := waitext.NewConstantBackoff(rt.retryInterval) - - err := waitext.Retry(context.Background(), boff, rt.maxRetries, func(_ context.Context) (bool, error) { - var err error - resp, err = rt.next.RoundTrip(req) - if err != nil { - // Previously client-go contained logic to retry connection refused errors. See https://github.com/kubernetes/kubernetes/pull/88267/files - if net.IsConnectionRefused(err) { - return true, err - } - return false, err - } - return false, nil - }, func(err error) { - rt.log.Warnf("kube api server connection refused, will retry: %v", err) - }) - return resp, err -} - -type logContextError struct { - err error - fields logrus.Fields -} - -func (e *logContextError) Error() string { - return e.err.Error() -} - -func (e *logContextError) Unwrap() error { - return e.err -} - -func runningOnGKE(clientset *kubernetes.Clientset, cfg config.Config) (isGKE bool, err error) { - err = waitext.Retry(context.Background(), waitext.DefaultExponentialBackoff(), 3, func(ctx context.Context) (bool, error) { - node, err := clientset.CoreV1().Nodes().Get(ctx, cfg.NodeName, metav1.GetOptions{}) - if err != nil { - return true, fmt.Errorf("getting node: %w", err) - } - - for k := range node.Labels { - if strings.HasPrefix(k, "cloud.google.com/") { - isGKE = true - return false, nil - } - } - - return false, nil - }, func(err error) { - }) - - return + cmd.Execute(ctx) }