diff --git a/admiral/pkg/clusters/dependency_handler.go b/admiral/pkg/clusters/dependency_handler.go index 60e477d5..f98ea41d 100644 --- a/admiral/pkg/clusters/dependency_handler.go +++ b/admiral/pkg/clusters/dependency_handler.go @@ -20,6 +20,7 @@ type DependencyHandler struct { RemoteRegistry *RemoteRegistry DepController *admiral.DependencyController DestinationServiceProcessor DestinationServiceProcessor + RoutingPolicyProcessor RoutingPolicyProcessor } func (dh *DependencyHandler) Added(ctx context.Context, obj *v1.Dependency) error { @@ -74,6 +75,9 @@ func (dh *DependencyHandler) HandleDependencyRecord(ctx context.Context, obj *v1 return handleDepRecordErrors } + // process routing policies + _ = dh.RoutingPolicyProcessor.DeltaUpdate(ctx, eventType, obj) + remoteRegistry.AdmiralCache.SourceToDestinations.put(obj) return handleDepRecordErrors } diff --git a/admiral/pkg/clusters/envoyfilter_test.go b/admiral/pkg/clusters/envoyfilter_test.go index 378fb371..313e1943 100644 --- a/admiral/pkg/clusters/envoyfilter_test.go +++ b/admiral/pkg/clusters/envoyfilter_test.go @@ -30,8 +30,6 @@ import ( func TestCreateOrUpdateEnvoyFilter(t *testing.T) { registry := getRegistry("1.13,1.17") - handler := RoutingPolicyHandler{} - rpFilterCache := &routingPolicyFilterCache{} rpFilterCache.filterCache = make(map[string]map[string]map[string]string) rpFilterCache.mutex = &sync.Mutex{} @@ -89,8 +87,6 @@ func TestCreateOrUpdateEnvoyFilter(t *testing.T) { registry.AdmiralCache.IdentityDependencyCache.Put("foo", "bar", "bar") registry.AdmiralCache.IdentityClusterCache.Put("bar", remoteController.ClusterID, remoteController.ClusterID) - handler.RemoteRegistry = registry - routingPolicyFoo := &v1.RoutingPolicy{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index ab460bd6..b6c6d7d6 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -43,9 +43,11 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis var err error destinationServiceProcessor := &ProcessDestinationService{} + routingPolicyProcessor := NewRoutingPolicyProcessor(rr) wd := DependencyHandler{ RemoteRegistry: rr, DestinationServiceProcessor: destinationServiceProcessor, + RoutingPolicyProcessor: routingPolicyProcessor, } wd.DepController, err = admiral.NewDependencyController(ctx.Done(), &wd, params.KubeconfigPath, params.DependenciesNamespace, 0, rr.ClientLoader) @@ -197,7 +199,9 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste return fmt.Errorf("error with NodeController controller initialization, err: %v", err) } logrus.Infof("starting RoutingPoliciesController for clusterID: %v", clusterID) - rc.RoutingPolicyController, err = admiral.NewRoutingPoliciesController(stop, &RoutingPolicyHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) + rpProcessor := NewRoutingPolicyProcessor(r) + rpHandler := NewRoutingPolicyHandler(r, clusterID, rpProcessor) + rc.RoutingPolicyController, err = admiral.NewRoutingPoliciesController(stop, rpHandler, clientConfig, 0, r.ClientLoader) if err != nil { return fmt.Errorf("error with RoutingPoliciesController initialization, err: %v", err) } diff --git a/admiral/pkg/clusters/registry_test.go b/admiral/pkg/clusters/registry_test.go index b4be7cb9..1d0eaec3 100644 --- a/admiral/pkg/clusters/registry_test.go +++ b/admiral/pkg/clusters/registry_test.go @@ -273,6 +273,7 @@ func TestAdded(t *testing.T) { RemoteRegistry: rr, DepController: d, DestinationServiceProcessor: &MockDestinationServiceProcessor{}, + RoutingPolicyProcessor: &MockPolicyProcessor{}, } depData := v1.Dependency{ diff --git a/admiral/pkg/clusters/routingpolicy_handler.go b/admiral/pkg/clusters/routingpolicy_handler.go index 621af640..b676d1f1 100644 --- a/admiral/pkg/clusters/routingpolicy_handler.go +++ b/admiral/pkg/clusters/routingpolicy_handler.go @@ -3,6 +3,7 @@ package clusters import ( "context" "errors" + "fmt" "sync" commonUtil "github.com/istio-ecosystem/admiral/admiral/pkg/util" @@ -15,51 +16,202 @@ import ( ) type RoutingPolicyHandler struct { + RemoteRegistry *RemoteRegistry + ClusterID string + RoutingPolicyService RoutingPolicyProcessor +} + +func NewRoutingPolicyHandler(rr *RemoteRegistry, cId string, rpProcessor RoutingPolicyProcessor) *RoutingPolicyHandler { + return &RoutingPolicyHandler{RemoteRegistry: rr, ClusterID: cId, RoutingPolicyService: rpProcessor} +} + +type RoutingPolicyProcessor interface { + ProcessAddOrUpdate(ctx context.Context, eventType admiral.EventType, newRP *v1.RoutingPolicy, oldRP *v1.RoutingPolicy, dependents map[string]string) error + DeltaUpdate(ctx context.Context, eventType admiral.EventType, dependency *v1.Dependency) error + Delete(ctx context.Context, eventType admiral.EventType, routingPolicy *v1.RoutingPolicy) error +} + +type RoutingPolicyService struct { RemoteRegistry *RemoteRegistry - ClusterID string } -type routingPolicyCache struct { - // map of routing policies key=environment.identity, value: RoutingPolicy object - // only one routing policy per identity + env is allowed - identityCache map[string]*v1.RoutingPolicy - mutex *sync.Mutex +func (r *RoutingPolicyService) ProcessAddOrUpdate(ctx context.Context, eventType admiral.EventType, newRP *v1.RoutingPolicy, oldRP *v1.RoutingPolicy, dependents map[string]string) error { + var err error + for _, remoteController := range r.RemoteRegistry.remoteControllers { + if !common.DoRoutingPolicyForCluster(remoteController.ClusterID) { + log.Warnf(LogFormat, eventType, "routingpolicy", newRP.Name, remoteController.ClusterID, "RoutingPolicy disabled for cluster") + continue + } + for _, dependent := range dependents { + // Check if the dependent exists in this remoteCluster. If so, we create an envoyFilter with dependent identity as workload selector + if _, ok := r.RemoteRegistry.AdmiralCache.IdentityClusterCache.Get(dependent).Copy()[remoteController.ClusterID]; ok { + _, err1 := createOrUpdateEnvoyFilter(ctx, remoteController, newRP, eventType, dependent, r.RemoteRegistry.AdmiralCache) + if err1 != nil { + log.Errorf(LogErrFormat, eventType, "routingpolicy", newRP.Name, remoteController.ClusterID, err) + err = common.AppendError(err, err1) + } else { + log.Infof(LogFormat, eventType, "routingpolicy ", newRP.Name, remoteController.ClusterID, "created envoyfilters") + } + } + id := common.GetRoutingPolicyIdentity(newRP) + env := common.GetRoutingPolicyEnv(newRP) + r.RemoteRegistry.AdmiralCache.RoutingPolicyCache.Put(id, env, newRP.Name, newRP) + if oldRP != nil { + oldEnv := common.GetRoutingPolicyEnv(oldRP) + r.RemoteRegistry.AdmiralCache.RoutingPolicyCache.Delete(id, oldEnv, oldRP.Name) + } + } + } + return err +} + +func (r *RoutingPolicyService) DeltaUpdate(ctx context.Context, eventType admiral.EventType, dependency *v1.Dependency) error { + //ns := getEnvoyFilterNamespace() + newDestinations, _ := getDestinationsToBeProcessed(eventType, dependency, r.RemoteRegistry) + + // for each destination, identify the rp. + for _, dependent := range newDestinations { + if isIdentityMeshEnabled(dependent, r.RemoteRegistry) { + // if the dependent is in the mesh, then get the rp + policies := r.RemoteRegistry.AdmiralCache.RoutingPolicyCache.GetForIdentity(dependent) + for _, rp := range policies { + err := r.ProcessAddOrUpdate(ctx, eventType, rp, nil, map[string]string{dependent: dependent}) + if err != nil { + log.Errorf(LogErrFormat, eventType, "routingpolicy", rp.Name, "", + fmt.Sprintf("failed to process routing policy for new destination=%s in delta update", dependent)) + return err + } + log.Infof(LogFormat, eventType, "routingpolicy", rp.Name, "", + fmt.Sprintf("finished processing routing policy for new destination=%s in delta update", dependent)) + } + } + } + + return nil } -func (r *routingPolicyCache) Delete(identity string, environment string) { +func (r *RoutingPolicyService) Delete(ctx context.Context, eventType admiral.EventType, routingPolicy *v1.RoutingPolicy) error { + identity := common.GetRoutingPolicyIdentity(routingPolicy) + env := common.GetRoutingPolicyEnv(routingPolicy) + key := routingPolicy.Name + identity + env + if r.RemoteRegistry == nil || r.RemoteRegistry.AdmiralCache == nil || r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache == nil { + log.Infof(LogFormat, eventType, "routingpolicy", routingPolicy.Name, "", "skipping delete event as cache is nil") + return nil + } + clusterIdFilterMap := r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Get(key) // RoutingPolicyFilterCache key=rpname+rpidentity+environment of the routingPolicy, value is a map [clusterId -> map [filterName -> filterNameSpace]] + var err error + for _, rc := range r.RemoteRegistry.remoteControllers { + if !common.DoRoutingPolicyForCluster(rc.ClusterID) { + log.Warnf(LogFormat, eventType, "routingpolicy", routingPolicy.Name, rc.ClusterID, "RoutingPolicy disabled for cluster") + continue + } + if rc != nil { + if filterMap, ok := clusterIdFilterMap[rc.ClusterID]; ok { + for filter, filterNs := range filterMap { + log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting") + err1 := rc.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters(filterNs).Delete(ctx, filter, metaV1.DeleteOptions{}) + if err1 != nil { + log.Errorf(LogErrFormat, eventType, "envoyfilter", filter, rc.ClusterID, err1) + err = common.AppendError(err, err1) + } else { + log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting from cache") + } + } + } + } + } + if err == nil { + log.Infof(LogFormat, eventType, "routingPolicy", fmt.Sprintf("%s.%s.%s", identity, env, routingPolicy.Name), "", "deleting from cache") + r.RemoteRegistry.AdmiralCache.RoutingPolicyCache.Delete(identity, env, routingPolicy.Name) + r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Delete(key) + } + return err +} + +func NewRoutingPolicyProcessor(remoteRegistry *RemoteRegistry) RoutingPolicyProcessor { + return &RoutingPolicyService{RemoteRegistry: remoteRegistry} +} + +type routingPolicyCacheEntry struct { + // key: env, value: map [ key: name, value: routingPolicy] + policiesByEnv map[string]map[string]*v1.RoutingPolicy +} + +type RoutingPolicyCache struct { + // key: identity of the asset + // value: routingPolicyCacheEntry + entries map[string]*routingPolicyCacheEntry + mutex *sync.Mutex +} + +func NewRoutingPolicyCache() *RoutingPolicyCache { + entries := make(map[string]*routingPolicyCacheEntry) + return &RoutingPolicyCache{entries: entries, mutex: &sync.Mutex{}} +} + +func (r *RoutingPolicyCache) GetForIdentity(identity string) []*v1.RoutingPolicy { defer r.mutex.Unlock() r.mutex.Lock() - key := common.ConstructRoutingPolicyKey(environment, identity) - if _, ok := r.identityCache[key]; ok { - log.Infof("deleting RoutingPolicy with key=%s from global RoutingPolicy cache", key) - delete(r.identityCache, key) + if r.entries[identity] == nil { + return []*v1.RoutingPolicy{} + } + policies := make([]*v1.RoutingPolicy, 0) + for _, envMap := range r.entries[identity].policiesByEnv { + for _, rp := range envMap { + policies = append(policies, rp) + } } + return policies } -func (r *routingPolicyCache) GetFromIdentity(identity string, environment string) *v1.RoutingPolicy { +func (r *RoutingPolicyCache) Get(identity string, env string, name string) *v1.RoutingPolicy { + defer r.mutex.Unlock() r.mutex.Lock() - return r.identityCache[common.ConstructRoutingPolicyKey(environment, identity)] + if (r.entries[identity] == nil) || + (r.entries[identity].policiesByEnv == nil) || + (r.entries[identity].policiesByEnv[env] == nil) || + (r.entries[identity].policiesByEnv[env][name] == nil) { + return nil + } + return r.entries[identity].policiesByEnv[env][name] } -func (r *routingPolicyCache) Put(rp *v1.RoutingPolicy) error { - if rp == nil || rp.Name == "" { - // no RoutingPolicy, throw error - return errors.New("cannot add an empty RoutingPolicy to the cache") - } - if rp.Labels == nil { - return errors.New("labels empty in RoutingPolicy") +func (r *RoutingPolicyCache) Put(identity string, env string, name string, rp *v1.RoutingPolicy) { + if rp == nil { + return } defer r.mutex.Unlock() r.mutex.Lock() - var rpIdentity = rp.Labels[common.GetRoutingPolicyLabel()] - var rpEnv = common.GetRoutingPolicyEnv(rp) - - log.Infof("Adding RoutingPolicy with name %v to RoutingPolicy cache. LabelMatch=%v env=%v", rp.Name, rpIdentity, rpEnv) - key := common.ConstructRoutingPolicyKey(rpEnv, rpIdentity) - r.identityCache[key] = rp + if r.entries[identity] == nil { + r.entries[identity] = &routingPolicyCacheEntry{policiesByEnv: make(map[string]map[string]*v1.RoutingPolicy)} + } + if r.entries[identity].policiesByEnv[env] == nil { + r.entries[identity].policiesByEnv[env] = make(map[string]*v1.RoutingPolicy) + } + r.entries[identity].policiesByEnv[env][name] = rp +} - return nil +func (r *RoutingPolicyCache) Delete(identity string, env string, name string) { + if commonUtil.IsAdmiralReadOnly() { + log.Infof(LogFormat, admiral.Delete, "routingpolicy", fmt.Sprintf("%s.%s.%s", identity, env, name), "", "skipping read-only mode") + return + } + if common.GetEnableRoutingPolicy() { + defer r.mutex.Unlock() + r.mutex.Lock() + if r.entries[identity] != nil && + r.entries[identity].policiesByEnv != nil && + r.entries[identity].policiesByEnv[env] != nil && + r.entries[identity].policiesByEnv[env][name] != nil { + delete(r.entries[identity].policiesByEnv[env], name) + if len(r.entries[identity].policiesByEnv[env]) == 0 { + delete(r.entries[identity].policiesByEnv, env) + } + } + } else { + log.Infof(LogFormat, admiral.Delete, "routingpolicy", fmt.Sprintf("%s.%s.%s", identity, env, name), "", "routingpolicy disabled") + } } type routingPolicyFilterCache struct { @@ -116,7 +268,7 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy) if common.ShouldIgnoreResource(obj.ObjectMeta) { log.Infof("op=%s type=%v name=%v namespace=%s cluster=%s message=%s", "admiralIoIgnoreAnnotationCheck", common.RoutingPolicyResourceType, obj.Name, obj.Namespace, "", "Value=true") - log.Infof(LogFormat, "success", "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation") + log.Infof(LogFormat, "success", "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation or label") return nil } dependents := getDependents(obj, r) @@ -124,7 +276,7 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy) log.Info("No dependents found for Routing Policy - ", obj.Name) return nil } - err := r.processroutingPolicy(ctx, dependents, obj, admiral.Add) + err := r.RoutingPolicyService.ProcessAddOrUpdate(ctx, admiral.Add, obj, nil, dependents) if err != nil { log.Errorf(LogErrFormat, admiral.Update, "routingpolicy", obj.Name, "", "failed to process routing policy") return err @@ -136,55 +288,32 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy) return nil } -func (r RoutingPolicyHandler) processroutingPolicy(ctx context.Context, dependents map[string]string, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType) error { - var err error - for _, remoteController := range r.RemoteRegistry.remoteControllers { - if !common.DoRoutingPolicyForCluster(remoteController.ClusterID) { - log.Warnf(LogFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, "RoutingPolicy disabled for cluster") - continue - } - for _, dependent := range dependents { - // Check if the dependent exists in this remoteCluster. If so, we create an envoyFilter with dependent identity as workload selector - if _, ok := r.RemoteRegistry.AdmiralCache.IdentityClusterCache.Get(dependent).Copy()[remoteController.ClusterID]; ok { - _, err1 := createOrUpdateEnvoyFilter(ctx, remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache) - if err1 != nil { - log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err) - err = common.AppendError(err, err1) - } else { - log.Infof(LogFormat, eventType, "routingpolicy ", routingPolicy.Name, remoteController.ClusterID, "created envoyfilters") - } - } - } - } - return err -} - -func (r RoutingPolicyHandler) Updated(ctx context.Context, obj *v1.RoutingPolicy) error { +func (r RoutingPolicyHandler) Updated(ctx context.Context, newRP *v1.RoutingPolicy, oldRP *v1.RoutingPolicy) error { if commonUtil.IsAdmiralReadOnly() { log.Infof(LogFormat, admiral.Update, "routingpolicy", "", "", "skipping read-only mode") return nil } if common.GetEnableRoutingPolicy() { - if common.ShouldIgnoreResource(obj.ObjectMeta) { + if common.ShouldIgnoreResource(newRP.ObjectMeta) { log.Infof("op=%s type=%v name=%v namespace=%s cluster=%s message=%s", "admiralIoIgnoreAnnotationCheck", common.RoutingPolicyResourceType, - obj.Name, obj.Namespace, "", "Value=true") - log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation") + newRP.Name, newRP.Namespace, "", "Value=true") + log.Infof(LogFormat, admiral.Update, "routingpolicy", newRP.Name, "", "Ignored the RoutingPolicy because of the annotation or label") // We need to process this as a delete event. - r.Deleted(ctx, obj) + r.Deleted(ctx, newRP) return nil } - dependents := getDependents(obj, r) + dependents := getDependents(newRP, r) if len(dependents) == 0 { return nil } - err := r.processroutingPolicy(ctx, dependents, obj, admiral.Update) + err := r.RoutingPolicyService.ProcessAddOrUpdate(ctx, admiral.Update, newRP, oldRP, dependents) if err != nil { - log.Errorf(LogErrFormat, admiral.Update, "routingpolicy", obj.Name, "", "failed to process routing policy") + log.Errorf(LogErrFormat, admiral.Update, "routingpolicy", newRP.Name, "", "failed to process routing policy") return err } - log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "updated routing policy") + log.Infof(LogFormat, admiral.Update, "routingpolicy", newRP.Name, "", "updated routing policy") } else { - log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "routingpolicy disabled") + log.Infof(LogFormat, admiral.Update, "routingpolicy", newRP.Name, "", "routingpolicy disabled") } return nil } @@ -207,43 +336,9 @@ func getDependents(obj *v1.RoutingPolicy, r RoutingPolicyHandler) map[string]str Deleted - deletes the envoyFilters for the routingPolicy when delete event received for routing policy */ func (r RoutingPolicyHandler) Deleted(ctx context.Context, obj *v1.RoutingPolicy) error { - err := r.deleteEnvoyFilters(ctx, obj, admiral.Delete) + err := r.RoutingPolicyService.Delete(ctx, admiral.Delete, obj) if err != nil { log.Infof(LogFormat, admiral.Delete, "routingpolicy", obj.Name, "", "deleted envoy filter for routing policy") } return err } - -func (r RoutingPolicyHandler) deleteEnvoyFilters(ctx context.Context, obj *v1.RoutingPolicy, eventType admiral.EventType) error { - key := obj.Name + common.GetRoutingPolicyIdentity(obj) + common.GetRoutingPolicyEnv(obj) - if r.RemoteRegistry == nil || r.RemoteRegistry.AdmiralCache == nil || r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache == nil { - log.Infof(LogFormat, eventType, "routingpolicy", obj.Name, "", "skipping delete event as cache is nil") - return nil - } - clusterIdFilterMap := r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Get(key) // RoutingPolicyFilterCache key=rpname+rpidentity+environment of the routingPolicy, value is a map [clusterId -> map [filterName -> filterNameSpace]] - var err error - for _, rc := range r.RemoteRegistry.remoteControllers { - if rc != nil { - if !common.DoRoutingPolicyForCluster(rc.ClusterID) { - log.Warnf(LogFormat, eventType, "routingpolicy", obj.Name, rc.ClusterID, "RoutingPolicy disabled for cluster") - continue - } - if filterMap, ok := clusterIdFilterMap[rc.ClusterID]; ok { - for filter, filterNs := range filterMap { - log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting") - err1 := rc.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters(filterNs).Delete(ctx, filter, metaV1.DeleteOptions{}) - if err1 != nil { - log.Errorf(LogErrFormat, eventType, "envoyfilter", filter, rc.ClusterID, err1) - err = common.AppendError(err, err1) - } else { - log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting from cache") - } - } - } - } - } - if err == nil { - r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Delete(key) - } - return err -} diff --git a/admiral/pkg/clusters/routingpolicy_handler_test.go b/admiral/pkg/clusters/routingpolicy_handler_test.go index 0b3838bf..3870b4c2 100644 --- a/admiral/pkg/clusters/routingpolicy_handler_test.go +++ b/admiral/pkg/clusters/routingpolicy_handler_test.go @@ -3,7 +3,7 @@ package clusters import ( "bytes" "context" - "fmt" + "errors" "os" "reflect" "strings" @@ -24,7 +24,7 @@ import ( metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestRoutingPolicyHandler(t *testing.T) { +func TestNewRoutingPolicyProcessor(t *testing.T) { common.ResetSync() p := common.AdmiralParams{ KubeconfigPath: "testdata/fake.config", @@ -50,8 +50,6 @@ func TestRoutingPolicyHandler(t *testing.T) { registry, _ := InitAdmiral(context.Background(), p) - handler := RoutingPolicyHandler{} - rpFilterCache := &routingPolicyFilterCache{} rpFilterCache.filterCache = make(map[string]map[string]map[string]string) rpFilterCache.mutex = &sync.Mutex{} @@ -62,24 +60,34 @@ func TestRoutingPolicyHandler(t *testing.T) { }) remoteController.RoutingPolicyController = routingPolicyController - registry.remoteControllers = map[string]*RemoteController{"cluster-1": remoteController} registry.AdmiralCache.RoutingPolicyFilterCache = rpFilterCache + registry.AdmiralCache.RoutingPolicyCache = NewRoutingPolicyCache() // foo is dependent upon bar and bar has a deployment in the same cluster. registry.AdmiralCache.IdentityDependencyCache.Put("foo", "bar", "bar") registry.AdmiralCache.IdentityClusterCache.Put("bar", remoteController.ClusterID, remoteController.ClusterID) - // foo is also dependent upon bar2 but bar2 is in a different cluster, so this cluster should not have the envoyfilter created + // foo2 is also dependent upon bar2 but bar2 is in a different cluster, so this cluster should not have the envoyfilter created registry.AdmiralCache.IdentityDependencyCache.Put("foo2", "bar2", "bar2") registry.AdmiralCache.IdentityClusterCache.Put("bar2", "differentCluster", "differentCluster") // foo1 is dependent upon bar 1 but bar1 does not have a deployment so it is missing from identityClusterCache registry.AdmiralCache.IdentityDependencyCache.Put("foo1", "bar1", "bar1") - handler.RemoteRegistry = registry - - routingPolicyFoo := &admiralV1.RoutingPolicy{ + type args struct { + rr *RemoteRegistry + et admiral.EventType + rp *admiralV1.RoutingPolicy + dp map[string]string + } + type want struct { + err error + expectedFilterCacheKey string + expectedFilterCount int + expectedEnvoyFilterConfigPatchVal map[string]interface{} + } + fooRP := &admiralV1.RoutingPolicy{ TypeMeta: metaV1.TypeMeta{}, ObjectMeta: metaV1.ObjectMeta{ Name: "rpfoo", @@ -92,16 +100,202 @@ func TestRoutingPolicyHandler(t *testing.T) { Plugin: "test", Hosts: []string{"e2e.testservice.mesh"}, Config: map[string]string{ - "cachePrefix": "cache-v1", - "cachettlSec": "86400", "routingServiceUrl": "e2e.test.routing.service.mesh", - "pathPrefix": "/sayhello,/v1/company/{id}/", }, }, Status: admiralV1.RoutingPolicyStatus{}, } - routingPolicyFooTest := &admiralV1.RoutingPolicy{ + foo1RP := fooRP.DeepCopy() + foo1RP.Labels[common.GetWorkloadIdentifier()] = "foo1" + + foo2RP := fooRP.DeepCopy() + foo1RP.Labels[common.GetWorkloadIdentifier()] = "foo2" + + testCases := []struct { + name string + args args + want want + }{ + { + name: "Valid routing policy and existing deployment", + args: args{ + rr: registry, + et: admiral.Add, + rp: fooRP, + dp: map[string]string{"bar": "bar"}, + }, + want: want{ + expectedFilterCacheKey: "rpfoofoodev", + expectedFilterCount: 1, + expectedEnvoyFilterConfigPatchVal: map[string]interface{}{"name": "dynamicRoutingFilterPatch", "typed_config": map[string]interface{}{ + "@type": "type.googleapis.com/udpa.type.v1.TypedStruct", "type_url": "type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm", + "value": map[string]interface{}{ + "config": map[string]interface{}{ + "configuration": map[string]interface{}{ + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "routingServiceUrl: e2e.test.routing.service.mesh\nhosts: e2e.testservice.mesh\nplugin: test"}, + "vm_config": map[string]interface{}{"code": map[string]interface{}{"local": map[string]interface{}{"filename": ""}}, "runtime": "envoy.wasm.runtime.v8", "vm_id": "test-dr-532221909d5db54fe5f5-f6ce3712830af1b15625-1.13"}}}}}, + }, + }, + { + name: "Valid routing policy but dependency cluster is missing", + args: args{ + rr: registry, + et: admiral.Add, + rp: foo1RP, + dp: map[string]string{"bar1": "bar1"}, + }, + want: want{ + expectedFilterCacheKey: "rpfoofoodev", + expectedFilterCount: 0, + }, + }, + { + name: "Valid routing policy and known cluster containing the deployment", + args: args{ + rr: registry, + et: admiral.Add, + rp: foo2RP, + dp: map[string]string{"bar2": "bar2"}, + }, + want: want{ + expectedFilterCacheKey: "rpfoofoodev", + expectedFilterCount: 0, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + p := NewRoutingPolicyProcessor(tc.args.rr) + ae := p.ProcessAddOrUpdate(ctx, tc.args.et, tc.args.rp, nil, tc.args.dp) + if ae != nil && !errors.Is(ae, tc.want.err) { + t.Errorf("NewRoutingPolicyProcessor() = %v, want %v", ae, tc.want.err) + t.Fail() + } + list1, _ := remoteController.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters("istio-system").List(ctx, metaV1.ListOptions{}) + assert.Equal(t, tc.want.expectedFilterCount, len(list1.Items)) + + if tc.want.expectedFilterCount > 0 { + receivedEnvoyFilter, _ := remoteController.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters("istio-system").Get(ctx, "test-dr-532221909d5db54fe5f5-f6ce3712830af1b15625-1.13", metaV1.GetOptions{}) + eq := reflect.DeepEqual(tc.want.expectedEnvoyFilterConfigPatchVal, receivedEnvoyFilter.Spec.ConfigPatches[0].Patch.Value.AsMap()) + assert.True(t, eq) + assert.NotNil(t, registry.AdmiralCache.RoutingPolicyCache.Get("foo", "dev", "rpfoo")) + } + + // once the routing policy is deleted, the corresponding filter should also be deleted + p.Delete(ctx, admiral.Delete, tc.args.rp) + assert.Nil(t, registry.AdmiralCache.RoutingPolicyFilterCache.Get(tc.want.expectedFilterCacheKey)) + assert.Nil(t, registry.AdmiralCache.RoutingPolicyCache.Get("foo", "dev", tc.args.rp.Name)) + + }) + } +} + +func TestRoutingPolicyProcess_DeltaUpdate(t *testing.T) { + common.ResetSync() + p := common.AdmiralParams{ + KubeconfigPath: "testdata/fake.config", + LabelSet: &common.LabelSet{ + DeploymentAnnotation: "sidecar.istio.io/inject", + }, + EnableSAN: true, + SANPrefix: "prefix", + HostnameSuffix: "mesh", + SyncNamespace: "ns", + CacheReconcileDuration: time.Minute, + ClusterRegistriesNamespace: "default", + DependenciesNamespace: "default", + EnableRoutingPolicy: true, + EnvoyFilterVersion: "1.13", + Profile: common.AdmiralProfileDefault, + } + + p.LabelSet.WorkloadIdentityKey = "identity" + p.LabelSet.EnvKey = "admiral.io/env" + p.LabelSet.AdmiralCRDIdentityLabel = "identity" + + InitAdmiral(context.Background(), p) + + type args struct { + rr *RemoteRegistry + dependency *admiralV1.Dependency + } + type want struct { + err error + } + + _ = []struct { + name string + args args + want want + }{ + { + name: "New dependency addition creates filter for destinations with routing policy", + }, + { + name: "Existing dependency update creates filter for destinations with routing policy", + }, + { + name: "Existing dependency deletion removes filter for destinations with routing policy", + }, + } +} + +func TestRoutingPolicyHandler(t *testing.T) { + common.ResetSync() + p := common.AdmiralParams{ + KubeconfigPath: "testdata/fake.config", + LabelSet: &common.LabelSet{ + DeploymentAnnotation: "sidecar.istio.io/inject", + }, + EnableSAN: true, + SANPrefix: "prefix", + HostnameSuffix: "mesh", + SyncNamespace: "ns", + CacheReconcileDuration: time.Minute, + ClusterRegistriesNamespace: "default", + DependenciesNamespace: "default", + EnableRoutingPolicy: true, + RoutingPolicyClusters: []string{"*"}, + EnvoyFilterVersion: "1.13", + Profile: common.AdmiralProfileDefault, + } + + p.LabelSet.WorkloadIdentityKey = "identity" + p.LabelSet.EnvKey = "admiral.io/env" + p.LabelSet.AdmiralCRDIdentityLabel = "identity" + + registry, _ := InitAdmiral(context.Background(), p) + + rpFilterCache := &routingPolicyFilterCache{} + rpFilterCache.filterCache = make(map[string]map[string]map[string]string) + rpFilterCache.mutex = &sync.Mutex{} + + routingPolicyController := &admiral.RoutingPolicyController{IstioClient: istiofake.NewSimpleClientset()} + remoteController, _ := createMockRemoteController(func(i interface{}) { + + }) + + remoteController.RoutingPolicyController = routingPolicyController + registry.remoteControllers = map[string]*RemoteController{"cluster-1": remoteController} + registry.AdmiralCache.RoutingPolicyFilterCache = rpFilterCache + + // foo is dependent upon bar and bar has a deployment in the same cluster. + registry.AdmiralCache.IdentityDependencyCache.Put("foo", "bar", "bar") + registry.AdmiralCache.IdentityClusterCache.Put("bar", remoteController.ClusterID, remoteController.ClusterID) + + // foo2 is also dependent upon bar2 but bar2 is in a different cluster, so this cluster should not have the envoyfilter created + registry.AdmiralCache.IdentityDependencyCache.Put("foo2", "bar2", "bar2") + registry.AdmiralCache.IdentityClusterCache.Put("bar2", "differentCluster", "differentCluster") + + // foo1 is dependent upon bar 1 but bar1 does not have a deployment so it is missing from identityClusterCache + registry.AdmiralCache.IdentityDependencyCache.Put("foo1", "bar1", "bar1") + processor := &MockPolicyProcessor{} + oops := errors.New("oops") + errProcessor := &MockPolicyProcessor{err: oops} + fooRP := &admiralV1.RoutingPolicy{ TypeMeta: metaV1.TypeMeta{}, ObjectMeta: metaV1.ObjectMeta{ Name: "rpfoo", @@ -119,95 +313,127 @@ func TestRoutingPolicyHandler(t *testing.T) { }, Status: admiralV1.RoutingPolicyStatus{}, } - - routingPolicyFoo1 := routingPolicyFoo.DeepCopy() - routingPolicyFoo1.Labels[common.GetWorkloadIdentifier()] = "foo1" - - routingPolicyFoo2 := routingPolicyFoo.DeepCopy() - routingPolicyFoo2.Labels[common.GetWorkloadIdentifier()] = "foo2" - + type args struct { + proc *MockPolicyProcessor + eventType admiral.EventType + rp *admiralV1.RoutingPolicy + } + type want struct { + err error + addCount int + delCount int + updateCount int + } + time.Sleep(time.Second * 10) testCases := []struct { - name string - routingPolicy *admiralV1.RoutingPolicy - expectedFilterCacheKey string - expectedFilterCount int - expectedEnvoyFilterConfigPatchVal map[string]interface{} + name string + args args + want want }{ { - name: "If dependent deployment exists, should fetch filter from cache", - routingPolicy: routingPolicyFooTest, - expectedFilterCacheKey: "rpfoofoodev", - expectedFilterCount: 1, - expectedEnvoyFilterConfigPatchVal: map[string]interface{}{"name": "dynamicRoutingFilterPatch", "typed_config": map[string]interface{}{ - "@type": "type.googleapis.com/udpa.type.v1.TypedStruct", "type_url": "type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm", - "value": map[string]interface{}{ - "config": map[string]interface{}{ - "configuration": map[string]interface{}{ - "@type": "type.googleapis.com/google.protobuf.StringValue", - "value": "routingServiceUrl: e2e.test.routing.service.mesh\nhosts: e2e.testservice.mesh\nplugin: test"}, - "vm_config": map[string]interface{}{"code": map[string]interface{}{"local": map[string]interface{}{"filename": ""}}, "runtime": "envoy.wasm.runtime.v8", "vm_id": "test-dr-532221909d5db54fe5f5-f6ce3712830af1b15625-1.13"}}}}}, + name: "Add event is propagated correctly", + args: args{ + proc: processor, + eventType: admiral.Add, + rp: fooRP, + }, + want: want{ + addCount: 1, + }, + }, + { + name: "Err on Add event", + args: args{ + proc: errProcessor, + eventType: admiral.Add, + rp: fooRP, + }, + want: want{ + err: oops, + }, + }, + { + name: "Update event is propagated correctly", + args: args{ + proc: processor, + eventType: admiral.Update, + rp: fooRP, + }, + want: want{ + updateCount: 1, + }, }, { - name: "If dependent deployment does not exist, the filter should not be created ", - routingPolicy: routingPolicyFoo1, - expectedFilterCacheKey: "rpfoofoodev", - expectedFilterCount: 0, + name: "Err on update event", + args: args{ + proc: errProcessor, + eventType: admiral.Update, + rp: fooRP, + }, + want: want{ + err: oops, + }, }, { - name: "If dependent deployment exists in a different cluster, the filter should not be created in cluster where dependency isnt there", - routingPolicy: routingPolicyFoo2, - expectedFilterCacheKey: "rpfoofoodev", - expectedFilterCount: 0, + name: "Delete event is propagated correctly", + args: args{ + proc: processor, + eventType: admiral.Delete, + rp: fooRP, + }, + want: want{ + delCount: 1, + }, + }, + { + name: "Err on delete event", + args: args{ + proc: errProcessor, + eventType: admiral.Delete, + rp: fooRP, + }, + want: want{ + err: oops, + }, }, } ctx := context.Background() - time.Sleep(time.Second * 30) for _, c := range testCases { t.Run(c.name, func(t *testing.T) { - handler.Added(ctx, c.routingPolicy) - if c.expectedFilterCount > 0 { - filterCacheValue := registry.AdmiralCache.RoutingPolicyFilterCache.Get(c.expectedFilterCacheKey) - assert.NotNil(t, filterCacheValue) - routingPolicyNameSha, _ := getSha1(c.routingPolicy.Name + common.GetRoutingPolicyEnv(c.routingPolicy) + common.GetRoutingPolicyIdentity(c.routingPolicy)) - dependentIdentitySha, _ := getSha1("bar") - envoyFilterName := fmt.Sprintf("%s-dr-%s-%s-%s", strings.ToLower(c.routingPolicy.Spec.Plugin), routingPolicyNameSha, dependentIdentitySha, "1.13") - - filterMap := filterCacheValue[remoteController.ClusterID] - assert.NotNil(t, filterMap) - assert.NotNil(t, filterMap[envoyFilterName]) - - filter, err := remoteController.RoutingPolicyController.IstioClient.NetworkingV1alpha3(). - EnvoyFilters("istio-system").Get(ctx, envoyFilterName, metaV1.GetOptions{}) - assert.Nil(t, err) - assert.NotNil(t, filter) - } - //get envoyfilters from all namespaces - list1, _ := remoteController.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters("istio-system").List(ctx, metaV1.ListOptions{}) - assert.Equal(t, c.expectedFilterCount, len(list1.Items)) - if c.expectedFilterCount > 0 { - receivedEnvoyFilter, _ := remoteController.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters("istio-system").Get(ctx, "test-dr-532221909d5db54fe5f5-f6ce3712830af1b15625-1.13", metaV1.GetOptions{}) - eq := reflect.DeepEqual(c.expectedEnvoyFilterConfigPatchVal, receivedEnvoyFilter.Spec.ConfigPatches[0].Patch.Value.AsMap()) - assert.True(t, eq) + handler := NewRoutingPolicyHandler(registry, "", c.args.proc) + switch c.args.eventType { + case admiral.Add: + err := handler.Added(ctx, c.args.rp) + if c.want.err != nil && !errors.Is(err, c.want.err) { + t.Errorf("RoutingPolicyHandler.Added() = %v, want %v", err, c.want.err) + t.Fail() + } + if c.want.addCount > 0 { + assert.Equal(t, c.args.proc.addCount, c.want.addCount) + } + case admiral.Update: + err := handler.Updated(ctx, c.args.rp, nil) + if c.want.err != nil && !errors.Is(err, c.want.err) { + t.Errorf("RoutingPolicyHandler.Updated() = %v, want %v", err, c.want.err) + t.Fail() + } + if c.want.addCount > 0 { + assert.Equal(t, c.args.proc.addCount, c.want.addCount) + } + case admiral.Delete: + err := handler.Deleted(ctx, c.args.rp) + if c.want.err != nil && !errors.Is(err, c.want.err) { + t.Errorf("RoutingPolicyHandler.Deleted() = %v, want %v", err, c.want.err) + t.Fail() + } + if c.want.addCount > 0 { + assert.Equal(t, c.args.proc.addCount, c.want.addCount) + } } - - // once the routing policy is deleted, the corresponding filter should also be deleted - handler.Deleted(ctx, c.routingPolicy) - assert.Nil(t, registry.AdmiralCache.RoutingPolicyFilterCache.Get(c.expectedFilterCacheKey)) }) } - - // ignore the routing policy - annotations := routingPolicyFoo.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) - } - annotations[common.AdmiralIgnoreAnnotation] = "true" - routingPolicyFoo.SetAnnotations(annotations) - - handler.Updated(ctx, routingPolicyFoo) - assert.Nil(t, registry.AdmiralCache.RoutingPolicyFilterCache.Get("rpfoofoodev")) } func TestRoutingPolicyReadOnly(t *testing.T) { @@ -230,7 +456,7 @@ func TestRoutingPolicyReadOnly(t *testing.T) { p.LabelSet.EnvKey = "admiral.io/env" p.LabelSet.AdmiralCRDIdentityLabel = "identity" - handler := RoutingPolicyHandler{} + handler := NewRoutingPolicyHandler(nil, "", NewRoutingPolicyProcessor(nil)) testcases := []struct { name string @@ -273,7 +499,7 @@ func TestRoutingPolicyReadOnly(t *testing.T) { assert.Equal(t, c.doesError, val) // Update routing policy test - handler.Updated(ctx, c.rp) + handler.Updated(ctx, c.rp, nil) t.Log(buf.String()) val = strings.Contains(buf.String(), "skipping read-only mode") assert.Equal(t, c.doesError, val) @@ -287,7 +513,7 @@ func TestRoutingPolicyReadOnly(t *testing.T) { } } -func TestRoutingPolicyDisabled_ForEmptyClusters(t *testing.T) { +func TestRoutingPolicyIgnored(t *testing.T) { common.ResetSync() p := common.AdmiralParams{ KubeconfigPath: "testdata/fake.config", @@ -302,7 +528,7 @@ func TestRoutingPolicyDisabled_ForEmptyClusters(t *testing.T) { ClusterRegistriesNamespace: "default", DependenciesNamespace: "default", EnableRoutingPolicy: true, - RoutingPolicyClusters: []string{}, + RoutingPolicyClusters: []string{"*"}, EnvoyFilterVersion: "1.13", Profile: common.AdmiralProfileDefault, } @@ -312,56 +538,247 @@ func TestRoutingPolicyDisabled_ForEmptyClusters(t *testing.T) { p.LabelSet.AdmiralCRDIdentityLabel = "identity" registry, _ := InitAdmiral(context.Background(), p) + processor := &MockPolicyProcessor{} + handler := NewRoutingPolicyHandler(registry, "", processor) + + type want struct { + err error + addCount int + delCount int + updateCount int + } + testcases := []struct { + name string + rp *admiralV1.RoutingPolicy + want want + }{ + { + name: "Ignore Routing Policy - Annotation", + rp: &admiralV1.RoutingPolicy{ + ObjectMeta: metaV1.ObjectMeta{ + Annotations: map[string]string{ + "admiral.io/ignore": "true", + }, + }, + }, + want: want{}, + }, + { + name: "Ignore Routing Policy - Label", + rp: &admiralV1.RoutingPolicy{ + ObjectMeta: metaV1.ObjectMeta{ + Labels: map[string]string{ + "admiral.io/ignore": "true", + }, + }, + }, + want: want{}, + }, + } - handler := RoutingPolicyHandler{} + ctx := context.Background() - rpFilterCache := &routingPolicyFilterCache{} - rpFilterCache.filterCache = make(map[string]map[string]map[string]string) - rpFilterCache.mutex = &sync.Mutex{} + for _, c := range testcases { + t.Run(c.name, func(t *testing.T) { + handler.Added(ctx, c.rp) + assert.Equal(t, c.want.addCount, 0) - routingPolicyController := &admiral.RoutingPolicyController{IstioClient: istiofake.NewSimpleClientset()} - remoteController, _ := createMockRemoteController(func(i interface{}) { + // Update routing policy test + handler.Updated(ctx, c.rp, nil) + assert.Equal(t, c.want.updateCount, 0) - }) + // Delete routing policy test + handler.Deleted(ctx, c.rp) + assert.Equal(t, c.want.delCount, 0) + }) + } +} - remoteController.RoutingPolicyController = routingPolicyController +func TestRoutingPolicyProcessingDisabled(t *testing.T) { + common.ResetSync() + p := common.AdmiralParams{ + KubeconfigPath: "testdata/fake.config", + LabelSet: &common.LabelSet{ + DeploymentAnnotation: "sidecar.istio.io/inject", + }, + EnableSAN: true, + SANPrefix: "prefix", + HostnameSuffix: "mesh", + SyncNamespace: "ns", + CacheReconcileDuration: time.Minute, + ClusterRegistriesNamespace: "default", + DependenciesNamespace: "default", + EnableRoutingPolicy: false, + RoutingPolicyClusters: []string{"*"}, + EnvoyFilterVersion: "1.13", + Profile: common.AdmiralProfileDefault, + } - registry.remoteControllers = map[string]*RemoteController{"cluster-1": remoteController} - registry.AdmiralCache.RoutingPolicyFilterCache = rpFilterCache + p.LabelSet.WorkloadIdentityKey = "identity" + p.LabelSet.EnvKey = "admiral.io/env" + p.LabelSet.AdmiralCRDIdentityLabel = "identity" - // foo is dependent upon bar and bar has a deployment in the same cluster. - registry.AdmiralCache.IdentityDependencyCache.Put("foo", "bar", "bar") - registry.AdmiralCache.IdentityClusterCache.Put("bar", remoteController.ClusterID, remoteController.ClusterID) + registry, _ := InitAdmiral(context.Background(), p) - handler.RemoteRegistry = registry - routingPolicy := &admiralV1.RoutingPolicy{ - TypeMeta: metaV1.TypeMeta{}, - ObjectMeta: metaV1.ObjectMeta{ - Name: "rpfoo", - Labels: map[string]string{ - "identity": "foo", - "admiral.io/env": "dev", + processor := &MockPolicyProcessor{} + handler := NewRoutingPolicyHandler(registry, "", processor) + + type want struct { + err error + addCount int + delCount int + updateCount int + } + testcases := []struct { + name string + rp *admiralV1.RoutingPolicy + want want + }{ + { + name: "Disabled", + rp: &admiralV1.RoutingPolicy{ + ObjectMeta: metaV1.ObjectMeta{ + Annotations: map[string]string{ + "admiral.io/ignore": "true", + }, + }, }, + want: want{}, }, - Spec: model.RoutingPolicy{ - Plugin: "test", - Hosts: []string{"e2e.testservice.mesh"}, - Config: map[string]string{ - "cachePrefix": "cache-v1", - "cachettlSec": "86400", - "routingServiceUrl": "e2e.test.routing.service.mesh", - "pathPrefix": "/sayhello,/v1/company/{id}/", + } + + ctx := context.Background() + + for _, c := range testcases { + t.Run(c.name, func(t *testing.T) { + handler.Added(ctx, c.rp) + assert.Equal(t, c.want.addCount, 0) + + // Update routing policy test + handler.Updated(ctx, c.rp, nil) + assert.Equal(t, c.want.updateCount, 0) + + // Delete routing policy test + handler.Deleted(ctx, c.rp) + assert.Equal(t, c.want.delCount, 0) + }) + } +} + +func TestRoutingPolicyCache_Put(t *testing.T) { + rp := NewRoutingPolicyCache() + + rp.Put("foo", "dev", "rpfoo", nil) + assert.Nil(t, rp.Get("foo", "dev", "rpfoo")) + + rp.Put("foo", "dev", "rpfoo", &admiralV1.RoutingPolicy{}) + assert.NotNil(t, rp.Get("foo", "dev", "rpfoo")) +} + +func TestRoutingPolicyCache_Delete(t *testing.T) { + p := common.AdmiralParams{ + KubeconfigPath: "testdata/fake.config", + LabelSet: &common.LabelSet{ + DeploymentAnnotation: "sidecar.istio.io/inject", + }, + EnableSAN: true, + SANPrefix: "prefix", + HostnameSuffix: "mesh", + SyncNamespace: "ns", + CacheReconcileDuration: time.Minute, + ClusterRegistriesNamespace: "default", + DependenciesNamespace: "default", + EnvoyFilterVersion: "1.13", + Profile: common.AdmiralProfileDefault, + } + + p.LabelSet.WorkloadIdentityKey = "identity" + p.LabelSet.EnvKey = "admiral.io/env" + p.LabelSet.AdmiralCRDIdentityLabel = "identity" + + type args struct { + readOnly bool + enableRoutingPolicy bool + identity string + env string + name string + } + testCases := []struct { + name string + args args + }{ + { + name: "Admiral should delete from cache", + args: args{ + readOnly: false, + enableRoutingPolicy: true, + identity: "foo", + env: "dev", + name: "rpfoo", + }, + }, + { + name: "Admiral should not delete from cache during readOnly", + args: args{ + readOnly: true, + enableRoutingPolicy: true, + identity: "foo", + env: "dev", + name: "rpfoo", + }, + }, + { + name: "Admiral should not delete from cache when disabled", + args: args{ + readOnly: false, + enableRoutingPolicy: false, + identity: "foo", + env: "dev", + name: "rpfoo", }, }, - Status: admiralV1.RoutingPolicyStatus{}, } - ctx := context.Background() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + common.ResetSync() + p.EnableRoutingPolicy = tc.args.enableRoutingPolicy + InitAdmiral(context.Background(), p) + commonUtil.CurrentAdmiralState.ReadOnly = tc.args.readOnly + cache := NewRoutingPolicyCache() + rp := &admiralV1.RoutingPolicy{} + cache.Put(tc.args.identity, tc.args.env, tc.args.name, rp) + cache.Delete(tc.args.identity, tc.args.env, tc.args.name) + if tc.args.readOnly { + assert.NotNil(t, cache.Get(tc.args.identity, tc.args.env, tc.args.name)) + } else { + if tc.args.enableRoutingPolicy { + assert.Nil(t, cache.Get(tc.args.identity, tc.args.env, tc.args.name)) + } else { + assert.NotNil(t, cache.Get(tc.args.identity, tc.args.env, tc.args.name)) + } + } + }) + } +} - handler.Added(ctx, routingPolicy) - // No-Op. Filters should not be created - assert.Nil(t, registry.AdmiralCache.RoutingPolicyFilterCache.Get("rpfoofoodev")) +type MockPolicyProcessor struct { + addCount int + deltaCount int + deleteCount int + err error +} + +func (m *MockPolicyProcessor) ProcessAddOrUpdate(ctx context.Context, eventType admiral.EventType, newRP *admiralV1.RoutingPolicy, oldRP *admiralV1.RoutingPolicy, dependents map[string]string) error { + m.addCount++ + return m.err +} + +func (m *MockPolicyProcessor) DeltaUpdate(ctx context.Context, eventType admiral.EventType, dependency *admiralV1.Dependency) error { + m.deltaCount++ + return m.err +} - handler.Deleted(ctx, routingPolicy) - // No-Op. Should not panic or throw errors - assert.Nil(t, registry.AdmiralCache.RoutingPolicyFilterCache.Get("rpfoofoodev")) +func (m *MockPolicyProcessor) Delete(ctx context.Context, eventType admiral.EventType, routingPolicy *admiralV1.RoutingPolicy) error { + m.deleteCount++ + return m.err } diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index a53205c9..2edd2290 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -70,6 +70,7 @@ type AdmiralCache struct { DependencyNamespaceCache *common.SidecarEgressMap SeClusterCache *common.MapOfMaps RoutingPolicyFilterCache *routingPolicyFilterCache + RoutingPolicyCache *RoutingPolicyCache SourceToDestinations *sourceToDestinations //This cache is to fetch list of all dependencies for a given source identity, TrafficConfigIgnoreAssets []string GatewayAssets []string @@ -134,6 +135,7 @@ func NewRemoteRegistry(ctx context.Context, params common.AdmiralParams) *Remote CnameDependentClusterCache: common.NewMapOfMaps(), IdentityDependencyCache: common.NewMapOfMaps(), RoutingPolicyFilterCache: rpFilterCache, + RoutingPolicyCache: NewRoutingPolicyCache(), DependencyNamespaceCache: common.NewSidecarEgressMap(), CnameIdentityCache: &sync.Map{}, ServiceEntryAddressStore: &ServiceEntryAddressStore{EntryAddresses: map[string]string{}, Addresses: []string{}}, diff --git a/admiral/pkg/controller/admiral/routingpolicy.go b/admiral/pkg/controller/admiral/routingpolicy.go index 598412ae..852c809f 100644 --- a/admiral/pkg/controller/admiral/routingpolicy.go +++ b/admiral/pkg/controller/admiral/routingpolicy.go @@ -22,7 +22,7 @@ import ( // RoutingPolicyHandler interface contains the methods that are required type RoutingPolicyHandler interface { Added(ctx context.Context, obj *v1.RoutingPolicy) error - Updated(ctx context.Context, obj *v1.RoutingPolicy) error + Updated(ctx context.Context, newObj *v1.RoutingPolicy, oldObj *v1.RoutingPolicy) error Deleted(ctx context.Context, obj *v1.RoutingPolicy) error } @@ -61,12 +61,14 @@ func (r *RoutingPolicyController) Added(ctx context.Context, obj interface{}) er return nil } -func (r *RoutingPolicyController) Updated(ctx context.Context, obj interface{}, oldObj interface{}) error { - routingPolicy, ok := obj.(*v1.RoutingPolicy) +func (r *RoutingPolicyController) Updated(ctx context.Context, newObj interface{}, oldObj interface{}) error { + newRP, ok := newObj.(*v1.RoutingPolicy) if !ok { - return fmt.Errorf("type assertion failed, %v is not of type *v1.RoutingPolicy", obj) + return fmt.Errorf("type assertion failed, %v is not of type *v1.RoutingPolicy", newObj) } - r.RoutingPolicyHandler.Updated(ctx, routingPolicy) + oldRP, ok := oldObj.(*v1.RoutingPolicy) + // oldRp can be nil + r.RoutingPolicyHandler.Updated(ctx, newRP, oldRP) return nil } diff --git a/admiral/pkg/controller/admiral/routingpolicy_test.go b/admiral/pkg/controller/admiral/routingpolicy_test.go index 9312d9c8..79e54d75 100644 --- a/admiral/pkg/controller/admiral/routingpolicy_test.go +++ b/admiral/pkg/controller/admiral/routingpolicy_test.go @@ -228,7 +228,7 @@ func TestRoutingPolicyAddUpdateDelete(t *testing.T) { rpObj := makeK8sRoutingPolicyObj(rpName, "namespace1", rp) routingPolicyController.Added(ctx, rpObj) - if !cmp.Equal(handler.Obj.Spec, rpObj.Spec) { + if !cmp.Equal(handler.New.Spec, rpObj.Spec) { t.Errorf("Add should call the handler with the object") } @@ -242,13 +242,13 @@ func TestRoutingPolicyAddUpdateDelete(t *testing.T) { routingPolicyController.Updated(ctx, updatedRpObj, rpObj) - if !cmp.Equal(handler.Obj.Spec, updatedRpObj.Spec) { + if !cmp.Equal(handler.New.Spec, updatedRpObj.Spec) { t.Errorf("Update should call the handler with the updated object") } routingPolicyController.Deleted(ctx, updatedRpObj) - if handler.Obj != nil { + if handler.New != nil { t.Errorf("Delete should delete the routing policy") } diff --git a/admiral/pkg/test/mock.go b/admiral/pkg/test/mock.go index 95ee2b51..d5d5b876 100644 --- a/admiral/pkg/test/mock.go +++ b/admiral/pkg/test/mock.go @@ -249,21 +249,24 @@ func (m *MockSidecarHandler) Deleted(ctx context.Context, obj *v1alpha32.Sidecar } type MockRoutingPolicyHandler struct { - Obj *admiralV1.RoutingPolicy + Old *admiralV1.RoutingPolicy + New *admiralV1.RoutingPolicy } func (m *MockRoutingPolicyHandler) Added(ctx context.Context, obj *admiralV1.RoutingPolicy) error { - m.Obj = obj + m.New = obj return nil } func (m *MockRoutingPolicyHandler) Deleted(ctx context.Context, obj *admiralV1.RoutingPolicy) error { - m.Obj = nil + m.Old = obj + m.New = nil return nil } -func (m *MockRoutingPolicyHandler) Updated(ctx context.Context, obj *admiralV1.RoutingPolicy) error { - m.Obj = obj +func (m *MockRoutingPolicyHandler) Updated(ctx context.Context, newObj *admiralV1.RoutingPolicy, oldObj *admiralV1.RoutingPolicy) error { + m.New = newObj + m.Old = oldObj return nil }