Skip to content

Commit

Permalink
rename mqtt event server to also cover kafka.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 16, 2024
1 parent 146d798 commit 59981a3
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 36 deletions.
17 changes: 7 additions & 10 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ func runServer(cmd *cobra.Command, args []string) {

// Create the event server based on the message broker type:
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT, create a Pulse server to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
switch environments.Environment().Config.MessageBroker.MessageBrokerType {
case "mqtt":
klog.Info("Setting up pulse server")
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
switch config.SubscriptionType(subscriptionType) {
Expand All @@ -64,12 +66,7 @@ func runServer(cmd *cobra.Command, args []string) {

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMQTTEventServer(eventBroadcaster, statusDispatcher)
case "grpc":
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
default:
klog.Errorf("Unsupported message broker type: %s", environments.Environment().Config.MessageBroker.MessageBrokerType)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
}

// Create the servers
Expand Down
37 changes: 18 additions & 19 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ type EventServer interface {
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error
}

var _ EventServer = &MQTTEventServer{}
var _ EventServer = &MessageQueueEventServer{}

// MQTTEventServer represents a server responsible for publish resource spec events from
// resource controller and handle resource status update events from the maestro agent.
// It also periodic heartbeat updates and checking the liveness of Maestro instances,
// triggering status resync based on instances' status and other conditions.
type MQTTEventServer struct {
// MessageQueueEventServer represents a event server responsible for publish resource spec events
// from resource controller and handle resource status update events from the message queue.
// It also maintains a status dispatcher to dispatch status update events to the corresponding
// maestro instances.
type MessageQueueEventServer struct {
instanceID string
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
Expand All @@ -58,9 +58,9 @@ type MQTTEventServer struct {
statusDispatcher dispatcher.Dispatcher
}

func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer {
func NewMessageQueueEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer {
sessionFactory := env().Database.SessionFactory
return &MQTTEventServer{
return &MessageQueueEventServer{
instanceID: env().Config.MessageBroker.ClientID,
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
Expand All @@ -72,11 +72,10 @@ func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatch
}
}

// Start initializes and runs the pulse server, updating and checking Maestro instances' liveness,
// initializes subscription to status update messages and triggers status resync based on
// instances' status and other conditions.
func (s *MQTTEventServer) Start(ctx context.Context) {
log.Infof("Starting pulse server")
// Start initializes and runs the event server. It starts the subscription
// to resource status update messages and the status dispatcher.
func (s *MessageQueueEventServer) Start(ctx context.Context) {
log.Infof("Starting message queue event server")

// start subscribing to resource status update messages.
s.startSubscription(ctx)
Expand All @@ -85,12 +84,12 @@ func (s *MQTTEventServer) Start(ctx context.Context) {

// wait until context is canceled
<-ctx.Done()
log.Infof("Shutting down pulse server")
log.Infof("Shutting down message queue event server")
}

// startSubscription initiates the subscription to resource status update messages.
// It runs asynchronously in the background until the provided context is canceled.
func (s *MQTTEventServer) startSubscription(ctx context.Context) {
func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error {
log.V(4).Infof("received action %s for resource %s", action, resource.ID)

Expand All @@ -115,25 +114,25 @@ func (s *MQTTEventServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *MQTTEventServer) OnCreate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *MQTTEventServer) OnUpdate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *MQTTEventServer) OnDelete(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

// On StatusUpdate will be called on each new status event inserted into db.
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *MQTTEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
// TODO consider using a same way (pulse_server.OnStatusUpdate) to handle this
// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this
func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID)
if sErr != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
BroadcastSubscriptionType SubscriptionType = "broadcast"
)

// EventServerConfig contains the configuration for the maestro pulse server.
// EventServerConfig contains the configuration for the message queue event server.
type EventServerConfig struct {
SubscriptionType string `json:"subscription_type"`
ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"`
Expand Down Expand Up @@ -45,8 +45,7 @@ func NewConsistentHashConfig() *ConsistentHashConfig {
}

// AddFlags configures the EventServerConfig with command line flags.
// It allows users to customize the interval for maestro instance pulses and subscription type.
// - "pulse-interval" sets the time between maestro instance pulses (in seconds) to indicate its liveness (default: 15 seconds).
// It allows users to customize the subscription type and ConsistentHashConfig settings.
// - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast".
// "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages.
// "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it.
Expand Down
2 changes: 1 addition & 1 deletion test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (helper *Helper) sendShutdownSignal() error {

func (helper *Helper) startEventServer(ctx context.Context) {
// helper.Env().Config.EventServer.SubscriptionType = "broadcast"
helper.EventServer = server.NewMQTTEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
go func() {
klog.V(10).Info("Test event server started")
helper.EventServer.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/status_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func TestEventServer(t *testing.T) {

// the cluster1 name cannot be changed, because consistent hash makes it allocate to different instance.
// the case here we want to the new consumer allocate to new instance(cluster1) which is a fake instance.
// after 3*pulseInterval (3s), it will relocate to maestro instance.
// after 3*heartbeatInterval (3s), it will relocate to maestro instance.
clusterName := "cluster1"
consumer := h.CreateConsumer(clusterName)

// insert a new instance with the same name to consumer name
// to make sure the consumer is hashed to the new instance firstly.
// after the new instance is stale after 3*pulseInterval (3s), the current
// after the new instance is stale after 3*heartbeatInterval (3s), the current
// instance will take over the consumer and resync the resource status.
_, err = instanceDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{
Expand Down

0 comments on commit 59981a3

Please sign in to comment.