Skip to content

Commit

Permalink
blocked clients cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sigalmaya committed Aug 8, 2022
1 parent 8e108d2 commit 4363880
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 147 deletions.
21 changes: 11 additions & 10 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ func (s *Settings) Validate() error {
}

type Options struct {
Repository storage.TransactionalRepository
APISettings *Settings
StorageSettings *storage.Settings
OperationSettings *operations.Settings
WSSettings *ws.Settings
Notificator storage.Notificator
WaitGroup *sync.WaitGroup
TenantLabelKey string
Agents *agents.Settings
Repository storage.TransactionalRepository
APISettings *Settings
StorageSettings *storage.Settings
OperationSettings *operations.Settings
WSSettings *ws.Settings
Notificator storage.Notificator
WaitGroup *sync.WaitGroup
TenantLabelKey string
Agents *agents.Settings
BlockedClientsCache *storage.Cache
}

// New returns the minimum set of REST APIs needed for the Service Manager
Expand Down Expand Up @@ -188,7 +189,7 @@ func New(ctx context.Context, e env.Environment, options *Options) (*web.API, er

api.RegisterFiltersBefore(filters.ProtectedLabelsFilterName, &filters.DisabledQueryParametersFilter{DisabledQueryParameters: options.APISettings.DisabledQueryParameters})
api.RegisterFiltersAfter(filters.DisabledQueryParametersName,
filters.NewBlockedClientsFilter(ctx, options.Repository, options.StorageSettings.URI))
filters.NewBlockedClientsFilter(options.BlockedClientsCache))

if rateLimiters != nil {
api.RegisterFiltersAfter(
Expand Down
118 changes: 16 additions & 102 deletions api/filters/check_blocked_clients_filter.go
Original file line number Diff line number Diff line change
@@ -1,105 +1,22 @@
package filters

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/Peripli/service-manager/pkg/log"
"github.com/Peripli/service-manager/pkg/types"
"github.com/Peripli/service-manager/pkg/util"
"github.com/Peripli/service-manager/pkg/web"
"github.com/Peripli/service-manager/storage"
"github.com/lib/pq"
"net/http"
"sync"
"time"
)

const new_blocked_client = "new_blocked_client"

type BlockedClientsFilter struct {
repository storage.Repository
ctx context.Context
cache sync.Map
storageURI string
updateBlockedClientsList func(ctx context.Context) []*types.BlockedClient
blockedClientsCache *storage.Cache
}

// NewBlockedClientsFilter creates a new BlockedClientsFilter filter
func NewBlockedClientsFilter(ctx context.Context, repository storage.Repository, storageURI string) *BlockedClientsFilter {
blockedClientsFilter := &BlockedClientsFilter{
repository: repository,
ctx: ctx,
cache: sync.Map{},
storageURI: storageURI,
}
blockedClientsFilter.initializeBlockedClients()
return blockedClientsFilter

}

func (b *BlockedClientsFilter) connectDBForBlockedClientsEvent() error {
reportProblem := func(et pq.ListenerEventType, err error) {
if err != nil {
//add login
fmt.Println(err)
}
}
listener := pq.NewListener(b.storageURI, 30*time.Second, time.Minute, reportProblem)
err := listener.Listen(new_blocked_client)
if err != nil {
return err
}

go b.processNewBlockedClient(listener)
return nil

func NewBlockedClientsFilter(cache *storage.Cache) *BlockedClientsFilter {
b := &BlockedClientsFilter{blockedClientsCache: cache}
return b
}
func (b *BlockedClientsFilter) processNewBlockedClient(l *pq.Listener) {
for {
n := <-l.Notify
switch n.Channel {
case new_blocked_client:
{
blockedClient, err := getPayload(n.Extra)
if err != nil {
log.C(b.ctx).WithError(err).Error("Could not unmarshal blocked client notification payload")
return
} else {
b.cache.Store(blockedClient.ClientID, blockedClient)
}
}
}
}
}
func getPayload(data string) (*types.BlockedClient, error) {
payload := &types.BlockedClient{}
if err := json.Unmarshal([]byte(data), payload); err != nil {
return nil, err
}
return payload, nil
}

func (b *BlockedClientsFilter) initializeBlockedClients() error {
b.connectDBForBlockedClientsEvent()
err := b.getBlockedClientsList()
return err
}

func (b *BlockedClientsFilter) getBlockedClientsList() error {
blockedClientsList, err := b.repository.List(b.ctx, types.BlockedClientsType)
if err != nil {
return err
}
for i := 0; i < blockedClientsList.Len(); i++ {
blockedClient := blockedClientsList.ItemAt(i).(*types.BlockedClient)
b.cache.Store(blockedClient.ClientID, blockedClient)
}
return nil

}

func (b *BlockedClientsFilter) Name() string {
return "BlockedClientsFilter"
}
Expand All @@ -109,24 +26,22 @@ func (b *BlockedClientsFilter) Run(request *web.Request, next web.Handler) (*web
method := request.Method
userContext, ok := web.UserFromContext(reqCtx)
if !ok {
return nil, errors.New("no client found")
//there is no context on the endpoint
return next.Handle(request)
}
blockedClient, isBlockedClient := b.isClientBlocked(userContext, method)
if isBlockedClient {
errorResponse := &util.HTTPError{
ErrorType: "RequestNotAllowed",
Description: fmt.Sprintf("You're blocked to execute this request. Client: %d ", blockedClient.ClientID),
StatusCode: http.StatusMethodNotAllowed,
ErrorType: "RequestNotAllowed",

StatusCode: http.StatusMethodNotAllowed,
}

errorResponse.Description = fmt.Sprintf("You're blocked to execute this request. Client: %d ", blockedClient.ClientID)

return nil, errorResponse

}
// if not - next.Handle(request)
// if it is - return an error (what is the error message?)
//if err != nil {
// log.C(request.Context()).WithError(err).Errorf("client is blocked - validate with Avi regarding this string")
// return nil, err
//}

return next.Handle(request)
}
Expand All @@ -136,13 +51,12 @@ func (bc *BlockedClientsFilter) isClientBlocked(userContext *web.UserContext, me
if userContext.AccessLevel == web.GlobalAccess || userContext.AccessLevel == web.AllTenantAccess {
return nil, false
}
blockedClientCache, ok := bc.cache.Load(userContext.Name)
blockedClientCache, ok := bc.blockedClientsCache.GetC(userContext.Name)
if !ok {
return nil, true
return nil, false
}
blockedClient := blockedClientCache.(*types.BlockedClient)
// add to retrieved from db
return blockedClient, contains(blockedClient.BlockedMethods, method)
blockedClient := blockedClientCache.(types.BlockedClient)
return &blockedClient, contains(blockedClient.BlockedMethods, method)

}
func contains(s []string, e string) bool {
Expand Down
33 changes: 33 additions & 0 deletions pkg/blocked_clients/blocked_clients_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package blocked_clients

import (
"context"
"github.com/Peripli/service-manager/pkg/types"
"github.com/Peripli/service-manager/storage"
"time"
)

type BlockedClientsManager struct {
repository storage.Repository
ctx context.Context
Cache *storage.Cache
}

func Init(ctx context.Context, repository storage.Repository) *BlockedClientsManager {
b := &BlockedClientsManager{ctx: ctx, repository: repository}
b.Cache = storage.NewCache(time.Minute*5, b.getBlockClients)
return b
}

func (b *BlockedClientsManager) getBlockClients() error {
blockedClientsList, err := b.repository.List(b.ctx, types.BlockedClientsType)
if err != nil {
return err
}
b.Cache.Flush()
for i := 0; i < blockedClientsList.Len(); i++ {
blockedClient := blockedClientsList.ItemAt(i).(*types.BlockedClient)
b.Cache.Add(blockedClient.ClientID, blockedClient)
}
return nil
}
46 changes: 37 additions & 9 deletions pkg/sm/sm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package sm
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/Peripli/service-manager/pkg/blocked_clients"
secFilters "github.com/Peripli/service-manager/pkg/security/filters"
"github.com/Peripli/service-manager/storage/events"
"math"
"net/http"
"sync"
Expand Down Expand Up @@ -130,21 +133,46 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg

// Setup core API
log.C(ctx).Info("Setting up Service Manager core API...")

pgNotificator, err := postgres.NewNotificator(smStorage, cfg.Storage)
if err != nil {
return nil, fmt.Errorf("could not create notificator: %v", err)
}

blockedClientsManager := blocked_clients.Init(ctx, interceptableRepository)
callbacks := map[string]func(*events.Message) error{
"blocked_clients-INSERT": func(envelope *events.Message) error {
var blockedClient types.BlockedClient

if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil {
log.C(ctx).Info("error unmarshaling new blocked client")
}

blockedClientsManager.Cache.AddC(blockedClient.ClientID, blockedClient)
return nil
},
"blocked_clients-DELETE": func(envelope *events.Message) error {
var blockedClient types.BlockedClient
if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil {
log.C(ctx).Info("error unmarshaling new blocked client")
}

blockedClientsManager.Cache.DeleteC(blockedClient.ClientID)
return nil
},
}

_ = events.NewPostgresEventListener(ctx, cfg.Storage.URI, callbacks)

apiOptions := &api.Options{
Repository: interceptableRepository,
APISettings: cfg.API,
OperationSettings: cfg.Operations,
WSSettings: cfg.WebSocket,
Notificator: pgNotificator,
WaitGroup: waitGroup,
TenantLabelKey: cfg.Multitenancy.LabelKey,
Agents: cfg.Agents,
Repository: interceptableRepository,
APISettings: cfg.API,
OperationSettings: cfg.Operations,
WSSettings: cfg.WebSocket,
Notificator: pgNotificator,
WaitGroup: waitGroup,
TenantLabelKey: cfg.Multitenancy.LabelKey,
Agents: cfg.Agents,
BlockedClientsCache: blockedClientsManager.Cache,
}
API, err := api.New(ctx, e, apiOptions)
if err != nil {
Expand Down
93 changes: 93 additions & 0 deletions storage/events/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package events

import (
"context"
"encoding/json"
"github.com/Peripli/service-manager/pkg/log"
"github.com/lib/pq"
"time"
)

const EVENTS_CHANNEL = "events"
const dbPingInterval = time.Second * 60

type PostgresEventListener struct {
ctx context.Context
listener *pq.Listener
storageURI string
callBacks map[string]func(message *Message) error
}

func NewPostgresEventListener(ctx context.Context,
storageURI string,
callBacks map[string]func(message *Message) error) *PostgresEventListener {
ps := &PostgresEventListener{ctx: ctx, storageURI: storageURI, callBacks: callBacks}
ps.connectDB()
return ps
}

type Message struct {
Table string
Action string
Data json.RawMessage
}

func (pe *PostgresEventListener) connectDB() error {
eventCallback := func(et pq.ListenerEventType, err error) {
switch et {
case pq.ListenerEventConnected, pq.ListenerEventReconnected:
log.C(pe.ctx).Info("DB connection for events established")
case pq.ListenerEventDisconnected, pq.ListenerEventConnectionAttemptFailed:
log.C(pe.ctx).WithError(err).Error("DB connection for events closed")
}
if err != nil {
log.C(pe.ctx).WithError(err).Error("Event notification error")
}
}
pe.listener = pq.NewListener(pe.storageURI, 30*time.Second, time.Minute, eventCallback)
err := pe.listener.Listen(EVENTS_CHANNEL)
if err != nil {
return err
}

go pe.waitForNotification()
return nil
}
func (pe *PostgresEventListener) processPayload(message string) error {
payload := &Message{}
if err := json.Unmarshal([]byte(message), payload); err != nil {
log.C(pe.ctx).WithError(err).Error("Could not unmarshal event notification payload.")
return err
}
callBack, ok := pe.callBacks[payload.Table+"-"+payload.Action]
if ok {
callBack(payload)

}
return nil
}
func (pe *PostgresEventListener) waitForNotification() {
for {
select {
case n, ok := <-pe.listener.Notify:
{
if !ok {
log.C(pe.ctx).Error("Notification channel closed")
return
}
if n == nil {
log.C(pe.ctx).Debug("Empty notification received")
continue
}
// to do handle error
pe.processPayload(n.Extra)
}
case <-time.After(dbPingInterval):
log.C(pe.ctx).Debugf(" Pinging connection")
if err := pe.listener.Ping(); err != nil {
log.C(pe.ctx).WithError(err).Error("Pinging connection failed")
return
}
}
}
}
Loading

0 comments on commit 4363880

Please sign in to comment.