Skip to content

Commit

Permalink
wip: nats config reconcile
Browse files Browse the repository at this point in the history
  • Loading branch information
katallaxie authored Dec 12, 2024
1 parent 723ff06 commit b6d46ee
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 84 deletions.
8 changes: 8 additions & 0 deletions api/v1alpha1/nats_account_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ const (
AccountPhaseFailed AccountPhase = "Failed"
)

// NatsAccountReference is a reference to a NatsAccount
type NatsAccountReference struct {
// Name is the name of the account.
Name string `json:"name"`
// Namespace is the namespace of the account.
Namespace string `json:"namespace,omitempty"`
}

// ExportType defines the type of import/export.
type ExportType int

Expand Down
16 changes: 11 additions & 5 deletions api/v1alpha1/nats_config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ import (
type ConfigPhase string

const (
ConfigPhaseNone ConfigPhase = ""
ConfigPhaseCreating ConfigPhase = "Creating"
ConfigPhaseActive ConfigPhase = "Active"
ConfigPhaseFailed ConfigPhase = "Failed"
ConfigPhaseNone ConfigPhase = ""
ConfigPhasePending ConfigPhase = "Pending"
ConfigPhaseCreating ConfigPhase = "Creating"
ConfigPhaseSynchronized ConfigPhase = "Synchronized"
ConfigPhaseFailed ConfigPhase = "Failed"
)

// NatsConfigSpec defines the desired state of NatsConfig
type NatsConfigSpec struct{}
type NatsConfigSpec struct {
// OperatorRef is a reference to the operator that is managing the config.
OperatorRef NatsOperatorReference `json:"operatorRef,omitempty"`
// SystemAccountRef is a reference to the system account.
SystemAccountRef NatsAccountReference `json:"systemAccountRef,omitempty"`
}

// NatsConfigStatus defines the observed state of NatsConfig
type NatsConfigStatus struct {
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/nats_operator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ const (
OperatorPhaseFailed OperatorPhase = "Failed"
)

// OperatorReference is a reference to an operator.
type NatsOperatorReference struct {
// Name is the name of the operator.
Name string `json:"name"`
// Namespace is the namespace of the operator.
Namespace string `json:"namespace,omitempty"`
}
type NatsOperatorSpec struct {
// PrivateKey is a reference to a secret that contains the private key
PrivateKey NatsPrivateKeyReference `json:"privateKey,omitempty"`
Expand Down
32 changes: 32 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func setupControllers(mgr ctrl.Manager) error {
return err
}

err = controllers.NewNatsConfigReconciler(mgr).SetupWithManager(mgr)
if err != nil {
return err
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions controllers/natsaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (r *NatsAccountReconciler) reconcileAccount(ctx context.Context, account *n
return err
}
account.Status.JWT = jwt
account.Status.PublicKey = public

if !controllerutil.ContainsFinalizer(account, natsv1alpha1.FinalizerName) {
controllerutil.AddFinalizer(account, natsv1alpha1.FinalizerName)
Expand Down
71 changes: 51 additions & 20 deletions controllers/natsconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -86,33 +87,61 @@ func (r *NatsConfigReconciler) reconcileDelete(ctx context.Context, obj *natsv1a
}

func (r *NatsConfigReconciler) reconcileResources(ctx context.Context, config *natsv1alpha1.NatsConfig) (ctrl.Result, error) {
if err := r.reconcileStatus(ctx, config); err != nil {
if err := r.reconcileConfig(ctx, config); err != nil {
return r.ManageError(ctx, config, err)
}

// if err := r.reconcileconfig(ctx, config); err != nil {
// return r.ManageError(ctx, config, err)
// }

return r.ManageSuccess(ctx, config)
}

func (r *NatsConfigReconciler) reconcileConfig(ctx context.Context, config *natsv1alpha1.NatsConfig) error {
// if !controllerutil.ContainsFinalizer(config, natsv1alpha1.FinalizerName) {
// controllerutil.AddFinalizer(config, natsv1alpha1.FinalizerName)
// }
cfg := &corev1.ConfigMap{}
cfgName := client.ObjectKey{
Namespace: config.Namespace,
Name: config.Name,
}

// if !controllerutil.HasControllerReference(config) {
// if err := controllerutil.SetControllerReference(config, pk, r.Scheme); err != nil {
// return err
// }
// }
if err := r.Get(ctx, cfgName, cfg); !errors.IsNotFound(err) {
return err
}

return nil
}
operator := &natsv1alpha1.NatsOperator{}
operatorName := client.ObjectKey{
Namespace: config.Namespace,
Name: config.Spec.OperatorRef.Name,
}

if err := r.Get(ctx, operatorName, operator); err != nil {
return err
}

systemAccount := &natsv1alpha1.NatsAccount{}
systemAccountName := client.ObjectKey{
Namespace: config.Namespace,
Name: config.Spec.SystemAccountRef.Name,
}

if err := r.Get(ctx, systemAccountName, systemAccount); err != nil {
return err
}

cfg.Namespace = config.Namespace
cfg.Name = config.Name
cfg.Data = map[string]string{
"auth.conf": fmt.Sprintf(AUTH_CONFIG_TEMPLATE, operator.Status.JWT, systemAccount.Status.PublicKey, systemAccount.Status.PublicKey, systemAccount.Status.JWT),
}

_, err := controllerutil.CreateOrUpdate(ctx, r.Client, cfg, func() error {
if !controllerutil.HasControllerReference(cfg) {
if err := controllerutil.SetControllerReference(config, cfg, r.Scheme); err != nil {
return err
}
}

return nil
})

func (r *NatsConfigReconciler) reconcileStatus(ctx context.Context, config *natsv1alpha1.NatsConfig) error {
return nil
return err
}

// IsCreating ...
Expand All @@ -122,14 +151,15 @@ func (r *NatsConfigReconciler) IsCreating(obj *natsv1alpha1.NatsConfig) bool {

// IsSynchronized ...
func (r *NatsConfigReconciler) IsSynchronized(obj *natsv1alpha1.NatsConfig) bool {
return obj.Status.Phase == natsv1alpha1.ConfigPhaseActive
return obj.Status.Phase == natsv1alpha1.ConfigPhaseSynchronized
}

// ManageError ...
func (r *NatsConfigReconciler) ManageError(ctx context.Context, obj *natsv1alpha1.NatsConfig, err error) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Error(err, "error reconciling config", "config", obj.Name)

logger.Error(err, "reconciliation failed", "config", obj)
obj.Status.Phase = natsv1alpha1.ConfigPhaseFailed

status.SetNatzConfigCondition(obj, status.NewNatzConfigFailedCondition(obj, err))

Expand All @@ -153,6 +183,7 @@ func (r *NatsConfigReconciler) ManageSuccess(ctx context.Context, obj *natsv1alp
return ctrl.Result{}, nil
}

obj.Status.Phase = natsv1alpha1.ConfigPhaseSynchronized
status.SetNatzConfigCondition(obj, status.NewNatzConfigSynchronizedCondition(obj))

if r.IsCreating(obj) {
Expand All @@ -176,6 +207,6 @@ func (r *NatsConfigReconciler) ManageSuccess(ctx context.Context, obj *natsv1alp
func (r *NatsConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&natsv1alpha1.NatsConfig{}).
Owns(&corev1.Secret{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
42 changes: 2 additions & 40 deletions controllers/natsgateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

natsv1alpha1 "github.com/zeiss/natz-operator/api/v1alpha1"
Expand Down Expand Up @@ -45,8 +44,6 @@ func NewNatsGatewayReconciler(mgr ctrl.Manager) *NatsGatewayReconciler {

// Reconcile ...
func (r *NatsGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

gateway := &natsv1alpha1.NatsGateway{}
if err := r.Get(ctx, req.NamespacedName, gateway); err != nil {
if errors.IsNotFound(err) {
Expand All @@ -57,8 +54,6 @@ func (r *NatsGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

if !gateway.ObjectMeta.DeletionTimestamp.IsZero() {
log.Info("processing deletion of gateway")

if finalizers.HasFinalizer(gateway, natsv1alpha1.FinalizerName) {
err := r.reconcileDelete(ctx, gateway)
if err != nil {
Expand All @@ -72,8 +67,6 @@ func (r *NatsGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// get latest version of the gateway
if err := r.Get(ctx, req.NamespacedName, gateway); err != nil {
log.Error(err, "gateway not found", "gateway", req.NamespacedName)

return reconcile.Result{}, err
}

Expand All @@ -88,34 +81,23 @@ func (r *NatsGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

func (r *NatsGatewayReconciler) reconcileResources(ctx context.Context, req ctrl.Request, gateway *natsv1alpha1.NatsGateway) error {
log := log.FromContext(ctx)

log.Info("reconcile resources", "name", gateway.Name, "namespace", gateway.Namespace)

if err := r.reconcileStatus(ctx, gateway); err != nil {
log.Error(err, "failed to reconcile status", "name", gateway.Name, "namespace", gateway.Namespace)
return err
}

if err := r.reconcileGateway(ctx, req, gateway); err != nil {
log.Error(err, "failed to reconcile gateway", "name", gateway.Name, "namespace", gateway.Namespace)
return err
}

if err := r.reconcileSecret(ctx, gateway); err != nil {
log.Error(err, "failed to reconcile secret", "name", gateway.Name, "namespace", gateway.Namespace)
return err
}

return nil
}

func (r *NatsGatewayReconciler) reconcileGateway(ctx context.Context, _ ctrl.Request, gateway *natsv1alpha1.NatsGateway) error {
log := log.FromContext(ctx)

log.Info("reconcile status", "name", gateway.Name, "namespace", gateway.Namespace)

op, err := controllerutil.CreateOrUpdate(ctx, r.Client, gateway, func() error {
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, gateway, func() error {
controllerutil.AddFinalizer(gateway, natsv1alpha1.FinalizerName)

return nil
Expand All @@ -124,18 +106,10 @@ func (r *NatsGatewayReconciler) reconcileGateway(ctx context.Context, _ ctrl.Req
return err
}

if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated {
log.Info("account created or updated", "operation", op)
}

return nil
}

func (r *NatsGatewayReconciler) reconcileSecret(ctx context.Context, gateway *natsv1alpha1.NatsGateway) error {
log := log.FromContext(ctx)

log.Info("reconcile secret", "name", gateway.Name, "namespace", gateway.Namespace)

gatewaySecret := &corev1.Secret{}
gatewaySecretName := client.ObjectKey{
Namespace: gateway.Namespace,
Expand All @@ -147,25 +121,17 @@ func (r *NatsGatewayReconciler) reconcileSecret(ctx context.Context, gateway *na
return err
}

op, err := controllerutil.CreateOrUpdate(ctx, r.Client, gatewaySecret, func() error {
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, gatewaySecret, func() error {
return controllerutil.SetControllerReference(gateway, gatewaySecret, r.Scheme)
})
if err != nil {
return err
}

if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated {
log.Info("secret created or updated", "operation", op)
}

return nil
}

func (r *NatsGatewayReconciler) reconcileStatus(ctx context.Context, gateway *natsv1alpha1.NatsGateway) error {
log := log.FromContext(ctx)

log.Info("reconcile status", "name", gateway.Name, "namespace", gateway.Namespace)

phase := natsv1alpha1.GatewayPhaseNone

if gateway.Status.Phase != phase {
Expand All @@ -178,10 +144,6 @@ func (r *NatsGatewayReconciler) reconcileStatus(ctx context.Context, gateway *na
}

func (r *NatsGatewayReconciler) reconcileDelete(ctx context.Context, gateway *natsv1alpha1.NatsGateway) error {
log := log.FromContext(ctx)

log.Info("reconcile delete gateway", "name", gateway.Name, "namespace", gateway.Namespace)

gateway.SetFinalizers(finalizers.RemoveFinalizer(gateway, natsv1alpha1.FinalizerName))
err := r.Update(ctx, gateway)
if err != nil && !errors.IsNotFound(err) {
Expand Down
5 changes: 0 additions & 5 deletions controllers/natsoperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -66,10 +65,6 @@ func NewNatsOperatorReconciler(mgr ctrl.Manager) *NatsOperatorReconciler {
// Reconcile ...
// nolint:gocyclo
func (r *NatsOperatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

log.Info("reconcile operator", "name", req.Name, "namespace", req.Namespace)

operator := &natsv1alpha1.NatsOperator{}
if err := r.Get(ctx, req.NamespacedName, operator); err != nil {
// Request object not found, could have been deleted after reconcile request.
Expand Down
Loading

0 comments on commit b6d46ee

Please sign in to comment.