From dfd7ee6e7c88a06ce6cf375ece2a259193d6565b Mon Sep 17 00:00:00 2001 From: Manuel Buil Date: Fri, 12 Jan 2024 18:25:21 +0100 Subject: [PATCH] Create a common CNI interface and config struct Signed-off-by: Manuel Buil --- pkg/pebinaryexecutor/pebinary.go | 55 +++++------- pkg/rke2/rke2_windows.go | 2 +- pkg/windows/calico.go | 138 +++++++++++++++++++------------ pkg/windows/types.go | 65 ++++++++------- 4 files changed, 142 insertions(+), 118 deletions(-) diff --git a/pkg/pebinaryexecutor/pebinary.go b/pkg/pebinaryexecutor/pebinary.go index 4665954b98..3e0da37c2d 100644 --- a/pkg/pebinaryexecutor/pebinary.go +++ b/pkg/pebinaryexecutor/pebinary.go @@ -14,7 +14,6 @@ import ( "strings" "time" - "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" "github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io" "github.com/k3s-io/k3s/pkg/cli/cmds" @@ -53,8 +52,8 @@ type PEBinaryConfig struct { KubeConfigKubeProxy string DisableETCD bool IsServer bool - CNI string - CNIConfig win.Calico + CNIName string + CNIPlugin win.CNIPlugin } type CloudProviderConfig struct { @@ -105,20 +104,21 @@ func (p *PEBinaryConfig) Bootstrap(ctx context.Context, nodeConfig *daemonconfig restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController) - p.CNI, err = getCniType(restConfig) + p.CNIName, err = getCNIPluginName(restConfig) if err != nil { return err } - switch p.CNI { + switch p.CNIName { case "", CNICalico: - if err := p.CNIConfig.Setup(ctx, nodeConfig, restConfig, p.DataDir); err != nil { + p.CNIPlugin = &win.Calico{} + if err := p.CNIPlugin.Setup(ctx, nodeConfig, restConfig, p.DataDir); err != nil { return err } case CNINone: logrus.Info("Skipping CNI setup") default: - logrus.Fatal("Unsupported CNI: ", p.CNI) + logrus.Fatal("Unsupported CNI: ", p.CNIName) } // required to initialize KubeProxy @@ -158,9 +158,9 @@ func (p *PEBinaryConfig) Kubelet(ctx context.Context, args []string) error { go func() { for { cniCtx, cancel := context.WithCancel(ctx) - if p.CNI != CNINone { + if p.CNIName != CNINone { go func() { - if err := p.CNIConfig.Start(cniCtx); err != nil { + if err := p.CNIPlugin.Start(cniCtx); err != nil { logrus.Errorf("error in cni start: %s", err) } }() @@ -181,13 +181,22 @@ func (p *PEBinaryConfig) Kubelet(ctx context.Context, args []string) error { // KubeProxy starts the kubeproxy in a subprocess with watching goroutine. func (p *PEBinaryConfig) KubeProxy(ctx context.Context, args []string) error { - if p.CNI == CNINone { + if p.CNIName == CNINone { return nil } + CNIConfig := p.CNIPlugin.GetConfig() + vip, err := p.CNIPlugin.ReserveSourceVip(ctx) + if err != nil || vip == "" { + logrus.Errorf("Failed to reserve VIP for kube-proxy: %s", err) + } + logrus.Infof("Reserved VIP for kube-proxy: %s", vip) + + extraArgs := map[string]string{ - "network-name": p.CNIConfig.CNICfg.OverlayNetName, - "bind-address": p.CNIConfig.CNICfg.IP, + "network-name": CNIConfig.OverlayNetName, + "bind-address": CNIConfig.NodeIP, + "source-vip": vip, } if err := hcn.DSRSupported(); err == nil { @@ -196,26 +205,6 @@ func (p *PEBinaryConfig) KubeProxy(ctx context.Context, args []string) error { extraArgs["enable-dsr"] = "true" } - if p.CNIConfig.CNICfg.Name == "Calico" { - var vip string - for range time.Tick(time.Second * 5) { - endpoint, err := hcsshim.GetHNSEndpointByName("Calico_ep") - if err != nil { - logrus.WithError(err).Warning("can't find Calico_ep HNS endpoint, retrying") - continue - } - vip = endpoint.IPAddress.String() - break - } - extraArgs["source-vip"] = vip - } - - logrus.Infof("Deleting HNS policies before kube-proxy starts.") - policies, _ := hcsshim.HNSListPolicyListRequest() - for _, policy := range policies { - policy.Delete() - } - args = append(getArgs(extraArgs), args...) logrus.Infof("Running RKE2 kube-proxy %s", args) @@ -288,7 +277,7 @@ func getArgs(argsMap map[string]string) []string { return args } -func getCniType(restConfig *rest.Config) (string, error) { +func getCNIPluginName(restConfig *rest.Config) (string, error) { hc, err := helm.NewFactoryFromConfig(restConfig) if err != nil { return "", err diff --git a/pkg/rke2/rke2_windows.go b/pkg/rke2/rke2_windows.go index 5b9d095e6c..54cb196920 100644 --- a/pkg/rke2/rke2_windows.go +++ b/pkg/rke2/rke2_windows.go @@ -69,6 +69,6 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*pebinaryexecuto KubeletPath: cfg.KubeletPath, DisableETCD: clx.Bool("disable-etcd"), IsServer: isServer, - CNI: "", + CNIName: "", }, nil } diff --git a/pkg/windows/calico.go b/pkg/windows/calico.go index 5bc18372a8..579b9aa4a0 100644 --- a/pkg/windows/calico.go +++ b/pkg/windows/calico.go @@ -13,6 +13,7 @@ import ( "text/template" "time" + "github.com/Microsoft/hcsshim" "github.com/k3s-io/helm-controller/pkg/generated/controllers/helm.cattle.io" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/version" @@ -23,6 +24,7 @@ import ( authv1 "k8s.io/api/authentication/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/yaml" @@ -37,7 +39,7 @@ var ( }, } - calicoKubeConfigTemplate = template.Must(template.New("CalicoKubeconfig").Parse(`apiVersion: v1 + calicoKubeConfigTemplate = template.Must(template.New("Kubeconfig").Parse(`apiVersion: v1 kind: Config clusters: - name: kubernetes @@ -61,11 +63,11 @@ users: calicoConfigTemplate = template.Must(template.New("CalicoConfig").Funcs(replaceSlashWin).Parse(`{ "name": "{{ .Name }}", "windows_use_single_network": true, - "cniVersion": "{{ .CNI.Version }}", + "cniVersion": "{{ .CNIVersion }}", "type": "calico", - "mode": "{{ .Mode }}", - "vxlan_mac_prefix": "{{ .Felix.MacPrefix }}", - "vxlan_vni": {{ .Felix.Vxlanvni }}, + "mode": "{{ .OverlayEncap }}", + "vxlan_mac_prefix": "0E-2A", + "vxlan_vni": {{ .VxlanVNI }}, "policy": { "type": "k8s" }, @@ -89,7 +91,7 @@ users: "kubeconfig": "{{ replace .KubeConfig.Path }}" }, "ipam": { - "type": "{{ .CNI.IpamType }}", + "type": "{{ .IpamType }}", "subnet": "usePodCidr" }, "policies": [ @@ -107,7 +109,7 @@ users: "Value": { "Type": "SDNROUTE", "DestinationPrefix": "{{ .ServiceCIDR }}", - {{- if eq .Mode "vxlan" }} + {{- if eq .OverlayEncap "vxlan" }} "NeedEncap": true {{- else }} "NeedEncap": false @@ -120,8 +122,7 @@ users: ) type Calico struct { - CNICfg *CalicoConfig - DataDir string + CNICfg *CalicoConfig } const ( @@ -134,11 +135,13 @@ const ( calicoNode = "calico-node" ) +func (c *Calico) GetConfig() *CNICommonConfig { + return &c.CNICfg.CNICommonConfig +} + // Setup creates the basic configuration required by the CNI. func (c *Calico) Setup(ctx context.Context, nodeConfig *daemonconfig.Node, restConfig *rest.Config, dataDir string) error { - c.DataDir = dataDir - - if err := c.initializeConfig(ctx, nodeConfig, restConfig); err != nil { + if err := c.initializeConfig(ctx, nodeConfig, restConfig, dataDir); err != nil { return err } @@ -146,7 +149,7 @@ func (c *Calico) Setup(ctx context.Context, nodeConfig *daemonconfig.Node, restC return err } - if err := c.writeConfigFiles(nodeConfig.AgentConfig.CNIConfDir, nodeConfig.AgentConfig.NodeName); err != nil { + if err := c.writeConfigFiles(); err != nil { return err } @@ -155,37 +158,36 @@ func (c *Calico) Setup(ctx context.Context, nodeConfig *daemonconfig.Node, restC } // initializeConfig sets the default configuration in CNIConfig -func (c *Calico) initializeConfig(ctx context.Context, nodeConfig *daemonconfig.Node, restConfig *rest.Config) error { +func (c *Calico) initializeConfig(ctx context.Context, nodeConfig *daemonconfig.Node, restConfig *rest.Config, dataDir string) error { platformType, err := platformType() if err != nil { return err } c.CNICfg = &CalicoConfig{ - Name: "Calico", - OverlayNetName: "Calico", - Hostname: nodeConfig.AgentConfig.NodeName, - NodeNameFile: filepath.Join("c:\\", c.DataDir, "agent", CalicoNodeNameFileName), + CNICommonConfig: CNICommonConfig{ + Name: "Calico", + OverlayNetName: "Calico", + OverlayEncap: "vxlan", + Hostname: nodeConfig.AgentConfig.NodeName, + ConfigPath: filepath.Join("c:\\", dataDir, "agent"), + CNIConfDir: nodeConfig.AgentConfig.CNIConfDir, + CNIBinDir: nodeConfig.AgentConfig.CNIBinDir, + ClusterCIDR: nodeConfig.AgentConfig.ClusterCIDR.String(), + ServiceCIDR: nodeConfig.AgentConfig.ServiceCIDR.String(), + NodeIP: nodeConfig.AgentConfig.NodeIP, + VxlanVNI: "4096", + VxlanPort: "4789", + IpamType: "calico-ipam", + CNIVersion: "0.3.1", + }, + NodeNameFile: filepath.Join("c:\\", dataDir, "agent", CalicoNodeNameFileName), KubeNetwork: "Calico.*", - Mode: "vxlan", - ServiceCIDR: nodeConfig.AgentConfig.ServiceCIDR.String(), DNSServers: nodeConfig.AgentConfig.ClusterDNS.String(), DNSSearch: "svc." + nodeConfig.AgentConfig.ClusterDomain, DatastoreType: "kubernetes", Platform: platformType, - IP: nodeConfig.AgentConfig.NodeIP, IPAutoDetectionMethod: "first-found", - Felix: FelixConfig{ - Metadataaddr: "none", - Vxlanvni: "4096", - MacPrefix: "0E-2A", - }, - CNI: CalicoCNIConfig{ - BinDir: nodeConfig.AgentConfig.CNIBinDir, - ConfDir: nodeConfig.AgentConfig.CNIConfDir, - IpamType: "calico-ipam", - Version: "0.3.1", - }, } c.CNICfg.KubeConfig, err = c.createKubeConfig(ctx, restConfig) @@ -193,22 +195,24 @@ func (c *Calico) initializeConfig(ctx context.Context, nodeConfig *daemonconfig. return err } + logrus.Debugf("Calico Config: %+v", c.CNICfg) + return nil } // writeConfigFiles writes the three required files by Calico -func (c *Calico) writeConfigFiles(CNIConfDir string, NodeName string) error { +func (c *Calico) writeConfigFiles() error { // Create CalicoKubeConfig and CIPAutoDetectionMethodalicoConfig files if err := c.renderCalicoConfig(c.CNICfg.KubeConfig.Path, calicoKubeConfigTemplate); err != nil { return err } - if err := c.renderCalicoConfig(filepath.Join(CNIConfDir, CalicoConfigName), calicoConfigTemplate); err != nil { + if err := c.renderCalicoConfig(filepath.Join(c.CNICfg.CNIConfDir, CalicoConfigName), calicoConfigTemplate); err != nil { return err } - return os.WriteFile(filepath.Join("c:\\", c.DataDir, "agent", CalicoNodeNameFileName), []byte(NodeName), 0644) + return os.WriteFile(filepath.Join(c.CNICfg.ConfigPath, CalicoNodeNameFileName), []byte(c.CNICfg.Hostname), 0644) } // renderCalicoConfig creates the file and then renders the template using Calico Config parameters @@ -228,13 +232,13 @@ func (c *Calico) renderCalicoConfig(path string, toRender *template.Template) er } // createKubeConfig creates all needed for Calico to contact kube-api -func (c *Calico) createKubeConfig(ctx context.Context, restConfig *rest.Config) (*CalicoKubeConfig, error) { +func (c *Calico) createKubeConfig(ctx context.Context, restConfig *rest.Config) (*KubeConfig, error) { // Fill all information except for the token - calicoKubeConfig := CalicoKubeConfig{ + calicoKubeConfig := KubeConfig{ Server: "https://127.0.0.1:6443", - CertificateAuthority: filepath.Join("c:\\", c.DataDir, "agent", "server-ca.crt"), - Path: filepath.Join("c:\\", c.DataDir, "agent", CalicoKubeConfigName), + CertificateAuthority: filepath.Join(c.CNICfg.ConfigPath, "server-ca.crt"), + Path: filepath.Join(c.CNICfg.ConfigPath, CalicoKubeConfigName), } // Generate the token request @@ -263,7 +267,7 @@ func (c *Calico) createKubeConfig(ctx context.Context, restConfig *rest.Config) // Start starts the CNI services on the Windows node. func (c *Calico) Start(ctx context.Context) error { - logPath := filepath.Join(c.DataDir, "agent", "logs") + logPath := filepath.Join(c.CNICfg.ConfigPath, "logs") for { if err := startCalico(ctx, c.CNICfg, logPath); err != nil { time.Sleep(5 * time.Second) @@ -273,10 +277,18 @@ func (c *Calico) Start(ctx context.Context) error { break } go startFelix(ctx, c.CNICfg, logPath) - if c.CNICfg.Mode == "windows-bgp" { + if c.CNICfg.OverlayEncap == "windows-bgp" { go startConfd(ctx, c.CNICfg, logPath) } + // Delete policies in case calico network is being reused + policies, _ := hcsshim.HNSListPolicyListRequest() + for _, policy := range policies { + policy.Delete() + } + + logrus.Info("Calico started correctly") + return nil } @@ -297,8 +309,8 @@ func (c *Calico) generateCalicoNetworks() error { networkAdapter = c.CNICfg.Interface } - if c.CNICfg.Interface == "" && c.CNICfg.IP != "" { - iFace, err := findInterface(c.CNICfg.IP) + if c.CNICfg.Interface == "" && c.CNICfg.NodeIP != "" { + iFace, err := findInterface(c.CNICfg.NodeIP) if err != nil { return err } @@ -306,7 +318,7 @@ func (c *Calico) generateCalicoNetworks() error { } } - mgmt, err := createHnsNetwork(c.CNICfg.Mode, networkAdapter) + mgmt, err := createHnsNetwork(c.CNICfg.OverlayEncap, networkAdapter) if err != nil { return err } @@ -356,7 +368,7 @@ func (c *Calico) overrideCalicoConfigByHelm(restConfig *rest.Config) error { } if bgpEnabled := overrides.Installation.CalicoNetwork.BGP; bgpEnabled != nil { if *bgpEnabled == opv1.BGPEnabled { - c.CNICfg.Mode = "windows-bgp" + c.CNICfg.OverlayEncap = "windows-bgp" } } return nil @@ -399,7 +411,7 @@ func startConfd(ctx context.Context, config *CalicoConfig, logPath string) { args := []string{ "-confd", - fmt.Sprintf("-confd-confdir=%s", filepath.Join(config.CNI.BinDir, "confd")), + fmt.Sprintf("-confd-confdir=%s", filepath.Join(config.CNIBinDir, "confd")), } logrus.Infof("Confd Envs: %s", append(generateGeneralCalicoEnvs(config), specificEnvs...)) @@ -407,7 +419,7 @@ func startConfd(ctx context.Context, config *CalicoConfig, logPath string) { cmd.Env = append(generateGeneralCalicoEnvs(config), specificEnvs...) cmd.Stdout = outputFile cmd.Stderr = outputFile - _ = os.Chdir(filepath.Join(config.CNI.BinDir, "confd")) + _ = os.Chdir(filepath.Join(config.CNIBinDir, "confd")) _ = cmd.Run() logrus.Error("Confd exited") } @@ -417,7 +429,7 @@ func startFelix(ctx context.Context, config *CalicoConfig, logPath string) { specificEnvs := []string{ fmt.Sprintf("FELIX_FELIXHOSTNAME=%s", config.Hostname), - fmt.Sprintf("FELIX_VXLANVNI=%s", config.Felix.Vxlanvni), + fmt.Sprintf("FELIX_VXLANVNI=%s", config.VxlanVNI), fmt.Sprintf("FELIX_DATASTORETYPE=%s", config.DatastoreType), } @@ -446,10 +458,10 @@ func startCalico(ctx context.Context, config *CalicoConfig, logPath string) erro specificEnvs := []string{ fmt.Sprintf("CALICO_NODENAME_FILE=%s", config.NodeNameFile), - fmt.Sprintf("CALICO_NETWORKING_BACKEND=%s", config.Mode), + fmt.Sprintf("CALICO_NETWORKING_BACKEND=%s", config.OverlayEncap), fmt.Sprintf("CALICO_DATASTORE_TYPE=%s", config.DatastoreType), fmt.Sprintf("IP_AUTODETECTION_METHOD=%s", config.IPAutoDetectionMethod), - fmt.Sprintf("VXLAN_VNI=%s", config.Felix.Vxlanvni), + fmt.Sprintf("VXLAN_VNI=%s", config.VxlanVNI), } // Add OS variables related to Calico. As they come after, they'll overwrite the previous ones @@ -476,11 +488,31 @@ func startCalico(ctx context.Context, config *CalicoConfig, logPath string) erro func generateGeneralCalicoEnvs(config *CalicoConfig) []string { return []string{ fmt.Sprintf("KUBE_NETWORK=%s", config.KubeNetwork), - fmt.Sprintf("KUBECONFIG=%s", config.KubeConfig.Path), + fmt.Sprintf("KUBECONFIG=%s", filepath.Join(config.ConfigPath, CalicoKubeConfigName)), fmt.Sprintf("NODENAME=%s", config.Hostname), fmt.Sprintf("CALICO_K8S_NODE_REF=%s", config.Hostname), - fmt.Sprintf("IP=%s", config.IP), - fmt.Sprintf("USE_POD_CIDR=%t", autoConfigureIpam(config.CNI.IpamType)), + fmt.Sprintf("IP=%s", config.NodeIP), + fmt.Sprintf("USE_POD_CIDR=%t", autoConfigureIpam(config.IpamType)), + } +} + +// ReserveSourceVip reserves a source VIP for kube-proxy +func (c *Calico) ReserveSourceVip(ctx context.Context) (string, error) { + var vip string + + if err := wait.PollImmediateWithContext(ctx, 5*time.Second, 5*time.Minute, func(ctx context.Context) (bool, error) { + // calico-node is creating an endpoint named Calico_ep for this purpose + endpoint, err := hcsshim.GetHNSEndpointByName("Calico_ep") + if err != nil { + logrus.WithError(err).Warning("can't find Calico_ep HNS endpoint, retrying") + return false, nil + } + vip = endpoint.IPAddress.String() + return true, nil + }); err != nil { + return "", err } + + return vip, nil } diff --git a/pkg/windows/types.go b/pkg/windows/types.go index ce1b170b6d..c2380b2ccb 100644 --- a/pkg/windows/types.go +++ b/pkg/windows/types.go @@ -4,56 +4,59 @@ package windows import ( + "context" + + daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" opv1 "github.com/tigera/operator/api/v1" + "k8s.io/client-go/rest" ) -type FelixConfig struct { - Metadataaddr string - Vxlanvni string - MacPrefix string - LogSeverityFile string - LogSeveritySys string +type CNIPlugin interface { + Setup(ctx context.Context, nodeConfig *daemonconfig.Node, restConfig *rest.Config, dataDir string) error + Start(ctx context.Context) error + GetConfig() *CNICommonConfig + ReserveSourceVip(ctx context.Context) (string, error) +} + +type KubeConfig struct { + CertificateAuthority string + Server string + Token string + Path string } -type CalicoCNIConfig struct { - BinDir string - ConfDir string - IpamType string - ConfFileName string - Version string +type CNICommonConfig struct { + Name string + OverlayNetName string + OverlayEncap string + Hostname string + ConfigPath string + CNIConfDir string + CNIBinDir string + ClusterCIDR string + ServiceCIDR string + NodeIP string + VxlanVNI string + VxlanPort string + Interface string + IpamType string + CNIVersion string + KubeConfig *KubeConfig } type CalicoConfig struct { - Name string - OverlayNetName string - Mode string - Hostname string + CNICommonConfig // embedded struct KubeNetwork string - ServiceCIDR string DNSServers string DNSSearch string DatastoreType string NodeNameFile string Platform string - StartUpValidIPTimeout int - IP string IPAutoDetectionMethod string - LogDir string - Felix FelixConfig - CNI CalicoCNIConfig ETCDEndpoints string ETCDKeyFile string ETCDCertFile string ETCDCaCertFile string - KubeConfig *CalicoKubeConfig - Interface string -} - -type CalicoKubeConfig struct { - CertificateAuthority string - Server string - Token string - Path string } // Stub of Calico configuration used to extract user-provided overrides