Skip to content

Commit

Permalink
multiple instances support for grpc broker.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 18, 2024
1 parent f859783 commit d7e9cfe
Show file tree
Hide file tree
Showing 16 changed files with 650 additions and 82 deletions.
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ jobs:
make e2e-test
env:
container_tool: docker
SERVER_REPLICAS: 2
MESSAGE_DRIVER_TYPE: grpc
11 changes: 10 additions & 1 deletion cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)
Expand Down Expand Up @@ -47,9 +49,14 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
var eventHandler controllers.EventHandler
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent,
environments.Environment().Services.Events(),
dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewInstanceDao(&environments.Environment().Database.SessionFactory))
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
Expand All @@ -67,12 +74,14 @@ func runServer(cmd *cobra.Command, args []string) {
// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory),
environments.Environment().Services.Events())
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
controllersServer := server.NewControllersServer(eventServer)
controllersServer := server.NewControllersServer(eventServer, eventHandler)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(eventServer EventServer) *ControllersServer {
func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer {
s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
eventHandler,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down
20 changes: 14 additions & 6 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ type EventServer interface {
Start(ctx context.Context)

// OnCreate handles the creation of a resource.
OnCreate(ctx context.Context, resourceID string) error
OnCreate(ctx context.Context, eventID, resourceID string) error

// OnUpdate handles updates to a resource.
OnUpdate(ctx context.Context, resourceID string) error
OnUpdate(ctx context.Context, eventID, resourceID string) error

// OnDelete handles the deletion of a resource.
OnDelete(ctx context.Context, resourceID string) error
OnDelete(ctx context.Context, eventID, resourceID string) error

// OnStatusUpdate handles status update events for a resource.
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error

// returns true if the event should be processed by the current instance, otherwise false and an error.
PredicateEvent(ctx context.Context, eventID string) (bool, error)
}

var _ EventServer = &MessageQueueEventServer{}
Expand Down Expand Up @@ -114,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

Expand Down Expand Up @@ -171,6 +174,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r
return err
}

// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on lock.
func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
return true, nil
}

// handleStatusUpdate processes the resource status update from the agent.
// The resource argument contains the updated status.
// The function performs the following steps:
Expand Down
71 changes: 56 additions & 15 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type GRPCBroker struct {
instanceID string
eventInstanceDao dao.EventInstanceDao
resourceService services.ResourceService
eventService services.EventService
statusEventService services.StatusEventService
bindAddress string
subscribers map[string]*subscriber // registered subscribers
Expand Down Expand Up @@ -79,6 +80,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
instanceID: env().Config.MessageBroker.ClientID,
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
statusEventService: env().Services.StatusEvents(),
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort,
subscribers: make(map[string]*subscriber),
Expand Down Expand Up @@ -389,40 +391,49 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) {
}
}

// handleResEvent publish the resource to the correct subscriber and add the event instance record.
func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error {
bkr.handleRes(resource)

// add the event instance record to mark the event has been processed by the current instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}

return nil
}

// OnCreate is called by the controller when a resource is created on the maestro server.
func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
if err != nil {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleResEvent(ctx, eventID, resource)
}

// OnUpdate is called by the controller when a resource is updated on the maestro server.
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
if err != nil {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleResEvent(ctx, eventID, resource)
}

// OnDelete is called by the controller when a resource is deleted from the maestro server.
func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
if err != nil {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleResEvent(ctx, eventID, resource)
}

// On StatusUpdate will be called on each new status event inserted into db.
Expand Down Expand Up @@ -469,6 +480,36 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s
return err
}

// PredicateEvent checks if the event should be processed by the current instance
// by verifying the resource consumer name is in the subscriber list, ensuring the
// event will be only processed when the consumer is subscribed to the current broker.
func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
evt, err := bkr.eventService.Get(ctx, eventID)
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}
resource, err := bkr.resourceService.Get(ctx, evt.SourceID)
if err != nil {
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
return true, nil
}

// if the consumer is not subscribed to the broker, then add the event instance record
// to indicate the event has been processed by the instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName)

return false, nil
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool {
bkr.mu.RLock()
Expand Down
Loading

0 comments on commit d7e9cfe

Please sign in to comment.