diff --git a/helm/ingress-controller/Chart.lock b/helm/ingress-controller/Chart.lock index 394c89cb..fc5340d2 100644 --- a/helm/ingress-controller/Chart.lock +++ b/helm/ingress-controller/Chart.lock @@ -1,6 +1,6 @@ dependencies: - name: common repository: https://charts.bitnami.com/bitnami - version: 2.20.5 -digest: sha256:d887f4d52c98e9524f9eb8be04f987acc240288a9c1cf2653cc1b7c221230ee7 -generated: "2024-07-30T16:56:34.90167-05:00" + version: 2.22.0 +digest: sha256:3c3a2f2c075dd8282147f1a611979a67dd40ce82a27698e16b3315eb9a94d059 +generated: "2024-08-09T14:49:27.42058351-05:00" diff --git a/internal/controller/ingress/service_controller.go b/internal/controller/ingress/service_controller.go index 2ca4da41..18b48ebe 100644 --- a/internal/controller/ingress/service_controller.go +++ b/internal/controller/ingress/service_controller.go @@ -41,17 +41,23 @@ import ( "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( OwnerReferencePath = "metadata.ownerReferences.uid" + ModuleSetPath = "metadata.annotations.k8s.ngrok.com/module-set" + TrafficPolicyPath = "metadata.annotations.k8s.ngrok.com/traffic-policy" NgrokLoadBalancerClass = "ngrok" ) @@ -86,9 +92,19 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { } return shouldHandleService(svc) }, - }) - // TODO: Add watches for modulesets and traffic policies so we get updates - + }). + // Watch modulesets for changes + Watches( + &ingressv1alpha1.NgrokModuleSet{}, + handler.EnqueueRequestsFromMapFunc(r.findServicesForModuleSet), + ). + // Watch traffic policies for changes + Watches( + &ngrokv1alpha1.NgrokTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.findServicesForTrafficPolicy), + ) + + // Index the subresources by their owner references for _, o := range owns { controller = controller.Owns(o) err := mgr.GetFieldIndexer().IndexField(context.Background(), o, OwnerReferencePath, func(obj client.Object) []string { @@ -108,6 +124,34 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { } } + // Index the services by the module set(s) they reference + err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, ModuleSetPath, func(obj client.Object) []string { + moduleSets, err := annotations.ExtractNgrokModuleSetsFromAnnotations(obj) + if err != nil { + return nil + } + + // Note: We are returning a slice of strings here for the field indexer. Checking for equality later, means + // that only one of the module sets needs to match for the service to be returned. + return moduleSets + }) + if err != nil { + return err + } + + // Index the services by the traffic policy they reference + err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, TrafficPolicyPath, func(obj client.Object) []string { + policy, err := annotations.ExtractNgrokTrafficPolicyFromAnnotations(obj) + if err != nil { + return nil + } + + return []string{policy} + }) + if err != nil { + return err + } + return controller.Complete(r) } @@ -236,6 +280,68 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, nil } +func (r *ServiceReconciler) findServicesForModuleSet(ctx context.Context, moduleSet client.Object) []reconcile.Request { + moduleSetNamespace := moduleSet.GetNamespace() + moduleSetName := moduleSet.GetName() + + r.Log.V(3).Info("Finding services for module set", "namespace", moduleSetNamespace, "name", moduleSetName) + services := &corev1.ServiceList{} + listOpts := &client.ListOptions{ + Namespace: moduleSetNamespace, + FieldSelector: fields.OneTermEqualSelector(ModuleSetPath, moduleSetName), + } + err := r.Client.List(ctx, services, listOpts) + if err != nil { + r.Log.Error(err, "Failed to list services for module set") + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(services.Items)) + for i, svc := range services.Items { + svcNamespace := svc.GetNamespace() + svcName := svc.GetName() + requests[i] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: svcNamespace, + Name: svcName, + }, + } + r.Log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName) + } + return requests +} + +func (r *ServiceReconciler) findServicesForTrafficPolicy(ctx context.Context, policy client.Object) []reconcile.Request { + policyNamespace := policy.GetNamespace() + policyName := policy.GetName() + + r.Log.V(3).Info("Finding services for traffic policy", "namespace", policyNamespace, "name", policyName) + services := &corev1.ServiceList{} + listOpts := &client.ListOptions{ + Namespace: policyNamespace, + FieldSelector: fields.OneTermEqualSelector(TrafficPolicyPath, policyName), + } + err := r.Client.List(ctx, services, listOpts) + if err != nil { + r.Log.Error(err, "Failed to list services for traffic policy") + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(services.Items)) + for i, svc := range services.Items { + svcNamespace := svc.GetNamespace() + svcName := svc.GetName() + requests[i] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: svcNamespace, + Name: svcName, + }, + } + r.Log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName) + } + return requests +} + func (r *ServiceReconciler) buildTunnelAndEdge(ctx context.Context, svc *corev1.Service) ([]client.Object, error) { port := svc.Spec.Ports[0].Port objects := make([]client.Object, 0) @@ -462,9 +568,10 @@ func (r *baseSubresourceReconciler[T, PT]) Reconcile(ctx context.Context, c clie log.Info(fmt.Sprintf("Updating %T", e), "desired", d, "existing", e) // Fetch the existing resource as it may have been updated - if err := c.Get(ctx, client.ObjectKeyFromObject(e), d); err != nil { + if err := c.Get(ctx, client.ObjectKeyFromObject(e), e); err != nil { return err } + r.mergeExisting(*d, e) // Update the resource @@ -620,7 +727,7 @@ func newServiceTunnelReconciler() serviceSubresourceReconciler { } } -// Given an ingress, it will resolve any ngrok modulesets defined on the ingress to the +// Given a service, it will resolve any ngrok modulesets defined on the service to the // CRDs and then will merge them in to a single moduleset func getNgrokModuleSetForService(ctx context.Context, c client.Client, svc *corev1.Service) (*ingressv1alpha1.NgrokModuleSet, error) { computedModSet := &ingressv1alpha1.NgrokModuleSet{}