Skip to content

Commit

Permalink
status update via merge-patch strategy
Browse files Browse the repository at this point in the history
Use merge-patch to update resource status, which simplifies compared
with update, and also does not conflict in the case of concurrent
changes.
  • Loading branch information
jcmoraisjr committed Mar 12, 2024
1 parent 4c0bfe1 commit c109070
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 52 deletions.
66 changes: 25 additions & 41 deletions pkg/controller/services/svcstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/jcmoraisjr/haproxy-ingress/pkg/utils"
Expand All @@ -41,15 +40,27 @@ func initSvcStatusUpdater(ctx context.Context, client client.Client) *svcStatusU
}

type svcStatusUpdater struct {
client client.Client
ctx context.Context
isleader bool
log logr.Logger
queue utils.Queue
client client.Client
ctx context.Context
running bool
log logr.Logger
queue utils.Queue
}

func (s *svcStatusUpdater) Start(ctx context.Context) error {
s.ctx = ctx
s.running = true
s.queue.RunWithContext(ctx)
s.running = false
return nil
}

func (s *svcStatusUpdater) CanShutdown() bool {
return s.queue.Len() == 0
}

func (s *svcStatusUpdater) update(obj client.Object) {
if s.isleader {
if s.running {
s.queue.Add(obj)
}
}
Expand All @@ -59,41 +70,14 @@ func (s *svcStatusUpdater) notify(item interface{}) error {
namespace := obj.GetNamespace()
name := obj.GetName()
log := s.log.WithValues("kind", reflect.TypeOf(obj), "namespace", namespace, "name", name)
if err := s.client.Status().Update(s.ctx, obj); err != nil {
// usually `obj` is up to date, but in case of a concurrent
// update, we'll refresh the object into a new instance and
// copy the updated status to it.
typ := reflect.TypeOf(obj)
if typ.Kind() == reflect.Pointer {
typ = typ.Elem()
}
new := reflect.New(typ).Interface().(client.Object)
if err := s.client.Get(s.ctx, types.NamespacedName{Namespace: namespace, Name: name}, new); err != nil {
log.Error(err, "cannot read status")
return err
}
// a reflection trick to copy the updated status from the outdated object to the new updated one
reflect.ValueOf(new).Elem().FieldByName("Status").Set(
reflect.ValueOf(obj).Elem().FieldByName("Status"))
if err := s.client.Status().Update(s.ctx, new); err != nil {
log.Error(err, "cannot update status")
return err
}

from := obj.DeepCopyObject().(client.Object)
reflect.ValueOf(from).Elem().FieldByName("Status").SetZero()
if err := s.client.Status().Patch(s.ctx, obj, client.MergeFrom(from)); err != nil {
log.Error(err, "cannot update status")
return err
}
log.V(1).Info("status updated")
return nil
}

func (s *svcStatusUpdater) Start(ctx context.Context) error {
s.ctx = ctx
s.isleader = true
s.queue.RunWithContext(ctx)
s.isleader = false
// s.ctx wasn't cleaned up here so lazy notifications
// doesn't crashloop due to nil ctx.
log.V(1).Info("status updated")
return nil
}

func (s *svcStatusUpdater) CanShutdown() bool {
return s.queue.Len() == 0
}
9 changes: 4 additions & 5 deletions pkg/controller/services/svcstatusing.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,10 @@ func (s *svcStatusIng) sync(ctx context.Context) {
}
} else {
// fall back to an empty list and log an error if everything else failed
s.log.Error(nil,
"cannot configure ingress status due to a failure reading the published hostnames/IPs; "+
"either fix the configuration or the permission failures, "+
"configure --publish-service or --publish-address command-line options, "+
"or disable status update with --update-status=false")
s.log.Error(fmt.Errorf("cannot configure ingress status due to a failure reading the published hostnames/IPs"), ""+
"either fix the configuration or the permission failures, "+
"configure --publish-service or --publish-address command-line options, "+
"or disable status update with --update-status=false")
}
sort.Slice(lb, func(i, j int) bool {
if lb[i].Hostname == lb[j].Hostname {
Expand Down
16 changes: 10 additions & 6 deletions tests/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import (
"github.com/jcmoraisjr/haproxy-ingress/tests/framework/options"
)

const (
PublishAddress = "10.0.1.1"
)

func NewFramework(ctx context.Context, t *testing.T) *framework {
wd, err := os.Getwd()
require.NoError(t, err)
Expand Down Expand Up @@ -63,18 +67,18 @@ func NewFramework(ctx context.Context, t *testing.T) *framework {
cli, err := client.NewWithWatch(config, client.Options{Scheme: scheme})
require.NoError(t, err)

startController(ctx, t, config, cli)

return &framework{
scheme: scheme,
codec: codec,
config: config,
cli: cli,
}
}

type framework struct {
scheme *runtime.Scheme
codec serializer.CodecFactory
config *rest.Config
cli client.WithWatch
}

Expand Down Expand Up @@ -119,7 +123,7 @@ func startApiserver(t *testing.T) *rest.Config {
return config
}

func startController(ctx context.Context, t *testing.T, config *rest.Config, cli client.WithWatch) {
func (f *framework) StartController(ctx context.Context, t *testing.T) {
t.Log("starting controller")

err := os.RemoveAll("/tmp/haproxy-ingress")
Expand Down Expand Up @@ -150,17 +154,17 @@ func startController(ctx context.Context, t *testing.T, config *rest.Config, cli
"http-port": "18080",
"https-port": "18443",
}
err = cli.Create(ctx, &global)
err = f.cli.Create(ctx, &global)
require.NoError(t, err)

opt := ctrlconfig.NewOptions()
opt.MasterWorker = true
opt.LocalFSPrefix = "/tmp/haproxy-ingress"
opt.PublishAddress = "127.0.0.1"
opt.PublishAddress = PublishAddress
opt.ConfigMap = "default/ingress-controller"
os.Setenv("POD_NAMESPACE", "default")
ctx, cancel := context.WithCancel(ctx)
cfg, err := ctrlconfig.CreateWithConfig(ctx, config, opt)
cfg, err := ctrlconfig.CreateWithConfig(ctx, f.config, opt)
require.NoError(t, err)

done := make(chan bool)
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

ingtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/ingress/types"
"github.com/jcmoraisjr/haproxy-ingress/tests/framework"
Expand All @@ -21,6 +24,17 @@ func TestIntegration(t *testing.T) {
f := framework.NewFramework(ctx, t)
httpPort := f.CreateHTTPServer(ctx, t)

lbingpre1 := "127.0.0.1"
require.NotEqual(t, framework.PublishAddress, lbingpre1)

svcpre1 := f.CreateService(ctx, t, httpPort)
ingpre1 := f.CreateIngress(ctx, t, svcpre1)
ingpre1.Status.LoadBalancer.Ingress = []networkingv1.IngressLoadBalancerIngress{{IP: lbingpre1}}
err := f.Client().Status().Update(ctx, ingpre1)
require.NoError(t, err)

f.StartController(ctx, t)

t.Run("hello world", func(t *testing.T) {
t.Parallel()
svc := f.CreateService(ctx, t, httpPort)
Expand Down Expand Up @@ -72,4 +86,34 @@ func TestIntegration(t *testing.T) {
assert.Fail(collect, "lease event not found")
}, 10*time.Second, time.Second)
})

expectedIngressStatus := networkingv1.IngressStatus{
LoadBalancer: networkingv1.IngressLoadBalancerStatus{
Ingress: []networkingv1.IngressLoadBalancerIngress{{IP: framework.PublishAddress}},
},
}

t.Run("should update ingress status", func(t *testing.T) {
t.Parallel()
svc := f.CreateService(ctx, t, httpPort)
ing := f.CreateIngress(ctx, t, svc)
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
err := f.Client().Get(ctx, client.ObjectKeyFromObject(ing), ing)
if !assert.NoError(collect, err) {
return
}
assert.Equal(collect, expectedIngressStatus, ing.Status)
}, 10*time.Second, time.Second)
})

t.Run("should override old status", func(t *testing.T) {
t.Parallel()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
err := f.Client().Get(ctx, client.ObjectKeyFromObject(ingpre1), ingpre1)
if !assert.NoError(collect, err) {
return
}
assert.Equal(collect, expectedIngressStatus, ingpre1.Status)
}, 10*time.Second, time.Second)
})
}

0 comments on commit c109070

Please sign in to comment.