Skip to content

Commit

Permalink
add grpc broker integration test.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 18, 2024
1 parent d7e9cfe commit 872aa6b
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 32 deletions.
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
6 changes: 6 additions & 0 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, err := bkr.resourceService.Get(ctx, evt.SourceID)
if err != nil {
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve
// check if all instances have processed the event
if !compareStrings(activeInstances, eventInstances) {
klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances)
return fmt.Errorf("event %s is not processed by all instances", event.ID)
return nil
}

// update the event with the reconciled date
Expand Down
61 changes: 43 additions & 18 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
Expand Down Expand Up @@ -71,8 +72,8 @@ type Helper struct {
Ctx context.Context
ContextCancelFunc context.CancelFunc

Broker string
EventBroadcaster *event.EventBroadcaster
StatusDispatcher dispatcher.Dispatcher
Store *MemoryStore
GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource]
DBFactory db.SessionFactory
Expand All @@ -81,6 +82,7 @@ type Helper struct {
MetricsServer server.Server
HealthCheckServer *server.HealthCheckServer
EventServer server.EventServer
EventHandler controllers.EventHandler
ControllerManager *server.ControllersServer
WorkAgentHolder *work.ClientHolder
WorkAgentInformer workv1informers.ManifestWorkInformer
Expand Down Expand Up @@ -111,6 +113,12 @@ func NewHelper(t *testing.T) *Helper {
}
pflag.Parse()

// Set the message broker type if it is set in the environment
broker := os.Getenv("BROKER")
if broker != "" {
env.Config.MessageBroker.MessageBrokerType = broker
}

err = env.Initialize()
if err != nil {
klog.Fatalf("Unable to initialize testing environment: %s", err.Error())
Expand All @@ -120,13 +128,26 @@ func NewHelper(t *testing.T) *Helper {
helper = &Helper{
Ctx: ctx,
ContextCancelFunc: cancel,
Broker: env.Config.MessageBroker.MessageBrokerType,
EventBroadcaster: event.NewEventBroadcaster(),
StatusDispatcher: dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory),
dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig),
AppConfig: env.Config,
DBFactory: env.Database.SessionFactory,
JWTPrivateKey: jwtKey,
JWTCA: jwtCA,
AppConfig: env.Config,
DBFactory: env.Database.SessionFactory,
JWTPrivateKey: jwtKey,
JWTCA: jwtCA,
}

// Set the healthcheck interval to 1 second for testing
helper.Env().Config.HealthCheck.HeartbeartInterval = 1
helper.HealthCheckServer = server.NewHealthCheckServer()

if helper.Broker == "mqtt" {
statusDispatcher := dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig)
helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher)
helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events())
} else {
helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster)
helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory))
}

// TODO jwk mock server needs to be refactored out of the helper and into the testing environment
Expand Down Expand Up @@ -197,9 +218,6 @@ func (helper *Helper) stopMetricsServer() error {
}

func (helper *Helper) startHealthCheckServer(ctx context.Context) {
helper.Env().Config.HealthCheck.HeartbeartInterval = 1
helper.HealthCheckServer = server.NewHealthCheckServer()
helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher)
go func() {
klog.V(10).Info("Test health check server started")
helper.HealthCheckServer.Start(ctx)
Expand All @@ -213,8 +231,6 @@ func (helper *Helper) sendShutdownSignal() error {
}

func (helper *Helper) startEventServer(ctx context.Context) {
// helper.Env().Config.EventServer.SubscriptionType = "broadcast"
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
go func() {
klog.V(10).Info("Test event server started")
helper.EventServer.Start(ctx)
Expand All @@ -233,7 +249,7 @@ func (helper *Helper) startEventBroadcaster() {
func (helper *Helper) StartControllerManager(ctx context.Context) {
helper.ControllerManager = &server.ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()),
helper.EventHandler,
helper.Env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand All @@ -259,10 +275,19 @@ func (helper *Helper) StartControllerManager(ctx context.Context) {
}

func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bundle bool) {
// initilize the mqtt options
mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig)
if err != nil {
klog.Fatalf("Unable to build MQTT options: %s", err.Error())
var brokerConfig any
if helper.Broker == "mqtt" {
// initilize the mqtt options
mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig)
if err != nil {
klog.Fatalf("Unable to build MQTT options: %s", err.Error())
}
brokerConfig = mqttOptions
} else {
// initilize the grpc options
grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort)
brokerConfig = grpcOptions
}

var workCodec generic.Codec[*workv1.ManifestWork]
Expand All @@ -274,7 +299,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu

watcherStore := store.NewAgentInformerWatcherStore()

clientHolder, err := work.NewClientHolderBuilder(mqttOptions).
clientHolder, err := work.NewClientHolderBuilder(brokerConfig).
WithClientID(clusterName).
WithClusterName(clusterName).
WithCodecs(workCodec).
Expand Down
94 changes: 85 additions & 9 deletions test/integration/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/api"
Expand All @@ -19,14 +20,22 @@ import (
func TestControllerRacing(t *testing.T) {
h, _ := test.RegisterIntegration(t)

// sleep for a while to wait for server instance is added
time.Sleep(1 * time.Second)

account := h.NewRandAccount()
ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account))
defer func() {
cancel()
}()

// start work agent so that grpc broker can work
consumer := h.CreateConsumer("cluster-" + rand.String(5))
h.StartWorkAgent(ctx, consumer.Name, false)

eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory)
statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory)
eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory)

// The handler filters the events by source id/type/reconciled, and only record
// the event with create type. Due to the event lock, each create event
Expand All @@ -49,6 +58,14 @@ func TestControllerRacing(t *testing.T) {
continue
}
proccessedEvent = append(proccessedEvent, resourceID)
// add the event instance record
_, err := eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: h.Env().Config.MessageBroker.ClientID,
})
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -76,11 +93,19 @@ func TestControllerRacing(t *testing.T) {

// Start 3 controllers concurrently
threads := 3
if h.Broker == "grpc" {
threads = 1
}
for i := 0; i < threads; i++ {
// each controller has its own event handler, otherwise, the event lock will block the event processing.
eventHandler := controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events())
if h.Broker == "grpc" {
eventHandler = controllers.NewPredicatedEventHandler(h.EventServer.PredicateEvent, h.Env().Services.Events(), dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), dao.NewInstanceDao(&h.Env().Database.SessionFactory))
}
go func() {
s := &server.ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()),
eventHandler,
h.Env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand All @@ -105,7 +130,6 @@ func TestControllerRacing(t *testing.T) {
// wait for controller service starts
time.Sleep(3 * time.Second)

consumer := h.CreateConsumer("cluster-" + rand.String(5))
resources := h.CreateResourceList(consumer.Name, 50)

// This is to check only 50 create events are processed. It waits for 5 seconds to ensure all events have been
Expand Down Expand Up @@ -142,11 +166,19 @@ func TestControllerRacing(t *testing.T) {
func TestControllerReconcile(t *testing.T) {
h, _ := test.RegisterIntegration(t)

// sleep for a while to wait for server instance is added
time.Sleep(1 * time.Second)

account := h.NewRandAccount()
ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account))

// start work agent so that grpc broker can work
consumer := h.CreateConsumer("cluster-" + rand.String(5))
h.StartWorkAgent(ctx, consumer.Name, false)

eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory)
statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory)
eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory)

processedEventTimes := 0
// this handler will return an error at the first time to simulate an error happened when handing an event,
Expand All @@ -157,6 +189,15 @@ func TestControllerReconcile(t *testing.T) {
return fmt.Errorf("failed to process the event")
}

// add the event instance record
_, err := eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: h.Env().Config.MessageBroker.ClientID,
})
if err != nil {
return err
}

return nil
}

Expand All @@ -176,7 +217,7 @@ func TestControllerReconcile(t *testing.T) {
go func() {
s := &server.ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()),
h.EventHandler,
h.Env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand All @@ -201,7 +242,6 @@ func TestControllerReconcile(t *testing.T) {
// wait for the listener to start
time.Sleep(100 * time.Millisecond)

consumer := h.CreateConsumer("cluster-" + rand.String(5))
deployName := fmt.Sprintf("nginx-%s", rand.String(5))
resource := h.CreateResource(consumer.Name, deployName, 1)

Expand Down Expand Up @@ -259,11 +299,42 @@ func TestControllerReconcile(t *testing.T) {
func TestControllerSync(t *testing.T) {
h, _ := test.RegisterIntegration(t)

// sleep for a while to wait for server instance is added
time.Sleep(1 * time.Second)

account := h.NewRandAccount()
ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account))

eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory)
// start work agent so that grpc broker can work
consumer := h.CreateConsumer("cluster-" + rand.String(5))
h.StartWorkAgent(ctx, consumer.Name, false)

// create two resources with resource dao
resource4ID := uuid.New().String()
resourceDao := dao.NewResourceDao(&h.Env().Database.SessionFactory)
if _, err := resourceDao.Create(ctx, &api.Resource{
Meta: api.Meta{
ID: resource4ID,
},
ConsumerName: consumer.Name,
Name: "resource4",
}); err != nil {
t.Fatal(err)
}

resource5ID := uuid.New().String()
if _, err := resourceDao.Create(ctx, &api.Resource{
Meta: api.Meta{
ID: resource5ID,
},
ConsumerName: consumer.Name,
Name: "resource5",
}); err != nil {
t.Fatal(err)
}

eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory)
eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory)
now := time.Now()
if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources",
SourceID: "resource1",
Expand All @@ -284,12 +355,12 @@ func TestControllerSync(t *testing.T) {
t.Fatal(err)
}
if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources",
SourceID: "resource4",
SourceID: resource4ID,
EventType: api.UpdateEventType}); err != nil {
t.Fatal(err)
}
if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources",
SourceID: "resource5",
SourceID: resource5ID,
EventType: api.UpdateEventType}); err != nil {
t.Fatal(err)
}
Expand All @@ -298,7 +369,12 @@ func TestControllerSync(t *testing.T) {
onUpsert := func(ctx context.Context, eventID, resourceID string) error {
// we just record the processed event
proccessedEvents = append(proccessedEvents, resourceID)
return nil
// add the event instance record
_, err := eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: h.Env().Config.MessageBroker.ClientID,
})
return err
}

// start the controller, once the controller started, it will sync the events:
Expand All @@ -307,7 +383,7 @@ func TestControllerSync(t *testing.T) {
go func() {
s := &server.ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()),
h.EventHandler,
h.Env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down

0 comments on commit 872aa6b

Please sign in to comment.