Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure maestro instance ready after adding to hash ring. #229

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (

"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)

Expand All @@ -35,24 +38,40 @@ func runServer(cmd *cobra.Command, args []string) {
klog.Fatalf("Unable to initialize environment: %s", err.Error())
}

healthcheckServer := server.NewHealthCheckServer()

// Create event broadcaster to broadcast resource status update events to subscribers
eventBroadcaster := event.NewEventBroadcaster()

// Create the event server based on the message broker type:
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT, create a Pulse server to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
} else {
klog.Info("Setting up pulse server")
eventServer = server.NewPulseServer(eventBroadcaster)
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
switch config.SubscriptionType(subscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
default:
klog.Errorf("Unsupported subscription type: %s", subscriptionType)
}

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
controllersServer := server.NewControllersServer(eventServer)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -70,10 +89,6 @@ func runServer(cmd *cobra.Command, args []string) {
if err := metricsServer.Stop(); err != nil {
klog.Errorf("Failed to stop metrics server, %v", err)
}

if err := healthcheckServer.Stop(); err != nil {
klog.Errorf("Failed to stop healthcheck server, %v", err)
}
}()

// Start the event broadcaster
Expand All @@ -82,7 +97,7 @@ func runServer(cmd *cobra.Command, args []string) {
// Run the servers
go apiserver.Start()
go metricsServer.Start()
go healthcheckServer.Start()
go healthcheckServer.Start(ctx)
go eventServer.Start(ctx)
go controllersServer.Start(ctx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ package server
import (
"context"
"fmt"
"time"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
"github.com/openshift-online/maestro/pkg/logger"
"github.com/openshift-online/maestro/pkg/services"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
Expand Down Expand Up @@ -45,16 +41,14 @@ type EventServer interface {
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error
}

var _ EventServer = &PulseServer{}
var _ EventServer = &MessageQueueEventServer{}

// PulseServer represents a server responsible for publish resource spec events from
// resource controller and handle resource status update events from the maestro agent.
// It also periodic heartbeat updates and checking the liveness of Maestro instances,
// triggering status resync based on instances' status and other conditions.
type PulseServer struct {
// MessageQueueEventServer represents a event server responsible for publish resource spec events
// from resource controller and handle resource status update events from the message queue.
// It also maintains a status dispatcher to dispatch status update events to the corresponding
// maestro instances.
type MessageQueueEventServer struct {
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster // event broadcaster to broadcast resource status update events to subscribers
Expand All @@ -64,22 +58,10 @@ type PulseServer struct {
statusDispatcher dispatcher.Dispatcher
}

func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer {
var statusDispatcher dispatcher.Dispatcher
switch config.SubscriptionType(env().Config.PulseServer.SubscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig)
default:
klog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType)
}
func NewMessageQueueEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer {
sessionFactory := env().Database.SessionFactory
return &PulseServer{
return &MessageQueueEventServer{
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
Expand All @@ -90,92 +72,24 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer {
}
}

// Start initializes and runs the pulse server, updating and checking Maestro instances' liveness,
// initializes subscription to status update messages and triggers status resync based on
// instances' status and other conditions.
func (s *PulseServer) Start(ctx context.Context) {
log.Infof("Starting pulse server")
// Start initializes and runs the event server. It starts the subscription
// to resource status update messages and the status dispatcher.
func (s *MessageQueueEventServer) Start(ctx context.Context) {
log.Infof("Starting message queue event server")

// start subscribing to resource status update messages.
s.startSubscription(ctx)
// start the status dispatcher
go s.statusDispatcher.Start(ctx)

// start a goroutine to periodically update heartbeat for the current maestro instance
go wait.UntilWithContext(ctx, s.pulse, time.Duration(s.pulseInterval*int64(time.Second)))

// start a goroutine to periodically check the liveness of maestro instances
go wait.UntilWithContext(ctx, s.checkInstances, time.Duration(s.pulseInterval/3*int64(time.Second)))

// wait until context is canceled
<-ctx.Done()
log.Infof("Shutting down pulse server")
}

func (s *PulseServer) pulse(ctx context.Context) {
log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID)
instance := &api.ServerInstance{
Meta: api.Meta{
ID: s.instanceID,
UpdatedAt: time.Now(),
},
}
_, err := s.instanceDao.UpSert(ctx, instance)
if err != nil {
log.Error(fmt.Sprintf("Unable to upsert maestro instance: %s", err.Error()))
}
}

func (s *PulseServer) checkInstances(ctx context.Context) {
log.V(10).Infof("Checking liveness of maestro instances")
// lock the Instance with a fail-fast advisory lock context.
// this allows concurrent processing of many instances by one or more maestro instances exclusively.
lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-pulse-check", db.Instances)
// Ensure that the transaction related to this lock always end.
defer s.lockFactory.Unlock(ctx, lockOwnerID)
if err != nil {
log.Error(fmt.Sprintf("error obtaining the instance lock: %v", err))
return
}
// skip if the lock is not acquired
if !acquired {
log.V(4).Infof("failed to acquire the lock as another maestro instance is checking instances, skip")
return
}

instances, err := s.instanceDao.All(ctx)
if err != nil {
log.Error(fmt.Sprintf("Unable to get all maestro instances: %s", err.Error()))
return
}

inactiveInstanceIDs := []string{}
for _, instance := range instances {
// Instances pulsing within the last three check intervals are considered as active.
if instance.UpdatedAt.After(time.Now().Add(time.Duration(int64(-3*time.Second) * s.pulseInterval))) {
if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil {
log.Error(fmt.Sprintf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error()))
}
} else {
if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil {
log.Error(fmt.Sprintf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error()))
} else {
inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID)
}
}
}

if len(inactiveInstanceIDs) > 0 {
// batch delete inactive instances
if err := s.instanceDao.DeleteByIDs(ctx, inactiveInstanceIDs); err != nil {
log.Error(fmt.Sprintf("Unable to delete inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error()))
}
}
log.Infof("Shutting down message queue event server")
}

// startSubscription initiates the subscription to resource status update messages.
// It runs asynchronously in the background until the provided context is canceled.
func (s *PulseServer) startSubscription(ctx context.Context) {
func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error {
log.V(4).Infof("received action %s for resource %s", action, resource.ID)

Expand All @@ -200,25 +114,25 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *PulseServer) OnCreate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *PulseServer) OnUpdate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *PulseServer) OnDelete(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

// On StatusUpdate will be called on each new status event inserted into db.
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
// TODO consider using a same way (pulse_server.OnStatusUpdate) to handle this
// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this
func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID)
if sErr != nil {
Expand Down
Loading
Loading