Skip to content

Commit

Permalink
chore: add reconcile event for accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
katallaxie authored Jan 22, 2025
1 parent f3190ee commit 98f163d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 9 deletions.
83 changes: 74 additions & 9 deletions controllers/account_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package controllers

import (
"context"
"math"
"sync"
"time"

natsv1alpha1 "github.com/zeiss/natz-operator/api/v1alpha1"

"github.com/nats-io/nats.go"
"github.com/zeiss/pkg/conv"
"github.com/zeiss/pkg/slices"
"github.com/zeiss/pkg/utilx"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// NatsAccountServer takes NatsAccount and serves them to a nats server (cluster)
Expand All @@ -22,6 +28,7 @@ type NatsAccountServer struct {
Scheme *runtime.Scheme
accounts sync.Map
nc *nats.Conn
Recorder record.EventRecorder
}

//+kubebuilder:rbac:groups=natz.zeiss.com,resources=natsaccounts,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -48,8 +55,8 @@ func (r *NatsAccountServer) GetJWT(publicKey string) (string, bool) {
// Reconcile ...
func (r *NatsAccountServer) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

account := &natsv1alpha1.NatsAccount{}

if err := r.Get(ctx, req.NamespacedName, account); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
Expand All @@ -61,24 +68,82 @@ func (r *NatsAccountServer) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Info("reconciling account", "account", account.Name)

if account.DeletionTimestamp != nil {
r.accounts.Delete(account.Status.PublicKey)
return r.reconcileDelete(ctx, account)
}

return ctrl.Result{}, nil
err := r.reconcileAccount(ctx, account)
if err != nil {
return r.ManageError(ctx, account, err)
}

if utilx.And(utilx.NotEmpty(account.Status.JWT), utilx.NotEmpty(account.Status.PublicKey)) {
r.accounts.Store(account.Status.PublicKey, account.Status.JWT)
return r.ManageSuccess(ctx, account)
}

func (r *NatsAccountServer) reconcileDelete(_ context.Context, obj *natsv1alpha1.NatsAccount) (ctrl.Result, error) {
r.accounts.Delete(obj.Status.PublicKey)

if err := r.nc.Publish("$SYS.REQ.CLAIMS.UPDATE", []byte(account.Status.JWT)); err != nil {
logger.Info("failed to publish claims update", "account", account.Name, "err", err)
return ctrl.Result{}, nil
}

return ctrl.Result{}, err
}
func (r *NatsAccountServer) reconcileAccount(_ context.Context, obj *natsv1alpha1.NatsAccount) error {
if !r.IsSynchronized(obj) {
return nil
}

if r.IsCreating(obj) {
return nil
}

if utilx.And(utilx.NotEmpty(obj.Status.JWT), utilx.NotEmpty(obj.Status.PublicKey)) {
r.accounts.Store(obj.Status.PublicKey, obj.Status.JWT)

return r.nc.Publish("$SYS.REQ.CLAIMS.UPDATE", []byte(obj.Status.JWT))
}

return nil
}

// IsCreating ...
func (r *NatsAccountServer) IsCreating(obj *natsv1alpha1.NatsAccount) bool {
return utilx.Or(obj.Status.Conditions == nil, slices.Len(obj.Status.Conditions) == 0)
}

// IsSynchronized ...
func (r *NatsAccountServer) IsSynchronized(obj *natsv1alpha1.NatsAccount) bool {
return obj.Status.Phase == natsv1alpha1.AccountPhaseSynchronized
}

// ManageSuccess ...
func (r *NatsAccountServer) ManageSuccess(ctx context.Context, obj *natsv1alpha1.NatsAccount) (ctrl.Result, error) {
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{Requeue: true}, nil
}

if !r.IsSynchronized(obj) {
return ctrl.Result{Requeue: true}, nil
}

if r.IsCreating(obj) {
return ctrl.Result{Requeue: true}, nil
}

r.Recorder.Event(obj, corev1.EventTypeNormal, conv.String(EventReasonAccountAccessGranted), "account access granted")

return ctrl.Result{}, nil
}

// ManageError ...
func (r *NatsAccountServer) ManageError(ctx context.Context, obj *natsv1alpha1.NatsAccount, err error) (ctrl.Result, error) {
r.Recorder.Event(obj, corev1.EventTypeWarning, conv.String(EventReasonAccountAccessFailed), "account access failed")

var retryInterval time.Duration

return reconcile.Result{
RequeueAfter: time.Duration(math.Min(float64(retryInterval.Nanoseconds()*2), float64(time.Hour.Nanoseconds()*6))),
Requeue: true,
}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *NatsAccountServer) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
3 changes: 3 additions & 0 deletions controllers/natsaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
const (
EventReasonAccountSecretCreateSucceeded EventReason = "AccountSecretCreateSucceeded"
EventReasonAccountSecretCreateFailed EventReason = "AccountSecretCreateFailed"
EventReasonAccountAccessGranted EventReason = "AccountAccessGranted"
EventReasonAccountAccessDeleted EventReason = "AccountAccessDeleted"
EventReasonAccountAccessFailed EventReason = "AccountAccessFailed"
)

// NatsAccountReconciler ...
Expand Down

0 comments on commit 98f163d

Please sign in to comment.