Skip to content

Commit

Permalink
add grpc broker integration test.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 18, 2024
1 parent d7e9cfe commit 1ce55ab
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ test/e2e/.consumer_name
test/e2e/.external_host_ip
test/e2e/report/*
unit-test-results.json
integration-test-results.json
*integration-test-results.json
test/e2e/setup/aro/aro-hcp
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
22 changes: 19 additions & 3 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,25 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}
resource, err := bkr.resourceService.Get(ctx, evt.SourceID)
if err != nil {
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error())

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by
// other instances, so we can mark the event as reconciled and ignore it.
if svcErr.Is404() {
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error())
}
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve
// check if all instances have processed the event
if !compareStrings(activeInstances, eventInstances) {
klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances)
return fmt.Errorf("event %s is not processed by all instances", event.ID)
return nil
}

// update the event with the reconciled date
Expand Down
61 changes: 43 additions & 18 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
Expand Down Expand Up @@ -71,8 +72,8 @@ type Helper struct {
Ctx context.Context
ContextCancelFunc context.CancelFunc

Broker string
EventBroadcaster *event.EventBroadcaster
StatusDispatcher dispatcher.Dispatcher
Store *MemoryStore
GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource]
DBFactory db.SessionFactory
Expand All @@ -81,6 +82,7 @@ type Helper struct {
MetricsServer server.Server
HealthCheckServer *server.HealthCheckServer
EventServer server.EventServer
EventHandler controllers.EventHandler
ControllerManager *server.ControllersServer
WorkAgentHolder *work.ClientHolder
WorkAgentInformer workv1informers.ManifestWorkInformer
Expand Down Expand Up @@ -111,6 +113,12 @@ func NewHelper(t *testing.T) *Helper {
}
pflag.Parse()

// Set the message broker type if it is set in the environment
broker := os.Getenv("BROKER")
if broker != "" {
env.Config.MessageBroker.MessageBrokerType = broker
}

err = env.Initialize()
if err != nil {
klog.Fatalf("Unable to initialize testing environment: %s", err.Error())
Expand All @@ -120,13 +128,26 @@ func NewHelper(t *testing.T) *Helper {
helper = &Helper{
Ctx: ctx,
ContextCancelFunc: cancel,
Broker: env.Config.MessageBroker.MessageBrokerType,
EventBroadcaster: event.NewEventBroadcaster(),
StatusDispatcher: dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory),
dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig),
AppConfig: env.Config,
DBFactory: env.Database.SessionFactory,
JWTPrivateKey: jwtKey,
JWTCA: jwtCA,
AppConfig: env.Config,
DBFactory: env.Database.SessionFactory,
JWTPrivateKey: jwtKey,
JWTCA: jwtCA,
}

// Set the healthcheck interval to 1 second for testing
helper.Env().Config.HealthCheck.HeartbeartInterval = 1
helper.HealthCheckServer = server.NewHealthCheckServer()

if helper.Broker == "mqtt" {
statusDispatcher := dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig)
helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher)
helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events())
} else {
helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster)
helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory))
}

// TODO jwk mock server needs to be refactored out of the helper and into the testing environment
Expand Down Expand Up @@ -197,9 +218,6 @@ func (helper *Helper) stopMetricsServer() error {
}

func (helper *Helper) startHealthCheckServer(ctx context.Context) {
helper.Env().Config.HealthCheck.HeartbeartInterval = 1
helper.HealthCheckServer = server.NewHealthCheckServer()
helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher)
go func() {
klog.V(10).Info("Test health check server started")
helper.HealthCheckServer.Start(ctx)
Expand All @@ -213,8 +231,6 @@ func (helper *Helper) sendShutdownSignal() error {
}

func (helper *Helper) startEventServer(ctx context.Context) {
// helper.Env().Config.EventServer.SubscriptionType = "broadcast"
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
go func() {
klog.V(10).Info("Test event server started")
helper.EventServer.Start(ctx)
Expand All @@ -233,7 +249,7 @@ func (helper *Helper) startEventBroadcaster() {
func (helper *Helper) StartControllerManager(ctx context.Context) {
helper.ControllerManager = &server.ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()),
helper.EventHandler,
helper.Env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand All @@ -259,10 +275,19 @@ func (helper *Helper) StartControllerManager(ctx context.Context) {
}

func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bundle bool) {
// initilize the mqtt options
mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig)
if err != nil {
klog.Fatalf("Unable to build MQTT options: %s", err.Error())
var brokerConfig any
if helper.Broker == "mqtt" {
// initilize the mqtt options
mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig)
if err != nil {
klog.Fatalf("Unable to build MQTT options: %s", err.Error())
}
brokerConfig = mqttOptions
} else {
// initilize the grpc options
grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort)
brokerConfig = grpcOptions
}

var workCodec generic.Codec[*workv1.ManifestWork]
Expand All @@ -274,7 +299,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu

watcherStore := store.NewAgentInformerWatcherStore()

clientHolder, err := work.NewClientHolderBuilder(mqttOptions).
clientHolder, err := work.NewClientHolderBuilder(brokerConfig).
WithClientID(clusterName).
WithClusterName(clusterName).
WithCodecs(workCodec).
Expand Down
Loading

0 comments on commit 1ce55ab

Please sign in to comment.