Skip to content

Commit

Permalink
fix(service-controller): Updates not working (#406)
Browse files Browse the repository at this point in the history
* chore: Bump bitnami-common chart version

* fix(service-controller): Not updating existing items correctly

* fix(service-controller): watch modulesets for changes

* fix(service-controller): watch traffic policies for changes
  • Loading branch information
jonstacks authored Aug 12, 2024
1 parent fc9f93e commit 15b7468
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 8 deletions.
6 changes: 3 additions & 3 deletions helm/ingress-controller/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: common
repository: https://charts.bitnami.com/bitnami
version: 2.20.5
digest: sha256:d887f4d52c98e9524f9eb8be04f987acc240288a9c1cf2653cc1b7c221230ee7
generated: "2024-07-30T16:56:34.90167-05:00"
version: 2.22.0
digest: sha256:3c3a2f2c075dd8282147f1a611979a67dd40ce82a27698e16b3315eb9a94d059
generated: "2024-08-09T14:49:27.42058351-05:00"
117 changes: 112 additions & 5 deletions internal/controller/ingress/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ import (
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
OwnerReferencePath = "metadata.ownerReferences.uid"
ModuleSetPath = "metadata.annotations.k8s.ngrok.com/module-set"
TrafficPolicyPath = "metadata.annotations.k8s.ngrok.com/traffic-policy"
NgrokLoadBalancerClass = "ngrok"
)

Expand Down Expand Up @@ -86,9 +92,19 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
return shouldHandleService(svc)
},
})
// TODO: Add watches for modulesets and traffic policies so we get updates

}).
// Watch modulesets for changes
Watches(
&ingressv1alpha1.NgrokModuleSet{},
handler.EnqueueRequestsFromMapFunc(r.findServicesForModuleSet),
).
// Watch traffic policies for changes
Watches(
&ngrokv1alpha1.NgrokTrafficPolicy{},
handler.EnqueueRequestsFromMapFunc(r.findServicesForTrafficPolicy),
)

// Index the subresources by their owner references
for _, o := range owns {
controller = controller.Owns(o)
err := mgr.GetFieldIndexer().IndexField(context.Background(), o, OwnerReferencePath, func(obj client.Object) []string {
Expand All @@ -108,6 +124,34 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
}

// Index the services by the module set(s) they reference
err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, ModuleSetPath, func(obj client.Object) []string {
moduleSets, err := annotations.ExtractNgrokModuleSetsFromAnnotations(obj)
if err != nil {
return nil
}

// Note: We are returning a slice of strings here for the field indexer. Checking for equality later, means
// that only one of the module sets needs to match for the service to be returned.
return moduleSets
})
if err != nil {
return err
}

// Index the services by the traffic policy they reference
err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, TrafficPolicyPath, func(obj client.Object) []string {
policy, err := annotations.ExtractNgrokTrafficPolicyFromAnnotations(obj)
if err != nil {
return nil
}

return []string{policy}
})
if err != nil {
return err
}

return controller.Complete(r)
}

Expand Down Expand Up @@ -236,6 +280,68 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

func (r *ServiceReconciler) findServicesForModuleSet(ctx context.Context, moduleSet client.Object) []reconcile.Request {
moduleSetNamespace := moduleSet.GetNamespace()
moduleSetName := moduleSet.GetName()

r.Log.V(3).Info("Finding services for module set", "namespace", moduleSetNamespace, "name", moduleSetName)
services := &corev1.ServiceList{}
listOpts := &client.ListOptions{
Namespace: moduleSetNamespace,
FieldSelector: fields.OneTermEqualSelector(ModuleSetPath, moduleSetName),
}
err := r.Client.List(ctx, services, listOpts)
if err != nil {
r.Log.Error(err, "Failed to list services for module set")
return []reconcile.Request{}
}

requests := make([]reconcile.Request, len(services.Items))
for i, svc := range services.Items {
svcNamespace := svc.GetNamespace()
svcName := svc.GetName()
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: svcNamespace,
Name: svcName,
},
}
r.Log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName)
}
return requests
}

func (r *ServiceReconciler) findServicesForTrafficPolicy(ctx context.Context, policy client.Object) []reconcile.Request {
policyNamespace := policy.GetNamespace()
policyName := policy.GetName()

r.Log.V(3).Info("Finding services for traffic policy", "namespace", policyNamespace, "name", policyName)
services := &corev1.ServiceList{}
listOpts := &client.ListOptions{
Namespace: policyNamespace,
FieldSelector: fields.OneTermEqualSelector(TrafficPolicyPath, policyName),
}
err := r.Client.List(ctx, services, listOpts)
if err != nil {
r.Log.Error(err, "Failed to list services for traffic policy")
return []reconcile.Request{}
}

requests := make([]reconcile.Request, len(services.Items))
for i, svc := range services.Items {
svcNamespace := svc.GetNamespace()
svcName := svc.GetName()
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: svcNamespace,
Name: svcName,
},
}
r.Log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName)
}
return requests
}

func (r *ServiceReconciler) buildTunnelAndEdge(ctx context.Context, svc *corev1.Service) ([]client.Object, error) {
port := svc.Spec.Ports[0].Port
objects := make([]client.Object, 0)
Expand Down Expand Up @@ -462,9 +568,10 @@ func (r *baseSubresourceReconciler[T, PT]) Reconcile(ctx context.Context, c clie

log.Info(fmt.Sprintf("Updating %T", e), "desired", d, "existing", e)
// Fetch the existing resource as it may have been updated
if err := c.Get(ctx, client.ObjectKeyFromObject(e), d); err != nil {
if err := c.Get(ctx, client.ObjectKeyFromObject(e), e); err != nil {
return err
}

r.mergeExisting(*d, e)

// Update the resource
Expand Down Expand Up @@ -620,7 +727,7 @@ func newServiceTunnelReconciler() serviceSubresourceReconciler {
}
}

// Given an ingress, it will resolve any ngrok modulesets defined on the ingress to the
// Given a service, it will resolve any ngrok modulesets defined on the service to the
// CRDs and then will merge them in to a single moduleset
func getNgrokModuleSetForService(ctx context.Context, c client.Client, svc *corev1.Service) (*ingressv1alpha1.NgrokModuleSet, error) {
computedModSet := &ingressv1alpha1.NgrokModuleSet{}
Expand Down

0 comments on commit 15b7468

Please sign in to comment.