diff --git a/.gitignore b/.gitignore index af4e2dc4..c9bc3a71 100755 --- a/.gitignore +++ b/.gitignore @@ -59,5 +59,5 @@ test/e2e/.consumer_name test/e2e/.external_host_ip test/e2e/report/* unit-test-results.json -integration-test-results.json +*integration-test-results.json test/e2e/setup/aro/aro-hcp diff --git a/Makefile b/Makefile index 90ad5c2e..08343446 100755 --- a/Makefile +++ b/Makefile @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18 # Test output files unit_test_json_output ?= ${PWD}/unit-test-results.json -integration_test_json_output ?= ${PWD}/integration-test-results.json +mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json +grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json # Prints a list of useful targets. help: @@ -218,11 +219,19 @@ test: # make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc. # make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet # make test-integration TESTFLAGS="-short" skips long-run tests -test-integration: - OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ - ./test/integration +test-integration: test-integration-mqtt test-integration-grpc .PHONY: test-integration +test-integration-mqtt: + BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ + ./test/integration +.PHONY: test-integration-mqtt + +test-integration-grpc: + BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \ + ./test/integration +.PHONY: test-integration-grpc + # Regenerate openapi client and models generate: rm -rf pkg/api/openapi diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 6a20ce2c..d1574f1d 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -200,6 +200,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv // TODO: error handling to address errors beyond network issues. if err := subServer.Send(pbEvt); err != nil { klog.Errorf("failed to send grpc event, %v", err) + return err // return error to requeue the spec event } return nil @@ -397,8 +398,8 @@ func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resou // add the event instance record to mark the event has been processed by the current instance if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: bkr.instanceID, + SpecEventID: eventID, + InstanceID: bkr.instanceID, }); err != nil { return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) } @@ -461,9 +462,26 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool if err != nil { return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) } - resource, err := bkr.resourceService.Get(ctx, evt.SourceID) - if err != nil { - return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error()) + + // fast return if the event is already reconciled + if evt.ReconciledDate != nil { + return false, nil + } + + resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) + if svcErr != nil { + // if the resource is not found, it indicates the resource has been handled by + // other instances, so we can mark the event as reconciled and ignore it. + if svcErr.Is404() { + klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) + now := time.Now() + evt.ReconciledDate = &now + if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { + return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error()) + } + return false, nil + } + return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) } if bkr.IsConsumerSubscribed(resource.ConsumerName) { @@ -473,8 +491,8 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool // if the consumer is not subscribed to the broker, then add the event instance record // to indicate the event has been processed by the instance if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: bkr.instanceID, + SpecEventID: eventID, + InstanceID: bkr.instanceID, }); err != nil { return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) } diff --git a/pkg/api/event_instances.go b/pkg/api/event_instances.go index 413147e8..9fc757ff 100644 --- a/pkg/api/event_instances.go +++ b/pkg/api/event_instances.go @@ -1,8 +1,9 @@ package api type EventInstance struct { - EventID string - InstanceID string + EventID string + SpecEventID string + InstanceID string } type EventInstanceList []*EventInstance diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go index c41a2d8f..8794da61 100644 --- a/pkg/controllers/event_handler.go +++ b/pkg/controllers/event_handler.go @@ -126,15 +126,26 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve return fmt.Errorf("error finding ready instances: %v", err) } - eventInstances, err := h.eventInstanceDao.GetInstancesByEventID(ctx, event.ID) + processedInstances, err := h.eventInstanceDao.GetInstancesBySpecEventID(ctx, event.ID) if err != nil { - return fmt.Errorf("error finding processed server instances for event %s: %v", event.ID, err) + return fmt.Errorf("error finding processed instances for event %s: %v", event.ID, err) + } + + // should never happen. If the event is not processed by any instance, return an error + if len(processedInstances) == 0 { + klog.V(10).Infof("Event %s is not processed by any instance", event.ID) + return fmt.Errorf("event %s is not processed by any instance", event.ID) } // check if all instances have processed the event - if !compareStrings(activeInstances, eventInstances) { - klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances) - return fmt.Errorf("event %s is not processed by all instances", event.ID) + // 1. In normal case, the activeInstances == eventInstances, mark the event as reconciled + // 2. If maestro server instance is up, but has't been marked as ready, then activeInstances < eventInstances, + // it's ok to mark the event as reconciled, as the instance is not ready to sever the request, no connected agents. + // 3. If maestro server instance is down, but has been marked as unready, it may still have connected agents, but + // the instance has stopped to handle the event, so activeInstances > eventInstances, the event should be equeued. + if !isSubSet(activeInstances, processedInstances) { + klog.V(10).Infof("Event %s is not processed by all active instances %v, handled by %v", event.ID, activeInstances, processedInstances) + return fmt.Errorf("event %s is not processed by all active instances", event.ID) } // update the event with the reconciled date @@ -147,12 +158,8 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve return nil } -// compareStrings compares two string slices and returns true if they are equal -func compareStrings(a, b []string) bool { - if len(a) != len(b) { - return false - } - +// isSubSet checks if slice a is a subset of slice b +func isSubSet(a, b []string) bool { for _, v := range a { found := false for _, vv := range b { diff --git a/pkg/controllers/event_handler_test.go b/pkg/controllers/event_handler_test.go index ebcd4a58..4b5fb8e5 100644 --- a/pkg/controllers/event_handler_test.go +++ b/pkg/controllers/event_handler_test.go @@ -116,8 +116,8 @@ func TestPredicatedEventHandler(t *testing.T) { Expect(shouldProcess).To(BeTrue()) _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: "1", - InstanceID: currentInstanceID, + SpecEventID: "1", + InstanceID: currentInstanceID, }) Expect(err).To(BeNil()) diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index 620674a1..ba5ecb4f 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -34,8 +34,8 @@ type exampleController struct { func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error { d.addCounter++ _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: d.instanceID, + SpecEventID: eventID, + InstanceID: d.instanceID, }) return err } @@ -43,8 +43,8 @@ func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID strin func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error { d.updateCounter++ _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: d.instanceID, + SpecEventID: eventID, + InstanceID: d.instanceID, }) return err } @@ -52,8 +52,8 @@ func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID st func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error { d.deleteCounter++ _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: d.instanceID, + SpecEventID: eventID, + InstanceID: d.instanceID, }) return err } diff --git a/pkg/dao/event_instance.go b/pkg/dao/event_instance.go index 53066ffa..c12e01d5 100644 --- a/pkg/dao/event_instance.go +++ b/pkg/dao/event_instance.go @@ -11,10 +11,9 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) - GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) - - FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) + GetInstancesBySpecEventID(ctx context.Context, eventID string) ([]string, error) + FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) } @@ -39,10 +38,19 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin return &eventInstance, nil } -func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { +func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { + db.MarkForRollback(ctx, err) + return nil, err + } + return eventInstance, nil +} + +func (d *sqlEventInstanceDao) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { g2 := (*d.sessionFactory).New(ctx) var eventInstances []api.EventInstance - if err := g2.Model(&api.EventInstance{}).Where("event_id = ?", eventID).Find(&eventInstances).Error; err != nil { + if err := g2.Model(&api.EventInstance{}).Where("spec_event_id = ?", specEventID).Find(&eventInstances).Error; err != nil { return nil, err } instanceIDs := make([]string, len(eventInstances)) @@ -52,16 +60,7 @@ func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID return instanceIDs, nil } -func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { - g2 := (*d.sessionFactory).New(ctx) - if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { - db.MarkForRollback(ctx, err) - return nil, err - } - return eventInstance, nil -} - -func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { +func (d *sqlEventInstanceDao) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) eventInstances := api.EventInstanceList{} if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil { diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index 1d5fe1f4..8be26a3c 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -19,7 +19,6 @@ type InstanceDao interface { Delete(ctx context.Context, id string) error DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) - FindReadyIDs(ctx context.Context) ([]string, error) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) FindReadyIDs(ctx context.Context) ([]string, error) All(ctx context.Context) (api.ServerInstanceList, error) @@ -112,19 +111,6 @@ func (d *sqlInstanceDao) FindByIDs(ctx context.Context, ids []string) (api.Serve return instances, nil } -func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) { - g2 := (*d.sessionFactory).New(ctx) - instances := api.ServerInstanceList{} - if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil { - return nil, err - } - ids := make([]string, len(instances)) - for i, instance := range instances { - ids[i] = instance.ID - } - return ids, nil -} - func (d *sqlInstanceDao) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) instances := api.ServerInstanceList{} diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go index 34a1468a..52b99cbf 100644 --- a/pkg/dao/mocks/event_instance.go +++ b/pkg/dao/mocks/event_instance.go @@ -33,13 +33,22 @@ func (d *eventInstanceDaoMock) Get(ctx context.Context, eventID, instanceID stri return nil, fmt.Errorf("event instance not found") } -func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) { +func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + d.mux.Lock() + defer d.mux.Unlock() + + d.eventInstances = append(d.eventInstances, eventInstance) + + return eventInstance, nil +} + +func (d *eventInstanceDaoMock) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { d.mux.RLock() defer d.mux.RUnlock() var instanceIDs []string for _, ei := range d.eventInstances { - if ei.EventID == eventID { + if ei.SpecEventID == specEventID { instanceIDs = append(instanceIDs, ei.InstanceID) } } @@ -47,11 +56,32 @@ func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventI return instanceIDs, nil } -func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { - d.mux.Lock() - defer d.mux.Unlock() +func (d *eventInstanceDaoMock) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { + d.mux.RLock() + defer d.mux.RUnlock() - d.eventInstances = append(d.eventInstances, eventInstance) + var eventInstances api.EventInstanceList + for _, id := range ids { + for _, ei := range d.eventInstances { + if ei.EventID == id { + eventInstances = append(eventInstances, ei) + } + } + } - return eventInstance, nil + return eventInstances, nil +} + +func (d *eventInstanceDaoMock) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + var eventIDs []string + for _, ei := range d.eventInstances { + if contains(instanceIDs, ei.InstanceID) { + eventIDs = append(eventIDs, ei.EventID) + } + } + + return eventIDs, nil } diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index 75dda50e..12498774 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -124,18 +124,6 @@ func (d *instanceDaoMock) FindByIDs(ctx context.Context, ids []string) (api.Serv return nil, errors.NotImplemented("Instance").AsError() } -func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { - d.mux.RLock() - defer d.mux.RUnlock() - ids := make([]string, 0, len(d.instances)) - for _, instance := range d.instances { - if instance.Ready { - ids = append(ids, instance.ID) - } - } - return ids, nil -} - func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() @@ -151,8 +139,7 @@ func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime tim func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { d.mux.RLock() defer d.mux.RUnlock() - - ids := []string{} + ids := make([]string, 0, len(d.instances)) for _, instance := range d.instances { if instance.Ready { ids = append(ids, instance.ID) diff --git a/test/helper.go b/test/helper.go index 062da815..6ee8dc21 100755 --- a/test/helper.go +++ b/test/helper.go @@ -24,6 +24,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -71,8 +72,8 @@ type Helper struct { Ctx context.Context ContextCancelFunc context.CancelFunc + Broker string EventBroadcaster *event.EventBroadcaster - StatusDispatcher dispatcher.Dispatcher Store *MemoryStore GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource] DBFactory db.SessionFactory @@ -81,6 +82,7 @@ type Helper struct { MetricsServer server.Server HealthCheckServer *server.HealthCheckServer EventServer server.EventServer + EventHandler controllers.EventHandler ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder WorkAgentInformer workv1informers.ManifestWorkInformer @@ -111,6 +113,12 @@ func NewHelper(t *testing.T) *Helper { } pflag.Parse() + // Set the message broker type if it is set in the environment + broker := os.Getenv("BROKER") + if broker != "" { + env.Config.MessageBroker.MessageBrokerType = broker + } + err = env.Initialize() if err != nil { klog.Fatalf("Unable to initialize testing environment: %s", err.Error()) @@ -120,18 +128,32 @@ func NewHelper(t *testing.T) *Helper { helper = &Helper{ Ctx: ctx, ContextCancelFunc: cancel, + Broker: env.Config.MessageBroker.MessageBrokerType, EventBroadcaster: event.NewEventBroadcaster(), - StatusDispatcher: dispatcher.NewHashDispatcher( + AppConfig: env.Config, + DBFactory: env.Database.SessionFactory, + JWTPrivateKey: jwtKey, + JWTCA: jwtCA, + } + + // Set the healthcheck interval to 1 second for testing + helper.Env().Config.HealthCheck.HeartbeartInterval = 1 + helper.HealthCheckServer = server.NewHealthCheckServer() + + if helper.Broker == "mqtt" { + statusDispatcher := dispatcher.NewHashDispatcher( helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig, - ), - AppConfig: env.Config, - DBFactory: env.Database.SessionFactory, - JWTPrivateKey: jwtKey, - JWTCA: jwtCA, + ) + helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) + helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()) + } else { + helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) + helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory)) } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -206,9 +228,6 @@ func (helper *Helper) stopMetricsServer() error { } func (helper *Helper) startHealthCheckServer(ctx context.Context) { - helper.Env().Config.HealthCheck.HeartbeartInterval = 1 - helper.HealthCheckServer = server.NewHealthCheckServer() - helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher) go func() { klog.V(10).Info("Test health check server started") helper.HealthCheckServer.Start(ctx) @@ -222,8 +241,6 @@ func (helper *Helper) sendShutdownSignal() error { } func (helper *Helper) startEventServer(ctx context.Context) { - // helper.Env().Config.EventServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) @@ -242,7 +259,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()), + helper.EventHandler, helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -270,10 +287,19 @@ func (helper *Helper) StartControllerManager(ctx context.Context) { } func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bundle bool) { - // initilize the mqtt options - mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) - if err != nil { - klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + var brokerConfig any + if helper.Broker == "mqtt" { + // initilize the mqtt options + mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) + if err != nil { + klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + } + brokerConfig = mqttOptions + } else { + // initilize the grpc options + grpcOptions := grpc.NewGRPCOptions() + grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort) + brokerConfig = grpcOptions } var workCodec generic.Codec[*workv1.ManifestWork] @@ -285,7 +311,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu watcherStore := store.NewAgentInformerWatcherStore() - clientHolder, err := work.NewClientHolderBuilder(mqttOptions). + clientHolder, err := work.NewClientHolderBuilder(brokerConfig). WithClientID(clusterName). WithClusterName(clusterName). WithCodecs(workCodec). diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 49d0e03f..f5459021 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/api" @@ -19,14 +20,22 @@ import ( func TestControllerRacing(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) defer func() { cancel() }() + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) // The handler filters the events by source id/type/reconciled, and only record // the event with create type. Due to the event lock, each create event @@ -49,6 +58,14 @@ func TestControllerRacing(t *testing.T) { continue } proccessedEvent = append(proccessedEvent, resourceID) + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + SpecEventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + if err != nil { + return err + } } return nil @@ -76,11 +93,19 @@ func TestControllerRacing(t *testing.T) { // Start 3 controllers concurrently threads := 3 + if h.Broker == "grpc" { + threads = 1 + } for i := 0; i < threads; i++ { + // each controller has its own event handler, otherwise, the event lock will block the event processing. + eventHandler := controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()) + if h.Broker == "grpc" { + eventHandler = controllers.NewPredicatedEventHandler(h.EventServer.PredicateEvent, h.Env().Services.Events(), dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), dao.NewInstanceDao(&h.Env().Database.SessionFactory)) + } go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + eventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -107,7 +132,6 @@ func TestControllerRacing(t *testing.T) { // wait for controller service starts time.Sleep(3 * time.Second) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) resources := h.CreateResourceList(consumer.Name, 50) // This is to check only 50 create events are processed. It waits for 5 seconds to ensure all events have been @@ -144,11 +168,19 @@ func TestControllerRacing(t *testing.T) { func TestControllerReconcile(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, @@ -159,6 +191,15 @@ func TestControllerReconcile(t *testing.T) { return fmt.Errorf("failed to process the event") } + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + SpecEventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + if err != nil { + return err + } + return nil } @@ -178,7 +219,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -205,7 +246,6 @@ func TestControllerReconcile(t *testing.T) { // wait for the listener to start time.Sleep(100 * time.Millisecond) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) resource := h.CreateResource(consumer.Name, deployName, 1) @@ -263,11 +303,42 @@ func TestControllerReconcile(t *testing.T) { func TestControllerSync(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) - eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + + // create two resources with resource dao + resource4ID := uuid.New().String() + resourceDao := dao.NewResourceDao(&h.Env().Database.SessionFactory) + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource4ID, + }, + ConsumerName: consumer.Name, + Name: "resource4", + }); err != nil { + t.Fatal(err) + } + resource5ID := uuid.New().String() + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource5ID, + }, + ConsumerName: consumer.Name, + Name: "resource5", + }); err != nil { + t.Fatal(err) + } + + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) now := time.Now() if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", SourceID: "resource1", @@ -288,12 +359,12 @@ func TestControllerSync(t *testing.T) { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource4", + SourceID: resource4ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource5", + SourceID: resource5ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } @@ -302,7 +373,12 @@ func TestControllerSync(t *testing.T) { onUpsert := func(ctx context.Context, eventID, resourceID string) error { // we just record the processed event proccessedEvents = append(proccessedEvents, resourceID) - return nil + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + SpecEventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + return err } // start the controller, once the controller started, it will sync the events: @@ -311,7 +387,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -433,7 +509,7 @@ func TestStatusControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -467,7 +543,7 @@ func TestStatusControllerSync(t *testing.T) { return fmt.Errorf("should purge the events %s, but got %+v", purged, events) } - eventInstances, err := eventInstanceDao.FindStatusEvents(ctx, purged) + eventInstances, err := eventInstanceDao.FindEventInstancesByEventIDs(ctx, purged) if err != nil { return err }