diff --git a/.gitignore b/.gitignore index af4e2dc4..c9bc3a71 100755 --- a/.gitignore +++ b/.gitignore @@ -59,5 +59,5 @@ test/e2e/.consumer_name test/e2e/.external_host_ip test/e2e/report/* unit-test-results.json -integration-test-results.json +*integration-test-results.json test/e2e/setup/aro/aro-hcp diff --git a/Makefile b/Makefile index 90ad5c2e..08343446 100755 --- a/Makefile +++ b/Makefile @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18 # Test output files unit_test_json_output ?= ${PWD}/unit-test-results.json -integration_test_json_output ?= ${PWD}/integration-test-results.json +mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json +grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json # Prints a list of useful targets. help: @@ -218,11 +219,19 @@ test: # make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc. # make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet # make test-integration TESTFLAGS="-short" skips long-run tests -test-integration: - OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ - ./test/integration +test-integration: test-integration-mqtt test-integration-grpc .PHONY: test-integration +test-integration-mqtt: + BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \ + ./test/integration +.PHONY: test-integration-mqtt + +test-integration-grpc: + BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \ + ./test/integration +.PHONY: test-integration-grpc + # Regenerate openapi client and models generate: rm -rf pkg/api/openapi diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 6daa7cec..4bdc72ff 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -488,9 +488,19 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool if err != nil { return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error()) } - resource, err := bkr.resourceService.Get(ctx, evt.SourceID) - if err != nil { - return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error()) + + // fast return if the event is already reconciled + if evt.ReconciledDate != nil { + return false, nil + } + + resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) + if svcErr != nil { + // if the resource is not found, return false to indicate the event should be ignored + if svcErr.Is404() { + return false, nil + } + return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) } if bkr.IsConsumerSubscribed(resource.ConsumerName) { diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go index c41a2d8f..51ab6b5d 100644 --- a/pkg/controllers/event_handler.go +++ b/pkg/controllers/event_handler.go @@ -134,7 +134,7 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve // check if all instances have processed the event if !compareStrings(activeInstances, eventInstances) { klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances) - return fmt.Errorf("event %s is not processed by all instances", event.ID) + return nil } // update the event with the reconciled date diff --git a/test/helper.go b/test/helper.go index 3c1f1719..0fd93af7 100755 --- a/test/helper.go +++ b/test/helper.go @@ -24,6 +24,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -71,8 +72,8 @@ type Helper struct { Ctx context.Context ContextCancelFunc context.CancelFunc + Broker string EventBroadcaster *event.EventBroadcaster - StatusDispatcher dispatcher.Dispatcher Store *MemoryStore GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource] DBFactory db.SessionFactory @@ -81,6 +82,7 @@ type Helper struct { MetricsServer server.Server HealthCheckServer *server.HealthCheckServer EventServer server.EventServer + EventHandler controllers.EventHandler ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder WorkAgentInformer workv1informers.ManifestWorkInformer @@ -111,6 +113,12 @@ func NewHelper(t *testing.T) *Helper { } pflag.Parse() + // Set the message broker type if it is set in the environment + broker := os.Getenv("BROKER") + if broker != "" { + env.Config.MessageBroker.MessageBrokerType = broker + } + err = env.Initialize() if err != nil { klog.Fatalf("Unable to initialize testing environment: %s", err.Error()) @@ -120,13 +128,26 @@ func NewHelper(t *testing.T) *Helper { helper = &Helper{ Ctx: ctx, ContextCancelFunc: cancel, + Broker: env.Config.MessageBroker.MessageBrokerType, EventBroadcaster: event.NewEventBroadcaster(), - StatusDispatcher: dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), - dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig), - AppConfig: env.Config, - DBFactory: env.Database.SessionFactory, - JWTPrivateKey: jwtKey, - JWTCA: jwtCA, + AppConfig: env.Config, + DBFactory: env.Database.SessionFactory, + JWTPrivateKey: jwtKey, + JWTCA: jwtCA, + } + + // Set the healthcheck interval to 1 second for testing + helper.Env().Config.HealthCheck.HeartbeartInterval = 1 + helper.HealthCheckServer = server.NewHealthCheckServer() + + if helper.Broker == "mqtt" { + statusDispatcher := dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig) + helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) + helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()) + } else { + helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) + helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory)) } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -197,9 +218,6 @@ func (helper *Helper) stopMetricsServer() error { } func (helper *Helper) startHealthCheckServer(ctx context.Context) { - helper.Env().Config.HealthCheck.HeartbeartInterval = 1 - helper.HealthCheckServer = server.NewHealthCheckServer() - helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher) go func() { klog.V(10).Info("Test health check server started") helper.HealthCheckServer.Start(ctx) @@ -213,8 +231,6 @@ func (helper *Helper) sendShutdownSignal() error { } func (helper *Helper) startEventServer(ctx context.Context) { - // helper.Env().Config.EventServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) @@ -233,7 +249,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()), + helper.EventHandler, helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -259,10 +275,19 @@ func (helper *Helper) StartControllerManager(ctx context.Context) { } func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bundle bool) { - // initilize the mqtt options - mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) - if err != nil { - klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + var brokerConfig any + if helper.Broker == "mqtt" { + // initilize the mqtt options + mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig) + if err != nil { + klog.Fatalf("Unable to build MQTT options: %s", err.Error()) + } + brokerConfig = mqttOptions + } else { + // initilize the grpc options + grpcOptions := grpc.NewGRPCOptions() + grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort) + brokerConfig = grpcOptions } var workCodec generic.Codec[*workv1.ManifestWork] @@ -274,7 +299,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu watcherStore := store.NewAgentInformerWatcherStore() - clientHolder, err := work.NewClientHolderBuilder(mqttOptions). + clientHolder, err := work.NewClientHolderBuilder(brokerConfig). WithClientID(clusterName). WithClusterName(clusterName). WithCodecs(workCodec). diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 541242ab..dbee2aad 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/api" @@ -19,14 +20,22 @@ import ( func TestControllerRacing(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) defer func() { cancel() }() + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) // The handler filters the events by source id/type/reconciled, and only record // the event with create type. Due to the event lock, each create event @@ -49,6 +58,14 @@ func TestControllerRacing(t *testing.T) { continue } proccessedEvent = append(proccessedEvent, resourceID) + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + if err != nil { + return err + } } return nil @@ -76,11 +93,19 @@ func TestControllerRacing(t *testing.T) { // Start 3 controllers concurrently threads := 3 + if h.Broker == "grpc" { + threads = 1 + } for i := 0; i < threads; i++ { + // each controller has its own event handler, otherwise, the event lock will block the event processing. + eventHandler := controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()) + if h.Broker == "grpc" { + eventHandler = controllers.NewPredicatedEventHandler(h.EventServer.PredicateEvent, h.Env().Services.Events(), dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), dao.NewInstanceDao(&h.Env().Database.SessionFactory)) + } go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + eventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -105,7 +130,6 @@ func TestControllerRacing(t *testing.T) { // wait for controller service starts time.Sleep(3 * time.Second) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) resources := h.CreateResourceList(consumer.Name, 50) // This is to check only 50 create events are processed. It waits for 5 seconds to ensure all events have been @@ -142,11 +166,19 @@ func TestControllerRacing(t *testing.T) { func TestControllerReconcile(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, @@ -157,6 +189,15 @@ func TestControllerReconcile(t *testing.T) { return fmt.Errorf("failed to process the event") } + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + if err != nil { + return err + } + return nil } @@ -176,7 +217,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -201,7 +242,6 @@ func TestControllerReconcile(t *testing.T) { // wait for the listener to start time.Sleep(100 * time.Millisecond) - consumer := h.CreateConsumer("cluster-" + rand.String(5)) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) resource := h.CreateResource(consumer.Name, deployName, 1) @@ -259,11 +299,42 @@ func TestControllerReconcile(t *testing.T) { func TestControllerSync(t *testing.T) { h, _ := test.RegisterIntegration(t) + // sleep for a while to wait for server instance is added + time.Sleep(1 * time.Second) + account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) - eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + // start work agent so that grpc broker can work + consumer := h.CreateConsumer("cluster-" + rand.String(5)) + h.StartWorkAgent(ctx, consumer.Name, false) + + // create two resources with resource dao + resource4ID := uuid.New().String() + resourceDao := dao.NewResourceDao(&h.Env().Database.SessionFactory) + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource4ID, + }, + ConsumerName: consumer.Name, + Name: "resource4", + }); err != nil { + t.Fatal(err) + } + + resource5ID := uuid.New().String() + if _, err := resourceDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ + ID: resource5ID, + }, + ConsumerName: consumer.Name, + Name: "resource5", + }); err != nil { + t.Fatal(err) + } + eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) now := time.Now() if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", SourceID: "resource1", @@ -284,12 +355,12 @@ func TestControllerSync(t *testing.T) { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource4", + SourceID: resource4ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", - SourceID: "resource5", + SourceID: resource5ID, EventType: api.UpdateEventType}); err != nil { t.Fatal(err) } @@ -298,7 +369,12 @@ func TestControllerSync(t *testing.T) { onUpsert := func(ctx context.Context, eventID, resourceID string) error { // we just record the processed event proccessedEvents = append(proccessedEvents, resourceID) - return nil + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: h.Env().Config.MessageBroker.ClientID, + }) + return err } // start the controller, once the controller started, it will sync the events: @@ -307,7 +383,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()), + h.EventHandler, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController(