Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sigalmaya committed Aug 17, 2022
1 parent 6b6566d commit 16e8c40
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 78 deletions.
6 changes: 3 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ func New(ctx context.Context, e env.Environment, options *Options) (*web.API, er
}

api.RegisterFiltersBefore(filters.ProtectedLabelsFilterName, &filters.DisabledQueryParametersFilter{DisabledQueryParameters: options.APISettings.DisabledQueryParameters})
api.RegisterFiltersAfter(filters.DisabledQueryParametersName,
filters.NewBlockedClientsFilter(options.BlockedClientsCache))
api.RegisterFiltersAfter(filters.LoggingFilterName,
filters.NewBlockedClientsFilter(options.BlockedClientsCache, options.TenantLabelKey))

if rateLimiters != nil {
api.RegisterFiltersAfter(
filters.LoggingFilterName,
filters.BlockedClientsFilterName,
filters.NewRateLimiterFilter(
rateLimiters,
options.APISettings.RateLimitExcludeClients,
Expand Down
23 changes: 22 additions & 1 deletion api/blocked_clients_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,38 @@ import (
"context"
"fmt"
"github.com/Peripli/service-manager/pkg/types"
"github.com/Peripli/service-manager/pkg/util"
"github.com/Peripli/service-manager/pkg/web"
"github.com/Peripli/service-manager/storage"
"net/http"
)

// BlockedClientsController configuration controller
type BlockedClientsController struct {
*BaseController
cache *storage.Cache
}

func NewBlockedClientsController(ctx context.Context, options *Options) *BlockedClientsController {

return &BlockedClientsController{
BaseController: NewController(ctx, options, web.BlockedClientsConfigURL, types.BlockedClientsType, func() types.Object {
return &types.BlockedClient{}
}, false),
cache: options.BlockedClientsCache,
}

}
func (c *BlockedClientsController) ResyncBlockedClientsCache(r *web.Request) (*web.Response, error) {
err := c.cache.FlushL()
if err != nil {
return nil, &util.HTTPError{
ErrorType: "BlockedClientError",
Description: fmt.Sprintf("failed to resync blocked_cleints cache"),
StatusCode: http.StatusInternalServerError,
}
}
return util.NewJSONResponse(http.StatusOK, struct{}{})
}

// Routes provides endpoints for modifying and obtaining the logging configuration
func (c *BlockedClientsController) Routes() []web.Route {
Expand All @@ -40,6 +54,13 @@ func (c *BlockedClientsController) Routes() []web.Route {
},
Handler: c.CreateObject,
},
{
Endpoint: web.Endpoint{
Method: http.MethodGet,
Path: web.ResyncBlockedClients,
},
Handler: c.ResyncBlockedClientsCache,
},
{
Endpoint: web.Endpoint{
Method: http.MethodDelete,
Expand Down
28 changes: 22 additions & 6 deletions api/filters/check_blocked_clients_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import (
"net/http"
)

const BlockedClientsFilterName = "BlockedClientsFilter"

type BlockedClientsFilter struct {
blockedClientsCache *storage.Cache
tenantLabelKey string
}

func NewBlockedClientsFilter(cache *storage.Cache) *BlockedClientsFilter {
b := &BlockedClientsFilter{blockedClientsCache: cache}
func NewBlockedClientsFilter(cache *storage.Cache, tenantLabelKey string) *BlockedClientsFilter {
b := &BlockedClientsFilter{blockedClientsCache: cache, tenantLabelKey: tenantLabelKey}
return b
}
func (b *BlockedClientsFilter) Name() string {
return "BlockedClientsFilter"
return BlockedClientsFilterName
}

func (b *BlockedClientsFilter) Run(request *web.Request, next web.Handler) (*web.Response, error) {
Expand All @@ -32,8 +35,7 @@ func (b *BlockedClientsFilter) Run(request *web.Request, next web.Handler) (*web
blockedClient, isBlockedClient := b.isClientBlocked(userContext, method)
if isBlockedClient {
errorResponse := &util.HTTPError{
ErrorType: "RequestNotAllowed",

ErrorType: "RequestNotAllowed",
StatusCode: http.StatusMethodNotAllowed,
}

Expand All @@ -51,7 +53,21 @@ func (bc *BlockedClientsFilter) isClientBlocked(userContext *web.UserContext, me
if userContext.AccessLevel == web.GlobalAccess || userContext.AccessLevel == web.AllTenantAccess {
return nil, false
}
blockedClientCache, ok := bc.blockedClientsCache.GetC(userContext.Name)

if userContext.AuthenticationType == web.Basic {
platform := types.Platform{}
err := userContext.Data(&platform)
if err != nil {
return nil, false
}

if _, isTenantScopedPlatform := platform.Labels[bc.tenantLabelKey]; !isTenantScopedPlatform {
return nil, false
}

}

blockedClientCache, ok := bc.blockedClientsCache.Get(userContext.Name)
if !ok {
return nil, false
}
Expand Down
50 changes: 41 additions & 9 deletions pkg/blocked_clients/blocked_clients_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,61 @@ package blocked_clients

import (
"context"
"encoding/json"
"github.com/Peripli/service-manager/pkg/log"
"github.com/Peripli/service-manager/pkg/types"
"github.com/Peripli/service-manager/storage"
"time"
"github.com/Peripli/service-manager/storage/events"
)

type BlockedClientsManager struct {
repository storage.Repository
ctx context.Context
Cache *storage.Cache
repository storage.Repository
smCtx context.Context
Cache *storage.Cache
callbacks map[string]func(*events.Message) error
postgresEventsListener *events.PostgresEventListener
}

func Init(ctx context.Context, repository storage.Repository) *BlockedClientsManager {
b := &BlockedClientsManager{ctx: ctx, repository: repository}
b.Cache = storage.NewCache(time.Minute*5, b.getBlockClients)
func Init(ctx context.Context, repository storage.Repository, storageURI string) *BlockedClientsManager {
b := &BlockedClientsManager{smCtx: ctx, repository: repository}
b.getBlockClients()
b.Cache = storage.NewCache(-1, nil, b.getBlockClients)
b.callbacks = map[string]func(*events.Message) error{
"blocked_clients-INSERT": func(envelope *events.Message) error {
var blockedClient types.BlockedClient

if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil {
log.C(ctx).Debugf("error unmarshalling new blocked client")
return err
}

if err := b.Cache.Add(blockedClient.ClientID, blockedClient); err != nil {
log.C(ctx).Debugf("error adding a blocked client in casche %s", blockedClient.ClientID)
}
return nil
},
"blocked_clients-DELETE": func(envelope *events.Message) error {
var blockedClient types.BlockedClient
if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil {
log.C(ctx).Debugf("error unmarshalling new blocked client")
return err
}

b.Cache.Delete(blockedClient.ClientID)
return nil
},
}

b.postgresEventsListener = events.NewPostgresEventListener(ctx, storageURI, b.callbacks)
return b
}

func (b *BlockedClientsManager) getBlockClients() error {
blockedClientsList, err := b.repository.List(b.ctx, types.BlockedClientsType)
blockedClientsList, err := b.repository.List(b.smCtx, types.BlockedClientsType)
if err != nil {
log.C(b.smCtx).Info("error retrieving blocked clients", err)
return err
}
b.Cache.Flush()
for i := 0; i < blockedClientsList.Len(); i++ {
blockedClient := blockedClientsList.ItemAt(i).(*types.BlockedClient)
b.Cache.Add(blockedClient.ClientID, blockedClient)
Expand Down
30 changes: 2 additions & 28 deletions pkg/sm/sm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"context"
"crypto/tls"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/Peripli/service-manager/pkg/blocked_clients"
secFilters "github.com/Peripli/service-manager/pkg/security/filters"
"github.com/Peripli/service-manager/storage/events"

osbc "github.com/kubernetes-sigs/go-open-service-broker-client/v2"
"math"
"net/http"
Expand Down Expand Up @@ -159,32 +158,7 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg
if err != nil {
return nil, fmt.Errorf("could not create notificator: %v", err)
}

blockedClientsManager := blocked_clients.Init(ctx, interceptableRepository)
callbacks := map[string]func(*events.Message) error{
"blocked_clients-INSERT": func(envelope *events.Message) error {
var blockedClient types.BlockedClient

if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil {
log.C(ctx).Info("error unmarshaling new blocked client")
}

blockedClientsManager.Cache.AddC(blockedClient.ClientID, blockedClient)
return nil
},
"blocked_clients-DELETE": func(envelope *events.Message) error {
var blockedClient types.BlockedClient
if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil {
log.C(ctx).Info("error unmarshaling new blocked client")
}

blockedClientsManager.Cache.DeleteC(blockedClient.ClientID)
return nil
},
}

_ = events.NewPostgresEventListener(ctx, cfg.Storage.URI, callbacks)

blockedClientsManager := blocked_clients.Init(ctx, interceptableRepository, cfg.Storage.URI)
apiOptions := &api.Options{
RedisClient: redisClient,
Repository: interceptableRepository,
Expand Down
3 changes: 2 additions & 1 deletion pkg/web/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const (
InfoURL = "/" + apiVersion + "/info"

// ConfigURL is the Configuration API base URL path
ConfigURL = "/" + apiVersion + "/config"
ConfigURL = "/" + apiVersion + "/config"
ResyncBlockedClients = ConfigURL + "/resync_blocked_clients"

// LoggingConfigURL is the Logging Configuration API URL path
LoggingConfigURL = ConfigURL + "/logging"
Expand Down
60 changes: 34 additions & 26 deletions storage/local_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,38 @@ import (
)

type cache struct {
items map[string]interface{}
lock sync.RWMutex
onResync func() error
sync *synchronizer
items map[string]interface{}
lock sync.RWMutex
onTimeExpired func() error
onFlush func() error
sync *janitor
}

type synchronizer struct {
type janitor struct {
Interval time.Duration
stop chan bool
}
type Cache struct {
*cache
}

func NewCache(resyncInterval time.Duration, onResync func() error) *Cache {
func NewCache(junitorInterval time.Duration, onTimeExpired func() error, onFlush func() error) *Cache {
items := make(map[string]interface{})
c := newCache(onResync, items)
c := newCache(onTimeExpired, onFlush, items)
C := &Cache{c}
if resyncInterval > 0 {
runSynchronizer(c, resyncInterval)
if junitorInterval > 0 {
runJanitor(c, junitorInterval)
runtime.SetFinalizer(C, StopSynchronizer)

}
return C

}
func newCache(onResync func() error, m map[string]interface{}) *cache {
func newCache(onTimeExpired func() error, onFlush func() error, m map[string]interface{}) *cache {
c := &cache{
items: m,
onResync: onResync,
items: m,
onTimeExpired: onTimeExpired,
onFlush: onFlush,
}
return c
}
Expand All @@ -46,21 +48,21 @@ func StopSynchronizer(c *Cache) {
c.sync.stop <- true
}

func runSynchronizer(c *cache, ci time.Duration) {
s := &synchronizer{
func runJanitor(c *cache, ci time.Duration) {
s := &janitor{
Interval: ci,
stop: make(chan bool),
}
c.sync = s
go s.Run(c)
}

func (s *synchronizer) Run(c *cache) {
func (s *janitor) Run(c *cache) {
ticker := time.NewTicker(s.Interval)
for {
select {
case <-ticker.C:
c.Resync()
c.TimeExpired()
case <-s.stop:
ticker.Stop()
return
Expand All @@ -71,49 +73,55 @@ func (c *cache) Flush() {
c.items = make(map[string]interface{})
}

func (c *cache) FlushC() {
func (c *cache) FlushL() error {
defer c.lock.Unlock()
c.lock.Lock()
c.items = make(map[string]interface{})

if c.onFlush != nil {
err := c.onFlush()
if err != nil {
return fmt.Errorf("error executing onFlush function: %s", err)
}
}
return nil
}
func (c *cache) Length() int {
return len(c.items)
}

func (c *cache) Resync() {
func (c *cache) TimeExpired() {
defer c.lock.Unlock()
c.lock.Lock()
c.onResync()
c.onTimeExpired()

}

func (c *cache) GetC(k string) (interface{}, bool) {
func (c *cache) Get(k string) (interface{}, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, found := c.items[k]
return item, found
}
func (c *cache) Get(k string) (interface{}, bool) {
func (c *cache) get(k string) (interface{}, bool) {
item, found := c.items[k]
return item, found
}

func (c *cache) Delete(k string) {
func (c *cache) delete(k string) {
delete(c.items, k)
}
func (c *cache) Add(k string, x interface{}) {
func (c *cache) add(k string, x interface{}) {
c.items[k] = x
}

func (c *cache) DeleteC(k string) {
func (c *cache) Delete(k string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.items, k)

}

func (c *cache) AddC(k string, x interface{}) error {
func (c *cache) Add(k string, x interface{}) error {
c.lock.Lock()
defer c.lock.Unlock()
_, found := c.Get(k)
Expand Down
Loading

0 comments on commit 16e8c40

Please sign in to comment.