From 6b6566db8ca6df9f7752e4627e0ad6ba66078b8f Mon Sep 17 00:00:00 2001 From: Maya Sigal Date: Mon, 8 Aug 2022 09:24:51 +0300 Subject: [PATCH] blocked clients cache --- api/api.go | 23 ++-- api/filters/check_blocked_clients_filter.go | 118 +++------------- .../blocked_clients_manager.go | 33 +++++ pkg/sm/sm.go | 48 +++++-- storage/events/event_listener.go | 93 +++++++++++++ storage/local_cache.go | 126 ++++++++++++++++++ storage/local_cache_test.go | 71 ++++++++++ storage/postgres/constant.go | 4 +- ...> 20220716100000_blocked_clients.down.sql} | 2 - ... => 20220716100000_blocked_clients.up.sql} | 24 +--- ...0220802100000_events_notification.down.sql | 1 + .../20220802100000_events_notification.up.sql | 21 +++ ...803100000_blocked_clients_trigger.down.sql | 1 + ...20803100000_blocked_clients_trigger.up.sql | 3 + .../blocked_clients_test.go | 1 + 15 files changed, 420 insertions(+), 149 deletions(-) create mode 100644 pkg/blocked_clients/blocked_clients_manager.go create mode 100644 storage/events/event_listener.go create mode 100644 storage/local_cache.go create mode 100644 storage/local_cache_test.go rename storage/postgres/migrations/{20220718100000_blocked_clients.down.sql => 20220716100000_blocked_clients.down.sql} (64%) rename storage/postgres/migrations/{20220718100000_blocked_clients.up.sql => 20220716100000_blocked_clients.up.sql} (64%) create mode 100644 storage/postgres/migrations/20220802100000_events_notification.down.sql create mode 100644 storage/postgres/migrations/20220802100000_events_notification.up.sql create mode 100644 storage/postgres/migrations/20220803100000_blocked_clients_trigger.down.sql create mode 100644 storage/postgres/migrations/20220803100000_blocked_clients_trigger.up.sql create mode 100644 test/blocked_clients_test/blocked_clients_test.go diff --git a/api/api.go b/api/api.go index fa4f93fd4..dc36be630 100644 --- a/api/api.go +++ b/api/api.go @@ -98,16 +98,17 @@ func (s *Settings) Validate() error { } type Options struct { - RedisClient *redis.Client - Repository storage.TransactionalRepository - APISettings *Settings - StorageSettings *storage.Settings - OperationSettings *operations.Settings - WSSettings *ws.Settings - Notificator storage.Notificator - WaitGroup *sync.WaitGroup - TenantLabelKey string - Agents *agents.Settings + RedisClient *redis.Client + Repository storage.TransactionalRepository + APISettings *Settings + StorageSettings *storage.Settings + OperationSettings *operations.Settings + WSSettings *ws.Settings + Notificator storage.Notificator + WaitGroup *sync.WaitGroup + TenantLabelKey string + Agents *agents.Settings + BlockedClientsCache *storage.Cache } // New returns the minimum set of REST APIs needed for the Service Manager @@ -191,7 +192,7 @@ func New(ctx context.Context, e env.Environment, options *Options) (*web.API, er api.RegisterFiltersBefore(filters.ProtectedLabelsFilterName, &filters.DisabledQueryParametersFilter{DisabledQueryParameters: options.APISettings.DisabledQueryParameters}) api.RegisterFiltersAfter(filters.DisabledQueryParametersName, - filters.NewBlockedClientsFilter(ctx, options.Repository, options.StorageSettings.URI)) + filters.NewBlockedClientsFilter(options.BlockedClientsCache)) if rateLimiters != nil { api.RegisterFiltersAfter( diff --git a/api/filters/check_blocked_clients_filter.go b/api/filters/check_blocked_clients_filter.go index d31e0b331..00be05fac 100644 --- a/api/filters/check_blocked_clients_filter.go +++ b/api/filters/check_blocked_clients_filter.go @@ -1,105 +1,22 @@ package filters import ( - "context" - "encoding/json" - "errors" "fmt" - "github.com/Peripli/service-manager/pkg/log" "github.com/Peripli/service-manager/pkg/types" "github.com/Peripli/service-manager/pkg/util" "github.com/Peripli/service-manager/pkg/web" "github.com/Peripli/service-manager/storage" - "github.com/lib/pq" "net/http" - "sync" - "time" ) -const new_blocked_client = "new_blocked_client" - type BlockedClientsFilter struct { - repository storage.Repository - ctx context.Context - cache sync.Map - storageURI string - updateBlockedClientsList func(ctx context.Context) []*types.BlockedClient + blockedClientsCache *storage.Cache } -// NewBlockedClientsFilter creates a new BlockedClientsFilter filter -func NewBlockedClientsFilter(ctx context.Context, repository storage.Repository, storageURI string) *BlockedClientsFilter { - blockedClientsFilter := &BlockedClientsFilter{ - repository: repository, - ctx: ctx, - cache: sync.Map{}, - storageURI: storageURI, - } - blockedClientsFilter.initializeBlockedClients() - return blockedClientsFilter - -} - -func (b *BlockedClientsFilter) connectDBForBlockedClientsEvent() error { - reportProblem := func(et pq.ListenerEventType, err error) { - if err != nil { - //add login - fmt.Println(err) - } - } - listener := pq.NewListener(b.storageURI, 30*time.Second, time.Minute, reportProblem) - err := listener.Listen(new_blocked_client) - if err != nil { - return err - } - - go b.processNewBlockedClient(listener) - return nil - +func NewBlockedClientsFilter(cache *storage.Cache) *BlockedClientsFilter { + b := &BlockedClientsFilter{blockedClientsCache: cache} + return b } -func (b *BlockedClientsFilter) processNewBlockedClient(l *pq.Listener) { - for { - n := <-l.Notify - switch n.Channel { - case new_blocked_client: - { - blockedClient, err := getPayload(n.Extra) - if err != nil { - log.C(b.ctx).WithError(err).Error("Could not unmarshal blocked client notification payload") - return - } else { - b.cache.Store(blockedClient.ClientID, blockedClient) - } - } - } - } -} -func getPayload(data string) (*types.BlockedClient, error) { - payload := &types.BlockedClient{} - if err := json.Unmarshal([]byte(data), payload); err != nil { - return nil, err - } - return payload, nil -} - -func (b *BlockedClientsFilter) initializeBlockedClients() error { - b.connectDBForBlockedClientsEvent() - err := b.getBlockedClientsList() - return err -} - -func (b *BlockedClientsFilter) getBlockedClientsList() error { - blockedClientsList, err := b.repository.List(b.ctx, types.BlockedClientsType) - if err != nil { - return err - } - for i := 0; i < blockedClientsList.Len(); i++ { - blockedClient := blockedClientsList.ItemAt(i).(*types.BlockedClient) - b.cache.Store(blockedClient.ClientID, blockedClient) - } - return nil - -} - func (b *BlockedClientsFilter) Name() string { return "BlockedClientsFilter" } @@ -109,24 +26,22 @@ func (b *BlockedClientsFilter) Run(request *web.Request, next web.Handler) (*web method := request.Method userContext, ok := web.UserFromContext(reqCtx) if !ok { - return nil, errors.New("no client found") + //there is no context on the endpoint + return next.Handle(request) } blockedClient, isBlockedClient := b.isClientBlocked(userContext, method) if isBlockedClient { errorResponse := &util.HTTPError{ - ErrorType: "RequestNotAllowed", - Description: fmt.Sprintf("You're blocked to execute this request. Client: %d ", blockedClient.ClientID), - StatusCode: http.StatusMethodNotAllowed, + ErrorType: "RequestNotAllowed", + + StatusCode: http.StatusMethodNotAllowed, } + + errorResponse.Description = fmt.Sprintf("You're blocked to execute this request. Client: %d ", blockedClient.ClientID) + return nil, errorResponse } - // if not - next.Handle(request) - // if it is - return an error (what is the error message?) - //if err != nil { - // log.C(request.Context()).WithError(err).Errorf("client is blocked - validate with Avi regarding this string") - // return nil, err - //} return next.Handle(request) } @@ -136,13 +51,12 @@ func (bc *BlockedClientsFilter) isClientBlocked(userContext *web.UserContext, me if userContext.AccessLevel == web.GlobalAccess || userContext.AccessLevel == web.AllTenantAccess { return nil, false } - blockedClientCache, ok := bc.cache.Load(userContext.Name) + blockedClientCache, ok := bc.blockedClientsCache.GetC(userContext.Name) if !ok { - return nil, true + return nil, false } - blockedClient := blockedClientCache.(*types.BlockedClient) - // add to retrieved from db - return blockedClient, contains(blockedClient.BlockedMethods, method) + blockedClient := blockedClientCache.(types.BlockedClient) + return &blockedClient, contains(blockedClient.BlockedMethods, method) } func contains(s []string, e string) bool { diff --git a/pkg/blocked_clients/blocked_clients_manager.go b/pkg/blocked_clients/blocked_clients_manager.go new file mode 100644 index 000000000..f6d7c1955 --- /dev/null +++ b/pkg/blocked_clients/blocked_clients_manager.go @@ -0,0 +1,33 @@ +package blocked_clients + +import ( + "context" + "github.com/Peripli/service-manager/pkg/types" + "github.com/Peripli/service-manager/storage" + "time" +) + +type BlockedClientsManager struct { + repository storage.Repository + ctx context.Context + Cache *storage.Cache +} + +func Init(ctx context.Context, repository storage.Repository) *BlockedClientsManager { + b := &BlockedClientsManager{ctx: ctx, repository: repository} + b.Cache = storage.NewCache(time.Minute*5, b.getBlockClients) + return b +} + +func (b *BlockedClientsManager) getBlockClients() error { + blockedClientsList, err := b.repository.List(b.ctx, types.BlockedClientsType) + if err != nil { + return err + } + b.Cache.Flush() + for i := 0; i < blockedClientsList.Len(); i++ { + blockedClient := blockedClientsList.ItemAt(i).(*types.BlockedClient) + b.Cache.Add(blockedClient.ClientID, blockedClient) + } + return nil +} diff --git a/pkg/sm/sm.go b/pkg/sm/sm.go index 2109646b0..5df63bdf6 100644 --- a/pkg/sm/sm.go +++ b/pkg/sm/sm.go @@ -20,9 +20,12 @@ import ( "context" "crypto/tls" "database/sql" + "encoding/json" "errors" "fmt" + "github.com/Peripli/service-manager/pkg/blocked_clients" secFilters "github.com/Peripli/service-manager/pkg/security/filters" + "github.com/Peripli/service-manager/storage/events" osbc "github.com/kubernetes-sigs/go-open-service-broker-client/v2" "math" "net/http" @@ -152,22 +155,47 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg // Setup core API log.C(ctx).Info("Setting up Service Manager core API...") - pgNotificator, err := postgres.NewNotificator(smStorage, cfg.Storage) if err != nil { return nil, fmt.Errorf("could not create notificator: %v", err) } + blockedClientsManager := blocked_clients.Init(ctx, interceptableRepository) + callbacks := map[string]func(*events.Message) error{ + "blocked_clients-INSERT": func(envelope *events.Message) error { + var blockedClient types.BlockedClient + + if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil { + log.C(ctx).Info("error unmarshaling new blocked client") + } + + blockedClientsManager.Cache.AddC(blockedClient.ClientID, blockedClient) + return nil + }, + "blocked_clients-DELETE": func(envelope *events.Message) error { + var blockedClient types.BlockedClient + if err := json.Unmarshal(envelope.Data, &blockedClient); err != nil { + log.C(ctx).Info("error unmarshaling new blocked client") + } + + blockedClientsManager.Cache.DeleteC(blockedClient.ClientID) + return nil + }, + } + + _ = events.NewPostgresEventListener(ctx, cfg.Storage.URI, callbacks) + apiOptions := &api.Options{ - RedisClient: redisClient, - Repository: interceptableRepository, - APISettings: cfg.API, - OperationSettings: cfg.Operations, - WSSettings: cfg.WebSocket, - Notificator: pgNotificator, - WaitGroup: waitGroup, - TenantLabelKey: cfg.Multitenancy.LabelKey, - Agents: cfg.Agents, + RedisClient: redisClient, + Repository: interceptableRepository, + APISettings: cfg.API, + OperationSettings: cfg.Operations, + WSSettings: cfg.WebSocket, + Notificator: pgNotificator, + WaitGroup: waitGroup, + TenantLabelKey: cfg.Multitenancy.LabelKey, + Agents: cfg.Agents, + BlockedClientsCache: blockedClientsManager.Cache, } API, err := api.New(ctx, e, apiOptions) if err != nil { diff --git a/storage/events/event_listener.go b/storage/events/event_listener.go new file mode 100644 index 000000000..9abca1295 --- /dev/null +++ b/storage/events/event_listener.go @@ -0,0 +1,93 @@ +package events + +import ( + "context" + "encoding/json" + "github.com/Peripli/service-manager/pkg/log" + "github.com/lib/pq" + "time" +) + +const EVENTS_CHANNEL = "events" +const dbPingInterval = time.Second * 60 + +type PostgresEventListener struct { + ctx context.Context + listener *pq.Listener + storageURI string + callBacks map[string]func(message *Message) error +} + +func NewPostgresEventListener(ctx context.Context, + storageURI string, + callBacks map[string]func(message *Message) error) *PostgresEventListener { + ps := &PostgresEventListener{ctx: ctx, storageURI: storageURI, callBacks: callBacks} + ps.connectDB() + return ps +} + +type Message struct { + Table string + Action string + Data json.RawMessage +} + +func (pe *PostgresEventListener) connectDB() error { + eventCallback := func(et pq.ListenerEventType, err error) { + switch et { + case pq.ListenerEventConnected, pq.ListenerEventReconnected: + log.C(pe.ctx).Info("DB connection for events established") + case pq.ListenerEventDisconnected, pq.ListenerEventConnectionAttemptFailed: + log.C(pe.ctx).WithError(err).Error("DB connection for events closed") + } + if err != nil { + log.C(pe.ctx).WithError(err).Error("Event notification error") + } + } + pe.listener = pq.NewListener(pe.storageURI, 30*time.Second, time.Minute, eventCallback) + err := pe.listener.Listen(EVENTS_CHANNEL) + if err != nil { + return err + } + + go pe.waitForNotification() + return nil +} +func (pe *PostgresEventListener) processPayload(message string) error { + payload := &Message{} + if err := json.Unmarshal([]byte(message), payload); err != nil { + log.C(pe.ctx).WithError(err).Error("Could not unmarshal event notification payload.") + return err + } + callBack, ok := pe.callBacks[payload.Table+"-"+payload.Action] + if ok { + callBack(payload) + + } + return nil +} +func (pe *PostgresEventListener) waitForNotification() { + for { + select { + case n, ok := <-pe.listener.Notify: + { + if !ok { + log.C(pe.ctx).Error("Notification channel closed") + return + } + if n == nil { + log.C(pe.ctx).Debug("Empty notification received") + continue + } + // to do handle error + pe.processPayload(n.Extra) + } + case <-time.After(dbPingInterval): + log.C(pe.ctx).Debugf(" Pinging connection") + if err := pe.listener.Ping(); err != nil { + log.C(pe.ctx).WithError(err).Error("Pinging connection failed") + return + } + } + } +} diff --git a/storage/local_cache.go b/storage/local_cache.go new file mode 100644 index 000000000..f0a3d1196 --- /dev/null +++ b/storage/local_cache.go @@ -0,0 +1,126 @@ +package storage + +import ( + "fmt" + "runtime" + "sync" + "time" +) + +type cache struct { + items map[string]interface{} + lock sync.RWMutex + onResync func() error + sync *synchronizer +} + +type synchronizer struct { + Interval time.Duration + stop chan bool +} +type Cache struct { + *cache +} + +func NewCache(resyncInterval time.Duration, onResync func() error) *Cache { + items := make(map[string]interface{}) + c := newCache(onResync, items) + C := &Cache{c} + if resyncInterval > 0 { + runSynchronizer(c, resyncInterval) + runtime.SetFinalizer(C, StopSynchronizer) + + } + return C + +} +func newCache(onResync func() error, m map[string]interface{}) *cache { + c := &cache{ + items: m, + onResync: onResync, + } + return c +} + +func StopSynchronizer(c *Cache) { + c.sync.stop <- true +} + +func runSynchronizer(c *cache, ci time.Duration) { + s := &synchronizer{ + Interval: ci, + stop: make(chan bool), + } + c.sync = s + go s.Run(c) +} + +func (s *synchronizer) Run(c *cache) { + ticker := time.NewTicker(s.Interval) + for { + select { + case <-ticker.C: + c.Resync() + case <-s.stop: + ticker.Stop() + return + } + } +} +func (c *cache) Flush() { + c.items = make(map[string]interface{}) +} + +func (c *cache) FlushC() { + defer c.lock.Unlock() + c.lock.Lock() + c.items = make(map[string]interface{}) + +} +func (c *cache) Length() int { + return len(c.items) +} + +func (c *cache) Resync() { + defer c.lock.Unlock() + c.lock.Lock() + c.onResync() + +} + +func (c *cache) GetC(k string) (interface{}, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, found := c.items[k] + return item, found +} +func (c *cache) Get(k string) (interface{}, bool) { + item, found := c.items[k] + return item, found +} + +func (c *cache) Delete(k string) { + delete(c.items, k) +} +func (c *cache) Add(k string, x interface{}) { + c.items[k] = x +} + +func (c *cache) DeleteC(k string) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.items, k) + +} + +func (c *cache) AddC(k string, x interface{}) error { + c.lock.Lock() + defer c.lock.Unlock() + _, found := c.Get(k) + if found { + return fmt.Errorf("Item %s already exists", k) + } + c.items[k] = x + + return nil +} diff --git a/storage/local_cache_test.go b/storage/local_cache_test.go new file mode 100644 index 000000000..1ddc81218 --- /dev/null +++ b/storage/local_cache_test.go @@ -0,0 +1,71 @@ +package storage + +import ( + "fmt" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sync" + "time" +) + +var _ = Describe("local cache", func() { + var localCache *Cache + var wg *sync.WaitGroup + BeforeEach(func() { + wg = &sync.WaitGroup{} + + }) + Context("cache with no resync", func() { + BeforeEach(func() { + localCache = NewCache(0, nil) + }) + + Context("cache add objects and flush", func() { + It("adds and deletes objects", func() { + wg.Add(1) + addStrings := func() { + for i := 1; i < 3; i++ { + localCache.AddC(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)) + } + wg.Done() + } + go addStrings() + for i := 3; i < 5; i++ { + localCache.AddC(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)) + } + wg.Wait() + for i := 1; i < 5; i++ { + val, _ := localCache.Get(fmt.Sprintf("key-%d", i)) + Expect(val.(string)).To(Equal(fmt.Sprintf("value-%d", i))) + } + localCache.FlushC() + Expect(localCache.Length()).To(BeZero()) + }) + + }) + + }) + + Context("cache with resync", func() { + var resyncFunc func() error + BeforeEach(func() { + resyncFunc = func() error { + localCache.Flush() + localCache.Add("0", "new") + return nil + } + localCache = NewCache(time.Second*5, resyncFunc) + }) + It("should have only new object", func() { + for i := 1; i < 3; i++ { + localCache.AddC(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)) + } + time.Sleep(time.Second * 6) + StopSynchronizer(localCache) + val, _ := localCache.Get("0") + Expect(val.(string)).To(Equal("new")) + Expect(localCache.Length()).To(Equal(1)) + + }) + }) +}) diff --git a/storage/postgres/constant.go b/storage/postgres/constant.go index ab637f23f..59945ef75 100644 --- a/storage/postgres/constant.go +++ b/storage/postgres/constant.go @@ -2,4 +2,6 @@ package postgres // todo: get automatically from folder // change this line with the latest version of the migrations: -const latestMigrationVersion = "20220718100000" +const latestMigrationVersion = "20220803100000" + +//20220718100000 diff --git a/storage/postgres/migrations/20220718100000_blocked_clients.down.sql b/storage/postgres/migrations/20220716100000_blocked_clients.down.sql similarity index 64% rename from storage/postgres/migrations/20220718100000_blocked_clients.down.sql rename to storage/postgres/migrations/20220716100000_blocked_clients.down.sql index 8861cc8c1..b24678531 100644 --- a/storage/postgres/migrations/20220718100000_blocked_clients.down.sql +++ b/storage/postgres/migrations/20220716100000_blocked_clients.down.sql @@ -1,8 +1,6 @@ BEGIN; DROP TABLE IF EXISTS blocked_clients; -DROP TRIGGER IF EXISTS value_insert; -DROP FUNCTION IF EXISTS new_blocked_client(); DROP TABLE IF EXISTS blocked_clients_labels; DROP index blocked_clients_paging_sequence_uindex; diff --git a/storage/postgres/migrations/20220718100000_blocked_clients.up.sql b/storage/postgres/migrations/20220716100000_blocked_clients.up.sql similarity index 64% rename from storage/postgres/migrations/20220718100000_blocked_clients.up.sql rename to storage/postgres/migrations/20220716100000_blocked_clients.up.sql index e2b768ed8..a2f222646 100644 --- a/storage/postgres/migrations/20220718100000_blocked_clients.up.sql +++ b/storage/postgres/migrations/20220716100000_blocked_clients.up.sql @@ -27,27 +27,5 @@ CREATE TABLE blocked_clients_labels CREATE UNIQUE INDEX IF NOT EXISTS blocked_clients_paging_sequence_uindex on blocked_clients (paging_sequence); +COMMIT; -CREATE OR REPLACE FUNCTION new_blocked_client() RETURNS TRIGGER AS $$ - DECLARE -data json; - -BEGIN - data = json_build_object( - 'id', NEW.id, - 'client_id', NEW.client_id, - 'subaccount_id', NEW.subaccount_id, - 'blocked_methods', NEW.blocked_methods - ); - PERFORM pg_notify('new_blocked_client', data::text); - - -- Result is ignored since this is an AFTER trigger -RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER value_insert - AFTER INSERT ON blocked_clients - FOR EACH ROW EXECUTE PROCEDURE new_blocked_client(); - -COMMIT; \ No newline at end of file diff --git a/storage/postgres/migrations/20220802100000_events_notification.down.sql b/storage/postgres/migrations/20220802100000_events_notification.down.sql new file mode 100644 index 000000000..6c6302da4 --- /dev/null +++ b/storage/postgres/migrations/20220802100000_events_notification.down.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS notify_event(); \ No newline at end of file diff --git a/storage/postgres/migrations/20220802100000_events_notification.up.sql b/storage/postgres/migrations/20220802100000_events_notification.up.sql new file mode 100644 index 000000000..c5f6b1644 --- /dev/null +++ b/storage/postgres/migrations/20220802100000_events_notification.up.sql @@ -0,0 +1,21 @@ +CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ + DECLARE + data json; + notification json; + + BEGIN + IF (TG_OP = 'DELETE') THEN + data = row_to_json(OLD); + ELSE + data = row_to_json(NEW); + END IF; + notification = json_build_object( + 'table',TG_TABLE_NAME, + 'action', TG_OP, + 'data', data); + + + PERFORM pg_notify('events',notification::text); + RETURN NULL; + END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/storage/postgres/migrations/20220803100000_blocked_clients_trigger.down.sql b/storage/postgres/migrations/20220803100000_blocked_clients_trigger.down.sql new file mode 100644 index 000000000..eba0aeb4d --- /dev/null +++ b/storage/postgres/migrations/20220803100000_blocked_clients_trigger.down.sql @@ -0,0 +1 @@ +DROP TRIGGER IF EXISTS blocked_clients_notify_event; \ No newline at end of file diff --git a/storage/postgres/migrations/20220803100000_blocked_clients_trigger.up.sql b/storage/postgres/migrations/20220803100000_blocked_clients_trigger.up.sql new file mode 100644 index 000000000..79ec9b610 --- /dev/null +++ b/storage/postgres/migrations/20220803100000_blocked_clients_trigger.up.sql @@ -0,0 +1,3 @@ +CREATE TRIGGER blocked_clients_notify_event + AFTER INSERT OR DELETE ON blocked_clients + FOR EACH ROW EXECUTE PROCEDURE notify_event() \ No newline at end of file diff --git a/test/blocked_clients_test/blocked_clients_test.go b/test/blocked_clients_test/blocked_clients_test.go new file mode 100644 index 000000000..56e540407 --- /dev/null +++ b/test/blocked_clients_test/blocked_clients_test.go @@ -0,0 +1 @@ +package test