diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 8d4aca79..080e59f5 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -65,4 +65,5 @@ jobs: make e2e-test env: container_tool: docker + SERVER_REPLICAS: 2 MESSAGE_DRIVER_TYPE: grpc diff --git a/.gitignore b/.gitignore index af4e2dc4..c9bc3a71 100755 --- a/.gitignore +++ b/.gitignore @@ -59,5 +59,5 @@ test/e2e/.consumer_name test/e2e/.external_host_ip test/e2e/report/* unit-test-results.json -integration-test-results.json +*integration-test-results.json test/e2e/setup/aro/aro-hcp diff --git a/Makefile b/Makefile index 90ad5c2e..08343446 100755 --- a/Makefile +++ b/Makefile @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18 # Test output files unit_test_json_output ?= ${PWD}/unit-test-results.json -integration_test_json_output ?= ${PWD}/integration-test-results.json +mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json +grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json # Prints a list of useful targets. help: @@ -218,11 +219,19 @@ test: # make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc. # make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet # make test-integration TESTFLAGS="-short" skips long-run tests -test-integration: - OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ - ./test/integration +test-integration: test-integration-mqtt test-integration-grpc .PHONY: test-integration +test-integration-mqtt: + BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ + ./test/integration +.PHONY: test-integration-mqtt + +test-integration-grpc: + BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \ + ./test/integration +.PHONY: test-integration-grpc + # Regenerate openapi client and models generate: rm -rf pkg/api/openapi diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index 1b3611df..8f393b36 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -12,7 +12,9 @@ import ( "github.com/openshift-online/maestro/cmd/maestro/environments" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/config" + "github.com/openshift-online/maestro/pkg/controllers" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" ) @@ -47,9 +49,11 @@ func runServer(cmd *cobra.Command, args []string) { // For gRPC, create a gRPC broker to handle resource spec and status events. // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer + var eventFilter controllers.EventFilter if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) + eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent) } else { klog.Info("Setting up message queue event server") var statusDispatcher dispatcher.Dispatcher @@ -67,12 +71,13 @@ func runServer(cmd *cobra.Command, args []string) { // Set the status dispatcher for the healthcheck server healthcheckServer.SetStatusDispatcher(statusDispatcher) eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) + eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory)) } // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - controllersServer := server.NewControllersServer(eventServer) + controllersServer := server.NewControllersServer(eventServer, eventFilter) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index dcf713b4..ea54952d 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -11,10 +11,10 @@ import ( "github.com/openshift-online/maestro/pkg/logger" ) -func NewControllersServer(eventServer EventServer) *ControllersServer { +func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer { s := &ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(env().Database.SessionFactory), + eventFilter, env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index 5514a3a3..a8f5f744 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -39,6 +39,9 @@ type EventServer interface { // OnStatusUpdate handles status update events for a resource. OnStatusUpdate(ctx context.Context, eventID, resourceID string) error + + // returns true if the event should be processed by the current instance, otherwise false and an error if it occurs. + PredicateEvent(ctx context.Context, eventID string) (bool, error) } var _ EventServer = &MessageQueueEventServer{} @@ -145,6 +148,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r ) } +// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on advisory lock. +func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + return true, nil +} + // handleStatusUpdate processes the resource status update from the agent. // The resource argument contains the updated status. // The function performs the following steps: diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index cf301fd1..eff5bf39 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -13,7 +13,9 @@ import ( cetypes "github.com/cloudevents/sdk-go/v2/types" "github.com/google/uuid" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "k8s.io/klog/v2" pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1" @@ -51,6 +53,7 @@ type GRPCBroker struct { instanceID string eventInstanceDao dao.EventInstanceDao resourceService services.ResourceService + eventService services.EventService statusEventService services.StatusEventService bindAddress string subscribers map[string]*subscriber // registered subscribers @@ -79,6 +82,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer { instanceID: env().Config.MessageBroker.ClientID, eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), resourceService: env().Services.Resources(), + eventService: env().Services.Events(), statusEventService: env().Services.StatusEvents(), bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort, subscribers: make(map[string]*subscriber), @@ -183,21 +187,25 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error { evt, err := encodeResourceSpec(res) if err != nil { + // return the error to requeue the event if encoding fails (e.g., due to invalid resource spec). return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) } - klog.V(4).Infof("send the event to spec subscribers, %s", evt) - // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf pbEvt := &pbv1.CloudEvent{} if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil { - return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err) + // return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent). + return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err) } // send the cloudevent to the subscriber - // TODO: error handling to address errors beyond network issues. + klog.V(4).Infof("sending the event to spec subscribers, %s", evt) if err := subServer.Send(pbEvt); err != nil { klog.Errorf("failed to send grpc event, %v", err) + // Return the error without wrapping, as it includes the gRPC error code and message for further handling. + // For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's + // error channel to unregister the subscriber. + return err } return nil @@ -205,10 +213,18 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv select { case err := <-errChan: + // When reaching this point, an unrecoverable error occurred while sending the event, + // such as the connection being closed. Unregister the subscriber to trigger agent reconnection. klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err) bkr.unregister(subscriberID) return err case <-subServer.Context().Done(): + // The context of the stream has been canceled or completed. + // This could happen if: + // - The client closed the connection or canceled the stream. + // - The server closed the stream, potentially due to a shutdown. + // Regardless of the reason, unregister the subscriber and stop processing. + // No error is returned here because the stream closure is expected. bkr.unregister(subscriberID) return nil } @@ -377,52 +393,67 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy } // handleRes publish the resource to the correct subscriber. -func (bkr *GRPCBroker) handleRes(resource *api.Resource) { +func (bkr *GRPCBroker) handleRes(resource *api.Resource) error { bkr.mu.RLock() defer bkr.mu.RUnlock() for _, subscriber := range bkr.subscribers { if subscriber.clusterName == resource.ConsumerName { if err := subscriber.handler(resource); err != nil { - subscriber.errChan <- err + // check if the error is recoverable. For unrecoverable errors, + // such as a connection closed by an intermediate proxy, push + // the error to subscriber's error channel to unregister the subscriber. + st, ok := status.FromError(err) + if ok && st.Code() == codes.Unavailable { + // TODO: handle more error codes that can't be recovered + subscriber.errChan <- err + } + return err } } } + return nil } // OnCreate is called by the controller when a resource is created on the maestro server. func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { resource, err := bkr.resourceService.Get(ctx, id) if err != nil { + // if the resource is not found, it indicates the resource has been processed. + if err.Is404() { + return nil + } return err } - bkr.handleRes(resource) - - return nil + return bkr.handleRes(resource) } // OnUpdate is called by the controller when a resource is updated on the maestro server. func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { resource, err := bkr.resourceService.Get(ctx, id) if err != nil { + // if the resource is not found, it indicates the resource has been processed. + if err.Is404() { + return nil + } return err } - bkr.handleRes(resource) - - return nil + return bkr.handleRes(resource) } // OnDelete is called by the controller when a resource is deleted from the maestro server. func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { resource, err := bkr.resourceService.Get(ctx, id) if err != nil { + // if the resource is not found, it indicates the resource has been processed. + if err.Is404() { + return nil + } return err } - bkr.handleRes(resource) - - return nil + return bkr.handleRes(resource) } // On StatusUpdate will be called on each new status event inserted into db. @@ -442,6 +473,39 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s ) } +// PredicateEvent checks if the event should be processed by the current instance +// by verifying the resource consumer name is in the subscriber list, ensuring the +// event will be only processed when the consumer is subscribed to the current broker. +func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + evt, err := bkr.eventService.Get(ctx, eventID) + if err != nil { + return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) + } + + // fast return if the event is already reconciled + if evt.ReconciledDate != nil { + return false, nil + } + + resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) + if svcErr != nil { + // if the resource is not found, it indicates the resource has been handled by other instances. + if svcErr.Is404() { + klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) + now := time.Now() + evt.ReconciledDate = &now + if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { + return false, fmt.Errorf("failed to mark event with id (%s) as reconciled: %s", evt.ID, svcErr) + } + return false, nil + } + return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) + } + + // check if the consumer is subscribed to the broker + return bkr.IsConsumerSubscribed(resource.ConsumerName), nil +} + // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool { bkr.mu.RLock() diff --git a/pkg/controllers/event_filter.go b/pkg/controllers/event_filter.go new file mode 100644 index 00000000..cdabc4b8 --- /dev/null +++ b/pkg/controllers/event_filter.go @@ -0,0 +1,96 @@ +package controllers + +import ( + "context" + "fmt" + + "github.com/openshift-online/maestro/pkg/db" +) + +// EventFilter defines an interface for filtering and deferring actions on events. +// Implementations of EventFilter should provide logic for determining whether an event +// should be processed and for handling any actions that need to be deferred. +// +// - Filter: Decides whether the event should be processed based on its ID. +// - DeferredAction: Allows for scheduling actions that should occur regardless of whether the event +// was processed successfully or not, such as cleanup tasks or releasing resources. +type EventFilter interface { + // Filter determines whether the event should be processed. + // Returns true if the event should be handled, false and an error otherwise. + Filter(ctx context.Context, id string) (bool, error) + + // DeferredAction schedules actions to be executed regardless of event processing success. + DeferredAction(ctx context.Context, id string) +} + +// LockBasedEventFilter implements EventFilter using a locking mechanism for event processing. +// It creates advisory locks on event IDs to ensure thread-safe access. +// - Filter acquires a lock on the event ID and returns true if the lock is successful. +// - DeferredAction releases the lock for the event ID. +type LockBasedEventFilter struct { + lockFactory db.LockFactory + // locks map is accessed by a single-threaded handler goroutine, no need for lock on it. + locks map[string]string +} + +func NewLockBasedEventFilter(lockFactory db.LockFactory) EventFilter { + return &LockBasedEventFilter{ + lockFactory: lockFactory, + locks: make(map[string]string), + } +} + +// Filter attempts to acquire a lock on the event ID. Returns true if successful, false and error otherwise. +func (h *LockBasedEventFilter) Filter(ctx context.Context, id string) (bool, error) { + // lock the Event with a fail-fast advisory lock context. + // this allows concurrent processing of many events by one or many controller managers. + // allow the lock to be released by the handler goroutine and allow this function to continue. + // subsequent events will be locked by their own distinct IDs. + lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) + // store the lock owner ID for deferred action + h.locks[id] = lockOwnerID + if err != nil { + return false, fmt.Errorf("error obtaining the event lock: %v", err) + } + + if !acquired { + logger.V(4).Infof("Event %s is processed by another worker", id) + return false, nil + } + + return true, nil +} + +// DeferredAction releases the lock for the given event ID if it was acquired. +func (h *LockBasedEventFilter) DeferredAction(ctx context.Context, id string) { + if ownerID, exists := h.locks[id]; exists { + h.lockFactory.Unlock(ctx, ownerID) + delete(h.locks, id) + } +} + +// eventFilterPredicate is a function type for filtering events based on their ID. +type eventFilterPredicate func(ctx context.Context, eventID string) (bool, error) + +// PredicatedEventFilter implements EventFilter using a predicate function for event filtering. +// - Filter uses the predicate to decide if the event should be processed. +// - DeferredAction is a no-op as no locking is performed. +type PredicatedEventFilter struct { + predicate eventFilterPredicate +} + +func NewPredicatedEventFilter(predicate eventFilterPredicate) EventFilter { + return &PredicatedEventFilter{ + predicate: predicate, + } +} + +// Filter calls the predicate function to determine if the event should be processed. +func (h *PredicatedEventFilter) Filter(ctx context.Context, id string) (bool, error) { + return h.predicate(ctx, id) +} + +// DeferredAction is a no-op since no locks are involved. +func (h *PredicatedEventFilter) DeferredAction(ctx context.Context, id string) { + // no-op +} diff --git a/pkg/controllers/event_filter_test.go b/pkg/controllers/event_filter_test.go new file mode 100644 index 00000000..2bb22e36 --- /dev/null +++ b/pkg/controllers/event_filter_test.go @@ -0,0 +1,150 @@ +package controllers + +import ( + "context" + "testing" + + "github.com/google/uuid" + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao/mocks" + dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" +) + +func TestLockingEventFilter(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + eventFilter := NewLockBasedEventFilter(dbmocks.NewMockAdvisoryLockFactory()) + + _, err := eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + shouldProcess, err := eventFilter.Filter(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + lockingEventFilter, ok := eventFilter.(*LockBasedEventFilter) + Expect(ok).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) + + eventFilter.DeferredAction(ctx, "1") + Expect(lockingEventFilter.locks).To(HaveLen(0)) + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventFilter.Filter(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) + + eventFilter.DeferredAction(ctx, "2") + Expect(lockingEventFilter.locks).To(HaveLen(0)) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventFilter.Filter(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) +} + +func TestPredicatedEventFilter(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + resourcesDao := mocks.NewResourceDao() + eventServer := &exampleEventServer{eventsDao: eventsDao, resourcesDao: resourcesDao, subscrbers: []string{"cluster1"}} + eventFilter := NewPredicatedEventFilter(eventServer.PredicateEvent) + + resID := uuid.New().String() + _, err := resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: resID}, + ConsumerName: "cluster1", + Source: source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: resID, + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + newResID := uuid.New().String() + _, err = resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: newResID}, + ConsumerName: "cluster2", + Source: source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "3"}, + Source: source, + SourceID: newResID, + EventType: api.DeleteEventType, + }) + Expect(err).To(BeNil()) + + // handle event 1 + shouldProcess, err := eventFilter.Filter(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + // call deferred action + eventFilter.DeferredAction(ctx, "1") + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + // handle event 2 + shouldProcess, err = eventFilter.Filter(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).NotTo(BeTrue()) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + // handle event 3 + shouldProcess, err = eventFilter.Filter(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).NotTo(BeTrue()) + + event, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) +} diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index f06f592a..b55819ea 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -6,7 +6,6 @@ import ( "time" "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/db" maestrologger "github.com/openshift-online/maestro/pkg/logger" "github.com/openshift-online/maestro/pkg/services" @@ -54,15 +53,15 @@ type ControllerConfig struct { type KindControllerManager struct { controllers map[string]map[api.EventType][]ControllerHandlerFunc - lockFactory db.LockFactory + eventFilter EventFilter events services.EventService eventsQueue workqueue.RateLimitingInterface } -func NewKindControllerManager(lockFactory db.LockFactory, events services.EventService) *KindControllerManager { +func NewKindControllerManager(eventFilter EventFilter, events services.EventService) *KindControllerManager { return &KindControllerManager{ controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, - lockFactory: lockFactory, + eventFilter: eventFilter, events: events, eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), } @@ -108,35 +107,34 @@ func (km *KindControllerManager) add(source string, ev api.EventType, fns []Cont } func (km *KindControllerManager) handleEvent(id string) error { - ctx := context.Background() - // lock the Event with a fail-fast advisory lock context. - // this allows concurrent processing of many events by one or many controller managers. - // allow the lock to be released by the handler goroutine and allow this function to continue. - // subsequent events will be locked by their own distinct IDs. - lockOwnerID, acquired, err := km.lockFactory.NewNonBlockingLock(ctx, id, db.Events) - // Ensure that the transaction related to this lock always end. - defer km.lockFactory.Unlock(ctx, lockOwnerID) + reqContext := context.WithValue(context.Background(), EventID, id) + + // check if the event should be processed by this instance + shouldProcess, err := km.eventFilter.Filter(reqContext, id) + defer km.eventFilter.DeferredAction(reqContext, id) if err != nil { - return fmt.Errorf("error obtaining the event lock: %v", err) + return fmt.Errorf("error filtering event with id (%s): %s", id, err) } - if !acquired { - logger.Infof("Event %s is processed by another worker, continue to process the next", id) + // if the event should not be processed by this instance, we can ignore it + if !shouldProcess { + logger.Infof("Event with id (%s) should not be processed by this instance", id) return nil } - reqContext := context.WithValue(ctx, EventID, id) - event, svcErr := km.events.Get(reqContext, id) if svcErr != nil { if svcErr.Is404() { // the event is already deleted, we can ignore it + logger.V(4).Infof("Event with id (%s) is not found", id) return nil } - return fmt.Errorf("error getting event with id(%s): %s", id, svcErr) + return fmt.Errorf("error getting event with id (%s): %s", id, svcErr) } if event.ReconciledDate != nil { + // the event is already reconciled, we can ignore it + logger.V(4).Infof("Event with id (%s) is already reconciled", id) return nil } @@ -155,17 +153,17 @@ func (km *KindControllerManager) handleEvent(id string) error { for _, fn := range handlerFns { err := fn(reqContext, event.SourceID) if err != nil { - return fmt.Errorf("error handing event %s, %s, %s: %s", event.Source, event.EventType, id, err) + return fmt.Errorf("error handing event %s-%s (%s): %s", event.Source, event.EventType, id, err) } } // all handlers successfully executed now := time.Now() event.ReconciledDate = &now - _, svcErr = km.events.Replace(reqContext, event) - if svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) + if _, svcErr := km.events.Replace(reqContext, event); svcErr != nil { + return fmt.Errorf("error updating event with id (%s): %s", id, svcErr) } + return nil } diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index 3365073b..10aebd35 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -2,13 +2,18 @@ package controllers import ( "context" + "fmt" "testing" + "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/dao/mocks" dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" "github.com/openshift-online/maestro/pkg/services" + "gorm.io/gorm" ) func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { @@ -43,38 +48,41 @@ func (d *exampleController) OnDelete(ctx context.Context, id string) error { return nil } -func TestControllerFramework(t *testing.T) { +func TestControllerFrameworkWithLockBasedEventFilter(t *testing.T) { RegisterTestingT(t) ctx := context.Background() eventsDao := mocks.NewEventDao() events := services.NewEventService(eventsDao) - mgr := NewKindControllerManager(dbmocks.NewMockAdvisoryLockFactory(), events) + mgr := NewKindControllerManager(NewLockBasedEventFilter(dbmocks.NewMockAdvisoryLockFactory()), events) ctrl := &exampleController{} config := newExampleControllerConfig(ctrl) mgr.Add(config) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err := eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "1"}, Source: config.Source, SourceID: "any id", EventType: api.CreateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "2"}, Source: config.Source, SourceID: "any id", EventType: api.UpdateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "3"}, Source: config.Source, SourceID: "any id", EventType: api.DeleteEventType, }) + Expect(err).To(BeNil()) mgr.handleEvent("1") mgr.handleEvent("2") @@ -84,6 +92,126 @@ func TestControllerFramework(t *testing.T) { Expect(ctrl.updateCounter).To(Equal(1)) Expect(ctrl.deleteCounter).To(Equal(1)) - eve, _ := eventsDao.Get(ctx, "1") + eve, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") +} + +type exampleEventServer struct { + eventsDao dao.EventDao + resourcesDao dao.ResourceDao + subscrbers []string +} + +func (e *exampleEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + event, err := e.eventsDao.Get(ctx, eventID) + if err != nil { + return false, err + } + resource, err := e.resourcesDao.Get(ctx, event.SourceID) + if err != nil { + // 404 == gorm.ErrRecordNotFound means the resource was deleted, so we can ignore the event + if err == gorm.ErrRecordNotFound { + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := e.eventsDao.Replace(ctx, event); svcErr != nil { + return false, fmt.Errorf("failed to update event %s: %s", event.ID, svcErr.Error()) + } + return false, nil + } + return false, err + } + return contains(e.subscrbers, resource.ConsumerName), nil +} + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} + +func TestControllerFrameworkWithPredicatedEventFilter(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + eventsDao := mocks.NewEventDao() + resourcesDao := mocks.NewResourceDao() + events := services.NewEventService(eventsDao) + eventServer := &exampleEventServer{eventsDao: eventsDao, resourcesDao: resourcesDao, subscrbers: []string{"cluster1"}} + mgr := NewKindControllerManager(NewPredicatedEventFilter(eventServer.PredicateEvent), events) + + ctrl := &exampleController{} + config := newExampleControllerConfig(ctrl) + mgr.Add(config) + + resID := uuid.New().String() + _, err := resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: resID}, + ConsumerName: "cluster1", + Source: config.Source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: config.Source, + SourceID: resID, + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: config.Source, + SourceID: "any id", + EventType: api.UpdateEventType, + }) + Expect(err).To(BeNil()) + + newResID := uuid.New().String() + _, err = resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: newResID}, + ConsumerName: "cluster2", + Source: config.Source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "3"}, + Source: config.Source, + SourceID: newResID, + EventType: api.DeleteEventType, + }) + Expect(err).To(BeNil()) + + mgr.handleEvent("1") + mgr.handleEvent("2") + mgr.handleEvent("3") + + Expect(ctrl.addCounter).To(Equal(1)) + Expect(ctrl.updateCounter).To(Equal(0)) + Expect(ctrl.deleteCounter).To(Equal(0)) + + eve, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).To(BeNil(), "event reconcile date should not be set") } diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instance.go similarity index 100% rename from pkg/dao/event_instances.go rename to pkg/dao/event_instance.go diff --git a/pkg/dao/mocks/event.go b/pkg/dao/mocks/event.go index 738e110f..a7e9559b 100755 --- a/pkg/dao/mocks/event.go +++ b/pkg/dao/mocks/event.go @@ -7,7 +7,6 @@ import ( "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" - "github.com/openshift-online/maestro/pkg/errors" ) var _ dao.EventDao = &eventDaoMock{} @@ -35,7 +34,13 @@ func (d *eventDaoMock) Create(ctx context.Context, event *api.Event) (*api.Event } func (d *eventDaoMock) Replace(ctx context.Context, event *api.Event) (*api.Event, error) { - return nil, errors.NotImplemented("Event").AsError() + for i, e := range d.events { + if e.ID == event.ID { + d.events[i] = event + return event, nil + } + } + return nil, gorm.ErrRecordNotFound } func (d *eventDaoMock) Delete(ctx context.Context, id string) error { @@ -53,7 +58,15 @@ func (d *eventDaoMock) Delete(ctx context.Context, id string) error { } func (d *eventDaoMock) FindByIDs(ctx context.Context, ids []string) (api.EventList, error) { - return nil, errors.NotImplemented("Event").AsError() + filteredEvents := api.EventList{} + for _, id := range ids { + for _, e := range d.events { + if e.ID == id { + filteredEvents = append(filteredEvents, e) + } + } + } + return filteredEvents, nil } func (d *eventDaoMock) All(ctx context.Context) (api.EventList, error) { diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go new file mode 100644 index 00000000..e8ebef59 --- /dev/null +++ b/pkg/dao/mocks/event_instance.go @@ -0,0 +1,76 @@ +package mocks + +import ( + "context" + "fmt" + "sync" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" +) + +var _ dao.EventInstanceDao = &eventInstanceDaoMock{} + +type eventInstanceDaoMock struct { + mux sync.RWMutex + eventInstances api.EventInstanceList +} + +func NewEventInstanceDaoMock() *eventInstanceDaoMock { + return &eventInstanceDaoMock{} +} + +func (d *eventInstanceDaoMock) Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + for _, ei := range d.eventInstances { + if ei.EventID == eventID && ei.InstanceID == instanceID { + return ei, nil + } + } + + return nil, fmt.Errorf("event instance not found") +} + +func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + d.mux.Lock() + defer d.mux.Unlock() + + d.eventInstances = append(d.eventInstances, eventInstance) + + return eventInstance, nil +} + +func (d *eventInstanceDaoMock) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + var eventInstances api.EventInstanceList + for _, id := range ids { + for _, ei := range d.eventInstances { + if ei.EventID == id { + eventInstances = append(eventInstances, ei) + } + } + } + + return eventInstances, nil +} + +func (d *eventInstanceDaoMock) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + var eventIDs []string + for _, ei := range d.eventInstances { + if contains(instanceIDs, ei.InstanceID) { + if ei.EventID == "" { + continue + } + eventIDs = append(eventIDs, ei.EventID) + } + } + + return eventIDs, nil +} diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index 73ff4e21..12498774 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -139,8 +139,7 @@ func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime tim func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { d.mux.RLock() defer d.mux.RUnlock() - - ids := []string{} + ids := make([]string, 0, len(d.instances)) for _, instance := range d.instances { if instance.Ready { ids = append(ids, instance.ID) diff --git a/test/helper.go b/test/helper.go index e9fcfe5a..331a79e4 100755 --- a/test/helper.go +++ b/test/helper.go @@ -24,6 +24,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -71,8 +72,8 @@ type Helper struct { Ctx context.Context ContextCancelFunc context.CancelFunc + Broker string EventBroadcaster *event.EventBroadcaster - StatusDispatcher dispatcher.Dispatcher Store *MemoryStore GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource] DBFactory db.SessionFactory @@ -81,6 +82,7 @@ type Helper struct { MetricsServer server.Server HealthCheckServer *server.HealthCheckServer EventServer server.EventServer + EventFilter controllers.EventFilter ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder WorkAgentInformer workv1informers.ManifestWorkInformer @@ -111,6 +113,12 @@ func NewHelper(t *testing.T) *Helper { } pflag.Parse() + // Set the message broker type if it is set in the environment + broker := os.Getenv("BROKER") + if broker != "" { + env.Config.MessageBroker.MessageBrokerType = broker + } + err = env.Initialize() if err != nil { klog.Fatalf("Unable to initialize testing environment: %s", err.Error()) @@ -120,18 +128,32 @@ func NewHelper(t *testing.T) *Helper { helper = &Helper{ Ctx: ctx, ContextCancelFunc: cancel, + Broker: env.Config.MessageBroker.MessageBrokerType, EventBroadcaster: event.NewEventBroadcaster(), - StatusDispatcher: dispatcher.NewHashDispatcher( + AppConfig: env.Config, + DBFactory: env.Database.SessionFactory, + JWTPrivateKey: jwtKey, + JWTCA: jwtCA, + } + + // Set the healthcheck interval to 1 second for testing + helper.Env().Config.HealthCheck.HeartbeartInterval = 1 + helper.HealthCheckServer = server.NewHealthCheckServer() + + if helper.Broker != "grpc" { + statusDispatcher := dispatcher.NewHashDispatcher( helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig, - ), - AppConfig: env.Config, - DBFactory: env.Database.SessionFactory, - JWTPrivateKey: jwtKey, - JWTCA: jwtCA, + ) + helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) + helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory)) + } else { + helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) + helper.EventFilter = controllers.NewPredicatedEventFilter(helper.EventServer.PredicateEvent) } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -206,9 +228,6 @@ func (helper *Helper) stopMetricsServer() error { } func (helper *Helper) startHealthCheckServer(ctx context.Context) { - helper.Env().Config.HealthCheck.HeartbeartInterval = 1 - helper.HealthCheckServer = server.NewHealthCheckServer() - helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher) go func() { klog.V(10).Info("Test health check server started") helper.HealthCheckServer.Start(ctx) @@ -222,8 +241,6 @@ func (helper *Helper) sendShutdownSignal() error { } func (helper *Helper) startEventServer(ctx context.Context) { - // helper.Env().Config.EventServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) @@ -242,7 +259,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), + helper.EventFilter, helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -270,10 +287,19 @@ func (helper *Helper) StartControllerManager(ctx context.Context) { } func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bundle bool) { - // initilize the mqtt options - mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) - if err != nil { - klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + var brokerConfig any + if helper.Broker != "grpc" { + // initilize the mqtt options + mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) + if err != nil { + klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + } + brokerConfig = mqttOptions + } else { + // initilize the grpc options + grpcOptions := grpc.NewGRPCOptions() + grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort) + brokerConfig = grpcOptions } var workCodec generic.Codec[*workv1.ManifestWork] @@ -285,7 +311,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu watcherStore := store.NewAgentInformerWatcherStore() - clientHolder, err := work.NewClientHolderBuilder(mqttOptions). + clientHolder, err := work.NewClientHolderBuilder(brokerConfig). WithClientID(clusterName). WithClusterName(clusterName). WithCodecs(workCodec). diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 1285eb60..9f4cb965 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/api" @@ -25,6 +26,10 @@ func TestControllerRacing(t *testing.T) { cancel() }() + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) @@ -74,13 +79,23 @@ func TestControllerRacing(t *testing.T) { return nil } - // Start 3 controllers concurrently + // Start 3 controllers concurrently for message queue event server threads := 3 + randNum := rand.Intn(3) for i := 0; i < threads; i++ { + // each controller has its own event filter, otherwise, the event lock will block the event processing. + eventFilter := controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory)) + if h.Broker == "grpc" { + eventFilter = controllers.NewPredicatedEventFilter(func(ctx context.Context, eventID string) (bool, error) { + // simulate the event filter, where the agent randomly connects to a grpc broker instance. + // in theory, only one broker instance should process the event. + return i == randNum, nil + }) + } go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + eventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -107,7 +122,6 @@ func TestControllerRacing(t *testing.T) { // wait for controller service starts time.Sleep(3 * time.Second) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) resources := h.CreateResourceList(consumer.Name, 50) // This is to check only 50 create events are processed. It waits for 5 seconds to ensure all events have been @@ -147,6 +161,10 @@ func TestControllerReconcile(t *testing.T) { account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) @@ -178,7 +196,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -203,9 +221,8 @@ func TestControllerReconcile(t *testing.T) { s.Start(ctx) }() // wait for the listener to start - time.Sleep(100 * time.Millisecond) + time.Sleep(time.Second) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) resource := h.CreateResource(consumer.Name, deployName, 1) @@ -266,8 +283,35 @@ func TestControllerSync(t *testing.T) { account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) - eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + + // create two resources with resource dao + resource4ID := uuid.New().String() + resourceDao := dao.NewResourceDao(&h.Env().Database.SessionFactory) + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource4ID, + }, + ConsumerName: consumer.Name, + Name: "resource4", + }); err != nil { + t.Fatal(err) + } + resource5ID := uuid.New().String() + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource5ID, + }, + ConsumerName: consumer.Name, + Name: "resource5", + }); err != nil { + t.Fatal(err) + } + + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) now := time.Now() if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", SourceID: "resource1", @@ -288,12 +332,12 @@ func TestControllerSync(t *testing.T) { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource4", + SourceID: resource4ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource5", + SourceID: resource5ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } @@ -311,7 +355,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -433,7 +477,7 @@ func TestStatusControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/test/integration/resource_test.go b/test/integration/resource_test.go index a66f07a8..458aa57a 100755 --- a/test/integration/resource_test.go +++ b/test/integration/resource_test.go @@ -519,6 +519,17 @@ func TestUpdateResourceWithRacingRequests(t *testing.T) { account := h.NewRandAccount() ctx := h.NewAuthenticatedContext(account) + var objids []string + rows, err := h.DBFactory.DirectDB().Query("SELECT objid FROM pg_locks WHERE locktype='advisory'") + Expect(err).NotTo(HaveOccurred(), "Error querying pg_locks: %v", err) + for rows.Next() { + var objid string + Expect(rows.Scan(&objid)).NotTo(HaveOccurred(), "Error scanning pg_locks value: %v", err) + objids = append(objids, objid) + } + rows.Close() + time.Sleep(time.Second) + consumer := h.CreateConsumer("cluster-" + rand.String(5)) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) res := h.CreateResource(consumer.Name, deployName, 1) @@ -561,11 +572,17 @@ func TestUpdateResourceWithRacingRequests(t *testing.T) { // the resource patch request is protected by the advisory lock, so there should only be one update Expect(updatedCount).To(Equal(1)) - // all the locks should be released finally + // ensure the locks for current test are released + query := fmt.Sprintf("select count(*) from pg_locks where locktype='advisory' and objid not in (%s)", strings.Join(objids, ",")) + if len(objids) == 0 { + query = "select count(*) from pg_locks where locktype='advisory'" + } + + // ensure the locks for current test are released finally Eventually(func() error { var count int err := h.DBFactory.DirectDB(). - QueryRow("select count(*) from pg_locks where locktype='advisory';"). + QueryRow(query). Scan(&count) Expect(err).NotTo(HaveOccurred(), "Error querying pg_locks: %v", err)