diff --git a/cmd/cl-controlplane/app/server.go b/cmd/cl-controlplane/app/server.go index 59711b2c9..7faa04a10 100644 --- a/cmd/cl-controlplane/app/server.go +++ b/cmd/cl-controlplane/app/server.go @@ -15,9 +15,12 @@ package app import ( "context" + "errors" "fmt" + "net/http" "os" "path" + "syscall" "github.com/bombsimon/logrusr/v4" "github.com/sirupsen/logrus" @@ -39,9 +42,10 @@ import ( "github.com/clusterlink-net/clusterlink/pkg/controlplane/xds" "github.com/clusterlink-net/clusterlink/pkg/util/controller" "github.com/clusterlink-net/clusterlink/pkg/util/grpc" + utilhttp "github.com/clusterlink-net/clusterlink/pkg/util/http" "github.com/clusterlink-net/clusterlink/pkg/util/log" "github.com/clusterlink-net/clusterlink/pkg/util/runnable" - "github.com/clusterlink-net/clusterlink/pkg/util/tls" + utiltls "github.com/clusterlink-net/clusterlink/pkg/util/tls" "github.com/clusterlink-net/clusterlink/pkg/versioninfo" ) @@ -126,7 +130,7 @@ func (o *Options) Run() error { } logrus.Infof("ClusterLink namespace: %s", namespace) - controlplaneCertData, _, err := tls.ParseFiles(CAFile, CertificateFile, KeyFile) + controlplaneCertData, _, err := utiltls.ParseFiles(CAFile, CertificateFile, KeyFile) if err != nil { return err } @@ -219,11 +223,24 @@ func (o *Options) Run() error { return err } + readinessListenAddress := fmt.Sprintf("0.0.0.0:%d", api.ReadinessListenPort) + httpServer := utilhttp.NewServer("controlplane-http", nil) + httpServer.Router().Get("/", func(w http.ResponseWriter, r *http.Request) { + _, err = http.Get(fmt.Sprintf("https://127.0.0.1:%d", api.ListenPort)) + if errors.Is(err, syscall.ECONNREFUSED) || + errors.Is(err, syscall.ECONNRESET) || + !authzManager.IsReady() || + !mgr.GetCache().WaitForCacheSync(r.Context()) { + w.WriteHeader(http.StatusServiceUnavailable) + } + }) + runnableManager := runnable.NewManager() runnableManager.Add(peerCertsWatcher) runnableManager.Add(controller.NewManager(mgr)) runnableManager.Add(controlManager) runnableManager.AddServer(controlplaneServerListenAddress, grpcServer) + runnableManager.AddServer(readinessListenAddress, httpServer) return runnableManager.Run() } diff --git a/cmd/cl-dataplane/app/envoy.go b/cmd/cl-dataplane/app/envoy.go index b897c7a8e..5d26a99bb 100644 --- a/cmd/cl-dataplane/app/envoy.go +++ b/cmd/cl-dataplane/app/envoy.go @@ -26,12 +26,16 @@ import ( const ( envoyPath = "/usr/local/bin/envoy" + + adminPort = 1500 ) func (o *Options) runEnvoy(dataplaneID string) error { envoyConfArgs := map[string]interface{}{ "dataplaneID": dataplaneID, + "adminPort": adminPort, + "controlplaneHost": o.ControlplaneHost, "controlplanePort": cpapi.ListenPort, diff --git a/cmd/cl-dataplane/app/envoyconf.go b/cmd/cl-dataplane/app/envoyconf.go index 1920c847c..ec29b8067 100644 --- a/cmd/cl-dataplane/app/envoyconf.go +++ b/cmd/cl-dataplane/app/envoyconf.go @@ -22,7 +22,7 @@ admin: address: socket_address: address: 127.0.0.1 - port_value: 1500 + port_value: {{.adminPort}} bootstrap_extensions: - name: envoy.bootstrap.internal_listener typed_config: diff --git a/cmd/cl-dataplane/app/server.go b/cmd/cl-dataplane/app/server.go index 641e25362..0724827b6 100644 --- a/cmd/cl-dataplane/app/server.go +++ b/cmd/cl-dataplane/app/server.go @@ -15,6 +15,7 @@ package app import ( "fmt" + "net/http" "os" "github.com/google/uuid" @@ -22,6 +23,8 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" + utilhttp "github.com/clusterlink-net/clusterlink/pkg/util/http" "github.com/clusterlink-net/clusterlink/pkg/util/log" ) @@ -36,8 +39,6 @@ const ( // KeyFile is the path to the private-key file. KeyFile = "/etc/ssl/key/clink-dataplane.pem" - // Name is the app label of dataplane pods. - Name = "cl-dataplane" // IngressSvcName is the ingress service name for the dataplane pods. IngressSvcName = "clusterlink" ) @@ -85,6 +86,22 @@ func (o *Options) Run() error { dataplaneID := uuid.New().String() logrus.Infof("Dataplane ID: %s.", dataplaneID) + readinessListenAddress := fmt.Sprintf("0.0.0.0:%d", api.ReadinessListenPort) + httpServer := utilhttp.NewServer("go-dataplane-readiness-http", nil) + if err := httpServer.Listen(readinessListenAddress); err != nil { + return fmt.Errorf("cannot listen for readiness: %w", err) + } + httpServer.Router().Get("/", func(w http.ResponseWriter, r *http.Request) { + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/ready", adminPort)) + if err != nil || resp.StatusCode != http.StatusOK { + w.WriteHeader(http.StatusServiceUnavailable) + } + }) + go func() { + err := httpServer.Start() + logrus.Errorf("Failed to start readiness server: %v.", err) + }() + return o.runEnvoy(dataplaneID) } diff --git a/cmd/cl-go-dataplane/app/server.go b/cmd/cl-go-dataplane/app/server.go index 7954fa828..42d8be5ef 100644 --- a/cmd/cl-go-dataplane/app/server.go +++ b/cmd/cl-go-dataplane/app/server.go @@ -16,6 +16,7 @@ package app import ( "fmt" "net" + "net/http" "os" "strconv" "time" @@ -32,6 +33,7 @@ import ( "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" dpclient "github.com/clusterlink-net/clusterlink/pkg/dataplane/client" dpserver "github.com/clusterlink-net/clusterlink/pkg/dataplane/server" + utilhttp "github.com/clusterlink-net/clusterlink/pkg/util/http" "github.com/clusterlink-net/clusterlink/pkg/util/log" "github.com/clusterlink-net/clusterlink/pkg/util/tls" ) @@ -81,7 +83,7 @@ func (o *Options) runGoDataplane(dataplaneID string, parsedCertData *tls.ParsedC controlplaneClient, err := grpc.NewClient( controlplaneTarget, - grpc.WithTransportCredentials(credentials.NewTLS(parsedCertData.ClientConfig("cl-controlplane"))), + grpc.WithTransportCredentials(credentials.NewTLS(parsedCertData.ClientConfig(cpapi.Name))), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.Config{ BaseDelay: 100 * time.Millisecond, @@ -101,8 +103,24 @@ func (o *Options) runGoDataplane(dataplaneID string, parsedCertData *tls.ParsedC logrus.Errorf("Failed to start dataplane server: %v.", err) }() - // Start xDS client, if it fails to start we keep retrying to connect to the controlplane host xdsClient := dpclient.NewXDSClient(dataplane, controlplaneClient) + + readinessListenAddress := fmt.Sprintf("0.0.0.0:%d", api.ReadinessListenPort) + httpServer := utilhttp.NewServer("go-dataplane-readiness-http", nil) + if err := httpServer.Listen(readinessListenAddress); err != nil { + return fmt.Errorf("cannot listen for readiness: %w", err) + } + httpServer.Router().Get("/", func(w http.ResponseWriter, r *http.Request) { + if !xdsClient.IsReady() || !dataplane.IsReady() { + w.WriteHeader(http.StatusServiceUnavailable) + } + }) + go func() { + err := httpServer.Start() + logrus.Errorf("Failed to start readiness server: %v.", err) + }() + + // Start xDS client, if it fails to start we keep retrying to connect to the controlplane host err = xdsClient.Run() return fmt.Errorf("xDS Client stopped: %w", err) } diff --git a/pkg/bootstrap/cert.go b/pkg/bootstrap/cert.go index 236a0f256..632c92a37 100644 --- a/pkg/bootstrap/cert.go +++ b/pkg/bootstrap/cert.go @@ -18,6 +18,8 @@ import ( "path/filepath" "github.com/clusterlink-net/clusterlink/cmd/clusterlink/config" + cpapi "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" + dpapi "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" ) // Certificate represents a clusterlink certificate. @@ -69,9 +71,9 @@ func CreateCACertificate() (*Certificate, error) { func CreateControlplaneCertificate(caCert *Certificate) (*Certificate, error) { cert, err := createCertificate(&certificateConfig{ Parent: caCert.cert, - Name: "cl-controlplane", + Name: cpapi.Name, IsServer: true, - DNSNames: []string{"cl-controlplane"}, + DNSNames: []string{cpapi.Name}, }) if err != nil { return nil, err @@ -84,9 +86,9 @@ func CreateControlplaneCertificate(caCert *Certificate) (*Certificate, error) { func CreateDataplaneCertificate(caCert *Certificate) (*Certificate, error) { cert, err := createCertificate(&certificateConfig{ Parent: caCert.cert, - Name: "cl-dataplane", + Name: dpapi.Name, IsClient: true, - DNSNames: []string{"cl-dataplane"}, + DNSNames: []string{dpapi.Name}, }) if err != nil { return nil, err diff --git a/pkg/bootstrap/platform/k8s.go b/pkg/bootstrap/platform/k8s.go index e34496473..3ab1f2ec5 100644 --- a/pkg/bootstrap/platform/k8s.go +++ b/pkg/bootstrap/platform/k8s.go @@ -46,7 +46,7 @@ data: apiVersion: v1 kind: Secret metadata: - name: cl-controlplane + name: {{.controlplaneName}} namespace: {{.namespace}} data: cert: {{.controlplaneCert}} @@ -55,7 +55,7 @@ data: apiVersion: v1 kind: Secret metadata: - name: cl-dataplane + name: {{.dataplaneName}} namespace: {{.namespace}} data: cert: {{.dataplaneCert}} @@ -75,19 +75,19 @@ data: apiVersion: apps/v1 kind: Deployment metadata: - name: cl-controlplane + name: {{.controlplaneName}} namespace: {{.namespace}} labels: - app: cl-controlplane + app: {{.controlplaneName}} spec: replicas: {{.controlplanes}} selector: matchLabels: - app: cl-controlplane + app: {{.controlplaneName}} template: metadata: labels: - app: cl-controlplane + app: {{.controlplaneName}} spec: volumes: - name: ca @@ -95,15 +95,18 @@ spec: secretName: cl-ca - name: tls secret: - secretName: cl-controlplane + secretName: {{.controlplaneName}} - name: peer-tls secret: secretName: cl-peer containers: - - name: cl-controlplane - image: {{.containerRegistry}}cl-controlplane:{{.tag}} + - name: {{.controlplaneName}} + image: {{.containerRegistry}}{{.controlplaneName}}:{{.tag}} args: ["--log-level", "{{.logLevel}}"] imagePullPolicy: IfNotPresent + readinessProbe: + httpGet: + port: {{.controlplaneReadinessPort}} ports: - containerPort: {{.controlplanePort}} volumeMounts: @@ -131,19 +134,19 @@ spec: apiVersion: apps/v1 kind: Deployment metadata: - name: cl-dataplane + name: {{.dataplaneName}} namespace: {{.namespace}} labels: - app: {{ .dataplaneAppName }} + app: {{ .dataplaneName }} spec: replicas: {{.dataplanes}} selector: matchLabels: - app: {{ .dataplaneAppName }} + app: {{ .dataplaneName }} template: metadata: labels: - app: {{ .dataplaneAppName }} + app: {{ .dataplaneName }} spec: volumes: - name: ca @@ -151,16 +154,19 @@ spec: secretName: cl-ca - name: tls secret: - secretName: cl-dataplane + secretName: {{.dataplaneName}} containers: - name: dataplane image: {{.containerRegistry}}{{ - if (eq .dataplaneType .dataplaneTypeEnvoy) }}cl-dataplane{{ - else }}cl-go-dataplane{{ end }}:{{.tag}} - args: ["--log-level", "{{.logLevel}}", "--controlplane-host", "cl-controlplane"] + if (eq .dataplaneType .dataplaneTypeEnvoy) }}{{.dataplaneName}}{{ + else }}{{.goDataplaneImageName}}{{ end }}:{{.tag}} + args: ["--log-level", "{{.logLevel}}", "--controlplane-host", "{{.controlplaneName}}"] imagePullPolicy: IfNotPresent ports: - containerPort: {{.dataplanePort}} + readinessProbe: + httpGet: + port: {{.dataplaneReadinessPort}} volumeMounts: - name: ca mountPath: {{.dataplaneCAMountPath}} @@ -178,11 +184,11 @@ spec: apiVersion: v1 kind: Service metadata: - name: cl-controlplane + name: {{.controlplaneName}} namespace: {{.namespace}} spec: selector: - app: cl-controlplane + app: {{.controlplaneName}} ports: - name: controlplane port: {{.controlplanePort}} @@ -190,7 +196,7 @@ spec: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: cl-controlplane + name: {{.controlplaneName}} rules: - apiGroups: [""] resources: ["events"] @@ -223,11 +229,11 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: cl-controlplane + name: {{.controlplaneName}} roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: cl-controlplane + name: {{.controlplaneName}} subjects: - kind: ServiceAccount name: default @@ -274,7 +280,7 @@ spec: nodePort: {{.ingressNodePort }} {{ end }} selector: - app: {{.dataplaneAppName}} + app: {{.dataplaneName}} ` ) @@ -312,7 +318,10 @@ func K8SConfig(config *Config) ([]byte, error) { "dataplaneTypeEnvoy": DataplaneTypeEnvoy, "namespaceEnvVariable": cpapp.NamespaceEnvVariable, - "dataplaneAppName": dpapp.Name, + + "controlplaneName": cpapi.Name, + "dataplaneName": dpapi.Name, + "goDataplaneImageName": dpapi.GoDataplaneName, "controlplaneCAMountPath": cpapp.CAFile, "controlplaneCertMountPath": cpapp.CertificateFile, @@ -324,8 +333,10 @@ func K8SConfig(config *Config) ([]byte, error) { "dataplaneCertMountPath": dpapp.CertificateFile, "dataplaneKeyMountPath": dpapp.KeyFile, - "controlplanePort": cpapi.ListenPort, - "dataplanePort": dpapi.ListenPort, + "controlplanePort": cpapi.ListenPort, + "controlplaneReadinessPort": cpapi.ReadinessListenPort, + "dataplanePort": dpapi.ListenPort, + "dataplaneReadinessPort": dpapi.ReadinessListenPort, } var k8sConfig, nsConfig bytes.Buffer @@ -369,6 +380,8 @@ func K8SCertificateConfig(config *Config) ([]byte, error) { "controlplaneKey": base64.StdEncoding.EncodeToString(config.ControlplaneCertificate.RawKey()), "dataplaneCert": base64.StdEncoding.EncodeToString(config.DataplaneCertificate.RawCert()), "dataplaneKey": base64.StdEncoding.EncodeToString(config.DataplaneCertificate.RawKey()), + "controlplaneName": cpapi.Name, + "dataplaneName": dpapi.Name, "peerCertificateFile": cpapp.PeerCertificateFile, "peerKeyFile": cpapp.PeerKeyFile, "fabricCertFile": cpapp.FabricCertificateFile, @@ -434,6 +447,8 @@ func K8SClusterLinkInstanceConfig(config *Config, name string) ([]byte, error) { func K8SEmptyCertificateConfig(config *Config) ([]byte, error) { args := map[string]interface{}{ "Namespace": config.Namespace, + "controlplaneName": cpapi.Name, + "dataplaneName": dpapi.Name, "ca": "", "controlplaneCert": "", "controlplaneKey": "", @@ -468,7 +483,7 @@ func k8SIngressConfig(config *Config) ([]byte, error) { "ingressType": ingressType, "dataplaneService": dpapp.IngressSvcName, - "dataplaneAppName": dpapp.Name, + "dataplaneName": dpapi.Name, "dataplanePort": dpapi.ListenPort, } diff --git a/pkg/controlplane/api/servername.go b/pkg/controlplane/api/const.go similarity index 81% rename from pkg/controlplane/api/servername.go rename to pkg/controlplane/api/const.go index 94166c3b3..cb0d55852 100644 --- a/pkg/controlplane/api/servername.go +++ b/pkg/controlplane/api/const.go @@ -16,4 +16,8 @@ package api const ( // ListenPort is the port used by the dataplane to access the controlplane. ListenPort = 4444 + // ReadinessListenPort is the port used to probe for controlplane readiness. + ReadinessListenPort = 4445 + // Name is the controlplane name. + Name = "cl-controlplane" ) diff --git a/pkg/controlplane/authz/manager.go b/pkg/controlplane/authz/manager.go index 94a64074d..b65173235 100644 --- a/pkg/controlplane/authz/manager.go +++ b/pkg/controlplane/authz/manager.go @@ -455,6 +455,12 @@ func (m *Manager) SetPeerCertificates(peerTLS *tls.ParsedCertData, _ *tls.RawCer return nil } +func (m *Manager) IsReady() bool { + m.jwksLock.RLock() + defer m.jwksLock.RUnlock() + return m.jwkSignKey != nil +} + // NewManager returns a new authorization manager. func NewManager(cl client.Client, namespace string) *Manager { return &Manager{ diff --git a/pkg/controlplane/control/manager.go b/pkg/controlplane/control/manager.go index 3d59e8d35..d4dab5987 100644 --- a/pkg/controlplane/control/manager.go +++ b/pkg/controlplane/control/manager.go @@ -16,20 +16,22 @@ package control import ( "bytes" "context" + "crypto/md5" "crypto/rand" "crypto/rsa" "crypto/x509" "encoding/base64" "encoding/pem" + "errors" + "fmt" "reflect" "strconv" "strings" + "sync" + + "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" //nolint:gosec // G505: use of weak cryptographic primitive is fine for service name - "crypto/md5" - "errors" - "fmt" - "sync" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -244,7 +246,7 @@ func (m *Manager) addImport(ctx context.Context, imp *v1alpha1.Import) (err erro TargetPort: intstr.FromInt32(int32(imp.Spec.TargetPort)), }, }, - Selector: map[string]string{"app": dpapp.Name}, + Selector: map[string]string{"app": api.Name}, Type: v1.ServiceTypeClusterIP, }, } diff --git a/pkg/dataplane/api/servername.go b/pkg/dataplane/api/const.go similarity index 75% rename from pkg/dataplane/api/servername.go rename to pkg/dataplane/api/const.go index 072aee171..d410ab613 100644 --- a/pkg/dataplane/api/servername.go +++ b/pkg/dataplane/api/const.go @@ -16,4 +16,10 @@ package api const ( // ListenPort is the dataplane external listening port. ListenPort = 4443 + // Name is the dataplane name. + Name = "cl-dataplane" + // Name of the go-dataplane image. + GoDataplaneName = "cl-go-dataplane" + // ReadinessListenPort is the port used to probe for dataplane readiness. + ReadinessListenPort = 4445 ) diff --git a/pkg/dataplane/client/fetcher.go b/pkg/dataplane/client/fetcher.go index 3caee38a6..118c6d04e 100644 --- a/pkg/dataplane/client/fetcher.go +++ b/pkg/dataplane/client/fetcher.go @@ -26,7 +26,6 @@ import ( client "github.com/envoyproxy/go-control-plane/pkg/client/sotw/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/sirupsen/logrus" - "google.golang.org/grpc" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -41,6 +40,15 @@ type fetcher struct { logger *logrus.Entry clusterLock sync.Mutex listenerLock sync.Mutex + + ready bool + readyLock sync.Mutex +} + +func (f *fetcher) IsReady() bool { + f.readyLock.Lock() + defer f.readyLock.Unlock() + return f.ready } func (f *fetcher) handleClusters(resources []*anypb.Any) error { @@ -122,6 +130,9 @@ func (f *fetcher) Run() error { f.logger.Errorf("Failed to fetch %s: %v.", f.resourceType, err) return err } + f.readyLock.Lock() + f.ready = true + f.readyLock.Unlock() f.logger.Debugf("Fetched %s -> %+v", f.resourceType, resp.Resources) switch f.resourceType { @@ -153,19 +164,13 @@ func (f *fetcher) Run() error { func newFetcher( ctx context.Context, - controlplaneClient grpc.ClientConnInterface, resourceType string, dp *server.Dataplane, -) (*fetcher, error) { - cl := client.NewADSClient(ctx, &core.Node{Id: dp.ID}, resourceType) - err := cl.InitConnect(controlplaneClient) - if err != nil { - return nil, err - } +) *fetcher { return &fetcher{ - client: cl, + client: client.NewADSClient(ctx, &core.Node{Id: dp.ID}, resourceType), resourceType: resourceType, dataplane: dp, logger: logrus.WithField("component", "fetcher.xds.client"), - }, nil + } } diff --git a/pkg/dataplane/client/xds.go b/pkg/dataplane/client/xds.go index 0ddad1dcc..eae7e0629 100644 --- a/pkg/dataplane/client/xds.go +++ b/pkg/dataplane/client/xds.go @@ -38,32 +38,44 @@ type XDSClient struct { errors map[string]error logger *logrus.Entry clustersReady chan bool + fetchers map[string]*fetcher } -func (x *XDSClient) runFetcher(resourceType string) error { +func (x *XDSClient) runFetcher(fetcher *fetcher) error { for { - fetcher, err := newFetcher(context.Background(), x.controlplaneClient, resourceType, x.dataplane) - if err != nil { - x.logger.Errorf("Failed to initialize %s fetcher: %v.", resourceType, err) + for { + err := fetcher.client.InitConnect(x.controlplaneClient) + if err == nil { + break + } + + x.logger.Errorf("Failed to initialize %s fetcher: %v.", fetcher.resourceType, err) time.Sleep(time.Second) - continue } - x.logger.Infof("Successfully initialized client for %s type.", resourceType) // If the resource type is listener, it shouldn't run until the cluster fetcher is running - switch resourceType { + switch fetcher.resourceType { case resource.ClusterType: x.clustersReady <- true case resource.ListenerType: <-x.clustersReady x.logger.Infof("Done waiting for cluster fetcher") } - x.logger.Infof("Starting to run %s fetcher.", resourceType) - err = fetcher.Run() - x.logger.Infof("Fetcher '%s' stopped: %v.", resourceType, err) + x.logger.Infof("Starting to run %s fetcher.", fetcher.resourceType) + err := fetcher.Run() + x.logger.Infof("Fetcher '%s' stopped: %v.", fetcher.resourceType, err) } } +func (x *XDSClient) IsReady() bool { + for _, fetcher := range x.fetchers { + if !fetcher.IsReady() { + return false + } + } + return true +} + // Run starts the running xDS client which fetches clusters and listeners from the controlplane. func (x *XDSClient) Run() error { var wg sync.WaitGroup @@ -72,7 +84,7 @@ func (x *XDSClient) Run() error { for _, res := range resources { go func(res string) { defer wg.Done() - err := x.runFetcher(res) + err := x.runFetcher(x.fetchers[res]) x.logger.Errorf("Fetcher (%s) stopped: %v", res, err) x.lock.Lock() @@ -95,11 +107,17 @@ func (x *XDSClient) Run() error { // NewXDSClient returns am xDS client which can fetch clusters and listeners from the controlplane. func NewXDSClient(dataplane *server.Dataplane, controlplaneClient grpc.ClientConnInterface) *XDSClient { + fetchers := make(map[string]*fetcher, len(resources)) + for _, res := range resources { + fetchers[res] = newFetcher(context.Background(), res, dataplane) + } + return &XDSClient{ dataplane: dataplane, controlplaneClient: controlplaneClient, errors: make(map[string]error), logger: logrus.WithField("component", "xds.client"), clustersReady: make(chan bool, 1), + fetchers: fetchers, } } diff --git a/pkg/dataplane/server/server.go b/pkg/dataplane/server/server.go index c8eae42b7..894d8c13d 100644 --- a/pkg/dataplane/server/server.go +++ b/pkg/dataplane/server/server.go @@ -15,15 +15,18 @@ package server import ( "crypto/tls" + "errors" "fmt" "io" "net" "net/http" + "syscall" "time" authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" cpapi "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" + "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" ) // StartDataplaneServer starts the Dataplane server. @@ -57,6 +60,18 @@ func (d *Dataplane) StartDataplaneServer(dataplaneServerAddress string) error { return server.ListenAndServeTLS("", "") } +func (d *Dataplane) IsReady() bool { + _, err := http.Get(fmt.Sprintf("https://127.0.0.1:%d", api.ListenPort)) + if errors.Is(err, syscall.ECONNREFUSED) || + errors.Is(err, syscall.ECONNRESET) { + return false + } + + d.tlsConfigLock.RLock() + defer d.tlsConfigLock.RUnlock() + return d.tlsConfig != nil +} + func (d *Dataplane) addAuthzHandlers() { d.router.NotFound(d.dataplaneIngressAuthorize) } diff --git a/pkg/operator/controller/instance_controller.go b/pkg/operator/controller/instance_controller.go index bdb44acbf..e1649843d 100644 --- a/pkg/operator/controller/instance_controller.go +++ b/pkg/operator/controller/instance_controller.go @@ -41,9 +41,6 @@ import ( ) const ( - ControlPlaneName = "cl-controlplane" - DataPlaneName = "cl-dataplane" - GoDataPlaneName = "cl-go-dataplane" OperatorNamespace = "clusterlink-operator" InstanceNamespace = "clusterlink-system" FinalizerName = "instance.clusterlink.net/finalizer" @@ -87,7 +84,7 @@ func (r *InstanceReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches( &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: ControlPlaneName, + Name: cpapi.Name, }, }, &handler.EnqueueRequestForObject{}, @@ -95,7 +92,7 @@ func (r *InstanceReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches( &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: DataPlaneName, + Name: dpapi.Name, }, }, &handler.EnqueueRequestForObject{}, @@ -190,11 +187,11 @@ func (r *InstanceReconciler) applyClusterLink(ctx context.Context, instance *clu instance.Spec.ContainerRegistry += "/" } // Create controlplane components - if err := r.createAccessControl(ctx, ControlPlaneName, instance.Spec.Namespace); err != nil { + if err := r.createAccessControl(ctx, cpapi.Name, instance.Spec.Namespace); err != nil { return err } - if err := r.createService(ctx, ControlPlaneName, instance.Spec.Namespace, cpapi.ListenPort); err != nil { + if err := r.createService(ctx, cpapi.Name, instance.Spec.Namespace, cpapi.ListenPort); err != nil { return err } @@ -213,9 +210,9 @@ func (r *InstanceReconciler) applyClusterLink(ctx context.Context, instance *clu // applyControlplane sets up the controlplane deployment. func (r *InstanceReconciler) applyControlplane(ctx context.Context, instance *clusterlink.Instance) error { - cpDeployment := r.setDeployment(ControlPlaneName, instance.Spec.Namespace, 1) + cpDeployment := r.setDeployment(cpapi.Name, instance.Spec.Namespace, 1) cpDeployment.Spec.Template.Spec = corev1.PodSpec{ - ServiceAccountName: ControlPlaneName, + ServiceAccountName: cpapi.Name, Volumes: []corev1.Volume{ { Name: "ca", @@ -229,7 +226,7 @@ func (r *InstanceReconciler) applyControlplane(ctx context.Context, instance *cl Name: "tls", VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ - SecretName: ControlPlaneName, + SecretName: cpapi.Name, }, }, }, @@ -244,10 +241,17 @@ func (r *InstanceReconciler) applyControlplane(ctx context.Context, instance *cl }, Containers: []corev1.Container{ { - Name: ControlPlaneName, - Image: instance.Spec.ContainerRegistry + ControlPlaneName + ":" + instance.Spec.Tag, + Name: cpapi.Name, + Image: instance.Spec.ContainerRegistry + cpapi.Name + ":" + instance.Spec.Tag, ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{"--log-level", instance.Spec.LogLevel}, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt32(cpapi.ReadinessListenPort), + }, + }, + }, Ports: []corev1.ContainerPort{ { ContainerPort: cpapi.ListenPort, @@ -296,12 +300,12 @@ func (r *InstanceReconciler) applyControlplane(ctx context.Context, instance *cl // applyDataplane sets up the dataplane deployment. func (r *InstanceReconciler) applyDataplane(ctx context.Context, instance *clusterlink.Instance) error { - DataplaneImage := DataPlaneName + DataplaneImage := dpapi.Name if instance.Spec.DataPlane.Type == clusterlink.DataplaneTypeGo { - DataplaneImage = GoDataPlaneName + DataplaneImage = dpapi.GoDataplaneName } - dpDeployment := r.setDeployment(DataPlaneName, instance.Spec.Namespace, int32(instance.Spec.DataPlane.Replicas)) + dpDeployment := r.setDeployment(dpapi.Name, instance.Spec.Namespace, int32(instance.Spec.DataPlane.Replicas)) dpDeployment.Spec.Template.Spec = corev1.PodSpec{ Volumes: []corev1.Volume{ { @@ -316,7 +320,7 @@ func (r *InstanceReconciler) applyDataplane(ctx context.Context, instance *clust Name: "tls", VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ - SecretName: DataPlaneName, + SecretName: dpapi.Name, }, }, }, @@ -327,7 +331,7 @@ func (r *InstanceReconciler) applyDataplane(ctx context.Context, instance *clust Image: instance.Spec.ContainerRegistry + DataplaneImage + ":" + instance.Spec.Tag, Args: []string{ "--log-level", instance.Spec.LogLevel, - "--controlplane-host", ControlPlaneName, + "--controlplane-host", cpapi.Name, }, ImagePullPolicy: corev1.PullIfNotPresent, Ports: []corev1.ContainerPort{ @@ -335,6 +339,13 @@ func (r *InstanceReconciler) applyDataplane(ctx context.Context, instance *clust ContainerPort: dpapi.ListenPort, }, }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt32(dpapi.ReadinessListenPort), + }, + }, + }, VolumeMounts: []corev1.VolumeMount{ { Name: "ca", @@ -494,7 +505,7 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name Subjects: []rbacv1.Subject{ { Kind: "ServiceAccount", - Name: ControlPlaneName, + Name: cpapi.Name, Namespace: namespace, }, }, @@ -517,7 +528,7 @@ func (r *InstanceReconciler) createExternalService(ctx context.Context, instance }, Spec: corev1.ServiceSpec{ Selector: map[string]string{ - "app": dpapp.Name, + "app": dpapi.Name, }, Ports: []corev1.ServicePort{ { @@ -599,7 +610,7 @@ func (r *InstanceReconciler) createResource(ctx context.Context, object client.O // deleteClusterLink delete all the ClusterLink resource. func (r *InstanceReconciler) deleteClusterLink(ctx context.Context, namespace string) error { // Delete controlPlane Resources - cpObj := metav1.ObjectMeta{Name: ControlPlaneName, Namespace: namespace} + cpObj := metav1.ObjectMeta{Name: cpapi.Name, Namespace: namespace} if err := r.deleteResource(ctx, &appsv1.Deployment{ObjectMeta: cpObj}); err != nil { return err } @@ -617,7 +628,7 @@ func (r *InstanceReconciler) deleteClusterLink(ctx context.Context, namespace st } // Delete dataplane Resources - dpObj := metav1.ObjectMeta{Name: DataPlaneName, Namespace: namespace} + dpObj := metav1.ObjectMeta{Name: dpapi.Name, Namespace: namespace} if err := r.deleteResource(ctx, &appsv1.Deployment{ObjectMeta: dpObj}); err != nil { return err } @@ -674,7 +685,7 @@ func (r *InstanceReconciler) checkStatus(ctx context.Context, instance *clusterl // checkControlplaneStatus check the status of the controlplane components. func (r *InstanceReconciler) checkControlplaneStatus(ctx context.Context, instance *clusterlink.Instance) (bool, error) { - cp := types.NamespacedName{Name: ControlPlaneName, Namespace: instance.Spec.Namespace} + cp := types.NamespacedName{Name: cpapi.Name, Namespace: instance.Spec.Namespace} deploymentStatus, err := r.checkDeploymnetStatus(ctx, cp) if err != nil { return false, err @@ -695,7 +706,7 @@ func (r *InstanceReconciler) checkControlplaneStatus(ctx context.Context, instan // checkDataplaneStatus check the status of the dataplane components. func (r *InstanceReconciler) checkDataplaneStatus(ctx context.Context, instance *clusterlink.Instance) (bool, error) { - dp := types.NamespacedName{Name: DataPlaneName, Namespace: instance.Spec.Namespace} + dp := types.NamespacedName{Name: dpapi.Name, Namespace: instance.Spec.Namespace} deploymentStatus, err := r.checkDeploymnetStatus(ctx, dp) if err != nil { return false, err diff --git a/pkg/operator/controller/instance_controller_test.go b/pkg/operator/controller/instance_controller_test.go index a00e21ed0..1bff85a3c 100644 --- a/pkg/operator/controller/instance_controller_test.go +++ b/pkg/operator/controller/instance_controller_test.go @@ -37,6 +37,8 @@ import ( dpapp "github.com/clusterlink-net/clusterlink/cmd/cl-dataplane/app" clusterlink "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" + cpapi "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" + dpapi "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" "github.com/clusterlink-net/clusterlink/pkg/operator/controller" ) @@ -143,14 +145,14 @@ func TestClusterLinkController(t *testing.T) { }, } - cpID := types.NamespacedName{Name: controller.ControlPlaneName, Namespace: controller.InstanceNamespace} + cpID := types.NamespacedName{Name: cpapi.Name, Namespace: controller.InstanceNamespace} cpResource := []client.Object{&appsv1.Deployment{}, &corev1.Service{}} roleID := types.NamespacedName{ - Name: controller.ControlPlaneName + controller.InstanceNamespace, + Name: cpapi.Name + controller.InstanceNamespace, Namespace: controller.InstanceNamespace, } roleResource := []client.Object{&rbacv1.ClusterRole{}, &rbacv1.ClusterRoleBinding{}} - dpID := types.NamespacedName{Name: controller.DataPlaneName, Namespace: controller.InstanceNamespace} + dpID := types.NamespacedName{Name: dpapi.Name, Namespace: controller.InstanceNamespace} dpResource := []client.Object{&appsv1.Deployment{}} ingressID := types.NamespacedName{Name: dpapp.IngressSvcName, Namespace: controller.InstanceNamespace} @@ -189,7 +191,7 @@ func TestClusterLinkController(t *testing.T) { // Check controlplane fields cp := &appsv1.Deployment{} getResource(t, cpID, cp) - cpImage := "ghcr.io/clusterlink-net/" + controller.ControlPlaneName + ":latest" + cpImage := "ghcr.io/clusterlink-net/" + cpapi.Name + ":latest" require.Equal(t, cpImage, cp.Spec.Template.Spec.Containers[0].Image) require.Equal(t, "info", cp.Spec.Template.Spec.Containers[0].Args[1]) @@ -201,7 +203,7 @@ func TestClusterLinkController(t *testing.T) { // Check Dataplane fields dp := &appsv1.Deployment{} getResource(t, dpID, dp) - envoyImage := "ghcr.io/clusterlink-net/" + controller.DataPlaneName + ":latest" + envoyImage := "ghcr.io/clusterlink-net/" + dpapi.Name + ":latest" require.Equal(t, envoyImage, dp.Spec.Template.Spec.Containers[0].Image) require.Equal(t, int32(1), *dp.Spec.Replicas) require.Equal(t, "info", dp.Spec.Template.Spec.Containers[0].Args[1]) @@ -258,13 +260,13 @@ func TestClusterLinkController(t *testing.T) { /// Check controlplane checkResourceCreated(t, cpID, cp) - cpImage := containerRegistry + "/" + controller.ControlPlaneName + ":" + tag + cpImage := containerRegistry + "/" + cpapi.Name + ":" + tag require.Equal(t, cpImage, cp.Spec.Template.Spec.Containers[0].Image) require.Equal(t, loglevel, cp.Spec.Template.Spec.Containers[0].Args[1]) /// Check dataplane checkResourceCreated(t, dpID, dp) - goImage := containerRegistry + "/" + controller.GoDataPlaneName + ":" + tag + goImage := containerRegistry + "/" + dpapi.GoDataplaneName + ":" + tag require.Equal(t, goImage, dp.Spec.Template.Spec.Containers[0].Image) require.Equal(t, int32(goReplicas), *dp.Spec.Replicas) require.Equal(t, loglevel, dp.Spec.Template.Spec.Containers[0].Args[1]) diff --git a/pkg/util/http/server.go b/pkg/util/http/server.go index 8aaff2b32..2685b1488 100644 --- a/pkg/util/http/server.go +++ b/pkg/util/http/server.go @@ -54,7 +54,13 @@ func (s *Server) Start() error { } }() - err := s.server.ServeTLS(s.GetListener(), "", "") + var err error + if s.server.TLSConfig != nil { + err = s.server.ServeTLS(s.GetListener(), "", "") + } else { + err = s.server.Serve(s.GetListener()) + } + if err == http.ErrServerClosed { s.logger.Info("Server closed by demand.") return nil diff --git a/tests/e2e/k8s/suite.go b/tests/e2e/k8s/suite.go index 50823cd9a..bdbcdb896 100644 --- a/tests/e2e/k8s/suite.go +++ b/tests/e2e/k8s/suite.go @@ -25,6 +25,8 @@ import ( "github.com/clusterlink-net/clusterlink/cmd/clusterlink/config" "github.com/clusterlink-net/clusterlink/pkg/bootstrap/platform" + cpapi "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" + dpapi "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" "github.com/clusterlink-net/clusterlink/pkg/operator/controller" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services/httpecho" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services/iperf3" @@ -35,7 +37,7 @@ const ( clusterCount = 3 ) -var images = [...]string{"cl-controlplane", "cl-dataplane", "cl-go-dataplane", "cl-operator"} +var images = [...]string{cpapi.Name, dpapi.Name, dpapi.GoDataplaneName, "cl-operator"} var iperf3Service = util.Service{ Name: "iperf3-server", diff --git a/tests/e2e/k8s/util/clusterlink.go b/tests/e2e/k8s/util/clusterlink.go index 663d2c071..e052647fe 100644 --- a/tests/e2e/k8s/util/clusterlink.go +++ b/tests/e2e/k8s/util/clusterlink.go @@ -29,6 +29,7 @@ import ( "github.com/clusterlink-net/clusterlink/cmd/cl-controlplane/app" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" "github.com/clusterlink-net/clusterlink/pkg/bootstrap" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services" ) @@ -66,7 +67,7 @@ func (c *ClusterLink) Cluster() *KindCluster { // ScaleControlplane scales the controlplane deployment. func (c *ClusterLink) ScaleControlplane(replicas int32) error { - return c.cluster.ScaleDeployment("cl-controlplane", c.namespace, replicas) + return c.cluster.ScaleDeployment(api.Name, c.namespace, replicas) } // RestartControlplane restarts the controlplane. @@ -116,7 +117,7 @@ func (c *ClusterLink) UpdatePeerCertificates( err = c.cluster.Resources().List( context.Background(), &pods, - resources.WithLabelSelector("app=cl-controlplane")) + resources.WithLabelSelector("app="+api.Name)) if err != nil { return fmt.Errorf("unable to list controlplane pods: %w", err) } diff --git a/tests/e2e/k8s/util/fabric.go b/tests/e2e/k8s/util/fabric.go index 1e4ab3705..875a64909 100644 --- a/tests/e2e/k8s/util/fabric.go +++ b/tests/e2e/k8s/util/fabric.go @@ -18,6 +18,8 @@ import ( "fmt" "time" + "github.com/clusterlink-net/clusterlink/pkg/dataplane/api" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -272,7 +274,7 @@ func (f *Fabric) deployClusterLink(target *peer, cfg *PeerConfig) (*ClusterLink, } // Wait for dataplane will be ready. - dep := appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "cl-dataplane", Namespace: f.namespace}} + dep := appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: api.Name, Namespace: f.namespace}} waitCon := conditions.New(target.cluster.resources).DeploymentConditionMatch(&dep, appsv1.DeploymentAvailable, v1.ConditionTrue) err = wait.For(waitCon, wait.WithTimeout(time.Second*60)) if err != nil { diff --git a/tests/e2e/k8s/util/k8s_yaml.go b/tests/e2e/k8s/util/k8s_yaml.go index f860480a0..af81d6a19 100644 --- a/tests/e2e/k8s/util/k8s_yaml.go +++ b/tests/e2e/k8s/util/k8s_yaml.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/clusterlink-net/clusterlink/pkg/bootstrap/platform" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" ) // replaceOnce replaces exactly once. @@ -121,7 +122,7 @@ func switchClusterRoleName(yaml, name string) (string, error) { search := ` kind: ClusterRole metadata: - name: cl-controlplane` + name: ` + api.Name replace := ` kind: ClusterRole metadata: @@ -134,7 +135,7 @@ metadata: search = ` kind: ClusterRole - name: cl-controlplane` + name: ` + api.Name replace = ` kind: ClusterRole name: %s` @@ -146,7 +147,7 @@ func switchClusterRoleBindingName(yaml, name string) (string, error) { search := ` kind: ClusterRoleBinding metadata: - name: cl-controlplane` + name: ` + api.Name replace := ` kind: ClusterRoleBinding metadata: