Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status update via merge-patch strategy #1091

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
gwapiversioned "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -218,7 +217,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
configLog.Info("watching endpointslices - --enable-endpointslices-api is true")
}

if opt.PublishSvc != "" && opt.PublishAddress != "" {
if opt.PublishService != "" && opt.PublishAddress != "" {
return nil, fmt.Errorf("configure only one of --publish-service or --publish-address")
}

Expand Down Expand Up @@ -256,7 +255,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
}
}

if opt.UpdateStatus && podName == "" && opt.PublishSvc == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 {
if opt.UpdateStatus && podName == "" && opt.PublishService == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 {
return nil, fmt.Errorf("one of --publish-service, --publish-address or POD_NAME envvar should be configured when --update-status=true")
}

Expand Down Expand Up @@ -317,34 +316,29 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
configLog.Info("using default backend", "service", opt.DefaultSvc)
}

if opt.PublishSvc != "" {
ns, name, err := cache.SplitMetaNamespaceKey(opt.PublishSvc)
if svc := opt.PublishService; svc != "" {
ns, name, err := cache.SplitMetaNamespaceKey(svc)
if err != nil {
return nil, fmt.Errorf("invalid service format: %w", err)
}
svc, err := client.CoreV1().Services(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting information about service '%s': %w", opt.PublishSvc, err)
return nil, fmt.Errorf("error getting information about service '%s': %w", svc, err)
}
if len(svc.Status.LoadBalancer.Ingress) == 0 {
if len(svc.Spec.ExternalIPs) == 0 {
return nil, fmt.Errorf("service '%s' does not (yet) have ingress points", opt.PublishSvc)
return nil, fmt.Errorf("service '%s' does not (yet) have ingress points", svc)
}
configLog.Info("service validated as assigned with externalIP", "service", opt.PublishSvc)
configLog.Info("service validated as assigned with externalIP", "service", svc)
} else {
configLog.Info("service validated as source of Ingress status", "service", opt.PublishSvc)
configLog.Info("service validated as source of Ingress status", "service", svc)
}
}

var watchNamespaces map[string]crcache.Config
if opt.WatchNamespace != "" {
_, err := client.NetworkingV1().Ingresses(opt.WatchNamespace).List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return nil, fmt.Errorf("no watchNamespace with name '%s' found: %w", opt.WatchNamespace, err)
}
watchNamespaces = make(map[string]crcache.Config)
for _, ns := range strings.Split(opt.WatchNamespace, ",") {
watchNamespaces[ns] = crcache.Config{}
return nil, fmt.Errorf("no namespace with name '%s' found: %w", opt.WatchNamespace, err)
}
} else {
_, err := client.CoreV1().Services("default").List(ctx, metav1.ListOptions{})
Expand Down Expand Up @@ -481,7 +475,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
Profiling: opt.Profiling,
PublishAddressHostnames: publishAddressHostnames,
PublishAddressIPs: publishAddressIPs,
PublishService: opt.PublishSvc,
PublishService: opt.PublishService,
RateLimitUpdate: opt.RateLimitUpdate,
ReadyzURL: opt.ReadyzURL,
ReloadInterval: opt.ReloadInterval,
Expand All @@ -503,7 +497,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
VersionInfo: versionInfo,
WaitBeforeUpdate: opt.WaitBeforeUpdate,
WatchIngressWithoutClass: opt.WatchIngressWithoutClass,
WatchNamespaces: watchNamespaces,
WatchNamespace: opt.WatchNamespace,
}, nil
}

Expand Down Expand Up @@ -681,5 +675,5 @@ type Config struct {
VersionInfo version.Info
WaitBeforeUpdate time.Duration
WatchIngressWithoutClass bool
WatchNamespaces map[string]crcache.Config
WatchNamespace string
}
7 changes: 3 additions & 4 deletions pkg/controller/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Options struct {
AcmeTokenConfigMapName string
AcmeTrackTLSAnn bool
BucketsResponseTime []float64
PublishSvc string
PublishService string
PublishAddress string
TCPConfigMapName string
AnnPrefix string
Expand Down Expand Up @@ -249,7 +249,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
"the haproxy's admin socket. The response time unit is in seconds.",
)

fs.StringVar(&o.PublishSvc, "publish-service", o.PublishSvc, ""+
fs.StringVar(&o.PublishService, "publish-service", o.PublishService, ""+
"Service fronting the ingress controllers. Takes the form namespace/name. The "+
"controller will set the endpoint records on the ingress objects to reflect "+
"those on the service.",
Expand Down Expand Up @@ -297,8 +297,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
)

fs.StringVar(&o.WatchNamespace, "watch-namespace", o.WatchNamespace, ""+
"Comma-separated list of namespaces to watch for Ingress. Default is to watch "+
"all namespaces",
"Namespace to watch for Ingress. Default is to watch all namespaces.",
)

fs.DurationVar(&o.StatsCollectProcPeriod, "stats-collect-processing-period", o.StatsCollectProcPeriod, ""+
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/launch/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func Run(cfg *config.Config) error {
ctx := cfg.RootContext

launchLog.Info("configuring manager")
var defaultNamespaces map[string]cache.Config
if cfg.WatchNamespace != "" {
defaultNamespaces = map[string]cache.Config{cfg.WatchNamespace: {}}
}
mgr, err := ctrl.NewManager(cfg.KubeConfig, ctrl.Options{
Logger: rootLogger.WithName("manager"),
Scheme: cfg.Scheme,
Expand All @@ -45,7 +49,7 @@ func Run(cfg *config.Config) error {
},
Cache: cache.Options{
SyncPeriod: cfg.ResyncPeriod,
DefaultNamespaces: cfg.WatchNamespaces,
DefaultNamespaces: defaultNamespaces,
},
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ type IngressReconciler struct {
// Reconcile ...
func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
changed := r.watchers.getChangedObjects()
r.Services.ReconcileIngress(changed)
r.Services.ReconcileIngress(ctx, changed)
return ctrl.Result{}, nil
}

func (r *IngressReconciler) leaderChanged(isLeader bool) {
func (r *IngressReconciler) leaderChanged(ctx context.Context, isLeader bool) {
if isLeader && r.watchers.running() {
changed := r.watchers.getChangedObjects()
changed.NeedFullSync = true
r.Services.ReconcileIngress(changed)
r.Services.ReconcileIngress(ctx, changed)
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/reconciler/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func (w *watchers) handlersCore() []*hdlr {
predicate.Or(
predicate.AnnotationChangedPredicate{},
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(func(object client.Object) bool {
if w.cfg.PublishService == "" {
return false
}
svc := object.(*api.Service)
return svc.Namespace+"/"+svc.Name == w.cfg.PublishService
}),
),
},
},
Expand Down
10 changes: 4 additions & 6 deletions pkg/controller/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ func (s *Services) setup(ctx context.Context) error {
svchealthz := initSvcHealthz(ctx, cfg, metrics, s.acmeExternalCallCheck)
svcstatus := initSvcStatusUpdater(ctx, s.Client)
cache := createCacheFacade(ctx, s.Client, cfg, tracker, sslCerts, dynConfig, svcstatus.update)
var svcstatusing *svcStatusIng
if cfg.UpdateStatus {
svcstatusing = initSvcStatusIng(ctx, cfg, s.Client, cache, svcstatus.update)
}
svcstatusing := initSvcStatusIng(ctx, cfg, s.Client, cache, svcstatus.update)
var acmeClient *svcAcmeClient
var acmeServer *svcAcmeServer
var acmeSigner acme.Signer
Expand Down Expand Up @@ -193,7 +190,7 @@ func (s *Services) withManager(mgr ctrl.Manager) error {
if err := s.svcleader.addRunnable(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil {
return err
}
if s.svcstatusing != nil {
if s.Config.UpdateStatus {
if err := s.svcleader.addRunnable(s.svcstatusing); err != nil {
return err
}
Expand Down Expand Up @@ -251,7 +248,7 @@ func (s *Services) GetIsValidResource() IsValidResource {
}

// ReconcileIngress ...
func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) {
func (s *Services) ReconcileIngress(ctx context.Context, changed *convtypes.ChangedObjects) {
s.modelMutex.Lock()
defer s.modelMutex.Unlock()
s.updateCount++
Expand All @@ -262,6 +259,7 @@ func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) {
s.instance.AcmeUpdate()
}
s.instance.HAProxyUpdate(timer)
s.svcstatusing.changed(ctx, changed)
s.log.WithValues("id", s.updateCount).WithValues(timer.AsValues("total")...).Info("finish haproxy update")
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/services/svcleader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

// SvcLeaderChangedFnc ...
type SvcLeaderChangedFnc func(isLeader bool)
type SvcLeaderChangedFnc func(ctx context.Context, isLeader bool)

func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error) {
r, err := initRecorderProvider(cfg)
Expand All @@ -59,6 +59,7 @@ func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error)
}

s := &svcLeader{
ctx: ctx,
log: logr.FromContextOrDiscard(ctx).WithName("leader"),
}

Expand All @@ -83,6 +84,7 @@ func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error)
}

type svcLeader struct {
ctx context.Context
le *leaderelection.LeaderElector
log logr.Logger
runnables []manager.Runnable
Expand Down Expand Up @@ -114,13 +116,13 @@ func (s *svcLeader) OnStartedLeading(ctx context.Context) {
s.rcancel = cancel

for _, f := range s.subscribers {
go f(true)
go f(ctx, true)
}
}

func (s *svcLeader) OnStoppedLeading() {
for _, f := range s.subscribers {
go f(false)
go f(s.ctx, false)
}

if s.rcancel != nil && s.rgroup != nil {
Expand Down
66 changes: 25 additions & 41 deletions pkg/controller/services/svcstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/jcmoraisjr/haproxy-ingress/pkg/utils"
Expand All @@ -41,15 +40,27 @@ func initSvcStatusUpdater(ctx context.Context, client client.Client) *svcStatusU
}

type svcStatusUpdater struct {
client client.Client
ctx context.Context
isleader bool
log logr.Logger
queue utils.Queue
client client.Client
ctx context.Context
run bool
log logr.Logger
queue utils.Queue
}

func (s *svcStatusUpdater) Start(ctx context.Context) error {
s.ctx = ctx
s.run = true
s.queue.RunWithContext(ctx)
s.run = false
return nil
}

func (s *svcStatusUpdater) CanShutdown() bool {
return s.queue.Len() == 0
}

func (s *svcStatusUpdater) update(obj client.Object) {
if s.isleader {
if s.run {
s.queue.Add(obj)
}
}
Expand All @@ -59,41 +70,14 @@ func (s *svcStatusUpdater) notify(item interface{}) error {
namespace := obj.GetNamespace()
name := obj.GetName()
log := s.log.WithValues("kind", reflect.TypeOf(obj), "namespace", namespace, "name", name)
if err := s.client.Status().Update(s.ctx, obj); err != nil {
// usually `obj` is up to date, but in case of a concurrent
// update, we'll refresh the object into a new instance and
// copy the updated status to it.
typ := reflect.TypeOf(obj)
if typ.Kind() == reflect.Pointer {
typ = typ.Elem()
}
new := reflect.New(typ).Interface().(client.Object)
if err := s.client.Get(s.ctx, types.NamespacedName{Namespace: namespace, Name: name}, new); err != nil {
log.Error(err, "cannot read status")
return err
}
// a reflection trick to copy the updated status from the outdated object to the new updated one
reflect.ValueOf(new).Elem().FieldByName("Status").Set(
reflect.ValueOf(obj).Elem().FieldByName("Status"))
if err := s.client.Status().Update(s.ctx, new); err != nil {
log.Error(err, "cannot update status")
return err
}

from := obj.DeepCopyObject().(client.Object)
reflect.ValueOf(from).Elem().FieldByName("Status").SetZero()
if err := s.client.Status().Patch(s.ctx, obj, client.MergeFrom(from)); err != nil {
log.Error(err, "cannot update status")
return err
}
log.V(1).Info("status updated")
return nil
}

func (s *svcStatusUpdater) Start(ctx context.Context) error {
s.ctx = ctx
s.isleader = true
s.queue.RunWithContext(ctx)
s.isleader = false
// s.ctx wasn't cleaned up here so lazy notifications
// doesn't crashloop due to nil ctx.
log.V(1).Info("status updated")
return nil
}

func (s *svcStatusUpdater) CanShutdown() bool {
return s.queue.Len() == 0
}
Loading
Loading