From 01ae185a33e5fd2f9bf8d98b8b5a2edcd8358aa6 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 24 Jul 2024 17:14:14 -0500 Subject: [PATCH 01/21] Implement Worker Certificate Renewal Process --- src/k8s/api/v1/certificates_refresh.go | 15 ++ src/k8s/pkg/client/kubernetes/csr.go | 37 ++++ src/k8s/pkg/k8sd/api/certs_refresh.go | 277 +++++++++++++++++++++++++ src/k8s/pkg/k8sd/api/endpoints.go | 11 + src/k8s/pkg/utils/pki/generate.go | 33 +++ 5 files changed, 373 insertions(+) create mode 100644 src/k8s/api/v1/certificates_refresh.go create mode 100644 src/k8s/pkg/client/kubernetes/csr.go create mode 100644 src/k8s/pkg/k8sd/api/certs_refresh.go diff --git a/src/k8s/api/v1/certificates_refresh.go b/src/k8s/api/v1/certificates_refresh.go new file mode 100644 index 000000000..43b83e659 --- /dev/null +++ b/src/k8s/api/v1/certificates_refresh.go @@ -0,0 +1,15 @@ +package apiv1 + +type RefreshCertificatesPlanResponse struct { + Seed int `json:"seed"` + CertificatesSigningRequests []string `json:"certificates_signing_requests"` +} + +type RefreshCertificatesRunRequest struct { + Seed int `json:"seed"` + ExpirationSeconds int `json:"expiration_seconds"` +} + +type RefreshCertificatesRunResponse struct { + ExpirationSeconds int `json:"expiration_seconds"` +} diff --git a/src/k8s/pkg/client/kubernetes/csr.go b/src/k8s/pkg/client/kubernetes/csr.go new file mode 100644 index 000000000..b0470e334 --- /dev/null +++ b/src/k8s/pkg/client/kubernetes/csr.go @@ -0,0 +1,37 @@ +package kubernetes + +import ( + "context" + "fmt" + + certificatesv1 "k8s.io/api/certificates/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (c *Client) CreateCertificateSigningRequest(ctx context.Context, name string, csrPEM []byte, usages []certificatesv1.KeyUsage, groups []string, signerName string) (*certificatesv1.CertificateSigningRequest, error) { + csr, err := c.CertificatesV1().CertificateSigningRequests().Create(ctx, &certificatesv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: certificatesv1.CertificateSigningRequestSpec{ + Request: csrPEM, + Usages: usages, + Groups: groups, + SignerName: signerName, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create certificate signing request %s: %w", name, err) + } + + return csr, nil +} + +func (c *Client) GetCertificateSigningRequest(ctx context.Context, name string) (*certificatesv1.CertificateSigningRequest, error) { + csr, err := c.CertificatesV1().CertificateSigningRequests().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get certificate signing request %s: %w", name, err) + } + + return csr, nil +} diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go new file mode 100644 index 000000000..9c60d65ba --- /dev/null +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -0,0 +1,277 @@ +package api + +import ( + "crypto/rand" + "crypto/x509/pkix" + "encoding/binary" + "fmt" + "net" + "net/http" + "os" + "path/filepath" + + apiv1 "github.com/canonical/k8s/api/v1" + "github.com/canonical/k8s/pkg/k8sd/pki" + "github.com/canonical/k8s/pkg/k8sd/setup" + "github.com/canonical/k8s/pkg/snap" + snaputil "github.com/canonical/k8s/pkg/snap/util" + "github.com/canonical/k8s/pkg/utils" + pkiutil "github.com/canonical/k8s/pkg/utils/pki" + "github.com/canonical/lxd/lxd/response" + "github.com/canonical/microcluster/state" + v1 "k8s.io/api/certificates/v1" + corev1 "k8s.io/api/core/v1" +) + +func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) response.Response { + snap := e.provider.Snap() + + isWorker, err := snaputil.IsWorker(snap) + if err != nil { + return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err)) + } + + if isWorker { + return refreshCertsPlanWorker(s, snap) + + } else { + // TODO: Control Plane refresh + return response.InternalError(fmt.Errorf("not implemented yet")) + } + +} + +// refreshCertsPlanWorker generates the CSRs for the worker node and returns the seed and the names of the CSRs. +func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { + client, err := snap.KubernetesNodeClient("") + if err != nil { + return response.InternalError(err) + } + + var seed int32 + + err = binary.Read(rand.Reader, binary.BigEndian, &seed) + if err != nil { + return response.InternalError(err) + } + seed = seed & 0x7FFFFFFF + + csrKubeletServing, pKeyKubeletServing, err := pkiutil.GenerateCSR( + pkix.Name{ + CommonName: fmt.Sprintf("system:node:%s", snap.Hostname()), + Organization: []string{"system:nodes"}, + }, + 2048, + nil, + []string{snap.Hostname()}, + []net.IP{net.ParseIP(s.Address().Hostname())}, + ) + + _, err = client.CreateCertificateSigningRequest( + s.Context, + fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed), + []byte(csrKubeletServing), + []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageServerAuth}, + []string{"system:nodes"}, + "k8sd.io/kubelet-serving", + ) + if err != nil { + return response.InternalError(err) + } + + csrKubeletClient, pKeyKubeletClient, err := pkiutil.GenerateCSR( + pkix.Name{ + CommonName: fmt.Sprintf("system:node:%s", snap.Hostname()), + Organization: []string{"system:nodes"}, + }, + 2048, + nil, + nil, + nil, + ) + + _, err = client.CreateCertificateSigningRequest( + s.Context, + fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), + []byte(csrKubeletClient), + []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + nil, + "k8sd.io/kubelet-client", + ) + if err != nil { + return response.InternalError(err) + } + + csrKubeProxy, pKeyKubeProxy, err := pkiutil.GenerateCSR( + pkix.Name{ + CommonName: "system:kube-proxy", + }, + 2048, + nil, + nil, + nil, + ) + + _, err = client.CreateCertificateSigningRequest( + s.Context, + fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), + []byte(csrKubeProxy), + []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + nil, + "k8sd.io/kube-proxy-client", + ) + + result := apiv1.RefreshCertificatesPlanResponse{ + Seed: int(seed), + CertificatesSigningRequests: []string{ + fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed), + fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), + fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), + }, + } + + err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new"), []byte(pKeyKubeletServing), 0600) + if err != nil { + return response.InternalError(err) + } + err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.new"), []byte(pKeyKubeletClient), 0600) + if err != nil { + return response.InternalError(err) + } + err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.new"), []byte(pKeyKubeProxy), 0600) + if err != nil { + return response.InternalError(err) + } + return response.SyncResponse(true, &result) + +} + +func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) response.Response { + // TODO: Control Plane refresh + req := apiv1.RefreshCertificatesRunRequest{} + if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil { + return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) + } + snap := e.provider.Snap() + + client, err := snap.KubernetesNodeClient("") + if err != nil { + return response.InternalError(err) + } + csrNames := []string{ + fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), + fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), + fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), + } + + certificates := pki.WorkerNodePKI{} + + for _, csrName := range csrNames { + csr, err := client.GetCertificateSigningRequest(s.Context, csrName) + if err != nil { + return response.InternalError(err) + } + + if !isCertificateApproved(csr.Status.Conditions) { + return response.InternalError(fmt.Errorf("CSR %s is not issued", csrName)) + } + + if len(csr.Status.Certificate) == 0 { + return response.InternalError(fmt.Errorf("CSR %s is missing certificate", csrName)) + } + + _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), "") + if err != nil { + return response.InternalError(fmt.Errorf("failed to load certificate for CSR %s: %w", csrName, err)) + } + + switch csrName { + case fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed): + certificates.KubeletCert = string(csr.Status.Certificate) + case fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed): + certificates.KubeletClientCert = string(csr.Status.Certificate) + case fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed): + certificates.KubeProxyClientCert = string(csr.Status.Certificate) + default: + return response.InternalError(fmt.Errorf("unknown CSR %s", csrName)) + } + + } + + // Read the CA and client CA + ca, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "ca.crt")) + if err != nil { + return response.InternalError(err) + } + clientCA, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "client-ca.crt")) + if err != nil { + return response.InternalError(err) + } + + // Read the new private keys + bytesKubeletKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new")) + if err != nil { + return response.InternalError(err) + } + + bytesKubeletClientKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.new")) + if err != nil { + return response.InternalError(err) + } + + bytesKubeProxyKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.new")) + if err != nil { + return response.InternalError(err) + } + + certificates.CACert = string(ca) + certificates.ClientCACert = string(clientCA) + certificates.KubeletKey = string(bytesKubeletKey) + certificates.KubeletClientKey = string(bytesKubeletClientKey) + certificates.KubeProxyClientKey = string(bytesKubeProxyKey) + + _, err = setup.EnsureWorkerPKI(snap, &certificates) + if err != nil { + return response.InternalError(err) + } + + // Kubeconfigs + if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), "127.0.0.1:6443", certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil { + return response.InternalError(fmt.Errorf("failed to generate kubelet kubeconfig: %w", err)) + } + if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), "127.0.0.1:6443", certificates.CACert, certificates.KubeProxyClientCert, certificates.KubeProxyClientKey); err != nil { + return response.InternalError(fmt.Errorf("failed to generate kube-proxy kubeconfig: %w", err)) + } + + // Restart the services + if err := snap.RestartService(s.Context, "kubelet"); err != nil { + return response.InternalError(err) + } + if err := snap.RestartService(s.Context, "kube-proxy"); err != nil { + return response.InternalError(err) + } + + // Remove the new private keys + if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new")); err != nil { + return response.InternalError(err) + } + if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.new")); err != nil { + return response.InternalError(err) + } + if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.new")); err != nil { + return response.InternalError(err) + } + + return response.SyncResponse(true, nil) + +} + +// isCertificateApproved checks if the certificate signing request is approved. +func isCertificateApproved(conditions []v1.CertificateSigningRequestCondition) bool { + for _, condition := range conditions { + if condition.Type == v1.CertificateApproved && condition.Status == corev1.ConditionTrue { + return true + } + } + return false +} diff --git a/src/k8s/pkg/k8sd/api/endpoints.go b/src/k8s/pkg/k8sd/api/endpoints.go index f7255b8c2..64cc69e6e 100644 --- a/src/k8s/pkg/k8sd/api/endpoints.go +++ b/src/k8s/pkg/k8sd/api/endpoints.go @@ -88,6 +88,17 @@ func (e *Endpoints) Endpoints() []rest.Endpoint { AccessHandler: ValidateWorkerInfoAccessHandler("worker-name", "worker-token"), }, }, + // Certificates + { + Name: "RefreshCerts/Plan", + Path: "k8sd/refresh-certs/plan", + Post: rest.EndpointAction{Handler: e.postRefreshCertsPlan}, + }, + { + Name: "RefreshCerts/Run", + Path: "k8sd/refresh-certs/run", + Post: rest.EndpointAction{Handler: e.postRefreshCertsRun}, + }, // Kubeconfig { Name: "Kubeconfig", diff --git a/src/k8s/pkg/utils/pki/generate.go b/src/k8s/pkg/utils/pki/generate.go index 3fa46c972..e63167795 100644 --- a/src/k8s/pkg/utils/pki/generate.go +++ b/src/k8s/pkg/utils/pki/generate.go @@ -122,3 +122,36 @@ func GenerateRSAKey(bits int) (string, string, error) { return string(privPEM), string(pubPEM), nil } + +func GenerateCSR(subject pkix.Name, bits int, priv any, dnsSANs []string, ipSANs []net.IP) (string, string, error) { + key, err := rsa.GenerateKey(rand.Reader, bits) + if err != nil { + return "", "", fmt.Errorf("failed to generate RSA private key: %w", err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) + if keyPEM == nil { + return "", "", fmt.Errorf("failed to encode private key PEM") + } + + if priv == nil { + priv = key + } + + csrKubeletServingTemplate := &x509.CertificateRequest{ + Subject: subject, + DNSNames: dnsSANs, + IPAddresses: ipSANs, + } + + csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csrKubeletServingTemplate, key) + if err != nil { + return "", "", fmt.Errorf("failed to create certificate request: %w", err) + } + + csrPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csrBytes}) + if csrPEM == nil { + return "", "", fmt.Errorf("failed to encode certificate request PEM") + } + + return string(csrPEM), string(keyPEM), nil +} From 54bfc225c18dee58324b2a1e4352d936a04b03d2 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Thu, 25 Jul 2024 16:52:33 -0500 Subject: [PATCH 02/21] Add logging --- src/k8s/pkg/k8sd/api/certs_refresh.go | 53 +++++++++++++++++---------- src/k8s/pkg/utils/pki/generate.go | 1 + 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 9c60d65ba..8da13ee71 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -13,6 +13,7 @@ import ( apiv1 "github.com/canonical/k8s/api/v1" "github.com/canonical/k8s/pkg/k8sd/pki" "github.com/canonical/k8s/pkg/k8sd/setup" + "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/snap" snaputil "github.com/canonical/k8s/pkg/snap/util" "github.com/canonical/k8s/pkg/utils" @@ -25,14 +26,12 @@ import ( func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) response.Response { snap := e.provider.Snap() - isWorker, err := snaputil.IsWorker(snap) if err != nil { return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err)) } - if isWorker { - return refreshCertsPlanWorker(s, snap) + return refreshCertsPlanWorker(s, r, snap) } else { // TODO: Control Plane refresh @@ -42,7 +41,9 @@ func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) respon } // refreshCertsPlanWorker generates the CSRs for the worker node and returns the seed and the names of the CSRs. -func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { +func refreshCertsPlanWorker(s *state.State, r *http.Request, snap snap.Snap) response.Response { + log := log.FromContext(r.Context()) + client, err := snap.KubernetesNodeClient("") if err != nil { return response.InternalError(err) @@ -56,6 +57,9 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { } seed = seed & 0x7FFFFFFF + log.Info("Generating CSRs for worker node") + log.Info("Generating Kubelet Serving Certificate Signing Request") + csrKubeletServing, pKeyKubeletServing, err := pkiutil.GenerateCSR( pkix.Name{ CommonName: fmt.Sprintf("system:node:%s", snap.Hostname()), @@ -68,7 +72,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { ) _, err = client.CreateCertificateSigningRequest( - s.Context, + r.Context(), fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed), []byte(csrKubeletServing), []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageServerAuth}, @@ -79,6 +83,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { return response.InternalError(err) } + log.Info("Generating Kubelet Client Certificate Signing Request") csrKubeletClient, pKeyKubeletClient, err := pkiutil.GenerateCSR( pkix.Name{ CommonName: fmt.Sprintf("system:node:%s", snap.Hostname()), @@ -91,7 +96,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { ) _, err = client.CreateCertificateSigningRequest( - s.Context, + r.Context(), fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), []byte(csrKubeletClient), []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, @@ -102,6 +107,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { return response.InternalError(err) } + log.Info("Generating Kube Proxy Client Certificate Signing Request") csrKubeProxy, pKeyKubeProxy, err := pkiutil.GenerateCSR( pkix.Name{ CommonName: "system:kube-proxy", @@ -113,7 +119,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { ) _, err = client.CreateCertificateSigningRequest( - s.Context, + r.Context(), fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), []byte(csrKubeProxy), []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, @@ -130,6 +136,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { }, } + log.Info("Writing new private keys") err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new"), []byte(pKeyKubeletServing), 0600) if err != nil { return response.InternalError(err) @@ -148,6 +155,7 @@ func refreshCertsPlanWorker(s *state.State, snap snap.Snap) response.Response { func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) response.Response { // TODO: Control Plane refresh + log := log.FromContext(r.Context()) req := apiv1.RefreshCertificatesRunRequest{} if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil { return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) @@ -166,22 +174,25 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons certificates := pki.WorkerNodePKI{} + log.Info("Checking if the CSRs have been approved and issued") for _, csrName := range csrNames { - csr, err := client.GetCertificateSigningRequest(s.Context, csrName) + csr, err := client.GetCertificateSigningRequest(r.Context(), csrName) if err != nil { return response.InternalError(err) } - if !isCertificateApproved(csr.Status.Conditions) { - return response.InternalError(fmt.Errorf("CSR %s is not issued", csrName)) + if !isCertificateSigningRequestApproved(csr.Status.Conditions) { + log.Error(fmt.Errorf("CSR %s has not been approved", csrName), "CSR has not been approved") + return response.InternalError(fmt.Errorf("CSR %s has not been approved", csrName)) } if len(csr.Status.Certificate) == 0 { - return response.InternalError(fmt.Errorf("CSR %s is missing certificate", csrName)) + log.Error(fmt.Errorf("CSR %s has not been issued", csrName), "CSR has not been issued") + return response.InternalError(fmt.Errorf("CSR %s has not been issued", csrName)) } - _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), "") - if err != nil { + if _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), ""); err != nil { + log.Error(err, fmt.Sprintf("failed to load certificate for CSR %s", csrName)) return response.InternalError(fmt.Errorf("failed to load certificate for CSR %s: %w", csrName, err)) } @@ -193,6 +204,7 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons case fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed): certificates.KubeProxyClientCert = string(csr.Status.Certificate) default: + log.Error(fmt.Errorf("unknown CSR %s", csrName), "Unknown CSR") return response.InternalError(fmt.Errorf("unknown CSR %s", csrName)) } @@ -230,12 +242,13 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons certificates.KubeletClientKey = string(bytesKubeletClientKey) certificates.KubeProxyClientKey = string(bytesKubeProxyKey) - _, err = setup.EnsureWorkerPKI(snap, &certificates) - if err != nil { + log.Info("Ensuring worker PKI") + if _, err = setup.EnsureWorkerPKI(snap, &certificates); err != nil { return response.InternalError(err) } // Kubeconfigs + log.Info("Generating new kubeconfigs") if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), "127.0.0.1:6443", certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil { return response.InternalError(fmt.Errorf("failed to generate kubelet kubeconfig: %w", err)) } @@ -244,14 +257,16 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons } // Restart the services - if err := snap.RestartService(s.Context, "kubelet"); err != nil { + log.Info("Restarting kubelet and kube-proxy") + if err := snap.RestartService(r.Context(), "kubelet"); err != nil { return response.InternalError(err) } - if err := snap.RestartService(s.Context, "kube-proxy"); err != nil { + if err := snap.RestartService(r.Context(), "kube-proxy"); err != nil { return response.InternalError(err) } // Remove the new private keys + log.Info("Removing temporal private keys") if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new")); err != nil { return response.InternalError(err) } @@ -266,8 +281,8 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons } -// isCertificateApproved checks if the certificate signing request is approved. -func isCertificateApproved(conditions []v1.CertificateSigningRequestCondition) bool { +// isCertificateSigningRequestApproved checks if the certificate signing request is approved. +func isCertificateSigningRequestApproved(conditions []v1.CertificateSigningRequestCondition) bool { for _, condition := range conditions { if condition.Type == v1.CertificateApproved && condition.Status == corev1.ConditionTrue { return true diff --git a/src/k8s/pkg/utils/pki/generate.go b/src/k8s/pkg/utils/pki/generate.go index e63167795..a090b7d2e 100644 --- a/src/k8s/pkg/utils/pki/generate.go +++ b/src/k8s/pkg/utils/pki/generate.go @@ -123,6 +123,7 @@ func GenerateRSAKey(bits int) (string, string, error) { return string(privPEM), string(pubPEM), nil } +// GenerateCSR generates a certificate signing request (CSR) and private key for the given subject. func GenerateCSR(subject pkix.Name, bits int, priv any, dnsSANs []string, ipSANs []net.IP) (string, string, error) { key, err := rsa.GenerateKey(rand.Reader, bits) if err != nil { From ab8c40660c14dee39251a3920cfb4d974fa0faea Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Mon, 29 Jul 2024 18:45:06 -0500 Subject: [PATCH 03/21] Add Configuration backup --- src/k8s/pkg/k8sd/api/certs_refresh.go | 89 +++++++++++++++++++++------ src/k8s/pkg/utils/file_operation.go | 73 ++++++++++++++++++++++ 2 files changed, 143 insertions(+), 19 deletions(-) create mode 100644 src/k8s/pkg/utils/file_operation.go diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 8da13ee71..17de366fc 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -137,30 +137,55 @@ func refreshCertsPlanWorker(s *state.State, r *http.Request, snap snap.Snap) res } log.Info("Writing new private keys") - err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new"), []byte(pKeyKubeletServing), 0600) - if err != nil { - return response.InternalError(err) - } - err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.new"), []byte(pKeyKubeletClient), 0600) - if err != nil { - return response.InternalError(err) + operations := []utils.FileOperations{ + { + SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.tmp"), + Content: []byte(pKeyKubeletServing), + Permissions: 0600, + }, + { + SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.tmp"), + Content: []byte(pKeyKubeletClient), + Permissions: 0600, + }, + { + SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.tmp"), + Content: []byte(pKeyKubeProxy), + Permissions: 0600, + }, } - err = os.WriteFile(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.new"), []byte(pKeyKubeProxy), 0600) + + err = utils.WriteFiles(operations) if err != nil { return response.InternalError(err) } + return response.SyncResponse(true, &result) } func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) response.Response { - // TODO: Control Plane refresh + snap := e.provider.Snap() + isWorker, err := snaputil.IsWorker(snap) + if err != nil { + return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err)) + } + if isWorker { + return refreshCertsRunWorker(r, snap) + + } else { + // TODO: Control Plane refresh + return response.InternalError(fmt.Errorf("not implemented yet")) + } +} + +// refreshCertsRunWorker refreshes the certificates for a worker node +func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { log := log.FromContext(r.Context()) req := apiv1.RefreshCertificatesRunRequest{} if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil { return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) } - snap := e.provider.Snap() client, err := snap.KubernetesNodeClient("") if err != nil { @@ -174,6 +199,32 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons certificates := pki.WorkerNodePKI{} + operations := []utils.FileOperations{ + { + SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt"), + BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt.bak"), + Permissions: 0600, + }, + { + SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key"), + BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.bak"), + Permissions: 0600, + }, + { + SourcePath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), + BackupPath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf.bak"), + Permissions: 0600, + }, + { + SourcePath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), + BackupPath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf.bak"), + Permissions: 0600, + }, + } + + log.Info("Backing up kubelet and kube-proxy certificates and configurations") + utils.BackupFiles(operations) + log.Info("Checking if the CSRs have been approved and issued") for _, csrName := range csrNames { csr, err := client.GetCertificateSigningRequest(r.Context(), csrName) @@ -181,7 +232,7 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons return response.InternalError(err) } - if !isCertificateSigningRequestApproved(csr.Status.Conditions) { + if !isCertificateSigningRequestApproved(csr) { log.Error(fmt.Errorf("CSR %s has not been approved", csrName), "CSR has not been approved") return response.InternalError(fmt.Errorf("CSR %s has not been approved", csrName)) } @@ -221,17 +272,17 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons } // Read the new private keys - bytesKubeletKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new")) + bytesKubeletKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.tmp")) if err != nil { return response.InternalError(err) } - bytesKubeletClientKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.new")) + bytesKubeletClientKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.tmp")) if err != nil { return response.InternalError(err) } - bytesKubeProxyKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.new")) + bytesKubeProxyKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.tmp")) if err != nil { return response.InternalError(err) } @@ -267,13 +318,13 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons // Remove the new private keys log.Info("Removing temporal private keys") - if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.new")); err != nil { + if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.tmp")); err != nil { return response.InternalError(err) } - if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.new")); err != nil { + if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.tmp")); err != nil { return response.InternalError(err) } - if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.new")); err != nil { + if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.tmp")); err != nil { return response.InternalError(err) } @@ -282,8 +333,8 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons } // isCertificateSigningRequestApproved checks if the certificate signing request is approved. -func isCertificateSigningRequestApproved(conditions []v1.CertificateSigningRequestCondition) bool { - for _, condition := range conditions { +func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) bool { + for _, condition := range csr.Status.Conditions { if condition.Type == v1.CertificateApproved && condition.Status == corev1.ConditionTrue { return true } diff --git a/src/k8s/pkg/utils/file_operation.go b/src/k8s/pkg/utils/file_operation.go new file mode 100644 index 000000000..60e1c87cc --- /dev/null +++ b/src/k8s/pkg/utils/file_operation.go @@ -0,0 +1,73 @@ +package utils + +import ( + "fmt" + "io" + "os" +) + +// FileOperations is a struct that helps in perfoming file operations like +// backup and write multiple files. +type FileOperations struct { + BackupPath string + SourcePath string + Content []byte + Permissions os.FileMode +} + +// BackupFiles backs up the files in the operations slice. +func BackupFiles(operations []FileOperations) error { + for _, op := range operations { + if err := backupFile(op.SourcePath, op.BackupPath); err != nil { + return fmt.Errorf("backup failed: %w", err) + } + } + return nil +} + +// WriteFiles writes the files in the operations slice. +func WriteFiles(operations []FileOperations) error { + for _, op := range operations { + if err := os.WriteFile(op.SourcePath, op.Content, op.Permissions); err != nil { + return fmt.Errorf("failed to write file: %w", err) + } + } + return nil +} + +// backupFile backs up the file at sourcePath to backupPath. +func backupFile(sourcePath, backupPath string) error { + err := copyFile(sourcePath, backupPath) + if err != nil { + return fmt.Errorf("failed to backup file: %w", err) + } + return nil +} + +// CopyFiles copies the files in the operations slice perserving the permissions. +func copyFile(sourcePath, destinationPath string) error { + in, err := os.Open(sourcePath) + if err != nil { + return fmt.Errorf("failed to open source file: %w", err) + } + defer in.Close() + + sourceInfo, err := os.Stat(sourcePath) + if err != nil { + return fmt.Errorf("failed to get source file info: %w", err) + } + + out, err := os.Create(destinationPath) + if err != nil { + return fmt.Errorf("failed to open destination file: %w", err) + } + defer out.Close() + + os.Chmod(destinationPath, sourceInfo.Mode()) + + if _, err := io.Copy(in, out); err != nil { + return fmt.Errorf("failed to copy file: %w", err) + } + + return nil +} From 90fecd37221ab07b5a944babaab8775b1bb9190a Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Tue, 30 Jul 2024 07:12:53 -0500 Subject: [PATCH 04/21] Change backup files extension to `old` --- src/k8s/pkg/k8sd/api/certs_refresh.go | 17 ++++++++++------- src/k8s/pkg/utils/file_operation.go | 10 ++++++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 17de366fc..35917c377 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -32,7 +32,6 @@ func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) respon } if isWorker { return refreshCertsPlanWorker(s, r, snap) - } else { // TODO: Control Plane refresh return response.InternalError(fmt.Errorf("not implemented yet")) @@ -172,7 +171,6 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons } if isWorker { return refreshCertsRunWorker(r, snap) - } else { // TODO: Control Plane refresh return response.InternalError(fmt.Errorf("not implemented yet")) @@ -202,22 +200,22 @@ func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { operations := []utils.FileOperations{ { SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt"), - BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt.bak"), + BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt.old"), Permissions: 0600, }, { SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key"), - BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.bak"), + BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.old"), Permissions: 0600, }, { SourcePath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), - BackupPath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf.bak"), + BackupPath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf.old"), Permissions: 0600, }, { SourcePath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), - BackupPath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf.bak"), + BackupPath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf.old"), Permissions: 0600, }, } @@ -237,7 +235,7 @@ func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { return response.InternalError(fmt.Errorf("CSR %s has not been approved", csrName)) } - if len(csr.Status.Certificate) == 0 { + if !isCertificateSigningRequestIssued(csr) { log.Error(fmt.Errorf("CSR %s has not been issued", csrName), "CSR has not been issued") return response.InternalError(fmt.Errorf("CSR %s has not been issued", csrName)) } @@ -341,3 +339,8 @@ func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) bool } return false } + +// isCertificateSigningRequestIssued checks if the certificate signing request is issued. +func isCertificateSigningRequestIssued(csr *v1.CertificateSigningRequest) bool { + return len(csr.Status.Certificate) > 0 +} diff --git a/src/k8s/pkg/utils/file_operation.go b/src/k8s/pkg/utils/file_operation.go index 60e1c87cc..02e2a939e 100644 --- a/src/k8s/pkg/utils/file_operation.go +++ b/src/k8s/pkg/utils/file_operation.go @@ -19,7 +19,7 @@ type FileOperations struct { func BackupFiles(operations []FileOperations) error { for _, op := range operations { if err := backupFile(op.SourcePath, op.BackupPath); err != nil { - return fmt.Errorf("backup failed: %w", err) + return fmt.Errorf("failed to backup file %s to %s: %w", op.SourcePath, op.BackupPath, err) } } return nil @@ -29,7 +29,7 @@ func BackupFiles(operations []FileOperations) error { func WriteFiles(operations []FileOperations) error { for _, op := range operations { if err := os.WriteFile(op.SourcePath, op.Content, op.Permissions); err != nil { - return fmt.Errorf("failed to write file: %w", err) + return fmt.Errorf("failed to write file %s: %w", op.SourcePath, err) } } return nil @@ -44,7 +44,7 @@ func backupFile(sourcePath, backupPath string) error { return nil } -// CopyFiles copies the files in the operations slice perserving the permissions. +// CopyFiles copies the files in the operations slice preserving the permissions. func copyFile(sourcePath, destinationPath string) error { in, err := os.Open(sourcePath) if err != nil { @@ -63,7 +63,9 @@ func copyFile(sourcePath, destinationPath string) error { } defer out.Close() - os.Chmod(destinationPath, sourceInfo.Mode()) + if err := os.Chmod(destinationPath, sourceInfo.Mode()); err != nil { + return fmt.Errorf("failed to set permissions on destination file: %w", err) + } if _, err := io.Copy(in, out); err != nil { return fmt.Errorf("failed to copy file: %w", err) From 5fcf7222e2e6544dfd7e257e6fb639418ead11cc Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Tue, 30 Jul 2024 18:18:04 -0500 Subject: [PATCH 05/21] Switch refresh to go routines --- src/k8s/go.mod | 2 +- src/k8s/pkg/k8sd/api/certs_refresh.go | 376 ++++++++++---------------- src/k8s/pkg/utils/file_operation.go | 75 ----- 3 files changed, 143 insertions(+), 310 deletions(-) delete mode 100644 src/k8s/pkg/utils/file_operation.go diff --git a/src/k8s/go.mod b/src/k8s/go.mod index 6fa66bd98..25ae69654 100644 --- a/src/k8s/go.mod +++ b/src/k8s/go.mod @@ -14,6 +14,7 @@ require ( github.com/spf13/cobra v1.8.1 golang.org/x/net v0.27.0 golang.org/x/sys v0.22.0 + golang.org/x/sync v0.7.0 // indirect gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.15.3 k8s.io/api v0.30.1 @@ -150,7 +151,6 @@ require ( golang.org/x/crypto v0.25.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 35917c377..7c23ab72b 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -1,10 +1,10 @@ package api import ( - "crypto/rand" "crypto/x509/pkix" - "encoding/binary" "fmt" + "math" + "math/rand" "net" "net/http" "os" @@ -20,146 +20,39 @@ import ( pkiutil "github.com/canonical/k8s/pkg/utils/pki" "github.com/canonical/lxd/lxd/response" "github.com/canonical/microcluster/state" + "golang.org/x/sync/errgroup" + certificatesv1 "k8s.io/api/certificates/v1" v1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) response.Response { + log := log.FromContext(r.Context()) + + log.Info("Generating random seed for certificates refresh") + seed := rand.Intn(math.MaxInt) + snap := e.provider.Snap() isWorker, err := snaputil.IsWorker(snap) if err != nil { return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err)) } if isWorker { - return refreshCertsPlanWorker(s, r, snap) - } else { - // TODO: Control Plane refresh - return response.InternalError(fmt.Errorf("not implemented yet")) - } - -} - -// refreshCertsPlanWorker generates the CSRs for the worker node and returns the seed and the names of the CSRs. -func refreshCertsPlanWorker(s *state.State, r *http.Request, snap snap.Snap) response.Response { - log := log.FromContext(r.Context()) - - client, err := snap.KubernetesNodeClient("") - if err != nil { - return response.InternalError(err) - } - - var seed int32 - - err = binary.Read(rand.Reader, binary.BigEndian, &seed) - if err != nil { - return response.InternalError(err) - } - seed = seed & 0x7FFFFFFF - - log.Info("Generating CSRs for worker node") - log.Info("Generating Kubelet Serving Certificate Signing Request") - - csrKubeletServing, pKeyKubeletServing, err := pkiutil.GenerateCSR( - pkix.Name{ - CommonName: fmt.Sprintf("system:node:%s", snap.Hostname()), - Organization: []string{"system:nodes"}, + return response.SyncResponse(true, apiv1.RefreshCertificatesPlanResponse{ + Seed: seed, + CertificatesSigningRequests: []string{ + fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed), + fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), + fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), + }, }, - 2048, - nil, - []string{snap.Hostname()}, - []net.IP{net.ParseIP(s.Address().Hostname())}, - ) - - _, err = client.CreateCertificateSigningRequest( - r.Context(), - fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed), - []byte(csrKubeletServing), - []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageServerAuth}, - []string{"system:nodes"}, - "k8sd.io/kubelet-serving", - ) - if err != nil { - return response.InternalError(err) + ) } - log.Info("Generating Kubelet Client Certificate Signing Request") - csrKubeletClient, pKeyKubeletClient, err := pkiutil.GenerateCSR( - pkix.Name{ - CommonName: fmt.Sprintf("system:node:%s", snap.Hostname()), - Organization: []string{"system:nodes"}, - }, - 2048, - nil, - nil, - nil, - ) - - _, err = client.CreateCertificateSigningRequest( - r.Context(), - fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), - []byte(csrKubeletClient), - []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, - nil, - "k8sd.io/kubelet-client", - ) - if err != nil { - return response.InternalError(err) - } - - log.Info("Generating Kube Proxy Client Certificate Signing Request") - csrKubeProxy, pKeyKubeProxy, err := pkiutil.GenerateCSR( - pkix.Name{ - CommonName: "system:kube-proxy", - }, - 2048, - nil, - nil, - nil, - ) - - _, err = client.CreateCertificateSigningRequest( - r.Context(), - fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), - []byte(csrKubeProxy), - []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, - nil, - "k8sd.io/kube-proxy-client", - ) - - result := apiv1.RefreshCertificatesPlanResponse{ - Seed: int(seed), - CertificatesSigningRequests: []string{ - fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed), - fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), - fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), - }, - } - - log.Info("Writing new private keys") - operations := []utils.FileOperations{ - { - SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.tmp"), - Content: []byte(pKeyKubeletServing), - Permissions: 0600, - }, - { - SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.tmp"), - Content: []byte(pKeyKubeletClient), - Permissions: 0600, - }, - { - SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.tmp"), - Content: []byte(pKeyKubeProxy), - Permissions: 0600, - }, - } - - err = utils.WriteFiles(operations) - if err != nil { - return response.InternalError(err) - } - - return response.SyncResponse(true, &result) + return response.SyncResponse(true, apiv1.RefreshCertificatesPlanResponse{ + Seed: seed, + }) } @@ -170,16 +63,16 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err)) } if isWorker { - return refreshCertsRunWorker(r, snap) - } else { - // TODO: Control Plane refresh - return response.InternalError(fmt.Errorf("not implemented yet")) + return refreshCertsRunWorker(s, r, snap) } + // TODO: Control Plane refresh + return response.InternalError(fmt.Errorf("not implemented yet")) } // refreshCertsRunWorker refreshes the certificates for a worker node -func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { +func refreshCertsRunWorker(s *state.State, r *http.Request, snap snap.Snap) response.Response { log := log.FromContext(r.Context()) + req := apiv1.RefreshCertificatesRunRequest{} if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil { return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) @@ -189,76 +82,9 @@ func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { if err != nil { return response.InternalError(err) } - csrNames := []string{ - fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), - fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), - fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), - } + log.Info("Generating CSRs for worker node") certificates := pki.WorkerNodePKI{} - - operations := []utils.FileOperations{ - { - SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt"), - BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.crt.old"), - Permissions: 0600, - }, - { - SourcePath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key"), - BackupPath: filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.old"), - Permissions: 0600, - }, - { - SourcePath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), - BackupPath: filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf.old"), - Permissions: 0600, - }, - { - SourcePath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), - BackupPath: filepath.Join(snap.KubernetesConfigDir(), "proxy.conf.old"), - Permissions: 0600, - }, - } - - log.Info("Backing up kubelet and kube-proxy certificates and configurations") - utils.BackupFiles(operations) - - log.Info("Checking if the CSRs have been approved and issued") - for _, csrName := range csrNames { - csr, err := client.GetCertificateSigningRequest(r.Context(), csrName) - if err != nil { - return response.InternalError(err) - } - - if !isCertificateSigningRequestApproved(csr) { - log.Error(fmt.Errorf("CSR %s has not been approved", csrName), "CSR has not been approved") - return response.InternalError(fmt.Errorf("CSR %s has not been approved", csrName)) - } - - if !isCertificateSigningRequestIssued(csr) { - log.Error(fmt.Errorf("CSR %s has not been issued", csrName), "CSR has not been issued") - return response.InternalError(fmt.Errorf("CSR %s has not been issued", csrName)) - } - - if _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), ""); err != nil { - log.Error(err, fmt.Sprintf("failed to load certificate for CSR %s", csrName)) - return response.InternalError(fmt.Errorf("failed to load certificate for CSR %s: %w", csrName, err)) - } - - switch csrName { - case fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed): - certificates.KubeletCert = string(csr.Status.Certificate) - case fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed): - certificates.KubeletClientCert = string(csr.Status.Certificate) - case fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed): - certificates.KubeProxyClientCert = string(csr.Status.Certificate) - default: - log.Error(fmt.Errorf("unknown CSR %s", csrName), "Unknown CSR") - return response.InternalError(fmt.Errorf("unknown CSR %s", csrName)) - } - - } - // Read the CA and client CA ca, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "ca.crt")) if err != nil { @@ -269,28 +95,118 @@ func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { return response.InternalError(err) } - // Read the new private keys - bytesKubeletKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.tmp")) - if err != nil { - return response.InternalError(err) - } - - bytesKubeletClientKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.tmp")) - if err != nil { - return response.InternalError(err) - } + certificates.CACert = string(ca) + certificates.ClientCACert = string(clientCA) - bytesKubeProxyKey, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.tmp")) - if err != nil { + g, errGroupCTX := errgroup.WithContext(r.Context()) + + for _, csr := range []struct { + name string + commonName string + organization []string + usages []v1.KeyUsage + hostnames []string + ips []net.IP + signerName string + }{ + { + name: fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), + commonName: fmt.Sprintf("system:node:%s", snap.Hostname()), + organization: []string{"system:nodes"}, + usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageServerAuth}, + hostnames: []string{snap.Hostname()}, + ips: []net.IP{net.ParseIP(s.Address().Hostname())}, + signerName: "k8sd.io/kubelet-serving", + }, + { + name: fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), + commonName: fmt.Sprintf("system:node:%s", snap.Hostname()), + organization: []string{"system:nodes"}, + usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + signerName: "k8sd.io/kubelet-client", + }, + { + name: fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), + commonName: "system:kube-proxy", + usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + signerName: "k8sd.io/kube-proxy-client", + }, + } { + csr := csr + g.Go(func() error { + componentCSR, pKey, err := pkiutil.GenerateCSR( + pkix.Name{ + CommonName: csr.commonName, + Organization: csr.organization, + }, + 2048, + nil, + csr.hostnames, + csr.ips, + ) + if err != nil { + return fmt.Errorf("failed to generate CSR for %s: %w", csr.name, err) + } + + _, err = client.CertificatesV1().CertificateSigningRequests().Create(errGroupCTX, &certificatesv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: csr.name, + }, + Spec: certificatesv1.CertificateSigningRequestSpec{ + Request: []byte(componentCSR), + Usages: csr.usages, + SignerName: csr.signerName, + }, + }, metav1.CreateOptions{}) + + if err != nil { + return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err) + } + + for { + select { + case <-errGroupCTX.Done(): + return nil + default: + request, err := client.CertificatesV1().CertificateSigningRequests().Get(errGroupCTX, csr.name, metav1.GetOptions{}) + if err != nil { + log.Error(err, fmt.Sprintf("failed to get CSR %s", csr.name)) + continue + } + + approved, err := isCertificateSigningRequestApproved(request) + if err != nil { + log.Error(err, fmt.Sprintf("failed to check approval status for CSR %s", csr.name)) + return fmt.Errorf("failed to check approval status for CSR %s: %w", csr.name, err) + } + + if approved && isCertificateSigningRequestIssued(request) { + if _, _, err = pkiutil.LoadCertificate(string(request.Status.Certificate), ""); err != nil { + log.Error(err, fmt.Sprintf("failed to load certificate for CSR %s", csr.name)) + } + switch csr.name { + case fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed): + certificates.KubeletCert = string(request.Status.Certificate) + certificates.KubeletKey = string(pKey) + case fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed): + certificates.KubeletClientCert = string(request.Status.Certificate) + certificates.KubeletClientKey = string(pKey) + case fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed): + certificates.KubeProxyClientCert = string(request.Status.Certificate) + certificates.KubeProxyClientKey = string(pKey) + } + return nil + } + } + } + }) + + } + + if err := g.Wait(); err != nil { return response.InternalError(err) } - certificates.CACert = string(ca) - certificates.ClientCACert = string(clientCA) - certificates.KubeletKey = string(bytesKubeletKey) - certificates.KubeletClientKey = string(bytesKubeletClientKey) - certificates.KubeProxyClientKey = string(bytesKubeProxyKey) - log.Info("Ensuring worker PKI") if _, err = setup.EnsureWorkerPKI(snap, &certificates); err != nil { return response.InternalError(err) @@ -314,30 +230,22 @@ func refreshCertsRunWorker(r *http.Request, snap snap.Snap) response.Response { return response.InternalError(err) } - // Remove the new private keys - log.Info("Removing temporal private keys") - if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet.key.tmp")); err != nil { - return response.InternalError(err) - } - if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kubelet-client.key.tmp")); err != nil { - return response.InternalError(err) - } - if err := os.Remove(filepath.Join(snap.KubernetesPKIDir(), "kube-proxy.key.tmp")); err != nil { - return response.InternalError(err) - } - return response.SyncResponse(true, nil) } // isCertificateSigningRequestApproved checks if the certificate signing request is approved. -func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) bool { +// It returns true if the CSR is approved, false if it is pending, and an error if it is denied. +func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) (bool, error) { for _, condition := range csr.Status.Conditions { if condition.Type == v1.CertificateApproved && condition.Status == corev1.ConditionTrue { - return true + return true, nil + } + if condition.Type == v1.CertificateDenied && condition.Status == corev1.ConditionTrue { + return false, fmt.Errorf("CSR %s was denied: %s", csr.Name, condition.Reason) } } - return false + return false, nil } // isCertificateSigningRequestIssued checks if the certificate signing request is issued. diff --git a/src/k8s/pkg/utils/file_operation.go b/src/k8s/pkg/utils/file_operation.go deleted file mode 100644 index 02e2a939e..000000000 --- a/src/k8s/pkg/utils/file_operation.go +++ /dev/null @@ -1,75 +0,0 @@ -package utils - -import ( - "fmt" - "io" - "os" -) - -// FileOperations is a struct that helps in perfoming file operations like -// backup and write multiple files. -type FileOperations struct { - BackupPath string - SourcePath string - Content []byte - Permissions os.FileMode -} - -// BackupFiles backs up the files in the operations slice. -func BackupFiles(operations []FileOperations) error { - for _, op := range operations { - if err := backupFile(op.SourcePath, op.BackupPath); err != nil { - return fmt.Errorf("failed to backup file %s to %s: %w", op.SourcePath, op.BackupPath, err) - } - } - return nil -} - -// WriteFiles writes the files in the operations slice. -func WriteFiles(operations []FileOperations) error { - for _, op := range operations { - if err := os.WriteFile(op.SourcePath, op.Content, op.Permissions); err != nil { - return fmt.Errorf("failed to write file %s: %w", op.SourcePath, err) - } - } - return nil -} - -// backupFile backs up the file at sourcePath to backupPath. -func backupFile(sourcePath, backupPath string) error { - err := copyFile(sourcePath, backupPath) - if err != nil { - return fmt.Errorf("failed to backup file: %w", err) - } - return nil -} - -// CopyFiles copies the files in the operations slice preserving the permissions. -func copyFile(sourcePath, destinationPath string) error { - in, err := os.Open(sourcePath) - if err != nil { - return fmt.Errorf("failed to open source file: %w", err) - } - defer in.Close() - - sourceInfo, err := os.Stat(sourcePath) - if err != nil { - return fmt.Errorf("failed to get source file info: %w", err) - } - - out, err := os.Create(destinationPath) - if err != nil { - return fmt.Errorf("failed to open destination file: %w", err) - } - defer out.Close() - - if err := os.Chmod(destinationPath, sourceInfo.Mode()); err != nil { - return fmt.Errorf("failed to set permissions on destination file: %w", err) - } - - if _, err := io.Copy(in, out); err != nil { - return fmt.Errorf("failed to copy file: %w", err) - } - - return nil -} From 4ef18a80f5d623876250a152393175731f30ce59 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 31 Jul 2024 07:00:25 -0500 Subject: [PATCH 06/21] Change func signature --- src/k8s/go.mod | 1 + src/k8s/pkg/k8sd/api/certs_refresh.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/k8s/go.mod b/src/k8s/go.mod index 25ae69654..6575aa8c2 100644 --- a/src/k8s/go.mod +++ b/src/k8s/go.mod @@ -13,6 +13,7 @@ require ( github.com/pelletier/go-toml v1.9.5 github.com/spf13/cobra v1.8.1 golang.org/x/net v0.27.0 + golang.org/x/sync v0.7.0 golang.org/x/sys v0.22.0 golang.org/x/sync v0.7.0 // indirect gopkg.in/yaml.v2 v2.4.0 diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 7c23ab72b..dee91fc28 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -27,7 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) response.Response { +func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response { log := log.FromContext(r.Context()) log.Info("Generating random seed for certificates refresh") @@ -56,7 +56,7 @@ func (e *Endpoints) postRefreshCertsPlan(s *state.State, r *http.Request) respon } -func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) response.Response { +func (e *Endpoints) postRefreshCertsRun(s state.State, r *http.Request) response.Response { snap := e.provider.Snap() isWorker, err := snaputil.IsWorker(snap) if err != nil { @@ -70,7 +70,7 @@ func (e *Endpoints) postRefreshCertsRun(s *state.State, r *http.Request) respons } // refreshCertsRunWorker refreshes the certificates for a worker node -func refreshCertsRunWorker(s *state.State, r *http.Request, snap snap.Snap) response.Response { +func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) response.Response { log := log.FromContext(r.Context()) req := apiv1.RefreshCertificatesRunRequest{} From 36666070cc877b2f408934995eaff8617837dc8f Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 31 Jul 2024 11:47:59 -0500 Subject: [PATCH 07/21] Use Watch instead of Get --- src/k8s/pkg/k8sd/api/certs_refresh.go | 75 ++++++++++++++------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index dee91fc28..4a2218547 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -28,9 +28,6 @@ import ( ) func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response { - log := log.FromContext(r.Context()) - - log.Info("Generating random seed for certificates refresh") seed := rand.Intn(math.MaxInt) snap := e.provider.Snap() @@ -83,7 +80,6 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return response.InternalError(err) } - log.Info("Generating CSRs for worker node") certificates := pki.WorkerNodePKI{} // Read the CA and client CA ca, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "ca.crt")) @@ -108,6 +104,8 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo hostnames []string ips []net.IP signerName string + certificate *string + key *string }{ { name: fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), @@ -117,6 +115,8 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo hostnames: []string{snap.Hostname()}, ips: []net.IP{net.ParseIP(s.Address().Hostname())}, signerName: "k8sd.io/kubelet-serving", + certificate: &certificates.KubeletCert, + key: &certificates.KubeletKey, }, { name: fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), @@ -124,17 +124,21 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo organization: []string{"system:nodes"}, usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, signerName: "k8sd.io/kubelet-client", + certificate: &certificates.KubeletClientCert, + key: &certificates.KubeletClientKey, }, { - name: fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), - commonName: "system:kube-proxy", - usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, - signerName: "k8sd.io/kube-proxy-client", + name: fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), + commonName: "system:kube-proxy", + usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + signerName: "k8sd.io/kube-proxy-client", + certificate: &certificates.KubeProxyClientCert, + key: &certificates.KubeProxyClientKey, }, } { csr := csr g.Go(func() error { - componentCSR, pKey, err := pkiutil.GenerateCSR( + csrPEM, keyPEM, err := pkiutil.GenerateCSR( pkix.Name{ CommonName: csr.commonName, Organization: csr.organization, @@ -148,53 +152,50 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return fmt.Errorf("failed to generate CSR for %s: %w", csr.name, err) } - _, err = client.CertificatesV1().CertificateSigningRequests().Create(errGroupCTX, &certificatesv1.CertificateSigningRequest{ + if _, err = client.CertificatesV1().CertificateSigningRequests().Create(errGroupCTX, &certificatesv1.CertificateSigningRequest{ ObjectMeta: metav1.ObjectMeta{ Name: csr.name, }, Spec: certificatesv1.CertificateSigningRequestSpec{ - Request: []byte(componentCSR), + Request: []byte(csrPEM), Usages: csr.usages, SignerName: csr.signerName, }, - }, metav1.CreateOptions{}) - - if err != nil { + }, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err) } + watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(errGroupCTX, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) + if err != nil { + return fmt.Errorf("failed to watch CSR %s: %w", csr.name, err) + } + + defer watcher.Stop() for { select { case <-errGroupCTX.Done(): return nil - default: - request, err := client.CertificatesV1().CertificateSigningRequests().Get(errGroupCTX, csr.name, metav1.GetOptions{}) - if err != nil { - log.Error(err, fmt.Sprintf("failed to get CSR %s", csr.name)) - continue + case evt, ok := <-watcher.ResultChan(): + if !ok { + return fmt.Errorf("watch closed for CSR %s", csr.name) + } + + request, ok := evt.Object.(*v1.CertificateSigningRequest) + if !ok { + return fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) } approved, err := isCertificateSigningRequestApproved(request) if err != nil { - log.Error(err, fmt.Sprintf("failed to check approval status for CSR %s", csr.name)) - return fmt.Errorf("failed to check approval status for CSR %s: %w", csr.name, err) + return fmt.Errorf("csr is in an invalid state: %w", err) } if approved && isCertificateSigningRequestIssued(request) { if _, _, err = pkiutil.LoadCertificate(string(request.Status.Certificate), ""); err != nil { log.Error(err, fmt.Sprintf("failed to load certificate for CSR %s", csr.name)) } - switch csr.name { - case fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed): - certificates.KubeletCert = string(request.Status.Certificate) - certificates.KubeletKey = string(pKey) - case fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed): - certificates.KubeletClientCert = string(request.Status.Certificate) - certificates.KubeletClientKey = string(pKey) - case fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed): - certificates.KubeProxyClientCert = string(request.Status.Certificate) - certificates.KubeProxyClientKey = string(pKey) - } + *csr.certificate = string(request.Status.Certificate) + *csr.key = string(keyPEM) return nil } } @@ -204,16 +205,14 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } if err := g.Wait(); err != nil { - return response.InternalError(err) + return response.InternalError(fmt.Errorf("failed to generate worker CSRs: %w", err)) } - log.Info("Ensuring worker PKI") if _, err = setup.EnsureWorkerPKI(snap, &certificates); err != nil { return response.InternalError(err) } // Kubeconfigs - log.Info("Generating new kubeconfigs") if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), "127.0.0.1:6443", certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil { return response.InternalError(fmt.Errorf("failed to generate kubelet kubeconfig: %w", err)) } @@ -235,7 +234,8 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } // isCertificateSigningRequestApproved checks if the certificate signing request is approved. -// It returns true if the CSR is approved, false if it is pending, and an error if it is denied. +// It returns true if the CSR is approved, false if it is pending, and an error if it is denied +// or failed. func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) (bool, error) { for _, condition := range csr.Status.Conditions { if condition.Type == v1.CertificateApproved && condition.Status == corev1.ConditionTrue { @@ -244,6 +244,9 @@ func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) (boo if condition.Type == v1.CertificateDenied && condition.Status == corev1.ConditionTrue { return false, fmt.Errorf("CSR %s was denied: %s", csr.Name, condition.Reason) } + if condition.Type == v1.CertificateFailed && condition.Status == corev1.ConditionTrue { + return false, fmt.Errorf("CSR %s failed: %s", csr.Name, condition.Reason) + } } return false, nil } From cccf7148fd9efc0b64f12f8e905d4170fc495dc5 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 31 Jul 2024 17:17:31 -0500 Subject: [PATCH 08/21] Remove CSR helpers --- src/k8s/pkg/client/kubernetes/csr.go | 37 --------------------------- src/k8s/pkg/k8sd/api/certs_refresh.go | 10 +++++++- 2 files changed, 9 insertions(+), 38 deletions(-) delete mode 100644 src/k8s/pkg/client/kubernetes/csr.go diff --git a/src/k8s/pkg/client/kubernetes/csr.go b/src/k8s/pkg/client/kubernetes/csr.go deleted file mode 100644 index b0470e334..000000000 --- a/src/k8s/pkg/client/kubernetes/csr.go +++ /dev/null @@ -1,37 +0,0 @@ -package kubernetes - -import ( - "context" - "fmt" - - certificatesv1 "k8s.io/api/certificates/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func (c *Client) CreateCertificateSigningRequest(ctx context.Context, name string, csrPEM []byte, usages []certificatesv1.KeyUsage, groups []string, signerName string) (*certificatesv1.CertificateSigningRequest, error) { - csr, err := c.CertificatesV1().CertificateSigningRequests().Create(ctx, &certificatesv1.CertificateSigningRequest{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: certificatesv1.CertificateSigningRequestSpec{ - Request: csrPEM, - Usages: usages, - Groups: groups, - SignerName: signerName, - }, - }, metav1.CreateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to create certificate signing request %s: %w", name, err) - } - - return csr, nil -} - -func (c *Client) GetCertificateSigningRequest(ctx context.Context, name string) (*certificatesv1.CertificateSigningRequest, error) { - csr, err := c.CertificatesV1().CertificateSigningRequests().Get(ctx, name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get certificate signing request %s: %w", name, err) - } - - return csr, nil -} diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 4a2218547..348ec818d 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -229,7 +229,15 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return response.InternalError(err) } - return response.SyncResponse(true, nil) + cert, _, err := pkiutil.LoadCertificate(certificates.KubeletCert, "") + if err != nil { + return response.InternalError(fmt.Errorf("failed to load kubelet certificate: %w", err)) + } + + expirationDuration := cert.NotAfter.Sub(cert.NotBefore) + return response.SyncResponse(true, apiv1.RefreshCertificatesRunResponse{ + ExpirationSeconds: int(expirationDuration.Seconds()), + }) } From 1c520093d8d7a7a3d2f8162bcc38ed6cd3430f52 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Thu, 1 Aug 2024 08:52:33 -0500 Subject: [PATCH 09/21] Get CA/ClientCA from ClusterConfig --- src/k8s/pkg/k8sd/api/certs_refresh.go | 30 ++++++++++++------------- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 4 ++++ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 348ec818d..5aa4d9c66 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -7,10 +7,10 @@ import ( "math/rand" "net" "net/http" - "os" "path/filepath" apiv1 "github.com/canonical/k8s/api/v1" + databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" "github.com/canonical/k8s/pkg/k8sd/pki" "github.com/canonical/k8s/pkg/k8sd/setup" "github.com/canonical/k8s/pkg/log" @@ -81,20 +81,17 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } certificates := pki.WorkerNodePKI{} - // Read the CA and client CA - ca, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "ca.crt")) - if err != nil { - return response.InternalError(err) - } - clientCA, err := os.ReadFile(filepath.Join(snap.KubernetesPKIDir(), "client-ca.crt")) - if err != nil { - return response.InternalError(err) + + clusterConfig, err := databaseutil.GetClusterConfig(r.Context(), s) + + if clusterConfig.Certificates.CACert == nil || clusterConfig.Certificates.ClientCACert == nil { + return response.InternalError(fmt.Errorf("missing CA certificates")) } - certificates.CACert = string(ca) - certificates.ClientCACert = string(clientCA) + certificates.CACert = *clusterConfig.Certificates.CACert + certificates.ClientCACert = *clusterConfig.Certificates.ClientCACert - g, errGroupCTX := errgroup.WithContext(r.Context()) + g, ctx := errgroup.WithContext(r.Context()) for _, csr := range []struct { name string @@ -152,7 +149,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return fmt.Errorf("failed to generate CSR for %s: %w", csr.name, err) } - if _, err = client.CertificatesV1().CertificateSigningRequests().Create(errGroupCTX, &certificatesv1.CertificateSigningRequest{ + if _, err = client.CertificatesV1().CertificateSigningRequests().Create(ctx, &certificatesv1.CertificateSigningRequest{ ObjectMeta: metav1.ObjectMeta{ Name: csr.name, }, @@ -164,7 +161,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo }, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err) } - watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(errGroupCTX, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) + watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) if err != nil { return fmt.Errorf("failed to watch CSR %s: %w", csr.name, err) } @@ -173,7 +170,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo for { select { - case <-errGroupCTX.Done(): + case <-ctx.Done(): return nil case evt, ok := <-watcher.ResultChan(): if !ok { @@ -192,7 +189,8 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo if approved && isCertificateSigningRequestIssued(request) { if _, _, err = pkiutil.LoadCertificate(string(request.Status.Certificate), ""); err != nil { - log.Error(err, fmt.Sprintf("failed to load certificate for CSR %s", csr.name)) + log.WithValues("csr", csr.name).Error(err, "CertificateSigningRequest failed") + return fmt.Errorf("CertificateSigningRequest failed: %w", err) } *csr.certificate = string(request.Status.Certificate) *csr.key = string(keyPEM) diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 6353ebdfa..771d69608 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -189,6 +189,8 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT // Worker nodes only use a subset of the ClusterConfig struct. At the moment, these are: // - Network.PodCIDR and Network.ClusterCIDR: informative // - Certificates.K8sdPublicKey: used to verify the signature of the k8sd-config configmap. + // - Certificates.CACert: kubernetes CA certificate. + // - Certificates.ClientCACert: kubernetes client CA certificate. // // TODO(neoaggelos): We should be explicit here and try to avoid having worker nodes use // or set other cluster configuration keys by accident. @@ -199,6 +201,8 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT }, Certificates: types.Certificates{ K8sdPublicKey: utils.Pointer(response.K8sdPublicKey), + CACert: utils.Pointer(response.CACert), + ClientCACert: utils.Pointer(response.ClientCACert), }, } From 3071bcc7c8acd3e37794057b5c4b469040950b5f Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Thu, 1 Aug 2024 18:40:50 -0500 Subject: [PATCH 10/21] Address code review comments --- src/k8s/pkg/k8sd/api/certs_refresh.go | 7 ++++--- src/k8s/pkg/snap/snap.go | 4 ++++ src/k8s/pkg/utils/pki/generate.go | 6 +----- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index 5aa4d9c66..d8e6eb81a 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + watch "k8s.io/apimachinery/pkg/watch" ) func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response { @@ -141,7 +142,6 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo Organization: csr.organization, }, 2048, - nil, csr.hostnames, csr.ips, ) @@ -161,8 +161,10 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo }, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err) } + watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) if err != nil { + log.V(1).Error(err, "failed to watch CSR") return fmt.Errorf("failed to watch CSR %s: %w", csr.name, err) } @@ -203,7 +205,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } if err := g.Wait(); err != nil { - return response.InternalError(fmt.Errorf("failed to generate worker CSRs: %w", err)) + return response.InternalError(fmt.Errorf("failed to get worker node certificates: %w", err)) } if _, err = setup.EnsureWorkerPKI(snap, &certificates); err != nil { @@ -219,7 +221,6 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } // Restart the services - log.Info("Restarting kubelet and kube-proxy") if err := snap.RestartService(r.Context(), "kubelet"); err != nil { return response.InternalError(err) } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index e0af05bd7..cc6f835bb 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -13,6 +13,7 @@ import ( "github.com/canonical/k8s/pkg/client/k8sd" "github.com/canonical/k8s/pkg/client/kubernetes" "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/utils" "github.com/moby/sys/mountinfo" "gopkg.in/yaml.v2" @@ -50,16 +51,19 @@ func NewSnap(opts SnapOpts) *snap { // StartService starts a k8s service. The name can be either prefixed or not. func (s *snap) StartService(ctx context.Context, name string) error { + log.FromContext(ctx).WithCallDepth(1).Info("Starting service", "service", name) return s.runCommand(ctx, []string{"snapctl", "start", "--enable", serviceName(name)}) } // StopService stops a k8s service. The name can be either prefixed or not. func (s *snap) StopService(ctx context.Context, name string) error { + log.FromContext(ctx).WithCallDepth(1).Info("Stopping service", "service", name) return s.runCommand(ctx, []string{"snapctl", "stop", "--disable", serviceName(name)}) } // RestartService restarts a k8s service. The name can be either prefixed or not. func (s *snap) RestartService(ctx context.Context, name string) error { + log.FromContext(ctx).WithCallDepth(1).Info("Restarting service", "service", name) return s.runCommand(ctx, []string{"snapctl", "restart", serviceName(name)}) } diff --git a/src/k8s/pkg/utils/pki/generate.go b/src/k8s/pkg/utils/pki/generate.go index a090b7d2e..18eee9da9 100644 --- a/src/k8s/pkg/utils/pki/generate.go +++ b/src/k8s/pkg/utils/pki/generate.go @@ -124,7 +124,7 @@ func GenerateRSAKey(bits int) (string, string, error) { } // GenerateCSR generates a certificate signing request (CSR) and private key for the given subject. -func GenerateCSR(subject pkix.Name, bits int, priv any, dnsSANs []string, ipSANs []net.IP) (string, string, error) { +func GenerateCSR(subject pkix.Name, bits int, dnsSANs []string, ipSANs []net.IP) (string, string, error) { key, err := rsa.GenerateKey(rand.Reader, bits) if err != nil { return "", "", fmt.Errorf("failed to generate RSA private key: %w", err) @@ -134,10 +134,6 @@ func GenerateCSR(subject pkix.Name, bits int, priv any, dnsSANs []string, ipSANs return "", "", fmt.Errorf("failed to encode private key PEM") } - if priv == nil { - priv = key - } - csrKubeletServingTemplate := &x509.CertificateRequest{ Subject: subject, DNSNames: dnsSANs, From 33516281587b9d35dae557d8a8b1942c768f7637 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Fri, 2 Aug 2024 13:00:02 -0500 Subject: [PATCH 11/21] Add retry logic --- src/k8s/pkg/k8sd/api/certs_refresh.go | 71 ++++++++++++++++----------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certs_refresh.go index d8e6eb81a..199dd3163 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certs_refresh.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "path/filepath" + "time" apiv1 "github.com/canonical/k8s/api/v1" databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" @@ -25,7 +26,6 @@ import ( v1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - watch "k8s.io/apimachinery/pkg/watch" ) func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response { @@ -162,44 +162,59 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err) } - watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) - if err != nil { - log.V(1).Error(err, "failed to watch CSR") - return fmt.Errorf("failed to watch CSR %s: %w", csr.name, err) - } - - defer watcher.Stop() - for { select { case <-ctx.Done(): - return nil - case evt, ok := <-watcher.ResultChan(): - if !ok { - return fmt.Errorf("watch closed for CSR %s", csr.name) - } + return ctx.Err() + default: - request, ok := evt.Object.(*v1.CertificateSigningRequest) - if !ok { - return fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) - } - - approved, err := isCertificateSigningRequestApproved(request) + watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) if err != nil { - return fmt.Errorf("csr is in an invalid state: %w", err) + log.WithValues("csr", csr.name).V(1).Error(err, "failed to watch CSR") + time.Sleep(3 * time.Second) + continue } - if approved && isCertificateSigningRequestIssued(request) { - if _, _, err = pkiutil.LoadCertificate(string(request.Status.Certificate), ""); err != nil { - log.WithValues("csr", csr.name).Error(err, "CertificateSigningRequest failed") - return fmt.Errorf("CertificateSigningRequest failed: %w", err) + defer watcher.Stop() + + watchClosed := false + for !watchClosed { + select { + case <-ctx.Done(): + return nil + case evt, ok := <-watcher.ResultChan(): + if !ok { + log.WithValues("csr", csr.name).V(1).Info("watch closed") + watchClosed = true + } + + request, ok := evt.Object.(*v1.CertificateSigningRequest) + if !ok { + log.WithValues("csr", csr.name).Error(fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object), "unexpected object") + continue + } + + approved, err := isCertificateSigningRequestApproved(request) + if err != nil { + return fmt.Errorf("csr is in an invalid state: %w", err) + } + + if approved && isCertificateSigningRequestIssued(request) { + if _, _, err = pkiutil.LoadCertificate(string(request.Status.Certificate), ""); err != nil { + log.WithValues("csr", csr.name).Error(err, "CertificateSigningRequest failed") + return fmt.Errorf("CertificateSigningRequest failed: %w", err) + } + *csr.certificate = string(request.Status.Certificate) + *csr.key = string(keyPEM) + return nil + } } - *csr.certificate = string(request.Status.Certificate) - *csr.key = string(keyPEM) - return nil } + time.Sleep(3 * time.Second) } + } + }) } From 0cade8f54c3c589d0d24910c37425cc43d692226 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Mon, 5 Aug 2024 07:04:36 -0500 Subject: [PATCH 12/21] Wrap errors --- ...rts_refresh.go => certificates_refresh.go} | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) rename src/k8s/pkg/k8sd/api/{certs_refresh.go => certificates_refresh.go} (84%) diff --git a/src/k8s/pkg/k8sd/api/certs_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go similarity index 84% rename from src/k8s/pkg/k8sd/api/certs_refresh.go rename to src/k8s/pkg/k8sd/api/certificates_refresh.go index 199dd3163..e904e51da 100644 --- a/src/k8s/pkg/k8sd/api/certs_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -23,7 +23,6 @@ import ( "github.com/canonical/microcluster/state" "golang.org/x/sync/errgroup" certificatesv1 "k8s.io/api/certificates/v1" - v1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -78,12 +77,15 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo client, err := snap.KubernetesNodeClient("") if err != nil { - return response.InternalError(err) + return response.InternalError(fmt.Errorf("failed to get Kubernetes client: %w", err)) } certificates := pki.WorkerNodePKI{} clusterConfig, err := databaseutil.GetClusterConfig(r.Context(), s) + if err != nil { + return response.InternalError(fmt.Errorf("failed to get cluster configuration: %w", err)) + } if clusterConfig.Certificates.CACert == nil || clusterConfig.Certificates.ClientCACert == nil { return response.InternalError(fmt.Errorf("missing CA certificates")) @@ -98,7 +100,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo name string commonName string organization []string - usages []v1.KeyUsage + usages []certificatesv1.KeyUsage hostnames []string ips []net.IP signerName string @@ -109,7 +111,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo name: fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), commonName: fmt.Sprintf("system:node:%s", snap.Hostname()), organization: []string{"system:nodes"}, - usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageServerAuth}, + usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageServerAuth}, hostnames: []string{snap.Hostname()}, ips: []net.IP{net.ParseIP(s.Address().Hostname())}, signerName: "k8sd.io/kubelet-serving", @@ -120,7 +122,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo name: fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), commonName: fmt.Sprintf("system:node:%s", snap.Hostname()), organization: []string{"system:nodes"}, - usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageClientAuth}, signerName: "k8sd.io/kubelet-client", certificate: &certificates.KubeletClientCert, key: &certificates.KubeletClientKey, @@ -128,7 +130,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo { name: fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), commonName: "system:kube-proxy", - usages: []v1.KeyUsage{v1.UsageDigitalSignature, v1.UsageKeyEncipherment, v1.UsageClientAuth}, + usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageClientAuth}, signerName: "k8sd.io/kube-proxy-client", certificate: &certificates.KubeProxyClientCert, key: &certificates.KubeProxyClientKey, @@ -188,7 +190,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo watchClosed = true } - request, ok := evt.Object.(*v1.CertificateSigningRequest) + request, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) if !ok { log.WithValues("csr", csr.name).Error(fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object), "unexpected object") continue @@ -224,7 +226,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } if _, err = setup.EnsureWorkerPKI(snap, &certificates); err != nil { - return response.InternalError(err) + return response.InternalError(fmt.Errorf("failed to write worker PKI: %w", err)) } // Kubeconfigs @@ -237,10 +239,10 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo // Restart the services if err := snap.RestartService(r.Context(), "kubelet"); err != nil { - return response.InternalError(err) + return response.InternalError(fmt.Errorf("failed to restart kubelet: %w", err)) } if err := snap.RestartService(r.Context(), "kube-proxy"); err != nil { - return response.InternalError(err) + return response.InternalError(fmt.Errorf("failed to restart kube-proxy: %w", err)) } cert, _, err := pkiutil.LoadCertificate(certificates.KubeletCert, "") @@ -258,15 +260,15 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo // isCertificateSigningRequestApproved checks if the certificate signing request is approved. // It returns true if the CSR is approved, false if it is pending, and an error if it is denied // or failed. -func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) (bool, error) { +func isCertificateSigningRequestApproved(csr *certificatesv1.CertificateSigningRequest) (bool, error) { for _, condition := range csr.Status.Conditions { - if condition.Type == v1.CertificateApproved && condition.Status == corev1.ConditionTrue { + if condition.Type == certificatesv1.CertificateApproved && condition.Status == corev1.ConditionTrue { return true, nil } - if condition.Type == v1.CertificateDenied && condition.Status == corev1.ConditionTrue { + if condition.Type == certificatesv1.CertificateDenied && condition.Status == corev1.ConditionTrue { return false, fmt.Errorf("CSR %s was denied: %s", csr.Name, condition.Reason) } - if condition.Type == v1.CertificateFailed && condition.Status == corev1.ConditionTrue { + if condition.Type == certificatesv1.CertificateFailed && condition.Status == corev1.ConditionTrue { return false, fmt.Errorf("CSR %s failed: %s", csr.Name, condition.Reason) } } @@ -274,6 +276,6 @@ func isCertificateSigningRequestApproved(csr *v1.CertificateSigningRequest) (boo } // isCertificateSigningRequestIssued checks if the certificate signing request is issued. -func isCertificateSigningRequestIssued(csr *v1.CertificateSigningRequest) bool { +func isCertificateSigningRequestIssued(csr *certificatesv1.CertificateSigningRequest) bool { return len(csr.Status.Certificate) > 0 } From 6d5fad1b791771386decfa6cdabfc1218fbf75fc Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Tue, 6 Aug 2024 17:42:21 -0500 Subject: [PATCH 13/21] Refactor Retry Logic --- .../kubernetes/certificate_signing_request.go | 46 ++++++++ src/k8s/pkg/k8sd/api/certificates_refresh.go | 105 +++++++++--------- 2 files changed, 97 insertions(+), 54 deletions(-) create mode 100644 src/k8s/pkg/client/kubernetes/certificate_signing_request.go diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go new file mode 100644 index 000000000..06e738c0d --- /dev/null +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -0,0 +1,46 @@ +package kubernetes + +import ( + "context" + "fmt" + + certificatesv1 "k8s.io/api/certificates/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// WatchCertificateSigningRequest watches a CertificateSigningRequest with the given name and calls the verify function on each event. +func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) error { + + w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) + if err != nil { + return fmt.Errorf("failed to watch CSR %s: %w", name, err) + } + defer w.Stop() + for { + select { + case <-ctx.Done(): + return nil + case evt, ok := <-w.ResultChan(): + if !ok { + return fmt.Errorf("watch closed") + } + + csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) + if !ok { + return fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) + } + + valid, err := verify(csr) + // If the verify function returns an error, we should return it + if err != nil { + return err + } + + // If the verify function returns true, the CSR is valid and we can return + if valid { + return nil + } + + } + } +} diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index e904e51da..24ce126be 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -2,6 +2,7 @@ package api import ( "crypto/x509/pkix" + "errors" "fmt" "math" "math/rand" @@ -27,6 +28,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var ( + // errInvalidCSR is returned when the Kubernetes CSR is in an invalid state. + errInvalidCSR = errors.New("csr is in an invalid state") +) + func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response { seed := rand.Intn(math.MaxInt) @@ -165,56 +171,29 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } for { + err := client.WatchCertificateSigningRequest( + ctx, + csr.name, + func(request *certificatesv1.CertificateSigningRequest) (bool, error) { + return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key) + }) + + if err == nil { + return nil + } + + // Check if error is non-recoverable + if errors.Is(err, errInvalidCSR) { + return fmt.Errorf("certificate signing request failed: %w", err) + } + + log.WithValues("name", "k8sd").Error(err, "Failed to watch CSR") + select { case <-ctx.Done(): return ctx.Err() - default: - - watcher, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: csr.name})) - if err != nil { - log.WithValues("csr", csr.name).V(1).Error(err, "failed to watch CSR") - time.Sleep(3 * time.Second) - continue - } - - defer watcher.Stop() - - watchClosed := false - for !watchClosed { - select { - case <-ctx.Done(): - return nil - case evt, ok := <-watcher.ResultChan(): - if !ok { - log.WithValues("csr", csr.name).V(1).Info("watch closed") - watchClosed = true - } - - request, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) - if !ok { - log.WithValues("csr", csr.name).Error(fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object), "unexpected object") - continue - } - - approved, err := isCertificateSigningRequestApproved(request) - if err != nil { - return fmt.Errorf("csr is in an invalid state: %w", err) - } - - if approved && isCertificateSigningRequestIssued(request) { - if _, _, err = pkiutil.LoadCertificate(string(request.Status.Certificate), ""); err != nil { - log.WithValues("csr", csr.name).Error(err, "CertificateSigningRequest failed") - return fmt.Errorf("CertificateSigningRequest failed: %w", err) - } - *csr.certificate = string(request.Status.Certificate) - *csr.key = string(keyPEM) - return nil - } - } - } - time.Sleep(3 * time.Second) + case <-time.After(3 * time.Second): } - } }) @@ -257,16 +236,18 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } -// isCertificateSigningRequestApproved checks if the certificate signing request is approved. -// It returns true if the CSR is approved, false if it is pending, and an error if it is denied +// isCertificateSigningRequestApprovedAndIssued checks if the certificate +// signing request is approved and issued. It returns true if the CSR is +// approved and issued, false if it is pending, and an error if it is denied // or failed. -func isCertificateSigningRequestApproved(csr *certificatesv1.CertificateSigningRequest) (bool, error) { +func isCertificateSigningRequestApprovedAndIssued(csr *certificatesv1.CertificateSigningRequest) (bool, error) { for _, condition := range csr.Status.Conditions { if condition.Type == certificatesv1.CertificateApproved && condition.Status == corev1.ConditionTrue { - return true, nil + return len(csr.Status.Certificate) > 0, nil + } if condition.Type == certificatesv1.CertificateDenied && condition.Status == corev1.ConditionTrue { - return false, fmt.Errorf("CSR %s was denied: %s", csr.Name, condition.Reason) + return false, fmt.Errorf(":CSR %s was denied: %s", csr.Name, condition.Reason) } if condition.Type == certificatesv1.CertificateFailed && condition.Status == corev1.ConditionTrue { return false, fmt.Errorf("CSR %s failed: %s", csr.Name, condition.Reason) @@ -275,7 +256,23 @@ func isCertificateSigningRequestApproved(csr *certificatesv1.CertificateSigningR return false, nil } -// isCertificateSigningRequestIssued checks if the certificate signing request is issued. -func isCertificateSigningRequestIssued(csr *certificatesv1.CertificateSigningRequest) bool { - return len(csr.Status.Certificate) > 0 +// verifyCSRAndSetPKI verifies the certificate signing request and sets the +// certificate and key if the CSR is approved. +func verifyCSRAndSetPKI(csr *certificatesv1.CertificateSigningRequest, keyPEM string, certificate, key *string) (bool, error) { + approved, err := isCertificateSigningRequestApprovedAndIssued(csr) + if err != nil { + return false, fmt.Errorf("%w: failed to validate csr: %w", errInvalidCSR, err) + } + + if !approved { + return false, nil + } + + if _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), ""); err != nil { + return false, fmt.Errorf("%w: failed to load certificate: %w", errInvalidCSR, err) + } + + *certificate = string(csr.Status.Certificate) + *key = keyPEM + return true, nil } From 4ca2d8878b13d91c0adfd3c614b868fda78959e6 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 7 Aug 2024 08:09:55 -0500 Subject: [PATCH 14/21] Use retry instead of custom error --- .../kubernetes/certificate_signing_request.go | 35 +++++++++++-------- src/k8s/pkg/k8sd/api/certificates_refresh.go | 19 ++++------ 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index 06e738c0d..e7fad6245 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -8,37 +8,44 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// WatchCertificateSigningRequest watches a CertificateSigningRequest with the given name and calls the verify function on each event. -func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) error { - +// WatchCertificateSigningRequest watches a CertificateSigningRequest with the +// given name and calls the verify function on each event. +// +// The verify function should return true if the CSR is valid and processing +// should stop, or false if watching should continue. It should return an error +// if the CSR is in an invalid state (e.g., failed or denied) or the issued +// certificate is invalid. +// +// The function returns a bool indicating if the Watch should be retried and an +// error if an error occurred while watching the CSR or during verification. +// +// The function will continue watching and verifying until one of the above +// conditions is met or an error occurs during verification. +func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) if err != nil { - return fmt.Errorf("failed to watch CSR %s: %w", name, err) + return true, fmt.Errorf("failed to watch CSR %s: %w", name, err) } defer w.Stop() for { select { case <-ctx.Done(): - return nil + return false, ctx.Err() case evt, ok := <-w.ResultChan(): if !ok { - return fmt.Errorf("watch closed") + return true, fmt.Errorf("watch closed") } csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) if !ok { - return fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) + return true, fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) } valid, err := verify(csr) - // If the verify function returns an error, we should return it if err != nil { - return err - } - - // If the verify function returns true, the CSR is valid and we can return - if valid { - return nil + return false, fmt.Errorf("failed to verify CSR %s: %w", name, err) + } else if valid { + return false, nil } } diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index 24ce126be..87d69a4a8 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -2,7 +2,6 @@ package api import ( "crypto/x509/pkix" - "errors" "fmt" "math" "math/rand" @@ -28,11 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - // errInvalidCSR is returned when the Kubernetes CSR is in an invalid state. - errInvalidCSR = errors.New("csr is in an invalid state") -) - func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response { seed := rand.Intn(math.MaxInt) @@ -171,23 +165,24 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } for { - err := client.WatchCertificateSigningRequest( + retry, err := client.WatchCertificateSigningRequest( ctx, csr.name, func(request *certificatesv1.CertificateSigningRequest) (bool, error) { return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key) - }) + }, + ) if err == nil { return nil } // Check if error is non-recoverable - if errors.Is(err, errInvalidCSR) { + if retry == false { return fmt.Errorf("certificate signing request failed: %w", err) } - log.WithValues("name", "k8sd").Error(err, "Failed to watch CSR") + log.Error(err, "Failed to watch CSR") select { case <-ctx.Done(): @@ -261,7 +256,7 @@ func isCertificateSigningRequestApprovedAndIssued(csr *certificatesv1.Certificat func verifyCSRAndSetPKI(csr *certificatesv1.CertificateSigningRequest, keyPEM string, certificate, key *string) (bool, error) { approved, err := isCertificateSigningRequestApprovedAndIssued(csr) if err != nil { - return false, fmt.Errorf("%w: failed to validate csr: %w", errInvalidCSR, err) + return false, fmt.Errorf("failed to validate csr: %w", err) } if !approved { @@ -269,7 +264,7 @@ func verifyCSRAndSetPKI(csr *certificatesv1.CertificateSigningRequest, keyPEM st } if _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), ""); err != nil { - return false, fmt.Errorf("%w: failed to load certificate: %w", errInvalidCSR, err) + return false, fmt.Errorf("failed to load certificate: %w", err) } *certificate = string(csr.Status.Certificate) From 041331bcecca19562591e7d72082837a858af37e Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Thu, 8 Aug 2024 08:32:32 -0500 Subject: [PATCH 15/21] Tidy-up godocs and logic --- src/k8s/api/v1/certificates_refresh.go | 6 ++--- .../kubernetes/certificate_signing_request.go | 25 ++++++++++--------- src/k8s/pkg/k8sd/api/certificates_refresh.go | 23 +++++++---------- src/k8s/pkg/snap/snap.go | 6 ++--- 4 files changed, 28 insertions(+), 32 deletions(-) diff --git a/src/k8s/api/v1/certificates_refresh.go b/src/k8s/api/v1/certificates_refresh.go index 43b83e659..38d6499af 100644 --- a/src/k8s/api/v1/certificates_refresh.go +++ b/src/k8s/api/v1/certificates_refresh.go @@ -2,14 +2,14 @@ package apiv1 type RefreshCertificatesPlanResponse struct { Seed int `json:"seed"` - CertificatesSigningRequests []string `json:"certificates_signing_requests"` + CertificatesSigningRequests []string `json:"certificates-signing-requests"` } type RefreshCertificatesRunRequest struct { Seed int `json:"seed"` - ExpirationSeconds int `json:"expiration_seconds"` + ExpirationSeconds int `json:"expiration-seconds"` } type RefreshCertificatesRunResponse struct { - ExpirationSeconds int `json:"expiration_seconds"` + ExpirationSeconds int `json:"expiration-seconds"` } diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index e7fad6245..ed9d5aebc 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -9,18 +9,20 @@ import ( ) // WatchCertificateSigningRequest watches a CertificateSigningRequest with the -// given name and calls the verify function on each event. +// given name and calls a verify function on each event. +// WatchCertificateSigningRequest will continue watching the CSR until the +// verify function returns true or an non-retriable error occurs. +// WatchCertificateSigningRequest will return true and a wrapped error if the +// error is retriable. +// WatchCertificateSigningRequest will return false and a wrapped error if the +// error is not retriable. // // The verify function should return true if the CSR is valid and processing -// should stop, or false if watching should continue. It should return an error -// if the CSR is in an invalid state (e.g., failed or denied) or the issued -// certificate is invalid. -// -// The function returns a bool indicating if the Watch should be retried and an -// error if an error occurred while watching the CSR or during verification. -// -// The function will continue watching and verifying until one of the above -// conditions is met or an error occurs during verification. +// should stop. +// The verify function should return false if the CSR is not yet valid and +// processing should continue. +// The verify function should return an error if the CSR is in an invalid state +// (e.g., failed or denied) or the issued certificate is invalid. func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) if err != nil { @@ -41,8 +43,7 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string return true, fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) } - valid, err := verify(csr) - if err != nil { + if valid, err := verify(csr); err != nil { return false, fmt.Errorf("failed to verify CSR %s: %w", name, err) } else if valid { return false, nil diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index 87d69a4a8..18704591e 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -43,8 +43,7 @@ func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) respons fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed), fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed), }, - }, - ) + }) } return response.SyncResponse(true, apiv1.RefreshCertificatesPlanResponse{ @@ -80,7 +79,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return response.InternalError(fmt.Errorf("failed to get Kubernetes client: %w", err)) } - certificates := pki.WorkerNodePKI{} + var certificates pki.WorkerNodePKI clusterConfig, err := databaseutil.GetClusterConfig(r.Context(), s) if err != nil { @@ -91,8 +90,8 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return response.InternalError(fmt.Errorf("missing CA certificates")) } - certificates.CACert = *clusterConfig.Certificates.CACert - certificates.ClientCACert = *clusterConfig.Certificates.ClientCACert + certificates.CACert = clusterConfig.Certificates.GetCACert() + certificates.ClientCACert = clusterConfig.Certificates.GetClientCACert() g, ctx := errgroup.WithContext(r.Context()) @@ -165,24 +164,20 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } for { - retry, err := client.WatchCertificateSigningRequest( + if retry, err := client.WatchCertificateSigningRequest( ctx, csr.name, func(request *certificatesv1.CertificateSigningRequest) (bool, error) { return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key) }, - ) - - if err == nil { + ); err == nil { return nil - } - - // Check if error is non-recoverable - if retry == false { + } else if !retry { + log.Error(err, "Failed to watch CSR") return fmt.Errorf("certificate signing request failed: %w", err) } - log.Error(err, "Failed to watch CSR") + log.V(1).Info("Retrying to watch CSR") select { case <-ctx.Done(): diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index cc6f835bb..b1206b750 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -51,19 +51,19 @@ func NewSnap(opts SnapOpts) *snap { // StartService starts a k8s service. The name can be either prefixed or not. func (s *snap) StartService(ctx context.Context, name string) error { - log.FromContext(ctx).WithCallDepth(1).Info("Starting service", "service", name) + log.FromContext(ctx).V(1).WithCallDepth(1).Info("Starting service", "service", name) return s.runCommand(ctx, []string{"snapctl", "start", "--enable", serviceName(name)}) } // StopService stops a k8s service. The name can be either prefixed or not. func (s *snap) StopService(ctx context.Context, name string) error { - log.FromContext(ctx).WithCallDepth(1).Info("Stopping service", "service", name) + log.FromContext(ctx).V(1).WithCallDepth(1).Info("Stopping service", "service", name) return s.runCommand(ctx, []string{"snapctl", "stop", "--disable", serviceName(name)}) } // RestartService restarts a k8s service. The name can be either prefixed or not. func (s *snap) RestartService(ctx context.Context, name string) error { - log.FromContext(ctx).WithCallDepth(1).Info("Restarting service", "service", name) + log.FromContext(ctx).V(1).WithCallDepth(1).Info("Restarting service", "service", name) return s.runCommand(ctx, []string{"snapctl", "restart", serviceName(name)}) } From f904e7a8ef6f65c3b34a62c0f6d43c14709571c9 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Thu, 8 Aug 2024 12:43:06 -0500 Subject: [PATCH 16/21] Offload retry logic into Watch function --- .../kubernetes/certificate_signing_request.go | 64 +++++++++++-------- src/k8s/pkg/k8sd/api/certificates_refresh.go | 33 ++++------ 2 files changed, 50 insertions(+), 47 deletions(-) diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index ed9d5aebc..9b0ed175c 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -3,7 +3,9 @@ package kubernetes import ( "context" "fmt" + "time" + "github.com/canonical/k8s/pkg/log" certificatesv1 "k8s.io/api/certificates/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -12,10 +14,6 @@ import ( // given name and calls a verify function on each event. // WatchCertificateSigningRequest will continue watching the CSR until the // verify function returns true or an non-retriable error occurs. -// WatchCertificateSigningRequest will return true and a wrapped error if the -// error is retriable. -// WatchCertificateSigningRequest will return false and a wrapped error if the -// error is not retriable. // // The verify function should return true if the CSR is valid and processing // should stop. @@ -23,32 +21,48 @@ import ( // processing should continue. // The verify function should return an error if the CSR is in an invalid state // (e.g., failed or denied) or the issued certificate is invalid. -func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { - w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) - if err != nil { - return true, fmt.Errorf("failed to watch CSR %s: %w", name, err) - } - defer w.Stop() +func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) error { + log := log.FromContext(ctx) for { - select { - case <-ctx.Done(): - return false, ctx.Err() - case evt, ok := <-w.ResultChan(): - if !ok { - return true, fmt.Errorf("watch closed") - } + w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) + if err != nil { + log.V(1).Info("Failed to watch CSR", "error", err) + continue + } + defer w.Stop() + watchClosed := false + for !watchClosed { + select { + case <-ctx.Done(): + return ctx.Err() + case evt, ok := <-w.ResultChan(): + if !ok { + log.V(1).Info("Watch closed") + watchClosed = true + continue + } - csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) - if !ok { - return true, fmt.Errorf("expected a CertificateSigningRequest but received %#v", evt.Object) - } + csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) + if !ok { + log.V(1).Info("Expected a CertificateSigningRequest but received something else", "object", evt.Object) + watchClosed = true + continue + } + + if valid, err := verify(csr); err != nil { + return fmt.Errorf("failed to verify CSR %s: %w", name, err) + } else if valid { + return nil + } - if valid, err := verify(csr); err != nil { - return false, fmt.Errorf("failed to verify CSR %s: %w", name, err) - } else if valid { - return false, nil } + } + log.V(1).Info("Retrying to watch CSR") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(3 * time.Second): } } } diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index 18704591e..d6d93e0ca 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -8,7 +8,6 @@ import ( "net" "net/http" "path/filepath" - "time" apiv1 "github.com/canonical/k8s/api/v1" databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" @@ -163,29 +162,19 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err) } - for { - if retry, err := client.WatchCertificateSigningRequest( - ctx, - csr.name, - func(request *certificatesv1.CertificateSigningRequest) (bool, error) { - return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key) - }, - ); err == nil { - return nil - } else if !retry { - log.Error(err, "Failed to watch CSR") - return fmt.Errorf("certificate signing request failed: %w", err) - } - - log.V(1).Info("Retrying to watch CSR") - - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(3 * time.Second): - } + if err := client.WatchCertificateSigningRequest( + ctx, + csr.name, + func(request *certificatesv1.CertificateSigningRequest) (bool, error) { + return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key) + }, + ); err != nil { + log.Error(err, "Failed to watch CSR") + return fmt.Errorf("certificate signing request failed: %w", err) } + return nil + }) } From 12b5c7b1b32676033b2a81a4e601b9a09a3e1844 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Fri, 9 Aug 2024 11:51:13 -0500 Subject: [PATCH 17/21] Move watcher stop after inner loop --- src/k8s/pkg/client/kubernetes/certificate_signing_request.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index 9b0ed175c..c84696963 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -29,7 +29,6 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string log.V(1).Info("Failed to watch CSR", "error", err) continue } - defer w.Stop() watchClosed := false for !watchClosed { select { @@ -57,6 +56,8 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string } } + + w.Stop() log.V(1).Info("Retrying to watch CSR") select { From 26ebe1fe270a17236c849e20203d245660d41635 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Fri, 9 Aug 2024 12:25:06 -0500 Subject: [PATCH 18/21] Offload inner loop to internal function --- .../kubernetes/certificate_signing_request.go | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index c84696963..9f98b4b7f 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -8,6 +8,7 @@ import ( "github.com/canonical/k8s/pkg/log" certificatesv1 "k8s.io/api/certificates/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" ) // WatchCertificateSigningRequest watches a CertificateSigningRequest with the @@ -29,32 +30,11 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string log.V(1).Info("Failed to watch CSR", "error", err) continue } - watchClosed := false - for !watchClosed { - select { - case <-ctx.Done(): - return ctx.Err() - case evt, ok := <-w.ResultChan(): - if !ok { - log.V(1).Info("Watch closed") - watchClosed = true - continue - } - csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) - if !ok { - log.V(1).Info("Expected a CertificateSigningRequest but received something else", "object", evt.Object) - watchClosed = true - continue - } - - if valid, err := verify(csr); err != nil { - return fmt.Errorf("failed to verify CSR %s: %w", name, err) - } else if valid { - return nil - } - - } + if retry, err := c.watchCertificateSigningRequestEvents(ctx, w, name, verify); err != nil { + return fmt.Errorf("failed to watch CSR %s: %w", name, err) + } else if !retry { + return nil } w.Stop() @@ -67,3 +47,33 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string } } } + +func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, w watch.Interface, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { + log := log.FromContext(ctx) + for { + select { + case <-ctx.Done(): + return false, ctx.Err() + case evt, ok := <-w.ResultChan(): + if !ok { + log.V(1).Info("Watch closed") + // Retry + return true, nil + } + + csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) + if !ok { + log.V(1).Info("Expected a CertificateSigningRequest but received something else", "object", evt.Object) + // Retry + return true, nil + } + + if valid, err := verify(csr); err != nil { + // Stop watching and return the error + return false, fmt.Errorf("failed to verify CSR %s: %w", name, err) + } else if valid { + return false, nil + } + } + } +} From b7405910251cb5f5231a260ae674f71b7f486682 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Fri, 9 Aug 2024 13:18:58 -0500 Subject: [PATCH 19/21] Fix imports --- src/k8s/go.mod | 1 - src/k8s/pkg/k8sd/api/certificates_refresh.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/k8s/go.mod b/src/k8s/go.mod index 6575aa8c2..b407065ce 100644 --- a/src/k8s/go.mod +++ b/src/k8s/go.mod @@ -15,7 +15,6 @@ require ( golang.org/x/net v0.27.0 golang.org/x/sync v0.7.0 golang.org/x/sys v0.22.0 - golang.org/x/sync v0.7.0 // indirect gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.15.3 k8s.io/api v0.30.1 diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index d6d93e0ca..9e6ab3573 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -19,7 +19,7 @@ import ( "github.com/canonical/k8s/pkg/utils" pkiutil "github.com/canonical/k8s/pkg/utils/pki" "github.com/canonical/lxd/lxd/response" - "github.com/canonical/microcluster/state" + "github.com/canonical/microcluster/v2/state" "golang.org/x/sync/errgroup" certificatesv1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" From b11a3788c02e9b5bdb906262a8083f6eb784dd53 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Sun, 11 Aug 2024 17:39:52 -0500 Subject: [PATCH 20/21] Address code review --- .../kubernetes/certificate_signing_request.go | 23 ++++++++----------- src/k8s/pkg/utils/pki/generate.go | 4 ++-- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index 9f98b4b7f..5416d093f 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -8,7 +8,6 @@ import ( "github.com/canonical/k8s/pkg/log" certificatesv1 "k8s.io/api/certificates/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" ) // WatchCertificateSigningRequest watches a CertificateSigningRequest with the @@ -23,23 +22,13 @@ import ( // The verify function should return an error if the CSR is in an invalid state // (e.g., failed or denied) or the issued certificate is invalid. func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) error { - log := log.FromContext(ctx) for { - w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) - if err != nil { - log.V(1).Info("Failed to watch CSR", "error", err) - continue - } - - if retry, err := c.watchCertificateSigningRequestEvents(ctx, w, name, verify); err != nil { + if retry, err := c.watchCertificateSigningRequestEvents(ctx, name, verify); err != nil { return fmt.Errorf("failed to watch CSR %s: %w", name, err) } else if !retry { return nil } - w.Stop() - log.V(1).Info("Retrying to watch CSR") - select { case <-ctx.Done(): return ctx.Err() @@ -48,8 +37,16 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string } } -func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, w watch.Interface, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { +func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { log := log.FromContext(ctx) + w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) + if err != nil { + log.Error(err, "Failed to watch CSR") + return true, nil + } + + defer w.Stop() + for { select { case <-ctx.Done(): diff --git a/src/k8s/pkg/utils/pki/generate.go b/src/k8s/pkg/utils/pki/generate.go index 18eee9da9..d3c1396bd 100644 --- a/src/k8s/pkg/utils/pki/generate.go +++ b/src/k8s/pkg/utils/pki/generate.go @@ -134,13 +134,13 @@ func GenerateCSR(subject pkix.Name, bits int, dnsSANs []string, ipSANs []net.IP) return "", "", fmt.Errorf("failed to encode private key PEM") } - csrKubeletServingTemplate := &x509.CertificateRequest{ + csrTemplate := &x509.CertificateRequest{ Subject: subject, DNSNames: dnsSANs, IPAddresses: ipSANs, } - csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csrKubeletServingTemplate, key) + csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csrTemplate, key) if err != nil { return "", "", fmt.Errorf("failed to create certificate request: %w", err) } From b11cb749bd8075b36f7ec20f89fafc9bd558d831 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Mon, 12 Aug 2024 07:26:20 -0500 Subject: [PATCH 21/21] Change import alias to `certv1` --- .../kubernetes/certificate_signing_request.go | 8 +++--- src/k8s/pkg/k8sd/api/certificates_refresh.go | 26 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go index 5416d093f..9a533e62e 100644 --- a/src/k8s/pkg/client/kubernetes/certificate_signing_request.go +++ b/src/k8s/pkg/client/kubernetes/certificate_signing_request.go @@ -6,7 +6,7 @@ import ( "time" "github.com/canonical/k8s/pkg/log" - certificatesv1 "k8s.io/api/certificates/v1" + certv1 "k8s.io/api/certificates/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -21,7 +21,7 @@ import ( // processing should continue. // The verify function should return an error if the CSR is in an invalid state // (e.g., failed or denied) or the issued certificate is invalid. -func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) error { +func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certv1.CertificateSigningRequest) (bool, error)) error { for { if retry, err := c.watchCertificateSigningRequestEvents(ctx, name, verify); err != nil { return fmt.Errorf("failed to watch CSR %s: %w", name, err) @@ -37,7 +37,7 @@ func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string } } -func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) { +func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, name string, verify func(csr *certv1.CertificateSigningRequest) (bool, error)) (bool, error) { log := log.FromContext(ctx) w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) if err != nil { @@ -58,7 +58,7 @@ func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, name return true, nil } - csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest) + csr, ok := evt.Object.(*certv1.CertificateSigningRequest) if !ok { log.V(1).Info("Expected a CertificateSigningRequest but received something else", "object", evt.Object) // Retry diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index 9e6ab3573..3313cac46 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -21,7 +21,7 @@ import ( "github.com/canonical/lxd/lxd/response" "github.com/canonical/microcluster/v2/state" "golang.org/x/sync/errgroup" - certificatesv1 "k8s.io/api/certificates/v1" + certv1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -98,7 +98,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo name string commonName string organization []string - usages []certificatesv1.KeyUsage + usages []certv1.KeyUsage hostnames []string ips []net.IP signerName string @@ -109,7 +109,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo name: fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), commonName: fmt.Sprintf("system:node:%s", snap.Hostname()), organization: []string{"system:nodes"}, - usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageServerAuth}, + usages: []certv1.KeyUsage{certv1.UsageDigitalSignature, certv1.UsageKeyEncipherment, certv1.UsageServerAuth}, hostnames: []string{snap.Hostname()}, ips: []net.IP{net.ParseIP(s.Address().Hostname())}, signerName: "k8sd.io/kubelet-serving", @@ -120,7 +120,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo name: fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), commonName: fmt.Sprintf("system:node:%s", snap.Hostname()), organization: []string{"system:nodes"}, - usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageClientAuth}, + usages: []certv1.KeyUsage{certv1.UsageDigitalSignature, certv1.UsageKeyEncipherment, certv1.UsageClientAuth}, signerName: "k8sd.io/kubelet-client", certificate: &certificates.KubeletClientCert, key: &certificates.KubeletClientKey, @@ -128,7 +128,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo { name: fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), commonName: "system:kube-proxy", - usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageClientAuth}, + usages: []certv1.KeyUsage{certv1.UsageDigitalSignature, certv1.UsageKeyEncipherment, certv1.UsageClientAuth}, signerName: "k8sd.io/kube-proxy-client", certificate: &certificates.KubeProxyClientCert, key: &certificates.KubeProxyClientKey, @@ -149,11 +149,11 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return fmt.Errorf("failed to generate CSR for %s: %w", csr.name, err) } - if _, err = client.CertificatesV1().CertificateSigningRequests().Create(ctx, &certificatesv1.CertificateSigningRequest{ + if _, err = client.CertificatesV1().CertificateSigningRequests().Create(ctx, &certv1.CertificateSigningRequest{ ObjectMeta: metav1.ObjectMeta{ Name: csr.name, }, - Spec: certificatesv1.CertificateSigningRequestSpec{ + Spec: certv1.CertificateSigningRequestSpec{ Request: []byte(csrPEM), Usages: csr.usages, SignerName: csr.signerName, @@ -165,7 +165,7 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo if err := client.WatchCertificateSigningRequest( ctx, csr.name, - func(request *certificatesv1.CertificateSigningRequest) (bool, error) { + func(request *certv1.CertificateSigningRequest) (bool, error) { return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key) }, ); err != nil { @@ -219,16 +219,16 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo // signing request is approved and issued. It returns true if the CSR is // approved and issued, false if it is pending, and an error if it is denied // or failed. -func isCertificateSigningRequestApprovedAndIssued(csr *certificatesv1.CertificateSigningRequest) (bool, error) { +func isCertificateSigningRequestApprovedAndIssued(csr *certv1.CertificateSigningRequest) (bool, error) { for _, condition := range csr.Status.Conditions { - if condition.Type == certificatesv1.CertificateApproved && condition.Status == corev1.ConditionTrue { + if condition.Type == certv1.CertificateApproved && condition.Status == corev1.ConditionTrue { return len(csr.Status.Certificate) > 0, nil } - if condition.Type == certificatesv1.CertificateDenied && condition.Status == corev1.ConditionTrue { + if condition.Type == certv1.CertificateDenied && condition.Status == corev1.ConditionTrue { return false, fmt.Errorf(":CSR %s was denied: %s", csr.Name, condition.Reason) } - if condition.Type == certificatesv1.CertificateFailed && condition.Status == corev1.ConditionTrue { + if condition.Type == certv1.CertificateFailed && condition.Status == corev1.ConditionTrue { return false, fmt.Errorf("CSR %s failed: %s", csr.Name, condition.Reason) } } @@ -237,7 +237,7 @@ func isCertificateSigningRequestApprovedAndIssued(csr *certificatesv1.Certificat // verifyCSRAndSetPKI verifies the certificate signing request and sets the // certificate and key if the CSR is approved. -func verifyCSRAndSetPKI(csr *certificatesv1.CertificateSigningRequest, keyPEM string, certificate, key *string) (bool, error) { +func verifyCSRAndSetPKI(csr *certv1.CertificateSigningRequest, keyPEM string, certificate, key *string) (bool, error) { approved, err := isCertificateSigningRequestApprovedAndIssued(csr) if err != nil { return false, fmt.Errorf("failed to validate csr: %w", err)