Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move leader election to a self managed service #1087

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/content/en/docs/configuration/command-line.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ The ID to be used for electing ingress controller leader. A leader needs to be e
* Embedded Acme signer, see [acme](#acme)
* Gateway API, see [`--watch-gateway`](#watch-gateway)

Defaults to `fc5ae9f3.haproxy-ingress.github.io` if not configured.
Election ID configuration has no efect if none of Ingress Status update, Embedded Acme signer, or Gateway API are enabled.

Since v0.15 a `%s` placeholder is used to define where the IngressClass value should be added to the election ID. Up to v0.14 the IngressClass was concatenated in the end of the provided value to compose the real election ID value. Ingress class is added to the election ID name to avoid conflict when two or more HAProxy Ingress controllers are running in the same cluster.

Election ID defaults to `class-%s.haproxy-ingress.github.io` if not configured, which is rendered to `class-haproxy.haproxy-ingress.github.io` if the IngressClass is not changed from the default value.

---

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.21.0
golang.org/x/sync v0.5.0
gopkg.in/go-playground/pool.v3 v3.1.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
16 changes: 14 additions & 2 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,24 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
podNamespace := os.Getenv("POD_NAMESPACE")
podName := os.Getenv("POD_NAME")

// we could `|| hasGateway[version...]` instead of `|| *watchGateway` here,
// we could `|| hasGateway[version...]` instead of `|| opt.WatchGateway` here,
// but we're choosing a consistent startup behavior despite of the cluster configuration.
election := opt.UpdateStatus || opt.AcmeServer || opt.WatchGateway
if election && podNamespace == "" {
return nil, fmt.Errorf("POD_NAMESPACE envvar should be configured when --update-status=true, --acme-server=true, or --watch-gateway=true")
}
if election && opt.IngressClass == "" {
return nil, fmt.Errorf("--ingress-class should not be empty when --update-status=true, --acme-server=true, or --watch-gateway=true")
}
var electionID string
if election {
if strings.Contains(opt.ElectionID, "%s") {
electionID = fmt.Sprintf(opt.ElectionID, opt.IngressClass)
} else {
// backward compatibility behavior
electionID = opt.ElectionID + "-" + opt.IngressClass
}
}

if opt.UpdateStatus && podName == "" && opt.PublishSvc == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 {
return nil, fmt.Errorf("one of --publish-service, --publish-address or POD_NAME envvar should be configured when --update-status=true")
Expand Down Expand Up @@ -449,7 +461,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
DisableExternalName: opt.DisableExternalName,
DisableKeywords: disableKeywords,
Election: election,
ElectionID: opt.ElectionID,
ElectionID: electionID,
ElectionNamespace: podNamespace,
EnableEndpointSliceAPI: opt.EnableEndpointSlicesAPI,
ForceNamespaceIsolation: opt.ForceIsolation,
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewOptions() *Options {
Profiling: true,
VerifyHostname: true,
UpdateStatus: true,
ElectionID: "fc5ae9f3.haproxy-ingress.github.io",
ElectionID: "class-%s.haproxy-ingress.github.io",
ShutdownTimeout: 25 * time.Second,
UpdateStatusOnShutdown: true,
LogLevel: 2,
Expand Down Expand Up @@ -345,7 +345,10 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
)

fs.StringVar(&o.ElectionID, "election-id", o.ElectionID, ""+
"Election id to be used for status update and certificate sign.",
"Election ID to be used for status update and certificate signing. An optional "+
"%s is used as a placeholder for the IngressClass name, and if not provided, the "+
"IngressClass is concatenated in the end of the provided value to compose the "+
"real Election ID, for backward compatibility.",
)

fs.StringVar(&o.WaitBeforeShutdown, "wait-before-shutdown", o.WaitBeforeShutdown, ""+
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/launch/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ func Run(cfg *config.Config) error {
mgr, err := ctrl.NewManager(cfg.KubeConfig, ctrl.Options{
Logger: rootLogger.WithName("manager"),
Scheme: cfg.Scheme,
LeaderElection: cfg.Election,
LeaderElectionID: cfg.ElectionID,
LeaderElectionNamespace: cfg.ElectionNamespace,
GracefulShutdownTimeout: cfg.ShutdownTimeout,
HealthProbeBindAddress: "0",
Metrics: server.Options{
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller/reconciler/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config"
)

func createRateLimiter(config *config.Config) ratelimiter.RateLimiter {
func createRateLimiter(cfg *config.Config) ratelimiter.RateLimiter {
return &rateLimiter{
mu: sync.Mutex{},
delta: time.Duration(float64(time.Second) / config.RateLimitUpdate),
wait: config.WaitBeforeUpdate,
delta: time.Duration(float64(time.Second) / cfg.RateLimitUpdate),
wait: cfg.WaitBeforeUpdate,
}
}

Expand All @@ -40,7 +39,7 @@ type rateLimiter struct {
last time.Time
}

func (r *rateLimiter) When(item interface{}) time.Duration {
func (r *rateLimiter) When(_ interface{}) time.Duration {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config"
"github.com/jcmoraisjr/haproxy-ingress/pkg/controller/services"
ctrlutils "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/utils"
)

// IngressReconciler ...
Expand All @@ -48,7 +47,7 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

func (r *IngressReconciler) leaderChanged(isLeader bool) {
if isLeader {
if isLeader && r.watchers.running() {
changed := r.watchers.getChangedObjects()
changed.NeedFullSync = true
r.Services.ReconcileIngress(changed)
Expand Down Expand Up @@ -78,5 +77,5 @@ func (r *IngressReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
}
}
r.Services.LeaderChangedSubscriber(r.leaderChanged)
return mgr.Add(ctrlutils.DistributedService(c))
return mgr.Add(c)
}
10 changes: 7 additions & 3 deletions pkg/controller/reconciler/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (

func createWatchers(ctx context.Context, cfg *config.Config, val services.IsValidResource) *watchers {
w := &watchers{
mu: sync.Mutex{},
log: logr.FromContextOrDiscard(ctx).WithName("watchers"),
cfg: cfg,
val: val,
Expand Down Expand Up @@ -104,6 +103,12 @@ func (w *watchers) initCh() {
w.ch.Links = types.TrackingLinks{}
}

func (w *watchers) running() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.run
}

func (w *watchers) handlersCore() []*hdlr {
cmChange := func(o client.Object) {
cm := o.(*api.ConfigMap)
Expand Down Expand Up @@ -487,8 +492,7 @@ func (h *hdlr) compose(ev string, obj client.Object) {
} else {
fullname = obj.GetName()
}
ns := obj.GetNamespace()
if ns != "" {
if ns := obj.GetNamespace(); ns != "" {
fullname = ns + "/" + fullname
}
ch := h.w.ch
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/services/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
clientcache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
Expand Down Expand Up @@ -69,11 +68,11 @@ type c struct {
status svcStatusUpdateFnc
}

var errGatewayA2Disabled = fmt.Errorf("Gateway API v1alpha2 wasn't initialized")
var errGatewayB1Disabled = fmt.Errorf("Gateway API v1beta1 wasn't initialized")
var errGatewayA2Disabled = fmt.Errorf("gateway API v1alpha2 wasn't initialized")
var errGatewayB1Disabled = fmt.Errorf("gateway API v1beta1 wasn't initialized")

func (c *c) get(key string, obj client.Object) error {
ns, n, err := clientcache.SplitMetaNamespaceKey(key)
ns, n, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
Expand All @@ -93,7 +92,7 @@ func (c *c) createOrUpdate(obj client.Object) error {
}

func buildResourceName(defaultNamespace, kind, resourceName string, allowCrossNamespace bool) (string, string, error) {
ns, name, err := clientcache.SplitMetaNamespaceKey(resourceName)
ns, name, err := cache.SplitMetaNamespaceKey(resourceName)
if err != nil {
return "", "", err
}
Expand Down Expand Up @@ -641,7 +640,7 @@ func (c *c) GetKey() (crypto.Signer, error) {

// implements acme.Cache
func (c *c) SetToken(domain string, uri, token string) error {
namespace, name, err := clientcache.SplitMetaNamespaceKey(c.config.AcmeTokenConfigMapName)
namespace, name, err := cache.SplitMetaNamespaceKey(c.config.AcmeTokenConfigMapName)
if err != nil {
return err
}
Expand Down
37 changes: 20 additions & 17 deletions pkg/controller/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func (s *Services) setup(ctx context.Context) error {
}
tracker := tracker.NewTracker()
metrics := createMetrics(cfg.BucketsResponseTime)
svcleader := initSvcLeader(ctx)
svcleader, err := initSvcLeader(ctx, cfg)
if err != nil {
return err
}
svchealthz := initSvcHealthz(ctx, cfg, metrics, s.acmeExternalCallCheck)
svcstatus := initSvcStatusUpdater(ctx, s.Client)
cache := createCacheFacade(ctx, s.Client, cfg, tracker, sslCerts, dynConfig, svcstatus.update)
Expand Down Expand Up @@ -187,42 +190,42 @@ func (s *Services) withManager(mgr ctrl.Manager) error {
if err := mgr.Add(s.svcleader); err != nil {
return err
}
if err := mgr.Add(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil {
if err := s.svcleader.addRunnable(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil {
return err
}
if s.svcstatusing != nil {
if err := mgr.Add(s.svcstatusing); err != nil {
if err := s.svcleader.addRunnable(s.svcstatusing); err != nil {
return err
}
}
if s.acmeClient != nil {
if err := s.svcleader.addRunnable(s.acmeClient); err != nil {
return err
}
}
}
if s.reloadQueue != nil {
if err := mgr.Add(ctrlutils.DistributedService(&svcReloadQueue{
if err := mgr.Add(&svcReloadQueue{
queue: s.reloadQueue,
})); err != nil {
}); err != nil {
return err
}
}
if s.Config.StatsCollectProcPeriod > 0 {
if err := mgr.Add(ctrlutils.DistributedService(&svcCalcIdle{
if err := mgr.Add(&svcCalcIdle{
instance: s.instance,
period: s.Config.StatsCollectProcPeriod,
})); err != nil {
return err
}
}
if s.acmeClient != nil {
if err := mgr.Add(s.acmeClient); err != nil {
}); err != nil {
return err
}
}
if s.acmeServer != nil {
if err := mgr.Add(ctrlutils.DistributedService(s.acmeServer)); err != nil {
if err := mgr.Add(s.acmeServer); err != nil {
return err
}
}
if s.svchealthz != nil {
if err := mgr.Add(ctrlutils.DistributedService(s.svchealthz)); err != nil {
if err := mgr.Add(s.svchealthz); err != nil {
return err
}
}
Expand All @@ -239,7 +242,7 @@ func (s *Services) acmePeriodicCheck() (count int, err error) {

// LeaderChangedSubscriber ...
func (s *Services) LeaderChangedSubscriber(f SvcLeaderChangedFnc) {
s.svcleader.addsubscriber(f)
s.svcleader.addSubscriber(f)
}

// GetIsValidResource ...
Expand All @@ -255,15 +258,15 @@ func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) {
s.log.Info("starting haproxy update", "id", s.updateCount)
timer := utils.NewTimer(s.metrics.ControllerProcTime)
converters.NewConverter(timer, s.instance.Config(), changed, s.converterOpt).Sync()
if s.svcleader.getIsLeader() {
if s.svcleader.isLeader() {
s.instance.AcmeUpdate()
}
s.instance.HAProxyUpdate(timer)
s.log.WithValues("id", s.updateCount).WithValues(timer.AsValues("total")...).Info("finish haproxy update")
}

func (s *Services) acmeCheck(source string) (count int, err error) {
if !s.svcleader.getIsLeader() {
if !s.svcleader.isLeader() {
err = fmt.Errorf("cannot check acme certificates, this controller is not the leader")
s.log.Error(err, "error checking acme certificates")
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/services/svcacme.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *svcAcmeClient) Start(ctx context.Context) error {

// implements utils.QueueFacade
func (s *svcAcmeClient) Add(item interface{}) {
if s.leader.getIsLeader() {
if s.leader.isLeader() {
s.queue.Add(item)
}
}
Expand Down
Loading
Loading