From eef996a579682dfff2cc5c0acfc900c8254b0cea Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Tue, 29 Oct 2024 11:46:56 +0100 Subject: [PATCH 1/7] KUBE-644: use informers to watch CSRs --- internal/actions/csr/csr.go | 133 ++++++++++++++----------------- internal/actions/csr/csr_test.go | 37 ++++----- internal/actions/csr/svc.go | 11 ++- 3 files changed, 85 insertions(+), 96 deletions(-) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index e05e7d2..6766be9 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -17,16 +17,16 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - - "github.com/castai/cluster-controller/internal/waitext" + "k8s.io/client-go/tools/cache" ) const ( - ReasonApproved = "AutoApproved" - approvedMessage = "This CSR was approved by CAST AI" - csrTTL = time.Hour + ReasonApproved = "AutoApproved" + approvedMessage = "This CSR was approved by CAST AI" + csrTTL = time.Hour + csrInformerResyncPeriod = time.Hour ) var ErrNodeCertificateNotFound = errors.New("node certificate not found") @@ -273,78 +273,49 @@ func getNodeCSRV1Beta1(ctx context.Context, client kubernetes.Interface, nodeNam return nil, ErrNodeCertificateNotFound } -func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan *Certificate) { - var w watch.Interface - var err error - b := waitext.DefaultExponentialBackoff() - err = waitext.Retry( - ctx, - b, - waitext.Forever, - func(ctx context.Context) (bool, error) { - w, err = getWatcher(ctx, client) - // Context canceled is when the cluster-controller is stopped. - // In that case context.Canceled is not an error. - if errors.Is(err, context.Canceled) { - return false, err - } - if err != nil { - return true, fmt.Errorf("getWatcher: %w", err) +func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan<- *Certificate) error { + factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector + })) + + csrInformer := factory.Certificates().V1().CertificateSigningRequests().Informer() + csrv1BetaInformer := factory.Certificates().V1beta1().CertificateSigningRequests().Informer() + + handlerFuncs := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if err := processCSREvent(ctx, c, obj); err != nil { + log.WithError(err).Warn("failed to process csr add event") } - return false, nil }, - func(err error) { - log.Warnf("retrying: %v", err) + UpdateFunc: func(oldObj, newObj interface{}) { + if err := processCSREvent(ctx, c, newObj); err != nil { + log.WithError(err).Warn("failed to process csr update event") + } }, - ) - if err != nil { - log.Warnf("finished: %v", err) - return + DeleteFunc: func(obj interface{}) {}, } - defer w.Stop() - - log.Info("watching for new node csr") - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-w.ResultChan(): - if !ok { - log.Info("watcher closed") - go WatchCastAINodeCSRs(ctx, log, client, c) // start over in case of any error. - return - } - - cert, err := toCertificate(event) - if err != nil { - log.Warnf("toCertificate: skipping csr event: %v", err) - continue - } + if _, err := csrInformer.AddEventHandler(handlerFuncs); err != nil { + return fmt.Errorf("adding v1/csr informer event handlers: %w", err) + } - if cert == nil { - continue - } + if _, err := csrv1BetaInformer.AddEventHandler(handlerFuncs); err != nil { + return fmt.Errorf("adding v1beta1/csr informer event handlers: %w", err) + } - if cert.Approved() { - continue - } + v1StopCh := make(chan struct{}) + v1BetaStopCh := make(chan struct{}) + defer close(v1StopCh) + defer close(v1BetaStopCh) + go factory.Start(v1StopCh) + go factory.Start(v1BetaStopCh) - sendCertificate(ctx, c, cert) - } - } -} + log.Info("watching for new node csr") -func getWatcher(ctx context.Context, client kubernetes.Interface) (watch.Interface, error) { - w, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, getOptions(certv1.KubeAPIServerClientKubeletSignerName)) - if err != nil { - w, err = client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName)) - if err != nil { - return nil, fmt.Errorf("fail to open v1 and v1beta watching client: %w", err) - } - } - return w, nil + <-ctx.Done() + log.WithField("context", ctx.Err()).Info("finished watching for new node csr") + return nil } var ( @@ -354,12 +325,30 @@ var ( errNonCastAINode = errors.New("not a castai node") ) -func toCertificate(event watch.Event) (cert *Certificate, err error) { +func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interface{}) error { + cert, err := toCertificate(csrObj) + if err != nil { + return err + } + + if cert == nil { + return nil + } + + if cert.Approved() { + return nil + } + + sendCertificate(ctx, c, cert) + return nil +} + +func toCertificate(obj interface{}) (cert *Certificate, err error) { var name string var request []byte isOutdated := false - switch e := event.Object.(type) { + switch e := obj.(type) { case *certv1.CertificateSigningRequest: name = e.Name request = e.Spec.Request @@ -410,7 +399,7 @@ func isCastAINodeCsr(subjectCommonName string) bool { return false } -func sendCertificate(ctx context.Context, c chan *Certificate, cert *Certificate) { +func sendCertificate(ctx context.Context, c chan<- *Certificate, cert *Certificate) { select { case c <- cert: case <-ctx.Done(): diff --git a/internal/actions/csr/csr_test.go b/internal/actions/csr/csr_test.go index 0d6f9f3..20ff7b9 100644 --- a/internal/actions/csr/csr_test.go +++ b/internal/actions/csr/csr_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" certv1 "k8s.io/api/certificates/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/clientcmd" @@ -104,7 +103,7 @@ func Test_toCertificate(t *testing.T) { testCSRv1 := getCSRv1("node-csr", "kubelet-bootstrap") testCSRv1beta1 := getCSRv1betav1("node-csr", "kubelet-bootstrap") type args struct { - event watch.Event + obj interface{} } tests := []struct { name string @@ -115,18 +114,16 @@ func Test_toCertificate(t *testing.T) { { name: "empty event", args: args{ - event: watch.Event{}, + obj: nil, }, wantErr: true, }, { name: "outdated event", args: args{ - event: watch.Event{ - Object: &certv1.CertificateSigningRequest{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.Time{Time: time.Now().Add(-csrTTL)}, - }, + obj: &certv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: time.Now().Add(-csrTTL)}, }, }, }, @@ -135,14 +132,12 @@ func Test_toCertificate(t *testing.T) { { name: "bad owner", args: args{ - event: watch.Event{ - Object: &certv1.CertificateSigningRequest{ - Spec: certv1.CertificateSigningRequestSpec{ - Username: "test", - }, - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.Time{Time: time.Now().Add(csrTTL)}, - }, + obj: &certv1.CertificateSigningRequest{ + Spec: certv1.CertificateSigningRequestSpec{ + Username: "test", + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: time.Now().Add(csrTTL)}, }, }, }, @@ -151,9 +146,7 @@ func Test_toCertificate(t *testing.T) { { name: "ok v1", args: args{ - event: watch.Event{ - Object: testCSRv1, - }, + obj: testCSRv1, }, wantErr: false, wantCert: &Certificate{ @@ -165,9 +158,7 @@ func Test_toCertificate(t *testing.T) { { name: "ok v1beta1", args: args{ - event: watch.Event{ - Object: testCSRv1beta1, - }, + obj: testCSRv1beta1, }, wantErr: false, wantCert: &Certificate{ @@ -179,7 +170,7 @@ func Test_toCertificate(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotCert, err := toCertificate(tt.args.event) + gotCert, err := toCertificate(tt.args.obj) if (err != nil) != tt.wantErr { t.Errorf("toCertificate() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/actions/csr/svc.go b/internal/actions/csr/svc.go index bb13609..afe52b9 100644 --- a/internal/actions/csr/svc.go +++ b/internal/actions/csr/svc.go @@ -110,7 +110,16 @@ func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context) { log := h.log.WithField("RunAutoApprove", "auto-approve-csr") c := make(chan *Certificate, 1) - go WatchCastAINodeCSRs(ctx, log, h.clientset, c) + go func() { + for { + if err := WatchCastAINodeCSRs(ctx, log, h.clientset, c); err != nil { + log.WithError(err).Warn("failed to watch csr") + time.Sleep(1 * time.Second) + continue + } + return + } + }() for { select { From 8c898b4087d62b1cd240ed5c122e0776fbabdb7c Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Tue, 29 Oct 2024 12:11:51 +0100 Subject: [PATCH 2/7] comment --- internal/actions/csr/csr.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index 6766be9..976b441 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -23,10 +23,12 @@ import ( ) const ( - ReasonApproved = "AutoApproved" - approvedMessage = "This CSR was approved by CAST AI" - csrTTL = time.Hour - csrInformerResyncPeriod = time.Hour + ReasonApproved = "AutoApproved" + approvedMessage = "This CSR was approved by CAST AI" + csrTTL = time.Hour + + // We should approve CSRs, when they are created, so resync can be high. + csrInformerResyncPeriod = 12 * time.Hour ) var ErrNodeCertificateNotFound = errors.New("node certificate not found") From 3e6821aa8578d60de052f64ab679ae73e172ef46 Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Tue, 29 Oct 2024 17:02:41 +0100 Subject: [PATCH 3/7] use two factories for different ListOptions --- internal/actions/csr/csr.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index 976b441..daf31db 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -276,13 +276,17 @@ func getNodeCSRV1Beta1(ctx context.Context, client kubernetes.Interface, nodeNam } func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan<- *Certificate) error { - factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, informers.WithTweakListOptions(func(opts *metav1.ListOptions) { opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector })) + v1Informer := v1Factory.Certificates().V1().CertificateSigningRequests().Informer() - csrInformer := factory.Certificates().V1().CertificateSigningRequests().Informer() - csrv1BetaInformer := factory.Certificates().V1beta1().CertificateSigningRequests().Informer() + v1beta1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.FieldSelector = getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName).FieldSelector + })) + v1betaInformer := v1beta1Factory.Certificates().V1beta1().CertificateSigningRequests().Informer() handlerFuncs := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -298,20 +302,19 @@ func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kub DeleteFunc: func(obj interface{}) {}, } - if _, err := csrInformer.AddEventHandler(handlerFuncs); err != nil { + if _, err := v1Informer.AddEventHandler(handlerFuncs); err != nil { return fmt.Errorf("adding v1/csr informer event handlers: %w", err) } - if _, err := csrv1BetaInformer.AddEventHandler(handlerFuncs); err != nil { + if _, err := v1betaInformer.AddEventHandler(handlerFuncs); err != nil { return fmt.Errorf("adding v1beta1/csr informer event handlers: %w", err) } - v1StopCh := make(chan struct{}) - v1BetaStopCh := make(chan struct{}) - defer close(v1StopCh) - defer close(v1BetaStopCh) - go factory.Start(v1StopCh) - go factory.Start(v1BetaStopCh) + stopCh := make(chan struct{}) + defer close(stopCh) + + go v1Factory.Start(stopCh) + go v1beta1Factory.Start(stopCh) log.Info("watching for new node csr") @@ -432,6 +435,8 @@ func parseCSR(pemData []byte) (*x509.CertificateRequest, error) { //nolint:unparam func getOptions(signer string) metav1.ListOptions { + fields.SelectorFromSet(fields.Set{}) + return metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{ "spec.signerName": signer, From 5681f7fa3ce74424c7cfc2acab108cd3e136b6dd Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Wed, 30 Oct 2024 10:00:09 +0100 Subject: [PATCH 4/7] use only v1 or v1beta1 --- internal/actions/csr/csr.go | 51 ++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index daf31db..70c14a8 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -275,18 +275,38 @@ func getNodeCSRV1Beta1(ctx context.Context, client kubernetes.Interface, nodeNam return nil, ErrNodeCertificateNotFound } +func createInformer(ctx context.Context, client kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer, error) { + var ( + errv1 error + errv1beta1 error + ) + + if _, errv1 = client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1 == nil { + v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector + })) + v1Informer := v1Factory.Certificates().V1().CertificateSigningRequests().Informer() + return v1Factory, v1Informer, nil + } + + if _, errv1beta1 = client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1beta1 == nil { + v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.FieldSelector = getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName).FieldSelector + })) + v1Informer := v1Factory.Certificates().V1beta1().CertificateSigningRequests().Informer() + return v1Factory, v1Informer, nil + } + + return nil, nil, fmt.Errorf("failed to create informer: v1: %w, v1beta1: %w", errv1, errv1beta1) +} + func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan<- *Certificate) error { - v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, - informers.WithTweakListOptions(func(opts *metav1.ListOptions) { - opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector - })) - v1Informer := v1Factory.Certificates().V1().CertificateSigningRequests().Informer() - - v1beta1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, - informers.WithTweakListOptions(func(opts *metav1.ListOptions) { - opts.FieldSelector = getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName).FieldSelector - })) - v1betaInformer := v1beta1Factory.Certificates().V1beta1().CertificateSigningRequests().Informer() + factory, informer, err := createInformer(ctx, client) + if err != nil { + return fmt.Errorf("create informer: %w", err) + } handlerFuncs := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -302,19 +322,14 @@ func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kub DeleteFunc: func(obj interface{}) {}, } - if _, err := v1Informer.AddEventHandler(handlerFuncs); err != nil { + if _, err := informer.AddEventHandler(handlerFuncs); err != nil { return fmt.Errorf("adding v1/csr informer event handlers: %w", err) } - if _, err := v1betaInformer.AddEventHandler(handlerFuncs); err != nil { - return fmt.Errorf("adding v1beta1/csr informer event handlers: %w", err) - } - stopCh := make(chan struct{}) defer close(stopCh) - go v1Factory.Start(stopCh) - go v1beta1Factory.Start(stopCh) + go factory.Start(stopCh) log.Info("watching for new node csr") From 658fe2edbd50d7aa318f56c7e6fb4f2cb16dfa95 Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Wed, 30 Oct 2024 11:58:46 +0100 Subject: [PATCH 5/7] do not print warning, when ignoring certs --- internal/actions/csr/csr.go | 66 +++++++---------- internal/actions/csr/csr_test.go | 122 ++++++++++++++++++++++++++----- 2 files changed, 130 insertions(+), 58 deletions(-) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index 70c14a8..3faaff9 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -68,6 +68,32 @@ func (c *Certificate) Approved() bool { return false } +func (c *Certificate) Outdated() bool { + if c.v1Beta1 != nil { + return c.v1Beta1.CreationTimestamp.Add(csrTTL).Before(time.Now()) + } + return c.v1.CreationTimestamp.Add(csrTTL).Before(time.Now()) +} + +func (c *Certificate) ForCASTAINode() bool { + if c.Name == "" { + return false + } + + if strings.HasPrefix(c.Name, "system:node") && strings.Contains(c.Name, "cast-pool") { + return true + } + + return false +} + +func (c *Certificate) NodeBootstrap() bool { + // Since we only have one handler per CSR/certificate name, + // which is the node name, we can process the controller's certificates and kubelet-bootstrap`s. + // This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved. + return c.RequestingUser == "kubelet-bootstrap" || c.RequestingUser == "system:serviceaccount:castai-agent:castai-cluster-controller" +} + func isAlreadyApproved(err error) bool { if err == nil { return false @@ -338,12 +364,7 @@ func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kub return nil } -var ( - errUnexpectedObjectType = errors.New("unexpected object type") - errCSRTooOld = errors.New("csr is too old") - errOwner = errors.New("owner is not bootstrap") - errNonCastAINode = errors.New("not a castai node") -) +var errUnexpectedObjectType = errors.New("unexpected object type") func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interface{}) error { cert, err := toCertificate(csrObj) @@ -355,7 +376,7 @@ func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interfac return nil } - if cert.Approved() { + if cert.Approved() || !cert.ForCASTAINode() || !cert.NodeBootstrap() || cert.Outdated() { return nil } @@ -367,58 +388,29 @@ func toCertificate(obj interface{}) (cert *Certificate, err error) { var name string var request []byte - isOutdated := false switch e := obj.(type) { case *certv1.CertificateSigningRequest: name = e.Name request = e.Spec.Request cert = &Certificate{Name: name, v1: e, RequestingUser: e.Spec.Username} - isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now()) case *certv1beta1.CertificateSigningRequest: name = e.Name request = e.Spec.Request cert = &Certificate{Name: name, v1Beta1: e, RequestingUser: e.Spec.Username} - isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now()) default: return nil, errUnexpectedObjectType } - if isOutdated { - return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errCSRTooOld) - } - - // Since we only have one handler per CSR/certificate name, - // which is the node name, we can process the controller's certificates and kubelet-bootstrap`s. - // This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved. - if cert.RequestingUser != "kubelet-bootstrap" && cert.RequestingUser != "system:serviceaccount:castai-agent:castai-cluster-controller" { - return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errOwner) - } - cn, err := getSubjectCommonName(name, request) if err != nil { return nil, fmt.Errorf("getSubjectCommonName: Name: %v RequestingUser: %v request: %v %w", cert.Name, cert.RequestingUser, string(request), err) } - if !isCastAINodeCsr(cn) { - return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v cn: %v %w", cert.Name, cert.RequestingUser, cn, errNonCastAINode) - } cert.Name = cn return cert, nil } -func isCastAINodeCsr(subjectCommonName string) bool { - if subjectCommonName == "" { - return false - } - - if strings.HasPrefix(subjectCommonName, "system:node") && strings.Contains(subjectCommonName, "cast-pool") { - return true - } - - return false -} - func sendCertificate(ctx context.Context, c chan<- *Certificate, cert *Certificate) { select { case c <- cert: @@ -450,8 +442,6 @@ func parseCSR(pemData []byte) (*x509.CertificateRequest, error) { //nolint:unparam func getOptions(signer string) metav1.ListOptions { - fields.SelectorFromSet(fields.Set{}) - return metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{ "spec.signerName": signer, diff --git a/internal/actions/csr/csr_test.go b/internal/actions/csr/csr_test.go index 20ff7b9..b60a571 100644 --- a/internal/actions/csr/csr_test.go +++ b/internal/actions/csr/csr_test.go @@ -3,12 +3,12 @@ package csr import ( "context" "path/filepath" - "reflect" "testing" "time" "github.com/stretchr/testify/require" certv1 "k8s.io/api/certificates/v1" + certv1beta1 "k8s.io/api/certificates/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -93,8 +93,82 @@ func Test_isCastAINodeCsr(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := isCastAINodeCsr(tt.args.subjectCommonName) - require.Equal(t, tt.want, got) + cert := &Certificate{ + Name: tt.args.subjectCommonName, + } + + require.Equal(t, tt.want, cert.ForCASTAINode()) + }) + } +} + +func Test_outdatedCertificate(t *testing.T) { + tt := map[string]struct { + createTimestamp time.Time + want bool + }{ + "Outdated": { + createTimestamp: time.Now().Add(-csrTTL).Add(-time.Second), + want: true, + }, + "Not outdated": { + createTimestamp: time.Now(), + want: false, + }, + "Outdated, right before": { + createTimestamp: time.Now().Add(-csrTTL).Add(2 * time.Second), + want: false, + }, + } + + for name, tc := range tt { + t.Run(name, func(t *testing.T) { + cert := &Certificate{ + v1: &certv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(tc.createTimestamp), + }, + }, + } + require.Equal(t, tc.want, cert.Outdated()) + + certBeta := &Certificate{ + v1Beta1: &certv1beta1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(tc.createTimestamp), + }, + }, + } + require.Equal(t, tc.want, certBeta.Outdated()) + }) + } +} + +func Test_nodeBootstrap(t *testing.T) { + tt := map[string]struct { + reqUser string + want bool + }{ + "other one": { + reqUser: "dummy-user", + want: false, + }, + "kubelet-bootstrap": { + reqUser: "kubelet-bootstrap", + want: true, + }, + "castai-cluster-controller": { + reqUser: "system:serviceaccount:castai-agent:castai-cluster-controller", + want: true, + }, + } + + for name, tc := range tt { + t.Run(name, func(t *testing.T) { + cert := &Certificate{ + RequestingUser: tc.reqUser, + } + require.Equal(t, tc.want, cert.NodeBootstrap()) }) } } @@ -106,10 +180,10 @@ func Test_toCertificate(t *testing.T) { obj interface{} } tests := []struct { - name string - args args - wantCert *Certificate - wantErr bool + name string + args args + checkFunc func(t *testing.T, cert *Certificate) + wantErr bool }{ { name: "empty event", @@ -125,9 +199,13 @@ func Test_toCertificate(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ CreationTimestamp: metav1.Time{Time: time.Now().Add(-csrTTL)}, }, + Spec: testCSRv1.Spec, }, }, - wantErr: true, + checkFunc: func(t *testing.T, cert *Certificate) { + require.True(t, cert.Outdated()) + }, + wantErr: false, }, { name: "bad owner", @@ -141,19 +219,22 @@ func Test_toCertificate(t *testing.T) { }, }, }, - wantErr: true, + checkFunc: func(t *testing.T, cert *Certificate) { + require.False(t, cert.NodeBootstrap()) + }, + wantErr: false, }, { name: "ok v1", args: args{ obj: testCSRv1, }, - wantErr: false, - wantCert: &Certificate{ - Name: "system:node:gke-dev-master-cast-pool-cb53177b", - RequestingUser: "kubelet-bootstrap", - v1: testCSRv1, + checkFunc: func(t *testing.T, cert *Certificate) { + require.Equal(t, "system:node:gke-dev-master-cast-pool-cb53177b", cert.Name) + require.Equal(t, "kubelet-bootstrap", cert.RequestingUser) + require.Equal(t, testCSRv1, cert.v1) }, + wantErr: false, }, { name: "ok v1beta1", @@ -161,10 +242,10 @@ func Test_toCertificate(t *testing.T) { obj: testCSRv1beta1, }, wantErr: false, - wantCert: &Certificate{ - Name: "system:node:gke-dev-master-cast-pool-cb53177b", - RequestingUser: "kubelet-bootstrap", - v1Beta1: testCSRv1beta1, + checkFunc: func(t *testing.T, cert *Certificate) { + require.Equal(t, "system:node:gke-dev-master-cast-pool-cb53177b", cert.Name) + require.Equal(t, "kubelet-bootstrap", cert.RequestingUser) + require.Equal(t, testCSRv1beta1, cert.v1Beta1) }, }, } @@ -175,8 +256,9 @@ func Test_toCertificate(t *testing.T) { t.Errorf("toCertificate() error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(gotCert, tt.wantCert) { - t.Errorf("toCertificate() gotCert = %v, want %v", gotCert, tt.wantCert) + + if tt.checkFunc != nil { + tt.checkFunc(t, gotCert) } }) } From cc71f9704983b2311108e80f9b697eac660b0845 Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Mon, 4 Nov 2024 10:52:04 +0100 Subject: [PATCH 6/7] address PR comments --- cmd/controller/run.go | 7 +++-- internal/actions/csr/csr.go | 51 +++++++++++-------------------------- internal/actions/csr/svc.go | 51 ++++++++++++++++++++++--------------- 3 files changed, 51 insertions(+), 58 deletions(-) diff --git a/cmd/controller/run.go b/cmd/controller/run.go index d70fda1..0f46334 100644 --- a/cmd/controller/run.go +++ b/cmd/controller/run.go @@ -189,9 +189,12 @@ func runController( } if isGKE { - log.Info("auto approve csr started as running on GKE") csrMgr := csr.NewApprovalManager(log, clientset) - csrMgr.Start(ctx) + if err := csrMgr.Start(ctx); err != nil { + log.WithError(err).Fatal("failed to start approval manager") + } + + log.Info("auto approve csr started as running on GKE") } svc.Run(ctx) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index 3faaff9..b863a08 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -28,6 +28,8 @@ const ( csrTTL = time.Hour // We should approve CSRs, when they are created, so resync can be high. + // Resync plays back all events (create, update, delete), which are in informer cache. + // This does not involve talking to API server, it is not relist. csrInformerResyncPeriod = 12 * time.Hour ) @@ -68,6 +70,8 @@ func (c *Certificate) Approved() bool { return false } +// Outdated returns, whether the certificate request is old and should not be processed by cluster-controller. +// It has nothing to do with certificate expiration. func (c *Certificate) Outdated() bool { if c.v1Beta1 != nil { return c.v1Beta1.CreationTimestamp.Add(csrTTL).Before(time.Now()) @@ -175,6 +179,17 @@ func (c *Certificate) NewCSR(ctx context.Context, client kubernetes.Interface) ( return &Certificate{v1: resp}, nil } +func startInformer(ctx context.Context, log logrus.FieldLogger, factory informers.SharedInformerFactory) { + stopCh := make(chan struct{}) + defer close(stopCh) + + factory.Start(stopCh) + log.Info("watching for new node csr") + + <-ctx.Done() + log.WithField("context", ctx.Err()).Info("finished watching for new node csr") +} + func get(ctx context.Context, client kubernetes.Interface, cert *Certificate) (*Certificate, error) { if cert.v1Beta1 != nil { v1beta1req, err := client.CertificatesV1beta1().CertificateSigningRequests().Get(ctx, cert.v1Beta1.Name, metav1.GetOptions{}) @@ -328,42 +343,6 @@ func createInformer(ctx context.Context, client kubernetes.Interface) (informers return nil, nil, fmt.Errorf("failed to create informer: v1: %w, v1beta1: %w", errv1, errv1beta1) } -func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan<- *Certificate) error { - factory, informer, err := createInformer(ctx, client) - if err != nil { - return fmt.Errorf("create informer: %w", err) - } - - handlerFuncs := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if err := processCSREvent(ctx, c, obj); err != nil { - log.WithError(err).Warn("failed to process csr add event") - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if err := processCSREvent(ctx, c, newObj); err != nil { - log.WithError(err).Warn("failed to process csr update event") - } - }, - DeleteFunc: func(obj interface{}) {}, - } - - if _, err := informer.AddEventHandler(handlerFuncs); err != nil { - return fmt.Errorf("adding v1/csr informer event handlers: %w", err) - } - - stopCh := make(chan struct{}) - defer close(stopCh) - - go factory.Start(stopCh) - - log.Info("watching for new node csr") - - <-ctx.Done() - log.WithField("context", ctx.Err()).Info("finished watching for new node csr") - return nil -} - var errUnexpectedObjectType = errors.New("unexpected object type") func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interface{}) error { diff --git a/internal/actions/csr/svc.go b/internal/actions/csr/svc.go index afe52b9..ae18b3f 100644 --- a/internal/actions/csr/svc.go +++ b/internal/actions/csr/svc.go @@ -11,6 +11,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/castai/cluster-controller/internal/waitext" ) @@ -35,8 +36,35 @@ type ApprovalManager struct { m sync.Mutex // Used to make sure there is just one watcher running. } -func (h *ApprovalManager) Start(ctx context.Context) { - go h.runAutoApproveForCastAINodes(ctx) +func (h *ApprovalManager) Start(ctx context.Context) error { + informerFactory, csrInformer, err := createInformer(ctx, h.clientset) + if err != nil { + return fmt.Errorf("while creating informer: %w", err) + } + + c := make(chan *Certificate, 1) + + handlerFuncs := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if err := processCSREvent(ctx, c, obj); err != nil { + h.log.WithError(err).Warn("failed to process csr add event") + } + }, + } + + if _, err := csrInformer.AddEventHandler(handlerFuncs); err != nil { + return fmt.Errorf("adding csr informer event handlers: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + if !h.startAutoApprove(cancel) { + return nil + } + + go startInformer(ctx, h.log, informerFactory) + go h.runAutoApproveForCastAINodes(ctx, c) + + return nil } func (h *ApprovalManager) Stop() { @@ -99,27 +127,10 @@ func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, ce return errCSRNotApproved } -func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - if !h.startAutoApprove(cancel) { - return // already running. - } +func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context, c <-chan *Certificate) { defer h.stopAutoApproveForCastAINodes() log := h.log.WithField("RunAutoApprove", "auto-approve-csr") - c := make(chan *Certificate, 1) - go func() { - for { - if err := WatchCastAINodeCSRs(ctx, log, h.clientset, c); err != nil { - log.WithError(err).Warn("failed to watch csr") - time.Sleep(1 * time.Second) - continue - } - return - } - }() for { select { From d8f8b11beb8ab9b8942dc7d89037199a747eeb0e Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Mon, 4 Nov 2024 11:36:28 +0100 Subject: [PATCH 7/7] fix linting --- internal/actions/csr/svc_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/actions/csr/svc_test.go b/internal/actions/csr/svc_test.go index 3ccdab6..4f592ac 100644 --- a/internal/actions/csr/svc_test.go +++ b/internal/actions/csr/svc_test.go @@ -85,7 +85,9 @@ func TestCSRApprove(t *testing.T) { wg.Add(2) go func() { defer wg.Done() - s.Start(ctx) + if err := s.Start(ctx); err != nil { + t.Logf("failed to start approval manager: %s", err.Error()) + } }() go func() { defer wg.Done() @@ -118,7 +120,9 @@ func TestCSRApprove(t *testing.T) { wg.Add(2) go func() { defer wg.Done() - s.Start(ctx) + if err := s.Start(ctx); err != nil { + t.Logf("failed to start approval manager: %s", err.Error()) + } }() go func() { defer wg.Done()