Skip to content

Commit

Permalink
MESH-5878: Admiral does not create envoy filter on client onboardings
Browse files Browse the repository at this point in the history
- Introduce RoutingPolicyProcessor interface to contain the process() api in order to expose it
  • Loading branch information
Adil Fulara committed Dec 9, 2024
1 parent 61c8b71 commit 1bbba26
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 46 deletions.
4 changes: 0 additions & 4 deletions admiral/pkg/clusters/envoyfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
func TestCreateOrUpdateEnvoyFilter(t *testing.T) {
registry := getRegistry("1.13,1.17")

handler := RoutingPolicyHandler{}

rpFilterCache := &routingPolicyFilterCache{}
rpFilterCache.filterCache = make(map[string]map[string]map[string]string)
rpFilterCache.mutex = &sync.Mutex{}
Expand Down Expand Up @@ -89,8 +87,6 @@ func TestCreateOrUpdateEnvoyFilter(t *testing.T) {
registry.AdmiralCache.IdentityDependencyCache.Put("foo", "bar", "bar")
registry.AdmiralCache.IdentityClusterCache.Put("bar", remoteController.ClusterID, remoteController.ClusterID)

handler.RemoteRegistry = registry

routingPolicyFoo := &v1.RoutingPolicy{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Expand Down
3 changes: 2 additions & 1 deletion admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
return fmt.Errorf("error with NodeController controller initialization, err: %v", err)
}
logrus.Infof("starting RoutingPoliciesController for clusterID: %v", clusterID)
rc.RoutingPolicyController, err = admiral.NewRoutingPoliciesController(stop, &RoutingPolicyHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
rpHandler := NewRoutingPolicyHandler(r, clusterID)
rc.RoutingPolicyController, err = admiral.NewRoutingPoliciesController(stop, rpHandler, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with RoutingPoliciesController initialization, err: %v", err)
}
Expand Down
67 changes: 30 additions & 37 deletions admiral/pkg/clusters/routingpolicy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,44 @@ import (
)

type RoutingPolicyHandler struct {
RemoteRegistry *RemoteRegistry
ClusterID string
RemoteRegistry *RemoteRegistry
ClusterID string
RoutingPolicyService *RoutingPolicyService
}

type routingPolicyCache struct {
// map of routing policies key=environment.identity, value: RoutingPolicy object
// only one routing policy per identity + env is allowed
identityCache map[string]*v1.RoutingPolicy
mutex *sync.Mutex
func NewRoutingPolicyHandler(rr *RemoteRegistry, cId string) *RoutingPolicyHandler {
return &RoutingPolicyHandler{RemoteRegistry: rr, ClusterID: cId}
}

func (r *routingPolicyCache) Delete(identity string, environment string) {
defer r.mutex.Unlock()
r.mutex.Lock()
key := common.ConstructRoutingPolicyKey(environment, identity)
if _, ok := r.identityCache[key]; ok {
log.Infof("deleting RoutingPolicy with key=%s from global RoutingPolicy cache", key)
delete(r.identityCache, key)
}
type RoutingPolicyProcessor interface {
process(ctx context.Context, dependents map[string]string, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType) error
}

func (r *routingPolicyCache) GetFromIdentity(identity string, environment string) *v1.RoutingPolicy {
defer r.mutex.Unlock()
r.mutex.Lock()
return r.identityCache[common.ConstructRoutingPolicyKey(environment, identity)]
type RoutingPolicyService struct {
RemoteRegistry *RemoteRegistry
}

func (r *routingPolicyCache) Put(rp *v1.RoutingPolicy) error {
if rp == nil || rp.Name == "" {
// no RoutingPolicy, throw error
return errors.New("cannot add an empty RoutingPolicy to the cache")
}
if rp.Labels == nil {
return errors.New("labels empty in RoutingPolicy")
func (r *RoutingPolicyService) process(ctx context.Context, dependents map[string]string, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType) error {
var err error
for _, remoteController := range r.RemoteRegistry.remoteControllers {
for _, dependent := range dependents {
// Check if the dependent exists in this remoteCluster. If so, we create an envoyFilter with dependent identity as workload selector
if _, ok := r.RemoteRegistry.AdmiralCache.IdentityClusterCache.Get(dependent).Copy()[remoteController.ClusterID]; ok {
_, err1 := createOrUpdateEnvoyFilter(ctx, remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache)
if err1 != nil {
log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err)
err = common.AppendError(err, err1)
} else {
log.Infof(LogFormat, eventType, "routingpolicy ", routingPolicy.Name, remoteController.ClusterID, "created envoyfilters")
}
}
}
}
defer r.mutex.Unlock()
r.mutex.Lock()
var rpIdentity = rp.Labels[common.GetRoutingPolicyLabel()]
var rpEnv = common.GetRoutingPolicyEnv(rp)

log.Infof("Adding RoutingPolicy with name %v to RoutingPolicy cache. LabelMatch=%v env=%v", rp.Name, rpIdentity, rpEnv)
key := common.ConstructRoutingPolicyKey(rpEnv, rpIdentity)
r.identityCache[key] = rp
return err
}

return nil
func NewRoutingPolicyProcessor(remoteRegistry *RemoteRegistry) RoutingPolicyProcessor {
return &RoutingPolicyService{RemoteRegistry: remoteRegistry}
}

type routingPolicyFilterCache struct {
Expand Down Expand Up @@ -124,7 +117,7 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy)
log.Info("No dependents found for Routing Policy - ", obj.Name)
return nil
}
err := r.processroutingPolicy(ctx, dependents, obj, admiral.Add)
err := r.RoutingPolicyService.process(ctx, dependents, obj, admiral.Add)
if err != nil {
log.Errorf(LogErrFormat, admiral.Update, "routingpolicy", obj.Name, "", "failed to process routing policy")
return err
Expand Down Expand Up @@ -173,7 +166,7 @@ func (r RoutingPolicyHandler) Updated(ctx context.Context, obj *v1.RoutingPolicy
if len(dependents) == 0 {
return nil
}
err := r.processroutingPolicy(ctx, dependents, obj, admiral.Update)
err := r.RoutingPolicyService.process(ctx, dependents, obj, admiral.Update)
if err != nil {
log.Errorf(LogErrFormat, admiral.Update, "routingpolicy", obj.Name, "", "failed to process routing policy")
return err
Expand Down
6 changes: 2 additions & 4 deletions admiral/pkg/clusters/routingpolicy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func TestRoutingPolicyHandler(t *testing.T) {

registry, _ := InitAdmiral(context.Background(), p)

handler := RoutingPolicyHandler{}

rpFilterCache := &routingPolicyFilterCache{}
rpFilterCache.filterCache = make(map[string]map[string]map[string]string)
rpFilterCache.mutex = &sync.Mutex{}
Expand All @@ -76,7 +74,7 @@ func TestRoutingPolicyHandler(t *testing.T) {
// foo1 is dependent upon bar 1 but bar1 does not have a deployment so it is missing from identityClusterCache
registry.AdmiralCache.IdentityDependencyCache.Put("foo1", "bar1", "bar1")

handler.RemoteRegistry = registry
handler := NewRoutingPolicyHandler(registry, "")

routingPolicyFoo := &admiralV1.RoutingPolicy{
TypeMeta: metaV1.TypeMeta{},
Expand Down Expand Up @@ -228,7 +226,7 @@ func TestRoutingPolicyReadOnly(t *testing.T) {
p.LabelSet.EnvKey = "admiral.io/env"
p.LabelSet.AdmiralCRDIdentityLabel = "identity"

handler := RoutingPolicyHandler{}
handler := NewRoutingPolicyHandler(nil, "")

testcases := []struct {
name string
Expand Down

0 comments on commit 1bbba26

Please sign in to comment.