Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KUBE-644: use informers to watch CSRs #154

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 101 additions & 100 deletions internal/actions/csr/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/internal/waitext"
"k8s.io/client-go/tools/cache"
)

const (
ReasonApproved = "AutoApproved"
approvedMessage = "This CSR was approved by CAST AI"
csrTTL = time.Hour

// We should approve CSRs, when they are created, so resync can be high.
aldor007 marked this conversation as resolved.
Show resolved Hide resolved
csrInformerResyncPeriod = 12 * time.Hour
)

var ErrNodeCertificateNotFound = errors.New("node certificate not found")
Expand Down Expand Up @@ -66,6 +68,32 @@ func (c *Certificate) Approved() bool {
return false
}

func (c *Certificate) Outdated() bool {
aldor007 marked this conversation as resolved.
Show resolved Hide resolved
if c.v1Beta1 != nil {
return c.v1Beta1.CreationTimestamp.Add(csrTTL).Before(time.Now())
}
return c.v1.CreationTimestamp.Add(csrTTL).Before(time.Now())
}

func (c *Certificate) ForCASTAINode() bool {
if c.Name == "" {
return false
}

if strings.HasPrefix(c.Name, "system:node") && strings.Contains(c.Name, "cast-pool") {
return true
}

return false
}

func (c *Certificate) NodeBootstrap() bool {
// Since we only have one handler per CSR/certificate name,
// which is the node name, we can process the controller's certificates and kubelet-bootstrap`s.
// This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved.
return c.RequestingUser == "kubelet-bootstrap" || c.RequestingUser == "system:serviceaccount:castai-agent:castai-cluster-controller"
}

func isAlreadyApproved(err error) bool {
if err == nil {
return false
Expand Down Expand Up @@ -273,144 +301,117 @@ func getNodeCSRV1Beta1(ctx context.Context, client kubernetes.Interface, nodeNam
return nil, ErrNodeCertificateNotFound
}

func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan *Certificate) {
var w watch.Interface
var err error
b := waitext.DefaultExponentialBackoff()
err = waitext.Retry(
ctx,
b,
waitext.Forever,
func(ctx context.Context) (bool, error) {
w, err = getWatcher(ctx, client)
// Context canceled is when the cluster-controller is stopped.
// In that case context.Canceled is not an error.
if errors.Is(err, context.Canceled) {
return false, err
}
if err != nil {
return true, fmt.Errorf("getWatcher: %w", err)
}
return false, nil
},
func(err error) {
log.Warnf("retrying: %v", err)
},
func createInformer(ctx context.Context, client kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer, error) {
var (
errv1 error
errv1beta1 error
)
if err != nil {
log.Warnf("finished: %v", err)
return
}

defer w.Stop()
if _, errv1 = client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1 == nil {
furkhat marked this conversation as resolved.
Show resolved Hide resolved
v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector
}))
v1Informer := v1Factory.Certificates().V1().CertificateSigningRequests().Informer()
return v1Factory, v1Informer, nil
}

log.Info("watching for new node csr")
if _, errv1beta1 = client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1beta1 == nil {
v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.FieldSelector = getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName).FieldSelector
}))
v1Informer := v1Factory.Certificates().V1beta1().CertificateSigningRequests().Informer()
return v1Factory, v1Informer, nil
}

for {
select {
case <-ctx.Done():
return
case event, ok := <-w.ResultChan():
if !ok {
log.Info("watcher closed")
go WatchCastAINodeCSRs(ctx, log, client, c) // start over in case of any error.
return
}
return nil, nil, fmt.Errorf("failed to create informer: v1: %w, v1beta1: %w", errv1, errv1beta1)
}

cert, err := toCertificate(event)
if err != nil {
log.Warnf("toCertificate: skipping csr event: %v", err)
continue
}
func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan<- *Certificate) error {
factory, informer, err := createInformer(ctx, client)
if err != nil {
return fmt.Errorf("create informer: %w", err)
}

if cert == nil {
continue
handlerFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if err := processCSREvent(ctx, c, obj); err != nil {
log.WithError(err).Warn("failed to process csr add event")
}

if cert.Approved() {
continue
},
UpdateFunc: func(oldObj, newObj interface{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need both add and updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, in theory reacting just on add should be enough. Will try to remove the update and see how it behaves.

if err := processCSREvent(ctx, c, newObj); err != nil {
log.WithError(err).Warn("failed to process csr update event")
}
},
DeleteFunc: func(obj interface{}) {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method be ommited?

}

sendCertificate(ctx, c, cert)
}
if _, err := informer.AddEventHandler(handlerFuncs); err != nil {
return fmt.Errorf("adding v1/csr informer event handlers: %w", err)
}

stopCh := make(chan struct{})
defer close(stopCh)

go factory.Start(stopCh)

log.Info("watching for new node csr")

<-ctx.Done()
log.WithField("context", ctx.Err()).Info("finished watching for new node csr")
return nil
}

func getWatcher(ctx context.Context, client kubernetes.Interface) (watch.Interface, error) {
w, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, getOptions(certv1.KubeAPIServerClientKubeletSignerName))
var errUnexpectedObjectType = errors.New("unexpected object type")

func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interface{}) error {
cert, err := toCertificate(csrObj)
if err != nil {
w, err = client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName))
if err != nil {
return nil, fmt.Errorf("fail to open v1 and v1beta watching client: %w", err)
}
return err
}
return w, nil
}

var (
errUnexpectedObjectType = errors.New("unexpected object type")
errCSRTooOld = errors.New("csr is too old")
errOwner = errors.New("owner is not bootstrap")
errNonCastAINode = errors.New("not a castai node")
)
if cert == nil {
return nil
}

func toCertificate(event watch.Event) (cert *Certificate, err error) {
if cert.Approved() || !cert.ForCASTAINode() || !cert.NodeBootstrap() || cert.Outdated() {
return nil
}

sendCertificate(ctx, c, cert)
return nil
}

func toCertificate(obj interface{}) (cert *Certificate, err error) {
var name string
var request []byte

isOutdated := false
switch e := event.Object.(type) {
switch e := obj.(type) {
case *certv1.CertificateSigningRequest:
name = e.Name
request = e.Spec.Request
cert = &Certificate{Name: name, v1: e, RequestingUser: e.Spec.Username}
isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now())
case *certv1beta1.CertificateSigningRequest:
name = e.Name
request = e.Spec.Request
cert = &Certificate{Name: name, v1Beta1: e, RequestingUser: e.Spec.Username}
isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now())
default:
return nil, errUnexpectedObjectType
}

if isOutdated {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errCSRTooOld)
}

// Since we only have one handler per CSR/certificate name,
// which is the node name, we can process the controller's certificates and kubelet-bootstrap`s.
// This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved.
if cert.RequestingUser != "kubelet-bootstrap" && cert.RequestingUser != "system:serviceaccount:castai-agent:castai-cluster-controller" {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errOwner)
}

cn, err := getSubjectCommonName(name, request)
if err != nil {
return nil, fmt.Errorf("getSubjectCommonName: Name: %v RequestingUser: %v request: %v %w", cert.Name, cert.RequestingUser, string(request), err)
}

if !isCastAINodeCsr(cn) {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v cn: %v %w", cert.Name, cert.RequestingUser, cn, errNonCastAINode)
}
cert.Name = cn

return cert, nil
}

func isCastAINodeCsr(subjectCommonName string) bool {
if subjectCommonName == "" {
return false
}

if strings.HasPrefix(subjectCommonName, "system:node") && strings.Contains(subjectCommonName, "cast-pool") {
return true
}

return false
}

func sendCertificate(ctx context.Context, c chan *Certificate, cert *Certificate) {
func sendCertificate(ctx context.Context, c chan<- *Certificate, cert *Certificate) {
select {
case c <- cert:
case <-ctx.Done():
Expand Down
Loading
Loading