From 99c1ecac6851af120d50a006d693084eb88ffa29 Mon Sep 17 00:00:00 2001 From: Joao Morais Date: Mon, 11 Mar 2024 09:36:05 -0300 Subject: [PATCH] move leader election to a self managed service Controller-runtime's leader election works by killing the running process when the leader is lost, consequently being restarted by the runtime env. This works nice for a control plane component, but it'd work badly with haproxy ingress because it currently embeds the data plane as well on its deployment, making haproxy to restart on every instance that looses an election. Controller-runtime behavior is not configurable, so we need to implement our own leader election control. --- .../en/docs/configuration/command-line.md | 6 +- go.mod | 1 + go.sum | 2 + pkg/controller/config/config.go | 16 ++- pkg/controller/config/options.go | 7 +- pkg/controller/launch/launch.go | 3 - pkg/controller/reconciler/ratelimiter.go | 9 +- pkg/controller/reconciler/reconciler.go | 5 +- pkg/controller/reconciler/watchers.go | 10 +- pkg/controller/services/cache.go | 11 +- pkg/controller/services/services.go | 37 +++--- pkg/controller/services/svcacme.go | 2 +- pkg/controller/services/svcleader.go | 120 ++++++++++++++++-- pkg/controller/services/svcleaderrecorder.go | 66 ++++++++++ pkg/controller/services/svcstatusing.go | 2 +- pkg/controller/utils/delayedshutdown.go | 11 +- pkg/controller/utils/distributed.go | 48 ------- pkg/haproxy/dynupdate_test.go | 2 +- pkg/haproxy/instance.go | 4 +- pkg/haproxy/instance_test.go | 6 - tests/framework/framework.go | 25 ++-- tests/integration/integration_test.go | 22 ++++ 22 files changed, 284 insertions(+), 131 deletions(-) create mode 100644 pkg/controller/services/svcleaderrecorder.go delete mode 100644 pkg/controller/utils/distributed.go diff --git a/docs/content/en/docs/configuration/command-line.md b/docs/content/en/docs/configuration/command-line.md index 9b22c9444..6012616e5 100644 --- a/docs/content/en/docs/configuration/command-line.md +++ b/docs/content/en/docs/configuration/command-line.md @@ -234,7 +234,11 @@ The ID to be used for electing ingress controller leader. A leader needs to be e * Embedded Acme signer, see [acme](#acme) * Gateway API, see [`--watch-gateway`](#watch-gateway) -Defaults to `fc5ae9f3.haproxy-ingress.github.io` if not configured. +Election ID configuration has no efect if none of Ingress Status update, Embedded Acme signer, or Gateway API are enabled. + +Since v0.15 a `%s` placeholder is used to define where the IngressClass value should be added to the election ID. Up to v0.14 the IngressClass was concatenated in the end of the provided value to compose the real election ID value. Ingress class is added to the election ID name to avoid conflict when two or more HAProxy Ingress controllers are running in the same cluster. + +Election ID defaults to `class-%s.haproxy-ingress.github.io` if not configured, which is rendered to `class-haproxy.haproxy-ingress.github.io` if the IngressClass is not changed from the default value. --- diff --git a/go.mod b/go.mod index c13188bcd..8efe03b46 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.26.0 golang.org/x/crypto v0.21.0 + golang.org/x/sync v0.5.0 gopkg.in/go-playground/pool.v3 v3.1.1 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.2 diff --git a/go.sum b/go.sum index f16d7a386..b27b46f4b 100644 --- a/go.sum +++ b/go.sum @@ -176,6 +176,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 418ea3a5c..344102b53 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -237,12 +237,24 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options podNamespace := os.Getenv("POD_NAMESPACE") podName := os.Getenv("POD_NAME") - // we could `|| hasGateway[version...]` instead of `|| *watchGateway` here, + // we could `|| hasGateway[version...]` instead of `|| opt.WatchGateway` here, // but we're choosing a consistent startup behavior despite of the cluster configuration. election := opt.UpdateStatus || opt.AcmeServer || opt.WatchGateway if election && podNamespace == "" { return nil, fmt.Errorf("POD_NAMESPACE envvar should be configured when --update-status=true, --acme-server=true, or --watch-gateway=true") } + if election && opt.IngressClass == "" { + return nil, fmt.Errorf("--ingress-class should not be empty when --update-status=true, --acme-server=true, or --watch-gateway=true") + } + var electionID string + if election { + if strings.Contains(opt.ElectionID, "%s") { + electionID = fmt.Sprintf(opt.ElectionID, opt.IngressClass) + } else { + // backward compatibility behavior + electionID = opt.ElectionID + "-" + opt.IngressClass + } + } if opt.UpdateStatus && podName == "" && opt.PublishSvc == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 { return nil, fmt.Errorf("one of --publish-service, --publish-address or POD_NAME envvar should be configured when --update-status=true") @@ -449,7 +461,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options DisableExternalName: opt.DisableExternalName, DisableKeywords: disableKeywords, Election: election, - ElectionID: opt.ElectionID, + ElectionID: electionID, ElectionNamespace: podNamespace, EnableEndpointSliceAPI: opt.EnableEndpointSlicesAPI, ForceNamespaceIsolation: opt.ForceIsolation, diff --git a/pkg/controller/config/options.go b/pkg/controller/config/options.go index 12c4c5ff7..b6180652c 100644 --- a/pkg/controller/config/options.go +++ b/pkg/controller/config/options.go @@ -32,7 +32,7 @@ func NewOptions() *Options { Profiling: true, VerifyHostname: true, UpdateStatus: true, - ElectionID: "fc5ae9f3.haproxy-ingress.github.io", + ElectionID: "class-%s.haproxy-ingress.github.io", ShutdownTimeout: 25 * time.Second, UpdateStatusOnShutdown: true, LogLevel: 2, @@ -345,7 +345,10 @@ func (o *Options) AddFlags(fs *flag.FlagSet) { ) fs.StringVar(&o.ElectionID, "election-id", o.ElectionID, ""+ - "Election id to be used for status update and certificate sign.", + "Election ID to be used for status update and certificate signing. An optional "+ + "%s is used as a placeholder for the IngressClass name, and if not provided, the "+ + "IngressClass is concatenated in the end of the provided value to compose the "+ + "real Election ID, for backward compatibility.", ) fs.StringVar(&o.WaitBeforeShutdown, "wait-before-shutdown", o.WaitBeforeShutdown, ""+ diff --git a/pkg/controller/launch/launch.go b/pkg/controller/launch/launch.go index d7d482d0e..fdea60b72 100644 --- a/pkg/controller/launch/launch.go +++ b/pkg/controller/launch/launch.go @@ -38,9 +38,6 @@ func Run(cfg *config.Config) error { mgr, err := ctrl.NewManager(cfg.KubeConfig, ctrl.Options{ Logger: rootLogger.WithName("manager"), Scheme: cfg.Scheme, - LeaderElection: cfg.Election, - LeaderElectionID: cfg.ElectionID, - LeaderElectionNamespace: cfg.ElectionNamespace, GracefulShutdownTimeout: cfg.ShutdownTimeout, HealthProbeBindAddress: "0", Metrics: server.Options{ diff --git a/pkg/controller/reconciler/ratelimiter.go b/pkg/controller/reconciler/ratelimiter.go index 3decbcb8a..5b88d0311 100644 --- a/pkg/controller/reconciler/ratelimiter.go +++ b/pkg/controller/reconciler/ratelimiter.go @@ -25,11 +25,10 @@ import ( "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config" ) -func createRateLimiter(config *config.Config) ratelimiter.RateLimiter { +func createRateLimiter(cfg *config.Config) ratelimiter.RateLimiter { return &rateLimiter{ - mu: sync.Mutex{}, - delta: time.Duration(float64(time.Second) / config.RateLimitUpdate), - wait: config.WaitBeforeUpdate, + delta: time.Duration(float64(time.Second) / cfg.RateLimitUpdate), + wait: cfg.WaitBeforeUpdate, } } @@ -40,7 +39,7 @@ type rateLimiter struct { last time.Time } -func (r *rateLimiter) When(item interface{}) time.Duration { +func (r *rateLimiter) When(_ interface{}) time.Duration { r.mu.Lock() defer r.mu.Unlock() now := time.Now() diff --git a/pkg/controller/reconciler/reconciler.go b/pkg/controller/reconciler/reconciler.go index 4709a9f73..46a3d92a1 100644 --- a/pkg/controller/reconciler/reconciler.go +++ b/pkg/controller/reconciler/reconciler.go @@ -28,7 +28,6 @@ import ( "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config" "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/services" - ctrlutils "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/utils" ) // IngressReconciler ... @@ -48,7 +47,7 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } func (r *IngressReconciler) leaderChanged(isLeader bool) { - if isLeader { + if isLeader && r.watchers.running() { changed := r.watchers.getChangedObjects() changed.NeedFullSync = true r.Services.ReconcileIngress(changed) @@ -78,5 +77,5 @@ func (r *IngressReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag } } r.Services.LeaderChangedSubscriber(r.leaderChanged) - return mgr.Add(ctrlutils.DistributedService(c)) + return mgr.Add(c) } diff --git a/pkg/controller/reconciler/watchers.go b/pkg/controller/reconciler/watchers.go index 2c96d03b9..414d112a8 100644 --- a/pkg/controller/reconciler/watchers.go +++ b/pkg/controller/reconciler/watchers.go @@ -44,7 +44,6 @@ import ( func createWatchers(ctx context.Context, cfg *config.Config, val services.IsValidResource) *watchers { w := &watchers{ - mu: sync.Mutex{}, log: logr.FromContextOrDiscard(ctx).WithName("watchers"), cfg: cfg, val: val, @@ -104,6 +103,12 @@ func (w *watchers) initCh() { w.ch.Links = types.TrackingLinks{} } +func (w *watchers) running() bool { + w.mu.Lock() + defer w.mu.Unlock() + return w.run +} + func (w *watchers) handlersCore() []*hdlr { cmChange := func(o client.Object) { cm := o.(*api.ConfigMap) @@ -487,8 +492,7 @@ func (h *hdlr) compose(ev string, obj client.Object) { } else { fullname = obj.GetName() } - ns := obj.GetNamespace() - if ns != "" { + if ns := obj.GetNamespace(); ns != "" { fullname = ns + "/" + fullname } ch := h.w.ch diff --git a/pkg/controller/services/cache.go b/pkg/controller/services/cache.go index c92f6fd7c..0a05cc4bd 100644 --- a/pkg/controller/services/cache.go +++ b/pkg/controller/services/cache.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" - clientcache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -69,11 +68,11 @@ type c struct { status svcStatusUpdateFnc } -var errGatewayA2Disabled = fmt.Errorf("Gateway API v1alpha2 wasn't initialized") -var errGatewayB1Disabled = fmt.Errorf("Gateway API v1beta1 wasn't initialized") +var errGatewayA2Disabled = fmt.Errorf("gateway API v1alpha2 wasn't initialized") +var errGatewayB1Disabled = fmt.Errorf("gateway API v1beta1 wasn't initialized") func (c *c) get(key string, obj client.Object) error { - ns, n, err := clientcache.SplitMetaNamespaceKey(key) + ns, n, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } @@ -93,7 +92,7 @@ func (c *c) createOrUpdate(obj client.Object) error { } func buildResourceName(defaultNamespace, kind, resourceName string, allowCrossNamespace bool) (string, string, error) { - ns, name, err := clientcache.SplitMetaNamespaceKey(resourceName) + ns, name, err := cache.SplitMetaNamespaceKey(resourceName) if err != nil { return "", "", err } @@ -641,7 +640,7 @@ func (c *c) GetKey() (crypto.Signer, error) { // implements acme.Cache func (c *c) SetToken(domain string, uri, token string) error { - namespace, name, err := clientcache.SplitMetaNamespaceKey(c.config.AcmeTokenConfigMapName) + namespace, name, err := cache.SplitMetaNamespaceKey(c.config.AcmeTokenConfigMapName) if err != nil { return err } diff --git a/pkg/controller/services/services.go b/pkg/controller/services/services.go index 5effd92b6..90b61d1e1 100644 --- a/pkg/controller/services/services.go +++ b/pkg/controller/services/services.go @@ -98,7 +98,10 @@ func (s *Services) setup(ctx context.Context) error { } tracker := tracker.NewTracker() metrics := createMetrics(cfg.BucketsResponseTime) - svcleader := initSvcLeader(ctx) + svcleader, err := initSvcLeader(ctx, cfg) + if err != nil { + return err + } svchealthz := initSvcHealthz(ctx, cfg, metrics, s.acmeExternalCallCheck) svcstatus := initSvcStatusUpdater(ctx, s.Client) cache := createCacheFacade(ctx, s.Client, cfg, tracker, sslCerts, dynConfig, svcstatus.update) @@ -187,42 +190,42 @@ func (s *Services) withManager(mgr ctrl.Manager) error { if err := mgr.Add(s.svcleader); err != nil { return err } - if err := mgr.Add(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil { + if err := s.svcleader.addRunnable(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil { return err } if s.svcstatusing != nil { - if err := mgr.Add(s.svcstatusing); err != nil { + if err := s.svcleader.addRunnable(s.svcstatusing); err != nil { + return err + } + } + if s.acmeClient != nil { + if err := s.svcleader.addRunnable(s.acmeClient); err != nil { return err } } } if s.reloadQueue != nil { - if err := mgr.Add(ctrlutils.DistributedService(&svcReloadQueue{ + if err := mgr.Add(&svcReloadQueue{ queue: s.reloadQueue, - })); err != nil { + }); err != nil { return err } } if s.Config.StatsCollectProcPeriod > 0 { - if err := mgr.Add(ctrlutils.DistributedService(&svcCalcIdle{ + if err := mgr.Add(&svcCalcIdle{ instance: s.instance, period: s.Config.StatsCollectProcPeriod, - })); err != nil { - return err - } - } - if s.acmeClient != nil { - if err := mgr.Add(s.acmeClient); err != nil { + }); err != nil { return err } } if s.acmeServer != nil { - if err := mgr.Add(ctrlutils.DistributedService(s.acmeServer)); err != nil { + if err := mgr.Add(s.acmeServer); err != nil { return err } } if s.svchealthz != nil { - if err := mgr.Add(ctrlutils.DistributedService(s.svchealthz)); err != nil { + if err := mgr.Add(s.svchealthz); err != nil { return err } } @@ -239,7 +242,7 @@ func (s *Services) acmePeriodicCheck() (count int, err error) { // LeaderChangedSubscriber ... func (s *Services) LeaderChangedSubscriber(f SvcLeaderChangedFnc) { - s.svcleader.addsubscriber(f) + s.svcleader.addSubscriber(f) } // GetIsValidResource ... @@ -255,7 +258,7 @@ func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) { s.log.Info("starting haproxy update", "id", s.updateCount) timer := utils.NewTimer(s.metrics.ControllerProcTime) converters.NewConverter(timer, s.instance.Config(), changed, s.converterOpt).Sync() - if s.svcleader.getIsLeader() { + if s.svcleader.isLeader() { s.instance.AcmeUpdate() } s.instance.HAProxyUpdate(timer) @@ -263,7 +266,7 @@ func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) { } func (s *Services) acmeCheck(source string) (count int, err error) { - if !s.svcleader.getIsLeader() { + if !s.svcleader.isLeader() { err = fmt.Errorf("cannot check acme certificates, this controller is not the leader") s.log.Error(err, "error checking acme certificates") return 0, err diff --git a/pkg/controller/services/svcacme.go b/pkg/controller/services/svcacme.go index 2030b50d9..439b2cb79 100644 --- a/pkg/controller/services/svcacme.go +++ b/pkg/controller/services/svcacme.go @@ -111,7 +111,7 @@ func (s *svcAcmeClient) Start(ctx context.Context) error { // implements utils.QueueFacade func (s *svcAcmeClient) Add(item interface{}) { - if s.leader.getIsLeader() { + if s.leader.isLeader() { s.queue.Add(item) } } diff --git a/pkg/controller/services/svcleader.go b/pkg/controller/services/svcleader.go index c17f241c6..2fb6858af 100644 --- a/pkg/controller/services/svcleader.go +++ b/pkg/controller/services/svcleader.go @@ -18,44 +18,138 @@ package services import ( "context" + "fmt" + "time" "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config" +) + +const ( + // Default values taken from + // https://github.com/kubernetes/component-base/blob/master/config/v1alpha1/defaults.go + defaultLeaseDuration = 15 * time.Second + defaultRenewDeadline = 10 * time.Second + defaultRetryPeriod = 2 * time.Second ) // SvcLeaderChangedFnc ... type SvcLeaderChangedFnc func(isLeader bool) -func initSvcLeader(ctx context.Context) *svcLeader { - return &svcLeader{ +func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error) { + r, err := initRecorderProvider(cfg) + if err != nil { + return nil, err + } + + rl, err := crleaderelection.NewResourceLock(cfg.KubeConfig, r, crleaderelection.Options{ + LeaderElection: cfg.Election, + LeaderElectionID: cfg.ElectionID, + LeaderElectionNamespace: cfg.ElectionNamespace, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + }) + if err != nil { + return nil, err + } + + s := &svcLeader{ log: logr.FromContextOrDiscard(ctx).WithName("leader"), } + + if rl != nil { + s.le, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Name: cfg.ElectionID, + Lock: rl, + LeaseDuration: defaultLeaseDuration, + RenewDeadline: defaultRenewDeadline, + RetryPeriod: defaultRetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: s.OnStartedLeading, + OnStoppedLeading: s.OnStoppedLeading, + }, + }) + if err != nil { + return nil, err + } + } + + return s, nil } type svcLeader struct { - isLeader bool + le *leaderelection.LeaderElector log logr.Logger + runnables []manager.Runnable + rgroup *errgroup.Group + rcancel context.CancelFunc subscribers []SvcLeaderChangedFnc } -func (s *svcLeader) addsubscriber(f SvcLeaderChangedFnc) { - s.subscribers = append(s.subscribers, f) -} - -func (s *svcLeader) getIsLeader() bool { - return s.isLeader +func (s *svcLeader) Start(ctx context.Context) error { + if s.le != nil { + s.le.Run(ctx) + } + <-ctx.Done() + return nil } -func (s *svcLeader) Start(ctx context.Context) error { +func (s *svcLeader) OnStartedLeading(ctx context.Context) { s.log.Info("leader acquired") - s.isLeader = true + + ctxwg, cancel := context.WithCancel(ctx) + wg, ctxrun := errgroup.WithContext(ctxwg) + for i := range s.runnables { + r := s.runnables[i] + wg.Go(func() error { + return r.Start(ctxrun) + }) + } + s.rgroup = wg + s.rcancel = cancel + for _, f := range s.subscribers { go f(true) } - <-ctx.Done() - s.isLeader = false +} + +func (s *svcLeader) OnStoppedLeading() { for _, f := range s.subscribers { go f(false) } + + if s.rcancel != nil && s.rgroup != nil { + s.rcancel() + err := s.rgroup.Wait() + if err != nil { + s.log.Error(err, "error stop leading") + } + } else { + s.log.Error(fmt.Errorf("cannot stop services, leader was not taken"), "error stop leading") + } + s.rcancel = nil + s.rgroup = nil + s.log.Info("stopped leading") +} + +func (s *svcLeader) addRunnable(r manager.Runnable) error { + s.runnables = append(s.runnables, r) return nil } + +func (s *svcLeader) addSubscriber(f SvcLeaderChangedFnc) { + s.subscribers = append(s.subscribers, f) +} + +func (s *svcLeader) isLeader() bool { + if s.le != nil { + return s.le.IsLeader() + } + return false +} diff --git a/pkg/controller/services/svcleaderrecorder.go b/pkg/controller/services/svcleaderrecorder.go new file mode 100644 index 000000000..6da59190d --- /dev/null +++ b/pkg/controller/services/svcleaderrecorder.go @@ -0,0 +1,66 @@ +/* +Copyright 2024 The HAProxy Ingress Controller Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + "os" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config" +) + +func initRecorderProvider(cfg *config.Config) (*recorderProvider, error) { + config := rest.CopyConfig(cfg.KubeConfig) + + cli, err := corev1client.NewForConfig(config) + if err != nil { + return nil, err + } + + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + + return &recorderProvider{ + cli: cli, + namespace: cfg.PodNamespace, + hostname: hostname, + electionID: cfg.ElectionID, + }, nil +} + +type recorderProvider struct { + cli *corev1client.CoreV1Client + namespace string + hostname string + electionID string +} + +func (r *recorderProvider) GetEventRecorderFor(name string) record.EventRecorder { + broadcaster := record.NewBroadcaster() + _ = broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: r.cli.Events(r.namespace)}) + return broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{ + Component: r.electionID + "_" + name, + Host: r.hostname, + }) +} diff --git a/pkg/controller/services/svcstatusing.go b/pkg/controller/services/svcstatusing.go index da01f8db1..0186faf9d 100644 --- a/pkg/controller/services/svcstatusing.go +++ b/pkg/controller/services/svcstatusing.go @@ -70,7 +70,7 @@ func (s *svcStatusIng) Start(ctx context.Context) error { return nil } -func (s *svcStatusIng) update(ctx context.Context, lb []networking.IngressLoadBalancerIngress) error { +func (s *svcStatusIng) update(_ context.Context, lb []networking.IngressLoadBalancerIngress) error { ingList, err := s.cache.GetIngressList() if err != nil { return err diff --git a/pkg/controller/utils/delayedshutdown.go b/pkg/controller/utils/delayedshutdown.go index dbdf80334..45a536495 100644 --- a/pkg/controller/utils/delayedshutdown.go +++ b/pkg/controller/utils/delayedshutdown.go @@ -19,12 +19,14 @@ package utils import ( "context" "time" + + "sigs.k8s.io/controller-runtime/pkg/manager" ) // DelayedShutdown adds a delay in the shutdown event, so the // runnable has some time to listen to events from other runnables // triggered during their shutdown -func DelayedShutdown(svc DelayedService) DSD { +func DelayedShutdown(svc DelayedService) manager.Runnable { return &dsd{ svc: svc, startingDelay: 200 * time.Millisecond, @@ -32,14 +34,9 @@ func DelayedShutdown(svc DelayedService) DSD { } } -// DSD ... -type DSD interface { - Start(context.Context) error -} - // DelayedService ... type DelayedService interface { - Start(context.Context) error + manager.Runnable CanShutdown() bool } diff --git a/pkg/controller/utils/distributed.go b/pkg/controller/utils/distributed.go deleted file mode 100644 index 0569c1f92..000000000 --- a/pkg/controller/utils/distributed.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -Copyright 2022 The HAProxy Ingress Controller Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package utils - -import ( - "context" - - "sigs.k8s.io/controller-runtime/pkg/manager" -) - -// DistributedService is a wrapper that configures a Runnable to run on every -// instance of a controller, despite being the leader or not. -func DistributedService(r manager.Runnable) DS { - return &ds{r} -} - -// DS ... -type DS interface { - Start(context.Context) error -} - -type ds struct { - r manager.Runnable -} - -// Start ... -func (d *ds) Start(ctx context.Context) error { - return d.r.Start(ctx) -} - -// NeedLeaderElection ... -func (d *ds) NeedLeaderElection() bool { - return false -} diff --git a/pkg/haproxy/dynupdate_test.go b/pkg/haproxy/dynupdate_test.go index 540f6a5ef..97378905e 100644 --- a/pkg/haproxy/dynupdate_test.go +++ b/pkg/haproxy/dynupdate_test.go @@ -1000,7 +1000,7 @@ INFO-V(2) need to reload due to config changes: [hosts] `, }, } - readFile = func(filename string) ([]byte, error) { + readFile = func(_ string) ([]byte, error) { return []byte(""), nil } for i, test := range testCases { diff --git a/pkg/haproxy/instance.go b/pkg/haproxy/instance.go index 2c956acfa..6dfceb2cb 100644 --- a/pkg/haproxy/instance.go +++ b/pkg/haproxy/instance.go @@ -116,11 +116,11 @@ func (i *instance) AcmeCheck(source string) (int, error) { return count, fmt.Errorf("controller wasn't started yet") } if i.options.AcmeQueue == nil { - return count, fmt.Errorf("Acme queue wasn't configured") + return count, fmt.Errorf("acme queue wasn't configured") } hasAccount := i.acmeEnsureConfig(i.config.AcmeData()) if !hasAccount { - return count, fmt.Errorf("Cannot create or retrieve the acme client account") + return count, fmt.Errorf("cannot create or retrieve the acme client account") } le := i.options.LeaderElector if !le.IsLeader() { diff --git a/pkg/haproxy/instance_test.go b/pkg/haproxy/instance_test.go index bf053e7b9..c965098ae 100644 --- a/pkg/haproxy/instance_test.go +++ b/pkg/haproxy/instance_test.go @@ -26,7 +26,6 @@ import ( "testing" "github.com/kylelemons/godebug/diff" - yaml "gopkg.in/yaml.v2" hatypes "github.com/jcmoraisjr/haproxy-ingress/pkg/haproxy/types" "github.com/jcmoraisjr/haproxy-ingress/pkg/types/helper_test" @@ -5624,11 +5623,6 @@ var defaultLogging = ` INFO (test) reload was skipped INFO haproxy successfully reloaded (embedded daemon)` -func _yamlMarshal(in interface{}) string { - out, _ := yaml.Marshal(in) - return string(out) -} - func (c *testConfig) Update() { timer := utils.NewTimer(nil) c.instance.AcmeUpdate() diff --git a/tests/framework/framework.go b/tests/framework/framework.go index 3a4acd0b6..d586b641d 100644 --- a/tests/framework/framework.go +++ b/tests/framework/framework.go @@ -114,7 +114,7 @@ func startApiserver(t *testing.T) *rest.Config { require.NoError(t, err) t.Cleanup(func() { - require.NoError(t, e.Stop()) + assert.NoError(t, e.Stop()) }) return config } @@ -154,12 +154,11 @@ func startController(ctx context.Context, t *testing.T, config *rest.Config, cli require.NoError(t, err) opt := ctrlconfig.NewOptions() - opt.UpdateStatus = false - opt.WatchGateway = false opt.MasterWorker = true opt.LocalFSPrefix = "/tmp/haproxy-ingress" opt.PublishAddress = "127.0.0.1" opt.ConfigMap = "default/ingress-controller" + os.Setenv("POD_NAMESPACE", "default") ctx, cancel := context.WithCancel(ctx) cfg, err := ctrlconfig.CreateWithConfig(ctx, config, opt) require.NoError(t, err) @@ -167,7 +166,7 @@ func startController(ctx context.Context, t *testing.T, config *rest.Config, cli done := make(chan bool) go func() { err := launch.Run(cfg) - require.NoError(t, err) + assert.NoError(t, err) done <- true }() @@ -201,7 +200,9 @@ func (f *framework) Request(ctx context.Context, t *testing.T, method, host, pat if opt.ExpectResponseCode > 0 { require.EventuallyWithT(t, func(collect *assert.CollectT) { res, err = cli.Do(req) - require.NoError(collect, err) + if !assert.NoError(collect, err) { + return + } assert.Equal(collect, opt.ExpectResponseCode, res.StatusCode) }, 5*time.Second, time.Second) } else { @@ -234,6 +235,10 @@ func (f *framework) Request(ctx context.Context, t *testing.T, method, host, pat } } +func (f *framework) Client() client.WithWatch { + return f.cli +} + func (f *framework) CreateService(ctx context.Context, t *testing.T, serverPort int32, o ...options.Object) *corev1.Service { opt := options.ParseObjectOptions(o...) data := ` @@ -265,7 +270,7 @@ spec: svc.Namespace = "default" svc.Name = name err := f.cli.Delete(ctx, &svc) - require.NoError(t, client.IgnoreNotFound(err)) + assert.NoError(t, client.IgnoreNotFound(err)) }) return svc } @@ -301,7 +306,7 @@ subsets: ep.Namespace = "default" ep.Name = name err := f.cli.Delete(ctx, &ep) - require.NoError(t, client.IgnoreNotFound(err)) + assert.NoError(t, client.IgnoreNotFound(err)) }) return ep } @@ -362,7 +367,7 @@ spec: ing.Namespace = "default" ing.Name = name err := f.cli.Delete(ctx, &ing) - require.NoError(t, client.IgnoreNotFound(err)) + assert.NoError(t, client.IgnoreNotFound(err)) }) return ing } @@ -383,7 +388,7 @@ func (f *framework) CreateHTTPServer(ctx context.Context, t *testing.T) int32 { } } _, err := w.Write([]byte(content)) - require.NoError(t, err) + assert.NoError(t, err) }) serverPort := int32(32768 + rand.Intn(32767)) @@ -401,7 +406,7 @@ func (f *framework) CreateHTTPServer(ctx context.Context, t *testing.T) int32 { t.Cleanup(func() { err := server.Shutdown(context.Background()) - require.NoError(t, err) + assert.NoError(t, err) <-done }) return serverPort diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 1b0d38000..ff03120d6 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -5,8 +5,10 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" ingtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/ingress/types" "github.com/jcmoraisjr/haproxy-ingress/tests/framework" @@ -50,4 +52,24 @@ func TestIntegration(t *testing.T) { assert.False(t, res.EchoResponse) assert.Equal(t, fmt.Sprintf("https://%s/", f.Host(ing)), res.HTTPResponse.Header.Get("location")) }) + + t.Run("should take leader", func(t *testing.T) { + t.Parallel() + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + events := corev1.EventList{} + err := f.Client().List(ctx, &events) + if !assert.NoError(collect, err) { + return + } + for _, event := range events.Items { + lease := event.InvolvedObject + t.Logf("lease: %+v message: %s", lease, event.Message) + if lease.Kind == "Lease" && lease.Namespace == "default" && lease.Name == "class-haproxy.haproxy-ingress.github.io" { + assert.Regexp(collect, `became leader$`, event.Message) + return + } + } + assert.Fail(collect, "lease event not found") + }, 10*time.Second, time.Second) + }) }