From 2d0074dd6012986a2e59b519919409cbd9b57f2e Mon Sep 17 00:00:00 2001 From: morvencao Date: Wed, 18 Dec 2024 04:46:36 +0000 Subject: [PATCH 1/7] multiple instances support for grpc broker. Signed-off-by: morvencao --- .github/workflows/e2e.yml | 1 + cmd/maestro/servecmd/cmd.go | 11 +- cmd/maestro/server/controllers.go | 4 +- cmd/maestro/server/event_server.go | 20 ++- cmd/maestro/server/grpc_broker.go | 71 ++++++-- pkg/controllers/event_handler.go | 170 ++++++++++++++++++ pkg/controllers/event_handler_test.go | 151 ++++++++++++++++ pkg/controllers/framework.go | 61 +++---- pkg/controllers/framework_test.go | 118 ++++++++++-- .../{event_instances.go => event_instance.go} | 14 ++ pkg/dao/instance.go | 14 ++ pkg/dao/mocks/event.go | 8 +- pkg/dao/mocks/event_instance.go | 57 ++++++ pkg/dao/mocks/instance.go | 12 ++ test/helper.go | 2 +- test/integration/controller_test.go | 18 +- 16 files changed, 650 insertions(+), 82 deletions(-) create mode 100644 pkg/controllers/event_handler.go create mode 100644 pkg/controllers/event_handler_test.go rename pkg/dao/{event_instances.go => event_instance.go} (81%) create mode 100644 pkg/dao/mocks/event_instance.go diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 8d4aca79..080e59f5 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -65,4 +65,5 @@ jobs: make e2e-test env: container_tool: docker + SERVER_REPLICAS: 2 MESSAGE_DRIVER_TYPE: grpc diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index 1b3611df..d582e060 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -12,7 +12,9 @@ import ( "github.com/openshift-online/maestro/cmd/maestro/environments" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/config" + "github.com/openshift-online/maestro/pkg/controllers" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" ) @@ -47,9 +49,14 @@ func runServer(cmd *cobra.Command, args []string) { // For gRPC, create a gRPC broker to handle resource spec and status events. // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer + var eventHandler controllers.EventHandler if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) + eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent, + environments.Environment().Services.Events(), + dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory), + dao.NewInstanceDao(&environments.Environment().Database.SessionFactory)) } else { klog.Info("Setting up message queue event server") var statusDispatcher dispatcher.Dispatcher @@ -67,12 +74,14 @@ func runServer(cmd *cobra.Command, args []string) { // Set the status dispatcher for the healthcheck server healthcheckServer.SetStatusDispatcher(statusDispatcher) eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) + eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory), + environments.Environment().Services.Events()) } // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - controllersServer := server.NewControllersServer(eventServer) + controllersServer := server.NewControllersServer(eventServer, eventHandler) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index dcf713b4..38d09ab2 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -11,10 +11,10 @@ import ( "github.com/openshift-online/maestro/pkg/logger" ) -func NewControllersServer(eventServer EventServer) *ControllersServer { +func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer { s := &ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(env().Database.SessionFactory), + eventHandler, env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index 5514a3a3..4ff89381 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -29,16 +29,19 @@ type EventServer interface { Start(ctx context.Context) // OnCreate handles the creation of a resource. - OnCreate(ctx context.Context, resourceID string) error + OnCreate(ctx context.Context, eventID, resourceID string) error // OnUpdate handles updates to a resource. - OnUpdate(ctx context.Context, resourceID string) error + OnUpdate(ctx context.Context, eventID, resourceID string) error // OnDelete handles the deletion of a resource. - OnDelete(ctx context.Context, resourceID string) error + OnDelete(ctx context.Context, eventID, resourceID string) error // OnStatusUpdate handles status update events for a resource. OnStatusUpdate(ctx context.Context, eventID, resourceID string) error + + // returns true if the event should be processed by the current instance, otherwise false and an error. + PredicateEvent(ctx context.Context, eventID string) (bool, error) } var _ EventServer = &MessageQueueEventServer{} @@ -114,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } @@ -145,6 +148,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r ) } +// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on lock. +func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + return true, nil +} + // handleStatusUpdate processes the resource status update from the agent. // The resource argument contains the updated status. // The function performs the following steps: diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index cf301fd1..6a20ce2c 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -51,6 +51,7 @@ type GRPCBroker struct { instanceID string eventInstanceDao dao.EventInstanceDao resourceService services.ResourceService + eventService services.EventService statusEventService services.StatusEventService bindAddress string subscribers map[string]*subscriber // registered subscribers @@ -79,6 +80,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer { instanceID: env().Config.MessageBroker.ClientID, eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), resourceService: env().Services.Resources(), + eventService: env().Services.Events(), statusEventService: env().Services.StatusEvents(), bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort, subscribers: make(map[string]*subscriber), @@ -389,40 +391,49 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) { } } +// handleResEvent publish the resource to the correct subscriber and add the event instance record. +func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error { + bkr.handleRes(resource) + + // add the event instance record to mark the event has been processed by the current instance + if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: bkr.instanceID, + }); err != nil { + return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) + } + + return nil +} + // OnCreate is called by the controller when a resource is created on the maestro server. -func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { - resource, err := bkr.resourceService.Get(ctx, id) +func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error { + resource, err := bkr.resourceService.Get(ctx, resourceID) if err != nil { return err } - bkr.handleRes(resource) - - return nil + return bkr.handleResEvent(ctx, eventID, resource) } // OnUpdate is called by the controller when a resource is updated on the maestro server. -func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { - resource, err := bkr.resourceService.Get(ctx, id) +func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error { + resource, err := bkr.resourceService.Get(ctx, resourceID) if err != nil { return err } - bkr.handleRes(resource) - - return nil + return bkr.handleResEvent(ctx, eventID, resource) } // OnDelete is called by the controller when a resource is deleted from the maestro server. -func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { - resource, err := bkr.resourceService.Get(ctx, id) +func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error { + resource, err := bkr.resourceService.Get(ctx, resourceID) if err != nil { return err } - bkr.handleRes(resource) - - return nil + return bkr.handleResEvent(ctx, eventID, resource) } // On StatusUpdate will be called on each new status event inserted into db. @@ -442,6 +453,36 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s ) } +// PredicateEvent checks if the event should be processed by the current instance +// by verifying the resource consumer name is in the subscriber list, ensuring the +// event will be only processed when the consumer is subscribed to the current broker. +func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + evt, err := bkr.eventService.Get(ctx, eventID) + if err != nil { + return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) + } + resource, err := bkr.resourceService.Get(ctx, evt.SourceID) + if err != nil { + return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error()) + } + + if bkr.IsConsumerSubscribed(resource.ConsumerName) { + return true, nil + } + + // if the consumer is not subscribed to the broker, then add the event instance record + // to indicate the event has been processed by the instance + if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: bkr.instanceID, + }); err != nil { + return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) + } + klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName) + + return false, nil +} + // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool { bkr.mu.RLock() diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go new file mode 100644 index 00000000..c41a2d8f --- /dev/null +++ b/pkg/controllers/event_handler.go @@ -0,0 +1,170 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" + "github.com/openshift-online/maestro/pkg/services" + "k8s.io/klog/v2" +) + +// EventHandler defines the actions to handle an event at various stages of its lifecycle. +type EventHandler interface { + // ShouldHandleEvent determines whether the event should be processed. + // Returns true if the event should be handled, false and an error otherwise. + ShouldHandleEvent(ctx context.Context, id string) (bool, error) + + // DeferredAction schedules any deferred actions that need to be executed + // after the event is processed successfully or unsuccessfully. + DeferredAction(ctx context.Context, id string) + + // PostProcess is called after the event is processed to perform any cleanup + // or additional actions required for the event. + PostProcess(ctx context.Context, event *api.Event) error +} + +// LockBasedEventHandler is an implementation of EventHandler that uses a locking mechanism to control event processing. +// It leverages a lock factory to create advisory locks for each event ID, ensuring non-blocking, thread-safe access. +// - ShouldHandleEvent acquires the lock for the event ID and returns true if the lock is successful. +// - DeferredAction releases the lock for the event ID. +// - PostProcess updates the event with a reconciled date after processing. +type LockBasedEventHandler struct { + lockFactory db.LockFactory + locks map[string]string + events services.EventService +} + +func NewLockBasedEventHandler(lockFactory db.LockFactory, events services.EventService) EventHandler { + return &LockBasedEventHandler{ + lockFactory: lockFactory, + locks: make(map[string]string), + events: events, + } +} + +func (h *LockBasedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { + // lock the Event with a fail-fast advisory lock context. + // this allows concurrent processing of many events by one or many controller managers. + // allow the lock to be released by the handler goroutine and allow this function to continue. + // subsequent events will be locked by their own distinct IDs. + lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) + // store the lock owner ID for deferred action + h.locks[id] = lockOwnerID + if err != nil { + return false, fmt.Errorf("error obtaining the event lock: %v", err) + } + + if !acquired { + logger.V(4).Infof("Event %s is processed by another worker", id) + return false, nil + } + + return true, nil +} + +func (h *LockBasedEventHandler) DeferredAction(ctx context.Context, id string) { + if ownerID, exists := h.locks[id]; exists { + h.lockFactory.Unlock(ctx, ownerID) + delete(h.locks, id) + } +} + +func (h *LockBasedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { + // update the event with the reconciled date + if event != nil { + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { + return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) + } + } + + return nil +} + +// eventHandlerPredicate is a function type for filtering events based on their ID. +type eventHandlerPredicate func(ctx context.Context, eventID string) (bool, error) + +// PredicatedEventHandler is an implementation of EventHandler that filters events using a predicate function. +// - ShouldHandleEvent uses the predicate to determine if the event should be processed by ID. +// - DeferredAction is a no-op as no locking is performed. +// - PostProcess updates the event with the reconciled date and checks if it's processed by all instances. +// If all instances have processed the event, it marks the event as reconciled. +type PredicatedEventHandler struct { + predicate eventHandlerPredicate + events services.EventService + eventInstanceDao dao.EventInstanceDao + instanceDao dao.InstanceDao +} + +func NewPredicatedEventHandler(predicate eventHandlerPredicate, events services.EventService, eventInstanceDao dao.EventInstanceDao, instanceDao dao.InstanceDao) EventHandler { + return &PredicatedEventHandler{ + predicate: predicate, + events: events, + eventInstanceDao: eventInstanceDao, + instanceDao: instanceDao, + } +} + +func (h *PredicatedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { + return h.predicate(ctx, id) +} + +func (h *PredicatedEventHandler) DeferredAction(ctx context.Context, id string) { + // no-op +} + +func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { + // check the event and alive instances + // if the event is handled by all alive instances, mark the event as reconciled + activeInstances, err := h.instanceDao.FindReadyIDs(ctx) + if err != nil { + return fmt.Errorf("error finding ready instances: %v", err) + } + + eventInstances, err := h.eventInstanceDao.GetInstancesByEventID(ctx, event.ID) + if err != nil { + return fmt.Errorf("error finding processed server instances for event %s: %v", event.ID, err) + } + + // check if all instances have processed the event + if !compareStrings(activeInstances, eventInstances) { + klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances) + return fmt.Errorf("event %s is not processed by all instances", event.ID) + } + + // update the event with the reconciled date + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { + return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) + } + + return nil +} + +// compareStrings compares two string slices and returns true if they are equal +func compareStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + + for _, v := range a { + found := false + for _, vv := range b { + if v == vv { + found = true + break + } + } + if !found { + return false + } + } + + return true +} diff --git a/pkg/controllers/event_handler_test.go b/pkg/controllers/event_handler_test.go new file mode 100644 index 00000000..ebcd4a58 --- /dev/null +++ b/pkg/controllers/event_handler_test.go @@ -0,0 +1,151 @@ +package controllers + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao/mocks" + dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" + "github.com/openshift-online/maestro/pkg/services" +) + +func TestLockingEventHandler(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + events := services.NewEventService(eventsDao) + eventHandler := NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + lockingEventHandler, ok := eventHandler.(*LockBasedEventHandler) + Expect(ok).To(BeTrue()) + Expect(lockingEventHandler.locks).To(HaveLen(1)) + + eventHandler.DeferredAction(ctx, "1") + Expect(lockingEventHandler.locks).To(HaveLen(0)) + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + err = eventHandler.PostProcess(ctx, event) + Expect(err).To(BeNil()) + + event, err = eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventHandler.locks).To(HaveLen(1)) + + eventHandler.DeferredAction(ctx, "2") + Expect(lockingEventHandler.locks).To(HaveLen(0)) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventHandler.locks).To(HaveLen(1)) +} + +func TestPredicatedEventHandler(t *testing.T) { + RegisterTestingT(t) + + currentInstanceID := "test-instance" + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + events := services.NewEventService(eventsDao) + eventInstancesDao := mocks.NewEventInstanceDaoMock() + instancesDao := mocks.NewInstanceDao() + eventServer := &exampleEventServer{eventDao: eventsDao} + eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: currentInstanceID}, + Ready: true, + }) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "another-instance"}, + Ready: false, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: "1", + InstanceID: currentInstanceID, + }) + Expect(err).To(BeNil()) + + eventHandler.DeferredAction(ctx, "1") + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + err = eventHandler.PostProcess(ctx, event) + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + event, err = eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + eventHandler.DeferredAction(ctx, "2") + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") + Expect(err).NotTo(BeNil()) + Expect(shouldProcess).To(BeFalse()) +} diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index f06f592a..62c56321 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -6,7 +6,6 @@ import ( "time" "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/db" maestrologger "github.com/openshift-online/maestro/pkg/logger" "github.com/openshift-online/maestro/pkg/services" @@ -45,7 +44,7 @@ var logger = maestrologger.NewOCMLogger(context.Background()) // events sync will help us to handle unexpected errors (e.g. sever restart), it ensures we will not miss any events var defaultEventsSyncPeriod = 10 * time.Hour -type ControllerHandlerFunc func(ctx context.Context, id string) error +type ControllerHandlerFunc func(ctx context.Context, eventID, sourceID string) error type ControllerConfig struct { Source string @@ -53,18 +52,18 @@ type ControllerConfig struct { } type KindControllerManager struct { - controllers map[string]map[api.EventType][]ControllerHandlerFunc - lockFactory db.LockFactory - events services.EventService - eventsQueue workqueue.RateLimitingInterface + controllers map[string]map[api.EventType][]ControllerHandlerFunc + eventHandler EventHandler + events services.EventService + eventsQueue workqueue.RateLimitingInterface } -func NewKindControllerManager(lockFactory db.LockFactory, events services.EventService) *KindControllerManager { +func NewKindControllerManager(eventHandler EventHandler, events services.EventService) *KindControllerManager { return &KindControllerManager{ - controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, - lockFactory: lockFactory, - events: events, - eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), + controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, + eventHandler: eventHandler, + events: events, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), } } @@ -108,35 +107,34 @@ func (km *KindControllerManager) add(source string, ev api.EventType, fns []Cont } func (km *KindControllerManager) handleEvent(id string) error { - ctx := context.Background() - // lock the Event with a fail-fast advisory lock context. - // this allows concurrent processing of many events by one or many controller managers. - // allow the lock to be released by the handler goroutine and allow this function to continue. - // subsequent events will be locked by their own distinct IDs. - lockOwnerID, acquired, err := km.lockFactory.NewNonBlockingLock(ctx, id, db.Events) - // Ensure that the transaction related to this lock always end. - defer km.lockFactory.Unlock(ctx, lockOwnerID) + reqContext := context.WithValue(context.Background(), EventID, id) + + // check if the event should be processed by this instance + shouldProcess, err := km.eventHandler.ShouldHandleEvent(reqContext, id) + defer km.eventHandler.DeferredAction(reqContext, id) if err != nil { - return fmt.Errorf("error obtaining the event lock: %v", err) + return fmt.Errorf("error filtering event with id (%s): %s", id, err) } - if !acquired { - logger.Infof("Event %s is processed by another worker, continue to process the next", id) + // if the event should not be processed by this instance, we can ignore it + if !shouldProcess { + logger.Infof("Event with id (%s) should not be processed by this instance", id) return nil } - reqContext := context.WithValue(ctx, EventID, id) - event, svcErr := km.events.Get(reqContext, id) if svcErr != nil { if svcErr.Is404() { // the event is already deleted, we can ignore it + logger.V(4).Infof("Event with id (%s) is not found", id) return nil } - return fmt.Errorf("error getting event with id(%s): %s", id, svcErr) + return fmt.Errorf("error getting event with id (%s): %s", id, svcErr) } if event.ReconciledDate != nil { + // the event is already reconciled, we can ignore it + logger.V(4).Infof("Event with id (%s) is already reconciled", id) return nil } @@ -153,20 +151,13 @@ func (km *KindControllerManager) handleEvent(id string) error { } for _, fn := range handlerFns { - err := fn(reqContext, event.SourceID) + err := fn(reqContext, id, event.SourceID) if err != nil { - return fmt.Errorf("error handing event %s, %s, %s: %s", event.Source, event.EventType, id, err) + return fmt.Errorf("error handing event %s-%s (%s): %s", event.Source, event.EventType, id, err) } } - // all handlers successfully executed - now := time.Now() - event.ReconciledDate = &now - _, svcErr = km.events.Replace(reqContext, event) - if svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) - } - return nil + return km.eventHandler.PostProcess(reqContext, event) } func (km *KindControllerManager) runWorker() { diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index 3365073b..620674a1 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -6,6 +6,7 @@ import ( . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/dao/mocks" dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" "github.com/openshift-online/maestro/pkg/services" @@ -23,35 +24,53 @@ func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { } type exampleController struct { - addCounter int - updateCounter int - deleteCounter int + instanceID string + eventInstancesDao dao.EventInstanceDao + addCounter int + updateCounter int + deleteCounter int } -func (d *exampleController) OnAdd(ctx context.Context, id string) error { +func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error { d.addCounter++ - return nil + _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: d.instanceID, + }) + return err } -func (d *exampleController) OnUpdate(ctx context.Context, id string) error { +func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error { d.updateCounter++ - return nil + _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: d.instanceID, + }) + return err } -func (d *exampleController) OnDelete(ctx context.Context, id string) error { +func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error { d.deleteCounter++ - return nil + _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: d.instanceID, + }) + return err } -func TestControllerFramework(t *testing.T) { +func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { RegisterTestingT(t) ctx := context.Background() eventsDao := mocks.NewEventDao() events := services.NewEventService(eventsDao) - mgr := NewKindControllerManager(dbmocks.NewMockAdvisoryLockFactory(), events) + eventInstancesDao := mocks.NewEventInstanceDaoMock() + mgr := NewKindControllerManager(NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events), events) - ctrl := &exampleController{} + ctrl := &exampleController{ + instanceID: "instance-1", + eventInstancesDao: eventInstancesDao, + } config := newExampleControllerConfig(ctrl) mgr.Add(config) @@ -87,3 +106,78 @@ func TestControllerFramework(t *testing.T) { eve, _ := eventsDao.Get(ctx, "1") Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") } + +type exampleEventServer struct { + eventDao dao.EventDao +} + +func (e *exampleEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { + _, err := e.eventDao.Get(ctx, eventID) + if err != nil { + return false, err + } + return true, nil +} + +func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) { + RegisterTestingT(t) + + currentInstanceID := "test-instance" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + events := services.NewEventService(eventsDao) + eventServer := &exampleEventServer{eventDao: eventsDao} + eventInstancesDao := mocks.NewEventInstanceDaoMock() + instancesDao := mocks.NewInstanceDao() + eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) + mgr := NewKindControllerManager(eventHandler, events) + + ctrl := &exampleController{ + instanceID: currentInstanceID, + eventInstancesDao: eventInstancesDao, + } + config := newExampleControllerConfig(ctrl) + mgr.Add(config) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: currentInstanceID}, + Ready: true, + }) + + _, _ = instancesDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "another-instance"}, + Ready: false, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: config.Source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: config.Source, + SourceID: "any id", + EventType: api.UpdateEventType, + }) + + _, _ = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "3"}, + Source: config.Source, + SourceID: "any id", + EventType: api.DeleteEventType, + }) + + mgr.handleEvent("1") + mgr.handleEvent("2") + mgr.handleEvent("3") + + Expect(ctrl.addCounter).To(Equal(1)) + Expect(ctrl.updateCounter).To(Equal(1)) + Expect(ctrl.deleteCounter).To(Equal(1)) + + eve, _ := eventsDao.Get(ctx, "1") + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") +} diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instance.go similarity index 81% rename from pkg/dao/event_instances.go rename to pkg/dao/event_instance.go index 7ec1eef5..53066ffa 100644 --- a/pkg/dao/event_instances.go +++ b/pkg/dao/event_instance.go @@ -11,6 +11,7 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) + GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) @@ -38,6 +39,19 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin return &eventInstance, nil } +func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { + g2 := (*d.sessionFactory).New(ctx) + var eventInstances []api.EventInstance + if err := g2.Model(&api.EventInstance{}).Where("event_id = ?", eventID).Find(&eventInstances).Error; err != nil { + return nil, err + } + instanceIDs := make([]string, len(eventInstances)) + for i, eventInstance := range eventInstances { + instanceIDs[i] = eventInstance.InstanceID + } + return instanceIDs, nil +} + func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { g2 := (*d.sessionFactory).New(ctx) if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index 8be26a3c..1d5fe1f4 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -19,6 +19,7 @@ type InstanceDao interface { Delete(ctx context.Context, id string) error DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) + FindReadyIDs(ctx context.Context) ([]string, error) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) FindReadyIDs(ctx context.Context) ([]string, error) All(ctx context.Context) (api.ServerInstanceList, error) @@ -111,6 +112,19 @@ func (d *sqlInstanceDao) FindByIDs(ctx context.Context, ids []string) (api.Serve return instances, nil } +func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) { + g2 := (*d.sessionFactory).New(ctx) + instances := api.ServerInstanceList{} + if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil { + return nil, err + } + ids := make([]string, len(instances)) + for i, instance := range instances { + ids[i] = instance.ID + } + return ids, nil +} + func (d *sqlInstanceDao) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) instances := api.ServerInstanceList{} diff --git a/pkg/dao/mocks/event.go b/pkg/dao/mocks/event.go index 738e110f..60833a0d 100755 --- a/pkg/dao/mocks/event.go +++ b/pkg/dao/mocks/event.go @@ -35,7 +35,13 @@ func (d *eventDaoMock) Create(ctx context.Context, event *api.Event) (*api.Event } func (d *eventDaoMock) Replace(ctx context.Context, event *api.Event) (*api.Event, error) { - return nil, errors.NotImplemented("Event").AsError() + for i, e := range d.events { + if e.ID == event.ID { + d.events[i] = event + return event, nil + } + } + return nil, gorm.ErrRecordNotFound } func (d *eventDaoMock) Delete(ctx context.Context, id string) error { diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go new file mode 100644 index 00000000..34a1468a --- /dev/null +++ b/pkg/dao/mocks/event_instance.go @@ -0,0 +1,57 @@ +package mocks + +import ( + "context" + "fmt" + "sync" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" +) + +var _ dao.EventInstanceDao = &eventInstanceDaoMock{} + +type eventInstanceDaoMock struct { + mux sync.RWMutex + eventInstances api.EventInstanceList +} + +func NewEventInstanceDaoMock() *eventInstanceDaoMock { + return &eventInstanceDaoMock{} +} + +func (d *eventInstanceDaoMock) Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + for _, ei := range d.eventInstances { + if ei.EventID == eventID && ei.InstanceID == instanceID { + return ei, nil + } + } + + return nil, fmt.Errorf("event instance not found") +} + +func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + var instanceIDs []string + for _, ei := range d.eventInstances { + if ei.EventID == eventID { + instanceIDs = append(instanceIDs, ei.InstanceID) + } + } + + return instanceIDs, nil +} + +func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + d.mux.Lock() + defer d.mux.Unlock() + + d.eventInstances = append(d.eventInstances, eventInstance) + + return eventInstance, nil +} diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index 73ff4e21..75dda50e 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -124,6 +124,18 @@ func (d *instanceDaoMock) FindByIDs(ctx context.Context, ids []string) (api.Serv return nil, errors.NotImplemented("Instance").AsError() } +func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + ids := make([]string, 0, len(d.instances)) + for _, instance := range d.instances { + if instance.Ready { + ids = append(ids, instance.ID) + } + } + return ids, nil +} + func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() diff --git a/test/helper.go b/test/helper.go index e9fcfe5a..062da815 100755 --- a/test/helper.go +++ b/test/helper.go @@ -242,7 +242,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()), helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 1285eb60..49d0e03f 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -32,13 +32,13 @@ func TestControllerRacing(t *testing.T) { // the event with create type. Due to the event lock, each create event // should be only processed once. var proccessedEvent, processedStatusEvent []string - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { events, err := eventDao.All(ctx) if err != nil { return err } for _, evt := range events { - if evt.SourceID != id { + if evt.SourceID != resourceID { continue } if evt.EventType != api.CreateEventType { @@ -48,7 +48,7 @@ func TestControllerRacing(t *testing.T) { if evt.ReconciledDate != nil { continue } - proccessedEvent = append(proccessedEvent, id) + proccessedEvent = append(proccessedEvent, resourceID) } return nil @@ -80,7 +80,7 @@ func TestControllerRacing(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -153,7 +153,7 @@ func TestControllerReconcile(t *testing.T) { processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, // and then, the controller will requeue this event, at that time, we handle this event successfully. - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { processedEventTimes = processedEventTimes + 1 if processedEventTimes == 1 { return fmt.Errorf("failed to process the event") @@ -178,7 +178,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -299,9 +299,9 @@ func TestControllerSync(t *testing.T) { } var proccessedEvents []string - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { // we just record the processed event - proccessedEvents = append(proccessedEvents, id) + proccessedEvents = append(proccessedEvents, resourceID) return nil } @@ -311,7 +311,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( From 1cf655990e7bb577b1a1df790d53ec8ea875e6d0 Mon Sep 17 00:00:00 2001 From: morvencao Date: Wed, 18 Dec 2024 09:17:47 +0000 Subject: [PATCH 2/7] add grpc broker integration test. Signed-off-by: morvencao --- .gitignore | 2 +- Makefile | 17 +++-- cmd/maestro/server/grpc_broker.go | 32 +++++++-- pkg/api/event_instances.go | 5 +- pkg/controllers/event_handler.go | 29 +++++--- pkg/controllers/event_handler_test.go | 44 ++++++++++-- pkg/controllers/framework_test.go | 21 ++++-- pkg/dao/event_instance.go | 31 ++++----- pkg/dao/instance.go | 14 ---- pkg/dao/mocks/event_instance.go | 47 +++++++++++-- pkg/dao/mocks/instance.go | 15 +--- test/helper.go | 62 ++++++++++++----- test/integration/controller_test.go | 98 ++++++++++++++++++++++++--- 13 files changed, 300 insertions(+), 117 deletions(-) diff --git a/.gitignore b/.gitignore index af4e2dc4..c9bc3a71 100755 --- a/.gitignore +++ b/.gitignore @@ -59,5 +59,5 @@ test/e2e/.consumer_name test/e2e/.external_host_ip test/e2e/report/* unit-test-results.json -integration-test-results.json +*integration-test-results.json test/e2e/setup/aro/aro-hcp diff --git a/Makefile b/Makefile index 90ad5c2e..08343446 100755 --- a/Makefile +++ b/Makefile @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18 # Test output files unit_test_json_output ?= ${PWD}/unit-test-results.json -integration_test_json_output ?= ${PWD}/integration-test-results.json +mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json +grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json # Prints a list of useful targets. help: @@ -218,11 +219,19 @@ test: # make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc. # make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet # make test-integration TESTFLAGS="-short" skips long-run tests -test-integration: - OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ - ./test/integration +test-integration: test-integration-mqtt test-integration-grpc .PHONY: test-integration +test-integration-mqtt: + BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ + ./test/integration +.PHONY: test-integration-mqtt + +test-integration-grpc: + BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \ + ./test/integration +.PHONY: test-integration-grpc + # Regenerate openapi client and models generate: rm -rf pkg/api/openapi diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 6a20ce2c..d1574f1d 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -200,6 +200,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv // TODO: error handling to address errors beyond network issues. if err := subServer.Send(pbEvt); err != nil { klog.Errorf("failed to send grpc event, %v", err) + return err // return error to requeue the spec event } return nil @@ -397,8 +398,8 @@ func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resou // add the event instance record to mark the event has been processed by the current instance if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: bkr.instanceID, + SpecEventID: eventID, + InstanceID: bkr.instanceID, }); err != nil { return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) } @@ -461,9 +462,26 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool if err != nil { return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) } - resource, err := bkr.resourceService.Get(ctx, evt.SourceID) - if err != nil { - return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error()) + + // fast return if the event is already reconciled + if evt.ReconciledDate != nil { + return false, nil + } + + resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) + if svcErr != nil { + // if the resource is not found, it indicates the resource has been handled by + // other instances, so we can mark the event as reconciled and ignore it. + if svcErr.Is404() { + klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) + now := time.Now() + evt.ReconciledDate = &now + if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { + return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error()) + } + return false, nil + } + return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) } if bkr.IsConsumerSubscribed(resource.ConsumerName) { @@ -473,8 +491,8 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool // if the consumer is not subscribed to the broker, then add the event instance record // to indicate the event has been processed by the instance if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: bkr.instanceID, + SpecEventID: eventID, + InstanceID: bkr.instanceID, }); err != nil { return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) } diff --git a/pkg/api/event_instances.go b/pkg/api/event_instances.go index 413147e8..97141852 100644 --- a/pkg/api/event_instances.go +++ b/pkg/api/event_instances.go @@ -1,8 +1,9 @@ package api type EventInstance struct { - EventID string - InstanceID string + EventID string `gorm:"default:null"` + SpecEventID string `gorm:"default:null"` + InstanceID string } type EventInstanceList []*EventInstance diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go index c41a2d8f..8794da61 100644 --- a/pkg/controllers/event_handler.go +++ b/pkg/controllers/event_handler.go @@ -126,15 +126,26 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve return fmt.Errorf("error finding ready instances: %v", err) } - eventInstances, err := h.eventInstanceDao.GetInstancesByEventID(ctx, event.ID) + processedInstances, err := h.eventInstanceDao.GetInstancesBySpecEventID(ctx, event.ID) if err != nil { - return fmt.Errorf("error finding processed server instances for event %s: %v", event.ID, err) + return fmt.Errorf("error finding processed instances for event %s: %v", event.ID, err) + } + + // should never happen. If the event is not processed by any instance, return an error + if len(processedInstances) == 0 { + klog.V(10).Infof("Event %s is not processed by any instance", event.ID) + return fmt.Errorf("event %s is not processed by any instance", event.ID) } // check if all instances have processed the event - if !compareStrings(activeInstances, eventInstances) { - klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances) - return fmt.Errorf("event %s is not processed by all instances", event.ID) + // 1. In normal case, the activeInstances == eventInstances, mark the event as reconciled + // 2. If maestro server instance is up, but has't been marked as ready, then activeInstances < eventInstances, + // it's ok to mark the event as reconciled, as the instance is not ready to sever the request, no connected agents. + // 3. If maestro server instance is down, but has been marked as unready, it may still have connected agents, but + // the instance has stopped to handle the event, so activeInstances > eventInstances, the event should be equeued. + if !isSubSet(activeInstances, processedInstances) { + klog.V(10).Infof("Event %s is not processed by all active instances %v, handled by %v", event.ID, activeInstances, processedInstances) + return fmt.Errorf("event %s is not processed by all active instances", event.ID) } // update the event with the reconciled date @@ -147,12 +158,8 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve return nil } -// compareStrings compares two string slices and returns true if they are equal -func compareStrings(a, b []string) bool { - if len(a) != len(b) { - return false - } - +// isSubSet checks if slice a is a subset of slice b +func isSubSet(a, b []string) bool { for _, v := range a { found := false for _, vv := range b { diff --git a/pkg/controllers/event_handler_test.go b/pkg/controllers/event_handler_test.go index ebcd4a58..8a00002d 100644 --- a/pkg/controllers/event_handler_test.go +++ b/pkg/controllers/event_handler_test.go @@ -78,6 +78,7 @@ func TestPredicatedEventHandler(t *testing.T) { RegisterTestingT(t) currentInstanceID := "test-instance" + anotherInstanceID := "another-instance" source := "my-event-source" ctx := context.Background() eventsDao := mocks.NewEventDao() @@ -87,13 +88,15 @@ func TestPredicatedEventHandler(t *testing.T) { eventServer := &exampleEventServer{eventDao: eventsDao} eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) + // current instance is ready _, _ = instancesDao.Create(ctx, &api.ServerInstance{ Meta: api.Meta{ID: currentInstanceID}, Ready: true, }) + // second instance is not ready _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: "another-instance"}, + Meta: api.Meta{ID: anotherInstanceID}, Ready: false, }) @@ -111,40 +114,71 @@ func TestPredicatedEventHandler(t *testing.T) { EventType: api.CreateEventType, }) + // handle event 1 shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") Expect(err).To(BeNil()) Expect(shouldProcess).To(BeTrue()) _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: "1", - InstanceID: currentInstanceID, + SpecEventID: "1", + InstanceID: currentInstanceID, }) Expect(err).To(BeNil()) eventHandler.DeferredAction(ctx, "1") + // simulate the second instance handled the event, although it has not been marked as ready + _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ + SpecEventID: "1", + InstanceID: anotherInstanceID, + }) + Expect(err).To(BeNil()) + event, err := eventsDao.Get(ctx, "1") Expect(err).To(BeNil()) Expect(event.ReconciledDate).To(BeNil()) + // should post process the event the second instance is not ready err = eventHandler.PostProcess(ctx, event) Expect(err).To(BeNil()) Expect(event.ReconciledDate).NotTo(BeNil()) - event, err = eventsDao.Get(ctx, "1") + // mark the second instance as ready + err = instancesDao.MarkReadyByIDs(ctx, []string{anotherInstanceID}) Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) + // handle event 2 shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") Expect(err).To(BeNil()) Expect(shouldProcess).To(BeTrue()) + // simulate the current instance handled the event, the second instance is shutting down + // before it handled the event + _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ + SpecEventID: "2", + InstanceID: currentInstanceID, + }) + Expect(err).To(BeNil()) + eventHandler.DeferredAction(ctx, "2") event, err = eventsDao.Get(ctx, "2") Expect(err).To(BeNil()) Expect(event.ReconciledDate).To(BeNil()) + err = eventHandler.PostProcess(ctx, event) + Expect(err).NotTo(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + // mark the second instance as unready + err = instancesDao.MarkUnreadyByIDs(ctx, []string{anotherInstanceID}) + Expect(err).To(BeNil()) + + // simulate requeue the event + err = eventHandler.PostProcess(ctx, event) + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") Expect(err).NotTo(BeNil()) Expect(shouldProcess).To(BeFalse()) diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index 620674a1..c91a1374 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -34,8 +34,8 @@ type exampleController struct { func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error { d.addCounter++ _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: d.instanceID, + SpecEventID: eventID, + InstanceID: d.instanceID, }) return err } @@ -43,8 +43,8 @@ func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID strin func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error { d.updateCounter++ _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: d.instanceID, + SpecEventID: eventID, + InstanceID: d.instanceID, }) return err } @@ -52,8 +52,8 @@ func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID st func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error { d.deleteCounter++ _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: d.instanceID, + SpecEventID: eventID, + InstanceID: d.instanceID, }) return err } @@ -105,6 +105,12 @@ func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { eve, _ := eventsDao.Get(ctx, "1") Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, _ = eventsDao.Get(ctx, "2") + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, _ = eventsDao.Get(ctx, "3") + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") } type exampleEventServer struct { @@ -123,6 +129,7 @@ func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) { RegisterTestingT(t) currentInstanceID := "test-instance" + anotherInstanceID := "another-instance" ctx := context.Background() eventsDao := mocks.NewEventDao() events := services.NewEventService(eventsDao) @@ -145,7 +152,7 @@ func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) { }) _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: "another-instance"}, + Meta: api.Meta{ID: anotherInstanceID}, Ready: false, }) diff --git a/pkg/dao/event_instance.go b/pkg/dao/event_instance.go index 53066ffa..687a48ea 100644 --- a/pkg/dao/event_instance.go +++ b/pkg/dao/event_instance.go @@ -11,10 +11,9 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) - GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) - - FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) + GetInstancesBySpecEventID(ctx context.Context, eventID string) ([]string, error) + FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) } @@ -39,10 +38,19 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin return &eventInstance, nil } -func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { +func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { + db.MarkForRollback(ctx, err) + return nil, err + } + return eventInstance, nil +} + +func (d *sqlEventInstanceDao) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { g2 := (*d.sessionFactory).New(ctx) var eventInstances []api.EventInstance - if err := g2.Model(&api.EventInstance{}).Where("event_id = ?", eventID).Find(&eventInstances).Error; err != nil { + if err := g2.Model(&api.EventInstance{}).Where("spec_event_id = ?", specEventID).Find(&eventInstances).Error; err != nil { return nil, err } instanceIDs := make([]string, len(eventInstances)) @@ -52,16 +60,7 @@ func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID return instanceIDs, nil } -func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { - g2 := (*d.sessionFactory).New(ctx) - if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { - db.MarkForRollback(ctx, err) - return nil, err - } - return eventInstance, nil -} - -func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { +func (d *sqlEventInstanceDao) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) eventInstances := api.EventInstanceList{} if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil { @@ -84,7 +83,7 @@ func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Conte // consider using join to optimize if err := g2.Table("event_instances"). Select("event_id"). - Where("instance_id IN ?", instanceIDs). + Where("instance_id IN (?) AND event_id IS NOT NULL", instanceIDs). Group("event_id"). Having("COUNT(DISTINCT instance_id) = ?", instanceCount). Scan(&eventIDs).Error; err != nil { diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index 1d5fe1f4..8be26a3c 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -19,7 +19,6 @@ type InstanceDao interface { Delete(ctx context.Context, id string) error DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) - FindReadyIDs(ctx context.Context) ([]string, error) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) FindReadyIDs(ctx context.Context) ([]string, error) All(ctx context.Context) (api.ServerInstanceList, error) @@ -112,19 +111,6 @@ func (d *sqlInstanceDao) FindByIDs(ctx context.Context, ids []string) (api.Serve return instances, nil } -func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) { - g2 := (*d.sessionFactory).New(ctx) - instances := api.ServerInstanceList{} - if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil { - return nil, err - } - ids := make([]string, len(instances)) - for i, instance := range instances { - ids[i] = instance.ID - } - return ids, nil -} - func (d *sqlInstanceDao) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) instances := api.ServerInstanceList{} diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go index 34a1468a..13bf8ce0 100644 --- a/pkg/dao/mocks/event_instance.go +++ b/pkg/dao/mocks/event_instance.go @@ -33,13 +33,22 @@ func (d *eventInstanceDaoMock) Get(ctx context.Context, eventID, instanceID stri return nil, fmt.Errorf("event instance not found") } -func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { +func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + d.mux.Lock() + defer d.mux.Unlock() + + d.eventInstances = append(d.eventInstances, eventInstance) + + return eventInstance, nil +} + +func (d *eventInstanceDaoMock) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { d.mux.RLock() defer d.mux.RUnlock() var instanceIDs []string for _, ei := range d.eventInstances { - if ei.EventID == eventID { + if ei.SpecEventID == specEventID { instanceIDs = append(instanceIDs, ei.InstanceID) } } @@ -47,11 +56,35 @@ func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventI return instanceIDs, nil } -func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { - d.mux.Lock() - defer d.mux.Unlock() +func (d *eventInstanceDaoMock) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { + d.mux.RLock() + defer d.mux.RUnlock() - d.eventInstances = append(d.eventInstances, eventInstance) + var eventInstances api.EventInstanceList + for _, id := range ids { + for _, ei := range d.eventInstances { + if ei.EventID == id { + eventInstances = append(eventInstances, ei) + } + } + } - return eventInstance, nil + return eventInstances, nil +} + +func (d *eventInstanceDaoMock) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + var eventIDs []string + for _, ei := range d.eventInstances { + if contains(instanceIDs, ei.InstanceID) { + if ei.EventID == "" { + continue + } + eventIDs = append(eventIDs, ei.EventID) + } + } + + return eventIDs, nil } diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index 75dda50e..12498774 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -124,18 +124,6 @@ func (d *instanceDaoMock) FindByIDs(ctx context.Context, ids []string) (api.Serv return nil, errors.NotImplemented("Instance").AsError() } -func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { - d.mux.RLock() - defer d.mux.RUnlock() - ids := make([]string, 0, len(d.instances)) - for _, instance := range d.instances { - if instance.Ready { - ids = append(ids, instance.ID) - } - } - return ids, nil -} - func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() @@ -151,8 +139,7 @@ func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime tim func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { d.mux.RLock() defer d.mux.RUnlock() - - ids := []string{} + ids := make([]string, 0, len(d.instances)) for _, instance := range d.instances { if instance.Ready { ids = append(ids, instance.ID) diff --git a/test/helper.go b/test/helper.go index 062da815..70eab990 100755 --- a/test/helper.go +++ b/test/helper.go @@ -24,6 +24,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -71,8 +72,8 @@ type Helper struct { Ctx context.Context ContextCancelFunc context.CancelFunc + Broker string EventBroadcaster *event.EventBroadcaster - StatusDispatcher dispatcher.Dispatcher Store *MemoryStore GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource] DBFactory db.SessionFactory @@ -81,6 +82,7 @@ type Helper struct { MetricsServer server.Server HealthCheckServer *server.HealthCheckServer EventServer server.EventServer + EventHandler controllers.EventHandler ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder WorkAgentInformer workv1informers.ManifestWorkInformer @@ -111,6 +113,12 @@ func NewHelper(t *testing.T) *Helper { } pflag.Parse() + // Set the message broker type if it is set in the environment + broker := os.Getenv("BROKER") + if broker != "" { + env.Config.MessageBroker.MessageBrokerType = broker + } + err = env.Initialize() if err != nil { klog.Fatalf("Unable to initialize testing environment: %s", err.Error()) @@ -120,18 +128,32 @@ func NewHelper(t *testing.T) *Helper { helper = &Helper{ Ctx: ctx, ContextCancelFunc: cancel, + Broker: env.Config.MessageBroker.MessageBrokerType, EventBroadcaster: event.NewEventBroadcaster(), - StatusDispatcher: dispatcher.NewHashDispatcher( + AppConfig: env.Config, + DBFactory: env.Database.SessionFactory, + JWTPrivateKey: jwtKey, + JWTCA: jwtCA, + } + + // Set the healthcheck interval to 1 second for testing + helper.Env().Config.HealthCheck.HeartbeartInterval = 1 + helper.HealthCheckServer = server.NewHealthCheckServer() + + if helper.Broker != "grpc" { + statusDispatcher := dispatcher.NewHashDispatcher( helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig, - ), - AppConfig: env.Config, - DBFactory: env.Database.SessionFactory, - JWTPrivateKey: jwtKey, - JWTCA: jwtCA, + ) + helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) + helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()) + } else { + helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) + helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory)) } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -206,9 +228,6 @@ func (helper *Helper) stopMetricsServer() error { } func (helper *Helper) startHealthCheckServer(ctx context.Context) { - helper.Env().Config.HealthCheck.HeartbeartInterval = 1 - helper.HealthCheckServer = server.NewHealthCheckServer() - helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher) go func() { klog.V(10).Info("Test health check server started") helper.HealthCheckServer.Start(ctx) @@ -222,8 +241,6 @@ func (helper *Helper) sendShutdownSignal() error { } func (helper *Helper) startEventServer(ctx context.Context) { - // helper.Env().Config.EventServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) @@ -242,7 +259,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()), + helper.EventHandler, helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -270,10 +287,19 @@ func (helper *Helper) StartControllerManager(ctx context.Context) { } func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bundle bool) { - // initilize the mqtt options - mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) - if err != nil { - klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + var brokerConfig any + if helper.Broker != "grpc" { + // initilize the mqtt options + mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) + if err != nil { + klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + } + brokerConfig = mqttOptions + } else { + // initilize the grpc options + grpcOptions := grpc.NewGRPCOptions() + grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort) + brokerConfig = grpcOptions } var workCodec generic.Codec[*workv1.ManifestWork] @@ -285,7 +311,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu watcherStore := store.NewAgentInformerWatcherStore() - clientHolder, err := work.NewClientHolderBuilder(mqttOptions). + clientHolder, err := work.NewClientHolderBuilder(brokerConfig). WithClientID(clusterName). WithClusterName(clusterName). WithCodecs(workCodec). diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 49d0e03f..f5459021 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/api" @@ -19,14 +20,22 @@ import ( func TestControllerRacing(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) defer func() { cancel() }() + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) // The handler filters the events by source id/type/reconciled, and only record // the event with create type. Due to the event lock, each create event @@ -49,6 +58,14 @@ func TestControllerRacing(t *testing.T) { continue } proccessedEvent = append(proccessedEvent, resourceID) + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + SpecEventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + if err != nil { + return err + } } return nil @@ -76,11 +93,19 @@ func TestControllerRacing(t *testing.T) { // Start 3 controllers concurrently threads := 3 + if h.Broker == "grpc" { + threads = 1 + } for i := 0; i < threads; i++ { + // each controller has its own event handler, otherwise, the event lock will block the event processing. + eventHandler := controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()) + if h.Broker == "grpc" { + eventHandler = controllers.NewPredicatedEventHandler(h.EventServer.PredicateEvent, h.Env().Services.Events(), dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), dao.NewInstanceDao(&h.Env().Database.SessionFactory)) + } go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + eventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -107,7 +132,6 @@ func TestControllerRacing(t *testing.T) { // wait for controller service starts time.Sleep(3 * time.Second) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) resources := h.CreateResourceList(consumer.Name, 50) // This is to check only 50 create events are processed. It waits for 5 seconds to ensure all events have been @@ -144,11 +168,19 @@ func TestControllerRacing(t *testing.T) { func TestControllerReconcile(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, @@ -159,6 +191,15 @@ func TestControllerReconcile(t *testing.T) { return fmt.Errorf("failed to process the event") } + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + SpecEventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + if err != nil { + return err + } + return nil } @@ -178,7 +219,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -205,7 +246,6 @@ func TestControllerReconcile(t *testing.T) { // wait for the listener to start time.Sleep(100 * time.Millisecond) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) resource := h.CreateResource(consumer.Name, deployName, 1) @@ -263,11 +303,42 @@ func TestControllerReconcile(t *testing.T) { func TestControllerSync(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) - eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + + // create two resources with resource dao + resource4ID := uuid.New().String() + resourceDao := dao.NewResourceDao(&h.Env().Database.SessionFactory) + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource4ID, + }, + ConsumerName: consumer.Name, + Name: "resource4", + }); err != nil { + t.Fatal(err) + } + resource5ID := uuid.New().String() + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource5ID, + }, + ConsumerName: consumer.Name, + Name: "resource5", + }); err != nil { + t.Fatal(err) + } + + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) now := time.Now() if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", SourceID: "resource1", @@ -288,12 +359,12 @@ func TestControllerSync(t *testing.T) { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource4", + SourceID: resource4ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource5", + SourceID: resource5ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } @@ -302,7 +373,12 @@ func TestControllerSync(t *testing.T) { onUpsert := func(ctx context.Context, eventID, resourceID string) error { // we just record the processed event proccessedEvents = append(proccessedEvents, resourceID) - return nil + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + SpecEventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + return err } // start the controller, once the controller started, it will sync the events: @@ -311,7 +387,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -433,7 +509,7 @@ func TestStatusControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -467,7 +543,7 @@ func TestStatusControllerSync(t *testing.T) { return fmt.Errorf("should purge the events %s, but got %+v", purged, events) } - eventInstances, err := eventInstanceDao.FindStatusEvents(ctx, purged) + eventInstances, err := eventInstanceDao.FindEventInstancesByEventIDs(ctx, purged) if err != nil { return err } From 9172a133a4245cf5f893f93619e39ee15c2d671c Mon Sep 17 00:00:00 2001 From: morvencao Date: Mon, 23 Dec 2024 06:09:05 +0000 Subject: [PATCH 3/7] event handler to event filter. Signed-off-by: morvencao --- cmd/maestro/servecmd/cmd.go | 12 +- cmd/maestro/server/controllers.go | 4 +- cmd/maestro/server/event_server.go | 12 +- cmd/maestro/server/grpc_broker.go | 111 ++++++++-------- pkg/api/event_instances.go | 5 +- pkg/controllers/event_filter.go | 96 +++++++++++++ pkg/controllers/event_filter_test.go | 150 +++++++++++++++++++++ pkg/controllers/event_handler.go | 177 ------------------------ pkg/controllers/event_handler_test.go | 185 -------------------------- pkg/controllers/framework.go | 35 +++-- pkg/controllers/framework_test.go | 163 +++++++++++++---------- pkg/dao/event_instance.go | 21 +-- pkg/dao/mocks/event.go | 11 +- pkg/dao/mocks/event_instance.go | 16 +-- test/helper.go | 8 +- test/integration/controller_test.go | 76 +++-------- 16 files changed, 470 insertions(+), 612 deletions(-) create mode 100644 pkg/controllers/event_filter.go create mode 100644 pkg/controllers/event_filter_test.go delete mode 100644 pkg/controllers/event_handler.go delete mode 100644 pkg/controllers/event_handler_test.go diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index d582e060..8f393b36 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -49,14 +49,11 @@ func runServer(cmd *cobra.Command, args []string) { // For gRPC, create a gRPC broker to handle resource spec and status events. // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer - var eventHandler controllers.EventHandler + var eventFilter controllers.EventFilter if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) - eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent, - environments.Environment().Services.Events(), - dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory), - dao.NewInstanceDao(&environments.Environment().Database.SessionFactory)) + eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent) } else { klog.Info("Setting up message queue event server") var statusDispatcher dispatcher.Dispatcher @@ -74,14 +71,13 @@ func runServer(cmd *cobra.Command, args []string) { // Set the status dispatcher for the healthcheck server healthcheckServer.SetStatusDispatcher(statusDispatcher) eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) - eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory), - environments.Environment().Services.Events()) + eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory)) } // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - controllersServer := server.NewControllersServer(eventServer, eventHandler) + controllersServer := server.NewControllersServer(eventServer, eventFilter) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index 38d09ab2..ea54952d 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -11,10 +11,10 @@ import ( "github.com/openshift-online/maestro/pkg/logger" ) -func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer { +func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer { s := &ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - eventHandler, + eventFilter, env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index 4ff89381..daaf5767 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -29,13 +29,13 @@ type EventServer interface { Start(ctx context.Context) // OnCreate handles the creation of a resource. - OnCreate(ctx context.Context, eventID, resourceID string) error + OnCreate(ctx context.Context, resourceID string) error // OnUpdate handles updates to a resource. - OnUpdate(ctx context.Context, eventID, resourceID string) error + OnUpdate(ctx context.Context, resourceID string) error // OnDelete handles the deletion of a resource. - OnDelete(ctx context.Context, eventID, resourceID string) error + OnDelete(ctx context.Context, resourceID string) error // OnStatusUpdate handles status update events for a resource. OnStatusUpdate(ctx context.Context, eventID, resourceID string) error @@ -117,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index d1574f1d..e4d9a655 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -29,7 +29,12 @@ import ( "github.com/openshift-online/maestro/pkg/services" ) -type resourceHandler func(res *api.Resource) error +// resourceHandler processes a resource spec by encoding it to a CloudEvent and sending it to the subscriber. +// It returns a bool indicating if the connection is closed and an error if one occurs. +// - Returns true and an error if the connection is closed. +// - Returns false and an error if encoding fails. +// - Returns false and nil if successful. +type resourceHandler func(res *api.Resource) (bool, error) // subscriber defines a subscriber that can receive and handle resource spec. type subscriber struct { @@ -182,36 +187,55 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv return fmt.Errorf("invalid subscription request: missing cluster name") } // register the cluster for subscription to the resource spec - subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error { + subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) (bool, error) { evt, err := encodeResourceSpec(res) if err != nil { - return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) + // return the error to requeue the event if encoding fails (e.g., due to invalid resource spec). + return false, fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) } - klog.V(4).Infof("send the event to spec subscribers, %s", evt) - // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf pbEvt := &pbv1.CloudEvent{} if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil { - return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err) + // return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent). + return false, fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err) } // send the cloudevent to the subscriber - // TODO: error handling to address errors beyond network issues. + klog.V(4).Infof("sending the event to spec subscribers, %s", evt) if err := subServer.Send(pbEvt); err != nil { klog.Errorf("failed to send grpc event, %v", err) - return err // return error to requeue the spec event + // Return true to ensure the subscriber will be unregistered when sending fails, which will close the subserver stream. + // See: https://github.com/grpc/grpc-go/blob/b615b35c4feb932a0ac658fb86b7127f10ef664e/stream.go#L1537 for more details. + // Return the error without wrapping, as it contains the gRPC error code and message for future (TODO) handling beyond network issues. + // This will also not requeue the event, as the error will cause the connection to the subscriber to be closed. + // If the subscriber (agent) reconnects, rely on the agent's resync to retrieve the missing resource spec. + return true, err } - return nil + return false, nil }) select { case err := <-errChan: + // An error occurred while sending the event to the subscriber. + // This could be due to multiple reasons: + // see: https://grpc.io/docs/guides/error/ + // 1. general errors such as: deadline exceeded before return the response. + // 2. network errors such as: connection closed by intermidiate proxy. + // 3. protocol errors such as: compression error or flow control error. + // In all above cases, unregister the subscriber. + // TODO: unregister the subscriber if the error is a network error and the connection could be re-established. klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err) bkr.unregister(subscriberID) return err case <-subServer.Context().Done(): + // The context of the stream has been canceled or completed. + // This could happen if: + // - The client closed the connection or canceled the stream. + // - The server closed the stream, potentially due to a shutdown. + // Regardless of the reason, unregister the subscriber and stop processing. + // No error is returned here because the stream closure is expected. bkr.unregister(subscriberID) return nil } @@ -380,61 +404,53 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy } // handleRes publish the resource to the correct subscriber. -func (bkr *GRPCBroker) handleRes(resource *api.Resource) { +func (bkr *GRPCBroker) handleRes(resource *api.Resource) error { bkr.mu.RLock() defer bkr.mu.RUnlock() for _, subscriber := range bkr.subscribers { if subscriber.clusterName == resource.ConsumerName { - if err := subscriber.handler(resource); err != nil { - subscriber.errChan <- err + if isConnClosed, err := subscriber.handler(resource); err != nil { + if isConnClosed { + // if the connection is closed, write the error to the subscriber's error channel + // to ensure the subscriber is unregistered + subscriber.errChan <- err + } + // return the error to requeue the event if handling fails. + return err } } } -} - -// handleResEvent publish the resource to the correct subscriber and add the event instance record. -func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error { - bkr.handleRes(resource) - - // add the event instance record to mark the event has been processed by the current instance - if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: bkr.instanceID, - }); err != nil { - return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) - } - return nil } // OnCreate is called by the controller when a resource is created on the maestro server. -func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error { - resource, err := bkr.resourceService.Get(ctx, resourceID) +func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { + resource, err := bkr.resourceService.Get(ctx, id) if err != nil { return err } - return bkr.handleResEvent(ctx, eventID, resource) + return bkr.handleRes(resource) } // OnUpdate is called by the controller when a resource is updated on the maestro server. -func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error { - resource, err := bkr.resourceService.Get(ctx, resourceID) +func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { + resource, err := bkr.resourceService.Get(ctx, id) if err != nil { return err } - return bkr.handleResEvent(ctx, eventID, resource) + return bkr.handleRes(resource) } // OnDelete is called by the controller when a resource is deleted from the maestro server. -func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error { - resource, err := bkr.resourceService.Get(ctx, resourceID) +func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { + resource, err := bkr.resourceService.Get(ctx, id) if err != nil { return err } - return bkr.handleResEvent(ctx, eventID, resource) + return bkr.handleRes(resource) } // On StatusUpdate will be called on each new status event inserted into db. @@ -470,35 +486,16 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) if svcErr != nil { - // if the resource is not found, it indicates the resource has been handled by - // other instances, so we can mark the event as reconciled and ignore it. + // if the resource is not found, it indicates the resource has been handled by other instances. if svcErr.Is404() { klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) - now := time.Now() - evt.ReconciledDate = &now - if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { - return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error()) - } return false, nil } return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) } - if bkr.IsConsumerSubscribed(resource.ConsumerName) { - return true, nil - } - - // if the consumer is not subscribed to the broker, then add the event instance record - // to indicate the event has been processed by the instance - if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: bkr.instanceID, - }); err != nil { - return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) - } - klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName) - - return false, nil + // check if the consumer is subscribed to the broker + return bkr.IsConsumerSubscribed(resource.ConsumerName), nil } // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. diff --git a/pkg/api/event_instances.go b/pkg/api/event_instances.go index 97141852..413147e8 100644 --- a/pkg/api/event_instances.go +++ b/pkg/api/event_instances.go @@ -1,9 +1,8 @@ package api type EventInstance struct { - EventID string `gorm:"default:null"` - SpecEventID string `gorm:"default:null"` - InstanceID string + EventID string + InstanceID string } type EventInstanceList []*EventInstance diff --git a/pkg/controllers/event_filter.go b/pkg/controllers/event_filter.go new file mode 100644 index 00000000..cdabc4b8 --- /dev/null +++ b/pkg/controllers/event_filter.go @@ -0,0 +1,96 @@ +package controllers + +import ( + "context" + "fmt" + + "github.com/openshift-online/maestro/pkg/db" +) + +// EventFilter defines an interface for filtering and deferring actions on events. +// Implementations of EventFilter should provide logic for determining whether an event +// should be processed and for handling any actions that need to be deferred. +// +// - Filter: Decides whether the event should be processed based on its ID. +// - DeferredAction: Allows for scheduling actions that should occur regardless of whether the event +// was processed successfully or not, such as cleanup tasks or releasing resources. +type EventFilter interface { + // Filter determines whether the event should be processed. + // Returns true if the event should be handled, false and an error otherwise. + Filter(ctx context.Context, id string) (bool, error) + + // DeferredAction schedules actions to be executed regardless of event processing success. + DeferredAction(ctx context.Context, id string) +} + +// LockBasedEventFilter implements EventFilter using a locking mechanism for event processing. +// It creates advisory locks on event IDs to ensure thread-safe access. +// - Filter acquires a lock on the event ID and returns true if the lock is successful. +// - DeferredAction releases the lock for the event ID. +type LockBasedEventFilter struct { + lockFactory db.LockFactory + // locks map is accessed by a single-threaded handler goroutine, no need for lock on it. + locks map[string]string +} + +func NewLockBasedEventFilter(lockFactory db.LockFactory) EventFilter { + return &LockBasedEventFilter{ + lockFactory: lockFactory, + locks: make(map[string]string), + } +} + +// Filter attempts to acquire a lock on the event ID. Returns true if successful, false and error otherwise. +func (h *LockBasedEventFilter) Filter(ctx context.Context, id string) (bool, error) { + // lock the Event with a fail-fast advisory lock context. + // this allows concurrent processing of many events by one or many controller managers. + // allow the lock to be released by the handler goroutine and allow this function to continue. + // subsequent events will be locked by their own distinct IDs. + lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) + // store the lock owner ID for deferred action + h.locks[id] = lockOwnerID + if err != nil { + return false, fmt.Errorf("error obtaining the event lock: %v", err) + } + + if !acquired { + logger.V(4).Infof("Event %s is processed by another worker", id) + return false, nil + } + + return true, nil +} + +// DeferredAction releases the lock for the given event ID if it was acquired. +func (h *LockBasedEventFilter) DeferredAction(ctx context.Context, id string) { + if ownerID, exists := h.locks[id]; exists { + h.lockFactory.Unlock(ctx, ownerID) + delete(h.locks, id) + } +} + +// eventFilterPredicate is a function type for filtering events based on their ID. +type eventFilterPredicate func(ctx context.Context, eventID string) (bool, error) + +// PredicatedEventFilter implements EventFilter using a predicate function for event filtering. +// - Filter uses the predicate to decide if the event should be processed. +// - DeferredAction is a no-op as no locking is performed. +type PredicatedEventFilter struct { + predicate eventFilterPredicate +} + +func NewPredicatedEventFilter(predicate eventFilterPredicate) EventFilter { + return &PredicatedEventFilter{ + predicate: predicate, + } +} + +// Filter calls the predicate function to determine if the event should be processed. +func (h *PredicatedEventFilter) Filter(ctx context.Context, id string) (bool, error) { + return h.predicate(ctx, id) +} + +// DeferredAction is a no-op since no locks are involved. +func (h *PredicatedEventFilter) DeferredAction(ctx context.Context, id string) { + // no-op +} diff --git a/pkg/controllers/event_filter_test.go b/pkg/controllers/event_filter_test.go new file mode 100644 index 00000000..2bb22e36 --- /dev/null +++ b/pkg/controllers/event_filter_test.go @@ -0,0 +1,150 @@ +package controllers + +import ( + "context" + "testing" + + "github.com/google/uuid" + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao/mocks" + dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" +) + +func TestLockingEventFilter(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + eventFilter := NewLockBasedEventFilter(dbmocks.NewMockAdvisoryLockFactory()) + + _, err := eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + shouldProcess, err := eventFilter.Filter(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + lockingEventFilter, ok := eventFilter.(*LockBasedEventFilter) + Expect(ok).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) + + eventFilter.DeferredAction(ctx, "1") + Expect(lockingEventFilter.locks).To(HaveLen(0)) + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventFilter.Filter(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) + + eventFilter.DeferredAction(ctx, "2") + Expect(lockingEventFilter.locks).To(HaveLen(0)) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventFilter.Filter(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) +} + +func TestPredicatedEventFilter(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + resourcesDao := mocks.NewResourceDao() + eventServer := &exampleEventServer{eventsDao: eventsDao, resourcesDao: resourcesDao, subscrbers: []string{"cluster1"}} + eventFilter := NewPredicatedEventFilter(eventServer.PredicateEvent) + + resID := uuid.New().String() + _, err := resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: resID}, + ConsumerName: "cluster1", + Source: source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: resID, + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + newResID := uuid.New().String() + _, err = resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: newResID}, + ConsumerName: "cluster2", + Source: source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "3"}, + Source: source, + SourceID: newResID, + EventType: api.DeleteEventType, + }) + Expect(err).To(BeNil()) + + // handle event 1 + shouldProcess, err := eventFilter.Filter(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + // call deferred action + eventFilter.DeferredAction(ctx, "1") + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + // handle event 2 + shouldProcess, err = eventFilter.Filter(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).NotTo(BeTrue()) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + // handle event 3 + shouldProcess, err = eventFilter.Filter(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).NotTo(BeTrue()) + + event, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) +} diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go deleted file mode 100644 index 8794da61..00000000 --- a/pkg/controllers/event_handler.go +++ /dev/null @@ -1,177 +0,0 @@ -package controllers - -import ( - "context" - "fmt" - "time" - - "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/dao" - "github.com/openshift-online/maestro/pkg/db" - "github.com/openshift-online/maestro/pkg/services" - "k8s.io/klog/v2" -) - -// EventHandler defines the actions to handle an event at various stages of its lifecycle. -type EventHandler interface { - // ShouldHandleEvent determines whether the event should be processed. - // Returns true if the event should be handled, false and an error otherwise. - ShouldHandleEvent(ctx context.Context, id string) (bool, error) - - // DeferredAction schedules any deferred actions that need to be executed - // after the event is processed successfully or unsuccessfully. - DeferredAction(ctx context.Context, id string) - - // PostProcess is called after the event is processed to perform any cleanup - // or additional actions required for the event. - PostProcess(ctx context.Context, event *api.Event) error -} - -// LockBasedEventHandler is an implementation of EventHandler that uses a locking mechanism to control event processing. -// It leverages a lock factory to create advisory locks for each event ID, ensuring non-blocking, thread-safe access. -// - ShouldHandleEvent acquires the lock for the event ID and returns true if the lock is successful. -// - DeferredAction releases the lock for the event ID. -// - PostProcess updates the event with a reconciled date after processing. -type LockBasedEventHandler struct { - lockFactory db.LockFactory - locks map[string]string - events services.EventService -} - -func NewLockBasedEventHandler(lockFactory db.LockFactory, events services.EventService) EventHandler { - return &LockBasedEventHandler{ - lockFactory: lockFactory, - locks: make(map[string]string), - events: events, - } -} - -func (h *LockBasedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { - // lock the Event with a fail-fast advisory lock context. - // this allows concurrent processing of many events by one or many controller managers. - // allow the lock to be released by the handler goroutine and allow this function to continue. - // subsequent events will be locked by their own distinct IDs. - lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) - // store the lock owner ID for deferred action - h.locks[id] = lockOwnerID - if err != nil { - return false, fmt.Errorf("error obtaining the event lock: %v", err) - } - - if !acquired { - logger.V(4).Infof("Event %s is processed by another worker", id) - return false, nil - } - - return true, nil -} - -func (h *LockBasedEventHandler) DeferredAction(ctx context.Context, id string) { - if ownerID, exists := h.locks[id]; exists { - h.lockFactory.Unlock(ctx, ownerID) - delete(h.locks, id) - } -} - -func (h *LockBasedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { - // update the event with the reconciled date - if event != nil { - now := time.Now() - event.ReconciledDate = &now - if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) - } - } - - return nil -} - -// eventHandlerPredicate is a function type for filtering events based on their ID. -type eventHandlerPredicate func(ctx context.Context, eventID string) (bool, error) - -// PredicatedEventHandler is an implementation of EventHandler that filters events using a predicate function. -// - ShouldHandleEvent uses the predicate to determine if the event should be processed by ID. -// - DeferredAction is a no-op as no locking is performed. -// - PostProcess updates the event with the reconciled date and checks if it's processed by all instances. -// If all instances have processed the event, it marks the event as reconciled. -type PredicatedEventHandler struct { - predicate eventHandlerPredicate - events services.EventService - eventInstanceDao dao.EventInstanceDao - instanceDao dao.InstanceDao -} - -func NewPredicatedEventHandler(predicate eventHandlerPredicate, events services.EventService, eventInstanceDao dao.EventInstanceDao, instanceDao dao.InstanceDao) EventHandler { - return &PredicatedEventHandler{ - predicate: predicate, - events: events, - eventInstanceDao: eventInstanceDao, - instanceDao: instanceDao, - } -} - -func (h *PredicatedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { - return h.predicate(ctx, id) -} - -func (h *PredicatedEventHandler) DeferredAction(ctx context.Context, id string) { - // no-op -} - -func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { - // check the event and alive instances - // if the event is handled by all alive instances, mark the event as reconciled - activeInstances, err := h.instanceDao.FindReadyIDs(ctx) - if err != nil { - return fmt.Errorf("error finding ready instances: %v", err) - } - - processedInstances, err := h.eventInstanceDao.GetInstancesBySpecEventID(ctx, event.ID) - if err != nil { - return fmt.Errorf("error finding processed instances for event %s: %v", event.ID, err) - } - - // should never happen. If the event is not processed by any instance, return an error - if len(processedInstances) == 0 { - klog.V(10).Infof("Event %s is not processed by any instance", event.ID) - return fmt.Errorf("event %s is not processed by any instance", event.ID) - } - - // check if all instances have processed the event - // 1. In normal case, the activeInstances == eventInstances, mark the event as reconciled - // 2. If maestro server instance is up, but has't been marked as ready, then activeInstances < eventInstances, - // it's ok to mark the event as reconciled, as the instance is not ready to sever the request, no connected agents. - // 3. If maestro server instance is down, but has been marked as unready, it may still have connected agents, but - // the instance has stopped to handle the event, so activeInstances > eventInstances, the event should be equeued. - if !isSubSet(activeInstances, processedInstances) { - klog.V(10).Infof("Event %s is not processed by all active instances %v, handled by %v", event.ID, activeInstances, processedInstances) - return fmt.Errorf("event %s is not processed by all active instances", event.ID) - } - - // update the event with the reconciled date - now := time.Now() - event.ReconciledDate = &now - if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) - } - - return nil -} - -// isSubSet checks if slice a is a subset of slice b -func isSubSet(a, b []string) bool { - for _, v := range a { - found := false - for _, vv := range b { - if v == vv { - found = true - break - } - } - if !found { - return false - } - } - - return true -} diff --git a/pkg/controllers/event_handler_test.go b/pkg/controllers/event_handler_test.go deleted file mode 100644 index 8a00002d..00000000 --- a/pkg/controllers/event_handler_test.go +++ /dev/null @@ -1,185 +0,0 @@ -package controllers - -import ( - "context" - "testing" - - . "github.com/onsi/gomega" - "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/dao/mocks" - dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" - "github.com/openshift-online/maestro/pkg/services" -) - -func TestLockingEventHandler(t *testing.T) { - RegisterTestingT(t) - - source := "my-event-source" - ctx := context.Background() - eventsDao := mocks.NewEventDao() - events := services.NewEventService(eventsDao) - eventHandler := NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "1"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "2"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - - lockingEventHandler, ok := eventHandler.(*LockBasedEventHandler) - Expect(ok).To(BeTrue()) - Expect(lockingEventHandler.locks).To(HaveLen(1)) - - eventHandler.DeferredAction(ctx, "1") - Expect(lockingEventHandler.locks).To(HaveLen(0)) - - event, err := eventsDao.Get(ctx, "1") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - err = eventHandler.PostProcess(ctx, event) - Expect(err).To(BeNil()) - - event, err = eventsDao.Get(ctx, "1") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) - - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - Expect(lockingEventHandler.locks).To(HaveLen(1)) - - eventHandler.DeferredAction(ctx, "2") - Expect(lockingEventHandler.locks).To(HaveLen(0)) - - event, err = eventsDao.Get(ctx, "2") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - Expect(lockingEventHandler.locks).To(HaveLen(1)) -} - -func TestPredicatedEventHandler(t *testing.T) { - RegisterTestingT(t) - - currentInstanceID := "test-instance" - anotherInstanceID := "another-instance" - source := "my-event-source" - ctx := context.Background() - eventsDao := mocks.NewEventDao() - events := services.NewEventService(eventsDao) - eventInstancesDao := mocks.NewEventInstanceDaoMock() - instancesDao := mocks.NewInstanceDao() - eventServer := &exampleEventServer{eventDao: eventsDao} - eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) - - // current instance is ready - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: currentInstanceID}, - Ready: true, - }) - - // second instance is not ready - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: anotherInstanceID}, - Ready: false, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "1"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "2"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - // handle event 1 - shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - - _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: "1", - InstanceID: currentInstanceID, - }) - Expect(err).To(BeNil()) - - eventHandler.DeferredAction(ctx, "1") - - // simulate the second instance handled the event, although it has not been marked as ready - _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: "1", - InstanceID: anotherInstanceID, - }) - Expect(err).To(BeNil()) - - event, err := eventsDao.Get(ctx, "1") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - // should post process the event the second instance is not ready - err = eventHandler.PostProcess(ctx, event) - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) - - // mark the second instance as ready - err = instancesDao.MarkReadyByIDs(ctx, []string{anotherInstanceID}) - Expect(err).To(BeNil()) - - // handle event 2 - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - - // simulate the current instance handled the event, the second instance is shutting down - // before it handled the event - _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: "2", - InstanceID: currentInstanceID, - }) - Expect(err).To(BeNil()) - - eventHandler.DeferredAction(ctx, "2") - - event, err = eventsDao.Get(ctx, "2") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - err = eventHandler.PostProcess(ctx, event) - Expect(err).NotTo(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - // mark the second instance as unready - err = instancesDao.MarkUnreadyByIDs(ctx, []string{anotherInstanceID}) - Expect(err).To(BeNil()) - - // simulate requeue the event - err = eventHandler.PostProcess(ctx, event) - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) - - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") - Expect(err).NotTo(BeNil()) - Expect(shouldProcess).To(BeFalse()) -} diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 62c56321..b55819ea 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -44,7 +44,7 @@ var logger = maestrologger.NewOCMLogger(context.Background()) // events sync will help us to handle unexpected errors (e.g. sever restart), it ensures we will not miss any events var defaultEventsSyncPeriod = 10 * time.Hour -type ControllerHandlerFunc func(ctx context.Context, eventID, sourceID string) error +type ControllerHandlerFunc func(ctx context.Context, id string) error type ControllerConfig struct { Source string @@ -52,18 +52,18 @@ type ControllerConfig struct { } type KindControllerManager struct { - controllers map[string]map[api.EventType][]ControllerHandlerFunc - eventHandler EventHandler - events services.EventService - eventsQueue workqueue.RateLimitingInterface + controllers map[string]map[api.EventType][]ControllerHandlerFunc + eventFilter EventFilter + events services.EventService + eventsQueue workqueue.RateLimitingInterface } -func NewKindControllerManager(eventHandler EventHandler, events services.EventService) *KindControllerManager { +func NewKindControllerManager(eventFilter EventFilter, events services.EventService) *KindControllerManager { return &KindControllerManager{ - controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, - eventHandler: eventHandler, - events: events, - eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), + controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, + eventFilter: eventFilter, + events: events, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), } } @@ -110,8 +110,8 @@ func (km *KindControllerManager) handleEvent(id string) error { reqContext := context.WithValue(context.Background(), EventID, id) // check if the event should be processed by this instance - shouldProcess, err := km.eventHandler.ShouldHandleEvent(reqContext, id) - defer km.eventHandler.DeferredAction(reqContext, id) + shouldProcess, err := km.eventFilter.Filter(reqContext, id) + defer km.eventFilter.DeferredAction(reqContext, id) if err != nil { return fmt.Errorf("error filtering event with id (%s): %s", id, err) } @@ -151,13 +151,20 @@ func (km *KindControllerManager) handleEvent(id string) error { } for _, fn := range handlerFns { - err := fn(reqContext, id, event.SourceID) + err := fn(reqContext, event.SourceID) if err != nil { return fmt.Errorf("error handing event %s-%s (%s): %s", event.Source, event.EventType, id, err) } } - return km.eventHandler.PostProcess(reqContext, event) + // all handlers successfully executed + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := km.events.Replace(reqContext, event); svcErr != nil { + return fmt.Errorf("error updating event with id (%s): %s", id, svcErr) + } + + return nil } func (km *KindControllerManager) runWorker() { diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index c91a1374..10aebd35 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -2,14 +2,18 @@ package controllers import ( "context" + "fmt" "testing" + "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/dao/mocks" dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" "github.com/openshift-online/maestro/pkg/services" + "gorm.io/gorm" ) func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { @@ -24,76 +28,61 @@ func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { } type exampleController struct { - instanceID string - eventInstancesDao dao.EventInstanceDao - addCounter int - updateCounter int - deleteCounter int + addCounter int + updateCounter int + deleteCounter int } -func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error { +func (d *exampleController) OnAdd(ctx context.Context, id string) error { d.addCounter++ - _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: d.instanceID, - }) - return err + return nil } -func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error { +func (d *exampleController) OnUpdate(ctx context.Context, id string) error { d.updateCounter++ - _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: d.instanceID, - }) - return err + return nil } -func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error { +func (d *exampleController) OnDelete(ctx context.Context, id string) error { d.deleteCounter++ - _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: d.instanceID, - }) - return err + return nil } -func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { +func TestControllerFrameworkWithLockBasedEventFilter(t *testing.T) { RegisterTestingT(t) ctx := context.Background() eventsDao := mocks.NewEventDao() events := services.NewEventService(eventsDao) - eventInstancesDao := mocks.NewEventInstanceDaoMock() - mgr := NewKindControllerManager(NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events), events) + mgr := NewKindControllerManager(NewLockBasedEventFilter(dbmocks.NewMockAdvisoryLockFactory()), events) - ctrl := &exampleController{ - instanceID: "instance-1", - eventInstancesDao: eventInstancesDao, - } + ctrl := &exampleController{} config := newExampleControllerConfig(ctrl) mgr.Add(config) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err := eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "1"}, Source: config.Source, SourceID: "any id", EventType: api.CreateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "2"}, Source: config.Source, SourceID: "any id", EventType: api.UpdateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "3"}, Source: config.Source, SourceID: "any id", EventType: api.DeleteEventType, }) + Expect(err).To(BeNil()) mgr.handleEvent("1") mgr.handleEvent("2") @@ -103,88 +92,126 @@ func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { Expect(ctrl.updateCounter).To(Equal(1)) Expect(ctrl.deleteCounter).To(Equal(1)) - eve, _ := eventsDao.Get(ctx, "1") + eve, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") - eve, _ = eventsDao.Get(ctx, "2") + eve, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") - eve, _ = eventsDao.Get(ctx, "3") + eve, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") } type exampleEventServer struct { - eventDao dao.EventDao + eventsDao dao.EventDao + resourcesDao dao.ResourceDao + subscrbers []string } func (e *exampleEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { - _, err := e.eventDao.Get(ctx, eventID) + event, err := e.eventsDao.Get(ctx, eventID) + if err != nil { + return false, err + } + resource, err := e.resourcesDao.Get(ctx, event.SourceID) if err != nil { + // 404 == gorm.ErrRecordNotFound means the resource was deleted, so we can ignore the event + if err == gorm.ErrRecordNotFound { + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := e.eventsDao.Replace(ctx, event); svcErr != nil { + return false, fmt.Errorf("failed to update event %s: %s", event.ID, svcErr.Error()) + } + return false, nil + } return false, err } - return true, nil + return contains(e.subscrbers, resource.ConsumerName), nil +} + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false } -func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) { +func TestControllerFrameworkWithPredicatedEventFilter(t *testing.T) { RegisterTestingT(t) - currentInstanceID := "test-instance" - anotherInstanceID := "another-instance" ctx := context.Background() eventsDao := mocks.NewEventDao() + resourcesDao := mocks.NewResourceDao() events := services.NewEventService(eventsDao) - eventServer := &exampleEventServer{eventDao: eventsDao} - eventInstancesDao := mocks.NewEventInstanceDaoMock() - instancesDao := mocks.NewInstanceDao() - eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) - mgr := NewKindControllerManager(eventHandler, events) - - ctrl := &exampleController{ - instanceID: currentInstanceID, - eventInstancesDao: eventInstancesDao, - } + eventServer := &exampleEventServer{eventsDao: eventsDao, resourcesDao: resourcesDao, subscrbers: []string{"cluster1"}} + mgr := NewKindControllerManager(NewPredicatedEventFilter(eventServer.PredicateEvent), events) + + ctrl := &exampleController{} config := newExampleControllerConfig(ctrl) mgr.Add(config) - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: currentInstanceID}, - Ready: true, + resID := uuid.New().String() + _, err := resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: resID}, + ConsumerName: "cluster1", + Source: config.Source, }) + Expect(err).To(BeNil()) - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: anotherInstanceID}, - Ready: false, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "1"}, Source: config.Source, - SourceID: "any id", + SourceID: resID, EventType: api.CreateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "2"}, Source: config.Source, SourceID: "any id", EventType: api.UpdateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + newResID := uuid.New().String() + _, err = resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: newResID}, + ConsumerName: "cluster2", + Source: config.Source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "3"}, Source: config.Source, - SourceID: "any id", + SourceID: newResID, EventType: api.DeleteEventType, }) + Expect(err).To(BeNil()) mgr.handleEvent("1") mgr.handleEvent("2") mgr.handleEvent("3") Expect(ctrl.addCounter).To(Equal(1)) - Expect(ctrl.updateCounter).To(Equal(1)) - Expect(ctrl.deleteCounter).To(Equal(1)) + Expect(ctrl.updateCounter).To(Equal(0)) + Expect(ctrl.deleteCounter).To(Equal(0)) - eve, _ := eventsDao.Get(ctx, "1") + eve, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).To(BeNil(), "event reconcile date should not be set") } diff --git a/pkg/dao/event_instance.go b/pkg/dao/event_instance.go index 687a48ea..7ec1eef5 100644 --- a/pkg/dao/event_instance.go +++ b/pkg/dao/event_instance.go @@ -12,8 +12,8 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) - GetInstancesBySpecEventID(ctx context.Context, eventID string) ([]string, error) - FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) + + FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) } @@ -47,20 +47,7 @@ func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.Eve return eventInstance, nil } -func (d *sqlEventInstanceDao) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { - g2 := (*d.sessionFactory).New(ctx) - var eventInstances []api.EventInstance - if err := g2.Model(&api.EventInstance{}).Where("spec_event_id = ?", specEventID).Find(&eventInstances).Error; err != nil { - return nil, err - } - instanceIDs := make([]string, len(eventInstances)) - for i, eventInstance := range eventInstances { - instanceIDs[i] = eventInstance.InstanceID - } - return instanceIDs, nil -} - -func (d *sqlEventInstanceDao) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { +func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) eventInstances := api.EventInstanceList{} if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil { @@ -83,7 +70,7 @@ func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Conte // consider using join to optimize if err := g2.Table("event_instances"). Select("event_id"). - Where("instance_id IN (?) AND event_id IS NOT NULL", instanceIDs). + Where("instance_id IN ?", instanceIDs). Group("event_id"). Having("COUNT(DISTINCT instance_id) = ?", instanceCount). Scan(&eventIDs).Error; err != nil { diff --git a/pkg/dao/mocks/event.go b/pkg/dao/mocks/event.go index 60833a0d..a7e9559b 100755 --- a/pkg/dao/mocks/event.go +++ b/pkg/dao/mocks/event.go @@ -7,7 +7,6 @@ import ( "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" - "github.com/openshift-online/maestro/pkg/errors" ) var _ dao.EventDao = &eventDaoMock{} @@ -59,7 +58,15 @@ func (d *eventDaoMock) Delete(ctx context.Context, id string) error { } func (d *eventDaoMock) FindByIDs(ctx context.Context, ids []string) (api.EventList, error) { - return nil, errors.NotImplemented("Event").AsError() + filteredEvents := api.EventList{} + for _, id := range ids { + for _, e := range d.events { + if e.ID == id { + filteredEvents = append(filteredEvents, e) + } + } + } + return filteredEvents, nil } func (d *eventDaoMock) All(ctx context.Context) (api.EventList, error) { diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go index 13bf8ce0..e8ebef59 100644 --- a/pkg/dao/mocks/event_instance.go +++ b/pkg/dao/mocks/event_instance.go @@ -42,21 +42,7 @@ func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.Ev return eventInstance, nil } -func (d *eventInstanceDaoMock) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { - d.mux.RLock() - defer d.mux.RUnlock() - - var instanceIDs []string - for _, ei := range d.eventInstances { - if ei.SpecEventID == specEventID { - instanceIDs = append(instanceIDs, ei.InstanceID) - } - } - - return instanceIDs, nil -} - -func (d *eventInstanceDaoMock) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { +func (d *eventInstanceDaoMock) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() diff --git a/test/helper.go b/test/helper.go index 70eab990..331a79e4 100755 --- a/test/helper.go +++ b/test/helper.go @@ -82,7 +82,7 @@ type Helper struct { MetricsServer server.Server HealthCheckServer *server.HealthCheckServer EventServer server.EventServer - EventHandler controllers.EventHandler + EventFilter controllers.EventFilter ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder WorkAgentInformer workv1informers.ManifestWorkInformer @@ -150,10 +150,10 @@ func NewHelper(t *testing.T) *Helper { ) helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) - helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()) + helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory)) } else { helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) - helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory)) + helper.EventFilter = controllers.NewPredicatedEventFilter(helper.EventServer.PredicateEvent) } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -259,7 +259,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - helper.EventHandler, + helper.EventFilter, helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index f5459021..9f4cb965 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -20,9 +20,6 @@ import ( func TestControllerRacing(t *testing.T) { h, _ := test.RegisterIntegration(t) - // sleep for a while to wait for server instance is added - time.Sleep(1 * time.Second) - account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) defer func() { @@ -35,19 +32,18 @@ func TestControllerRacing(t *testing.T) { eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) - eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) // The handler filters the events by source id/type/reconciled, and only record // the event with create type. Due to the event lock, each create event // should be only processed once. var proccessedEvent, processedStatusEvent []string - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { events, err := eventDao.All(ctx) if err != nil { return err } for _, evt := range events { - if evt.SourceID != resourceID { + if evt.SourceID != id { continue } if evt.EventType != api.CreateEventType { @@ -57,15 +53,7 @@ func TestControllerRacing(t *testing.T) { if evt.ReconciledDate != nil { continue } - proccessedEvent = append(proccessedEvent, resourceID) - // add the event instance record - _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: h.Env().Config.MessageBroker.ClientID, - }) - if err != nil { - return err - } + proccessedEvent = append(proccessedEvent, id) } return nil @@ -91,21 +79,23 @@ func TestControllerRacing(t *testing.T) { return nil } - // Start 3 controllers concurrently + // Start 3 controllers concurrently for message queue event server threads := 3 - if h.Broker == "grpc" { - threads = 1 - } + randNum := rand.Intn(3) for i := 0; i < threads; i++ { - // each controller has its own event handler, otherwise, the event lock will block the event processing. - eventHandler := controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()) + // each controller has its own event filter, otherwise, the event lock will block the event processing. + eventFilter := controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory)) if h.Broker == "grpc" { - eventHandler = controllers.NewPredicatedEventHandler(h.EventServer.PredicateEvent, h.Env().Services.Events(), dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), dao.NewInstanceDao(&h.Env().Database.SessionFactory)) + eventFilter = controllers.NewPredicatedEventFilter(func(ctx context.Context, eventID string) (bool, error) { + // simulate the event filter, where the agent randomly connects to a grpc broker instance. + // in theory, only one broker instance should process the event. + return i == randNum, nil + }) } go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - eventHandler, + eventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -168,9 +158,6 @@ func TestControllerRacing(t *testing.T) { func TestControllerReconcile(t *testing.T) { h, _ := test.RegisterIntegration(t) - // sleep for a while to wait for server instance is added - time.Sleep(1 * time.Second) - account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) @@ -180,26 +167,16 @@ func TestControllerReconcile(t *testing.T) { eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) - eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, // and then, the controller will requeue this event, at that time, we handle this event successfully. - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { processedEventTimes = processedEventTimes + 1 if processedEventTimes == 1 { return fmt.Errorf("failed to process the event") } - // add the event instance record - _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: h.Env().Config.MessageBroker.ClientID, - }) - if err != nil { - return err - } - return nil } @@ -219,7 +196,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - h.EventHandler, + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -244,7 +221,7 @@ func TestControllerReconcile(t *testing.T) { s.Start(ctx) }() // wait for the listener to start - time.Sleep(100 * time.Millisecond) + time.Sleep(time.Second) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) resource := h.CreateResource(consumer.Name, deployName, 1) @@ -303,9 +280,6 @@ func TestControllerReconcile(t *testing.T) { func TestControllerSync(t *testing.T) { h, _ := test.RegisterIntegration(t) - // sleep for a while to wait for server instance is added - time.Sleep(1 * time.Second) - account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) @@ -338,7 +312,6 @@ func TestControllerSync(t *testing.T) { } eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) - eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) now := time.Now() if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", SourceID: "resource1", @@ -370,15 +343,10 @@ func TestControllerSync(t *testing.T) { } var proccessedEvents []string - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { // we just record the processed event - proccessedEvents = append(proccessedEvents, resourceID) - // add the event instance record - _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: h.Env().Config.MessageBroker.ClientID, - }) - return err + proccessedEvents = append(proccessedEvents, id) + return nil } // start the controller, once the controller started, it will sync the events: @@ -387,7 +355,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - h.EventHandler, + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -509,7 +477,7 @@ func TestStatusControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - h.EventHandler, + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -543,7 +511,7 @@ func TestStatusControllerSync(t *testing.T) { return fmt.Errorf("should purge the events %s, but got %+v", purged, events) } - eventInstances, err := eventInstanceDao.FindEventInstancesByEventIDs(ctx, purged) + eventInstances, err := eventInstanceDao.FindStatusEvents(ctx, purged) if err != nil { return err } From 29902c22c8c0961dce4e534ddf587950495c30fb Mon Sep 17 00:00:00 2001 From: morvencao Date: Thu, 2 Jan 2025 08:35:56 +0000 Subject: [PATCH 4/7] handle grpc connection error. Signed-off-by: morvencao --- cmd/maestro/server/event_server.go | 4 +-- cmd/maestro/server/grpc_broker.go | 49 ++++++++++++------------------ 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index daaf5767..a8f5f744 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -40,7 +40,7 @@ type EventServer interface { // OnStatusUpdate handles status update events for a resource. OnStatusUpdate(ctx context.Context, eventID, resourceID string) error - // returns true if the event should be processed by the current instance, otherwise false and an error. + // returns true if the event should be processed by the current instance, otherwise false and an error if it occurs. PredicateEvent(ctx context.Context, eventID string) (bool, error) } @@ -148,7 +148,7 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r ) } -// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on lock. +// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on advisory lock. func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { return true, nil } diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index e4d9a655..b7a6b6b5 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -13,7 +13,9 @@ import ( cetypes "github.com/cloudevents/sdk-go/v2/types" "github.com/google/uuid" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "k8s.io/klog/v2" pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1" @@ -29,12 +31,7 @@ import ( "github.com/openshift-online/maestro/pkg/services" ) -// resourceHandler processes a resource spec by encoding it to a CloudEvent and sending it to the subscriber. -// It returns a bool indicating if the connection is closed and an error if one occurs. -// - Returns true and an error if the connection is closed. -// - Returns false and an error if encoding fails. -// - Returns false and nil if successful. -type resourceHandler func(res *api.Resource) (bool, error) +type resourceHandler func(res *api.Resource) error // subscriber defines a subscriber that can receive and handle resource spec. type subscriber struct { @@ -187,45 +184,37 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv return fmt.Errorf("invalid subscription request: missing cluster name") } // register the cluster for subscription to the resource spec - subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) (bool, error) { + subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error { evt, err := encodeResourceSpec(res) if err != nil { // return the error to requeue the event if encoding fails (e.g., due to invalid resource spec). - return false, fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) + return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) } // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf pbEvt := &pbv1.CloudEvent{} if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil { // return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent). - return false, fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err) + return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err) } // send the cloudevent to the subscriber klog.V(4).Infof("sending the event to spec subscribers, %s", evt) if err := subServer.Send(pbEvt); err != nil { klog.Errorf("failed to send grpc event, %v", err) - // Return true to ensure the subscriber will be unregistered when sending fails, which will close the subserver stream. - // See: https://github.com/grpc/grpc-go/blob/b615b35c4feb932a0ac658fb86b7127f10ef664e/stream.go#L1537 for more details. - // Return the error without wrapping, as it contains the gRPC error code and message for future (TODO) handling beyond network issues. - // This will also not requeue the event, as the error will cause the connection to the subscriber to be closed. - // If the subscriber (agent) reconnects, rely on the agent's resync to retrieve the missing resource spec. - return true, err + // Return the error without wrapping, as it includes the gRPC error code and message for further handling. + // For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's + // error channel to unregister the subscriber. + return err } - return false, nil + return nil }) select { case err := <-errChan: - // An error occurred while sending the event to the subscriber. - // This could be due to multiple reasons: - // see: https://grpc.io/docs/guides/error/ - // 1. general errors such as: deadline exceeded before return the response. - // 2. network errors such as: connection closed by intermidiate proxy. - // 3. protocol errors such as: compression error or flow control error. - // In all above cases, unregister the subscriber. - // TODO: unregister the subscriber if the error is a network error and the connection could be re-established. + // When reaching this point, an unrecoverable error occurred while sending the event, + // such as the connection being closed. Unregister the subscriber to trigger agent reconnection. klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err) bkr.unregister(subscriberID) return err @@ -409,13 +398,15 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) error { defer bkr.mu.RUnlock() for _, subscriber := range bkr.subscribers { if subscriber.clusterName == resource.ConsumerName { - if isConnClosed, err := subscriber.handler(resource); err != nil { - if isConnClosed { - // if the connection is closed, write the error to the subscriber's error channel - // to ensure the subscriber is unregistered + if err := subscriber.handler(resource); err != nil { + // check if the error is recoverable. For unrecoverable errors, + // such as a connection closed by an intermediate proxy, push + // the error to subscriber's error channel to unregister the subscriber. + st, ok := status.FromError(err) + if ok && st.Code() == codes.Unavailable { + // TODO: handle more error codes that can't be recovered subscriber.errChan <- err } - // return the error to requeue the event if handling fails. return err } } From eb7c0306a88d89f2512789eacba73cd86dbfd151 Mon Sep 17 00:00:00 2001 From: morvencao Date: Thu, 2 Jan 2025 09:24:06 +0000 Subject: [PATCH 5/7] no need to requeue if resource is not found. Signed-off-by: morvencao --- cmd/maestro/server/grpc_broker.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index b7a6b6b5..5b6fd25b 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -418,6 +418,10 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) error { func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { resource, err := bkr.resourceService.Get(ctx, id) if err != nil { + // if the resource is not found, it indicates the resource has been processed. + if err.Is404() { + return nil + } return err } @@ -428,6 +432,10 @@ func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { resource, err := bkr.resourceService.Get(ctx, id) if err != nil { + // if the resource is not found, it indicates the resource has been processed. + if err.Is404() { + return nil + } return err } @@ -438,6 +446,10 @@ func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { resource, err := bkr.resourceService.Get(ctx, id) if err != nil { + // if the resource is not found, it indicates the resource has been processed. + if err.Is404() { + return nil + } return err } From 5ed379f726fc171f347bf2a27a3e08ecdd7353d5 Mon Sep 17 00:00:00 2001 From: morvencao Date: Tue, 7 Jan 2025 02:41:11 +0000 Subject: [PATCH 6/7] address comment for predicate event. Signed-off-by: morvencao --- cmd/maestro/server/grpc_broker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 5b6fd25b..eff5bf39 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -492,6 +492,11 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool // if the resource is not found, it indicates the resource has been handled by other instances. if svcErr.Is404() { klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) + now := time.Now() + evt.ReconciledDate = &now + if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { + return false, fmt.Errorf("failed to mark event with id (%s) as reconciled: %s", evt.ID, svcErr) + } return false, nil } return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) From 9a92eab846a391fae95c5fda4339797098943411 Mon Sep 17 00:00:00 2001 From: morvencao Date: Tue, 7 Jan 2025 08:44:00 +0000 Subject: [PATCH 7/7] enhance resource update racing testing. Signed-off-by: morvencao --- test/integration/resource_test.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/test/integration/resource_test.go b/test/integration/resource_test.go index a66f07a8..458aa57a 100755 --- a/test/integration/resource_test.go +++ b/test/integration/resource_test.go @@ -519,6 +519,17 @@ func TestUpdateResourceWithRacingRequests(t *testing.T) { account := h.NewRandAccount() ctx := h.NewAuthenticatedContext(account) + var objids []string + rows, err := h.DBFactory.DirectDB().Query("SELECT objid FROM pg_locks WHERE locktype='advisory'") + Expect(err).NotTo(HaveOccurred(), "Error querying pg_locks: %v", err) + for rows.Next() { + var objid string + Expect(rows.Scan(&objid)).NotTo(HaveOccurred(), "Error scanning pg_locks value: %v", err) + objids = append(objids, objid) + } + rows.Close() + time.Sleep(time.Second) + consumer := h.CreateConsumer("cluster-" + rand.String(5)) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) res := h.CreateResource(consumer.Name, deployName, 1) @@ -561,11 +572,17 @@ func TestUpdateResourceWithRacingRequests(t *testing.T) { // the resource patch request is protected by the advisory lock, so there should only be one update Expect(updatedCount).To(Equal(1)) - // all the locks should be released finally + // ensure the locks for current test are released + query := fmt.Sprintf("select count(*) from pg_locks where locktype='advisory' and objid not in (%s)", strings.Join(objids, ",")) + if len(objids) == 0 { + query = "select count(*) from pg_locks where locktype='advisory'" + } + + // ensure the locks for current test are released finally Eventually(func() error { var count int err := h.DBFactory.DirectDB(). - QueryRow("select count(*) from pg_locks where locktype='advisory';"). + QueryRow(query). Scan(&count) Expect(err).NotTo(HaveOccurred(), "Error querying pg_locks: %v", err)