Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple instances support for grpc broker. #235

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ jobs:
make e2e-test
env:
container_tool: docker
SERVER_REPLICAS: 2
MESSAGE_DRIVER_TYPE: grpc
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ test/e2e/.consumer_name
test/e2e/.external_host_ip
test/e2e/report/*
unit-test-results.json
integration-test-results.json
*integration-test-results.json
test/e2e/setup/aro/aro-hcp
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
11 changes: 10 additions & 1 deletion cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)
Expand Down Expand Up @@ -47,9 +49,14 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
var eventHandler controllers.EventHandler
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent,
environments.Environment().Services.Events(),
dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewInstanceDao(&environments.Environment().Database.SessionFactory))
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
Expand All @@ -67,12 +74,14 @@ func runServer(cmd *cobra.Command, args []string) {
// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory),
environments.Environment().Services.Events())
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
controllersServer := server.NewControllersServer(eventServer)
controllersServer := server.NewControllersServer(eventServer, eventHandler)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(eventServer EventServer) *ControllersServer {
func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer {
s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
eventHandler,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down
20 changes: 14 additions & 6 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ type EventServer interface {
Start(ctx context.Context)

// OnCreate handles the creation of a resource.
OnCreate(ctx context.Context, resourceID string) error
OnCreate(ctx context.Context, eventID, resourceID string) error

// OnUpdate handles updates to a resource.
OnUpdate(ctx context.Context, resourceID string) error
OnUpdate(ctx context.Context, eventID, resourceID string) error

// OnDelete handles the deletion of a resource.
OnDelete(ctx context.Context, resourceID string) error
OnDelete(ctx context.Context, eventID, resourceID string) error

// OnStatusUpdate handles status update events for a resource.
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error

// returns true if the event should be processed by the current instance, otherwise false and an error.
PredicateEvent(ctx context.Context, eventID string) (bool, error)
}

var _ EventServer = &MessageQueueEventServer{}
Expand Down Expand Up @@ -114,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

Expand Down Expand Up @@ -171,6 +174,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r
return err
}

// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on lock.
func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
return true, nil
}

// handleStatusUpdate processes the resource status update from the agent.
// The resource argument contains the updated status.
// The function performs the following steps:
Expand Down
89 changes: 74 additions & 15 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type GRPCBroker struct {
instanceID string
eventInstanceDao dao.EventInstanceDao
resourceService services.ResourceService
eventService services.EventService
statusEventService services.StatusEventService
bindAddress string
subscribers map[string]*subscriber // registered subscribers
Expand Down Expand Up @@ -79,6 +80,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
instanceID: env().Config.MessageBroker.ClientID,
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
statusEventService: env().Services.StatusEvents(),
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort,
subscribers: make(map[string]*subscriber),
Expand Down Expand Up @@ -198,6 +200,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// TODO: error handling to address errors beyond network issues.
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
return err // return error to requeue the spec event
}

return nil
Expand Down Expand Up @@ -389,40 +392,49 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) {
}
}

// handleResEvent publish the resource to the correct subscriber and add the event instance record.
func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error {
bkr.handleRes(resource)

// add the event instance record to mark the event has been processed by the current instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}

return nil
}

// OnCreate is called by the controller when a resource is created on the maestro server.
func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
if err != nil {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleResEvent(ctx, eventID, resource)
}

// OnUpdate is called by the controller when a resource is updated on the maestro server.
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
if err != nil {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleResEvent(ctx, eventID, resource)
}

// OnDelete is called by the controller when a resource is deleted from the maestro server.
func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
if err != nil {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleResEvent(ctx, eventID, resource)
}

// On StatusUpdate will be called on each new status event inserted into db.
Expand Down Expand Up @@ -469,6 +481,53 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s
return err
}

// PredicateEvent checks if the event should be processed by the current instance
// by verifying the resource consumer name is in the subscriber list, ensuring the
// event will be only processed when the consumer is subscribed to the current broker.
func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
evt, err := bkr.eventService.Get(ctx, eventID)
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by
// other instances, so we can mark the event as reconciled and ignore it.
if svcErr.Is404() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log added.

klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error())
}
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
return true, nil
}

// if the consumer is not subscribed to the broker, then add the event instance record
// to indicate the event has been processed by the instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName)

return false, nil
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool {
bkr.mu.RLock()
Expand Down
Loading
Loading