Skip to content

Commit

Permalink
add grpc broker integration test.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 20, 2024
1 parent 2d0074d commit 1c1601c
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 112 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ test/e2e/.consumer_name
test/e2e/.external_host_ip
test/e2e/report/*
unit-test-results.json
integration-test-results.json
*integration-test-results.json
test/e2e/setup/aro/aro-hcp
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
32 changes: 25 additions & 7 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// TODO: error handling to address errors beyond network issues.
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
return err // return error to requeue the spec event
}

return nil
Expand Down Expand Up @@ -397,8 +398,8 @@ func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resou

// add the event instance record to mark the event has been processed by the current instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
Expand Down Expand Up @@ -461,9 +462,26 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}
resource, err := bkr.resourceService.Get(ctx, evt.SourceID)
if err != nil {
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error())

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by
// other instances, so we can mark the event as reconciled and ignore it.
if svcErr.Is404() {
klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error())
}
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
Expand All @@ -473,8 +491,8 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool
// if the consumer is not subscribed to the broker, then add the event instance record
// to indicate the event has been processed by the instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/event_instances.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package api

type EventInstance struct {
EventID string
InstanceID string
EventID string
SpecEventID string
InstanceID string
}

type EventInstanceList []*EventInstance
29 changes: 18 additions & 11 deletions pkg/controllers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,26 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve
return fmt.Errorf("error finding ready instances: %v", err)
}

eventInstances, err := h.eventInstanceDao.GetInstancesByEventID(ctx, event.ID)
processedInstances, err := h.eventInstanceDao.GetInstancesBySpecEventID(ctx, event.ID)
if err != nil {
return fmt.Errorf("error finding processed server instances for event %s: %v", event.ID, err)
return fmt.Errorf("error finding processed instances for event %s: %v", event.ID, err)
}

// should never happen. If the event is not processed by any instance, return an error
if len(processedInstances) == 0 {
klog.V(10).Infof("Event %s is not processed by any instance", event.ID)
return fmt.Errorf("event %s is not processed by any instance", event.ID)
}

// check if all instances have processed the event
if !compareStrings(activeInstances, eventInstances) {
klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances)
return fmt.Errorf("event %s is not processed by all instances", event.ID)
// 1. In normal case, the activeInstances == eventInstances, mark the event as reconciled
// 2. If maestro server instance is up, but has't been marked as ready, then activeInstances < eventInstances,
// it's ok to mark the event as reconciled, as the instance is not ready to sever the request, no connected agents.
// 3. If maestro server instance is down, but has been marked as unready, it may still have connected agents, but
// the instance has stopped to handle the event, so activeInstances > eventInstances, the event should be equeued.
if !isSubSet(activeInstances, processedInstances) {
klog.V(10).Infof("Event %s is not processed by all active instances %v, handled by %v", event.ID, activeInstances, processedInstances)
return fmt.Errorf("event %s is not processed by all active instances", event.ID)
}

// update the event with the reconciled date
Expand All @@ -147,12 +158,8 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve
return nil
}

// compareStrings compares two string slices and returns true if they are equal
func compareStrings(a, b []string) bool {
if len(a) != len(b) {
return false
}

// isSubSet checks if slice a is a subset of slice b
func isSubSet(a, b []string) bool {
for _, v := range a {
found := false
for _, vv := range b {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func TestPredicatedEventHandler(t *testing.T) {
Expect(shouldProcess).To(BeTrue())

_, err = eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: "1",
InstanceID: currentInstanceID,
SpecEventID: "1",
InstanceID: currentInstanceID,
})
Expect(err).To(BeNil())

Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ type exampleController struct {
func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error {
d.addCounter++
_, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: d.instanceID,
SpecEventID: eventID,
InstanceID: d.instanceID,
})
return err
}

func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error {
d.updateCounter++
_, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: d.instanceID,
SpecEventID: eventID,
InstanceID: d.instanceID,
})
return err
}

func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error {
d.deleteCounter++
_, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: d.instanceID,
SpecEventID: eventID,
InstanceID: d.instanceID,
})
return err
}
Expand Down
29 changes: 14 additions & 15 deletions pkg/dao/event_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (

type EventInstanceDao interface {
Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error)
GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error)
Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error)

FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error)
GetInstancesBySpecEventID(ctx context.Context, eventID string) ([]string, error)
FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error)
GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error)
}

Expand All @@ -39,10 +38,19 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin
return &eventInstance, nil
}

func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) {
func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil {
db.MarkForRollback(ctx, err)
return nil, err
}
return eventInstance, nil
}

func (d *sqlEventInstanceDao) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) {
g2 := (*d.sessionFactory).New(ctx)
var eventInstances []api.EventInstance
if err := g2.Model(&api.EventInstance{}).Where("event_id = ?", eventID).Find(&eventInstances).Error; err != nil {
if err := g2.Model(&api.EventInstance{}).Where("spec_event_id = ?", specEventID).Find(&eventInstances).Error; err != nil {
return nil, err
}
instanceIDs := make([]string, len(eventInstances))
Expand All @@ -52,16 +60,7 @@ func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID
return instanceIDs, nil
}

func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil {
db.MarkForRollback(ctx, err)
return nil, err
}
return eventInstance, nil
}

func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) {
func (d *sqlEventInstanceDao) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) {
g2 := (*d.sessionFactory).New(ctx)
eventInstances := api.EventInstanceList{}
if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil {
Expand Down
14 changes: 0 additions & 14 deletions pkg/dao/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type InstanceDao interface {
Delete(ctx context.Context, id string) error
DeleteByIDs(ctx context.Context, ids []string) error
FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error)
FindReadyIDs(ctx context.Context) ([]string, error)
FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error)
FindReadyIDs(ctx context.Context) ([]string, error)
All(ctx context.Context) (api.ServerInstanceList, error)
Expand Down Expand Up @@ -112,19 +111,6 @@ func (d *sqlInstanceDao) FindByIDs(ctx context.Context, ids []string) (api.Serve
return instances, nil
}

func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) {
g2 := (*d.sessionFactory).New(ctx)
instances := api.ServerInstanceList{}
if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil {
return nil, err
}
ids := make([]string, len(instances))
for i, instance := range instances {
ids[i] = instance.ID
}
return ids, nil
}

func (d *sqlInstanceDao) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) {
g2 := (*d.sessionFactory).New(ctx)
instances := api.ServerInstanceList{}
Expand Down
44 changes: 37 additions & 7 deletions pkg/dao/mocks/event_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,55 @@ func (d *eventInstanceDaoMock) Get(ctx context.Context, eventID, instanceID stri
return nil, fmt.Errorf("event instance not found")
}

func (d *eventInstanceDaoMock) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) {
func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) {
d.mux.Lock()
defer d.mux.Unlock()

d.eventInstances = append(d.eventInstances, eventInstance)

return eventInstance, nil
}

func (d *eventInstanceDaoMock) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) {
d.mux.RLock()
defer d.mux.RUnlock()

var instanceIDs []string
for _, ei := range d.eventInstances {
if ei.EventID == eventID {
if ei.SpecEventID == specEventID {
instanceIDs = append(instanceIDs, ei.InstanceID)
}
}

return instanceIDs, nil
}

func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) {
d.mux.Lock()
defer d.mux.Unlock()
func (d *eventInstanceDaoMock) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) {
d.mux.RLock()
defer d.mux.RUnlock()

d.eventInstances = append(d.eventInstances, eventInstance)
var eventInstances api.EventInstanceList
for _, id := range ids {
for _, ei := range d.eventInstances {
if ei.EventID == id {
eventInstances = append(eventInstances, ei)
}
}
}

return eventInstance, nil
return eventInstances, nil
}

func (d *eventInstanceDaoMock) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) {
d.mux.RLock()
defer d.mux.RUnlock()

var eventIDs []string
for _, ei := range d.eventInstances {
if contains(instanceIDs, ei.InstanceID) {
eventIDs = append(eventIDs, ei.EventID)
}
}

return eventIDs, nil
}
15 changes: 1 addition & 14 deletions pkg/dao/mocks/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,6 @@ func (d *instanceDaoMock) FindByIDs(ctx context.Context, ids []string) (api.Serv
return nil, errors.NotImplemented("Instance").AsError()
}

func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) {
d.mux.RLock()
defer d.mux.RUnlock()
ids := make([]string, 0, len(d.instances))
for _, instance := range d.instances {
if instance.Ready {
ids = append(ids, instance.ID)
}
}
return ids, nil
}

func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) {
d.mux.RLock()
defer d.mux.RUnlock()
Expand All @@ -151,8 +139,7 @@ func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime tim
func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) {
d.mux.RLock()
defer d.mux.RUnlock()

ids := []string{}
ids := make([]string, 0, len(d.instances))
for _, instance := range d.instances {
if instance.Ready {
ids = append(ids, instance.ID)
Expand Down
Loading

0 comments on commit 1c1601c

Please sign in to comment.