Skip to content

Commit

Permalink
move instance check to healthcheck server.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 13, 2024
1 parent a322718 commit 852d8b0
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 237 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ permissions:

jobs:
e2e:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -32,7 +32,7 @@ jobs:
container_tool: docker
SERVER_REPLICAS: 2
e2e-broadcast-subscription:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -50,7 +50,7 @@ jobs:
SERVER_REPLICAS: 2
ENABLE_BROADCAST_SUBSCRIPTION: true
e2e-grpc-broker:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
38 changes: 28 additions & 10 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (

"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)

Expand All @@ -35,24 +38,43 @@ func runServer(cmd *cobra.Command, args []string) {
klog.Fatalf("Unable to initialize environment: %s", err.Error())
}

healthcheckServer := server.NewHealthCheckServer()

// Create event broadcaster to broadcast resource status update events to subscribers
eventBroadcaster := event.NewEventBroadcaster()

// Create the event server based on the message broker type:
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT, create a Pulse server to handle resource spec and status events.
var eventServer server.EventServer
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
switch environments.Environment().Config.MessageBroker.MessageBrokerType {
case "mqtt":
klog.Info("Setting up pulse server")
var statusDispatcher dispatcher.Dispatcher
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
switch config.SubscriptionType(subscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
default:
klog.Errorf("Unsupported subscription type: %s", subscriptionType)
}

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMQTTEventServer(eventBroadcaster, statusDispatcher)
case "grpc":
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
} else {
klog.Info("Setting up pulse server")
eventServer = server.NewPulseServer(eventBroadcaster)
default:
klog.Errorf("Unsupported message broker type: %s", environments.Environment().Config.MessageBroker.MessageBrokerType)
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
controllersServer := server.NewControllersServer(eventServer)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -70,10 +92,6 @@ func runServer(cmd *cobra.Command, args []string) {
if err := metricsServer.Stop(); err != nil {
klog.Errorf("Failed to stop metrics server, %v", err)
}

if err := healthcheckServer.Stop(); err != nil {
klog.Errorf("Failed to stop healthcheck server, %v", err)
}
}()

// Start the event broadcaster
Expand All @@ -82,7 +100,7 @@ func runServer(cmd *cobra.Command, args []string) {
// Run the servers
go apiserver.Start()
go metricsServer.Start()
go healthcheckServer.Start()
go healthcheckServer.Start(ctx)
go eventServer.Start(ctx)
go controllersServer.Start(ctx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ package server
import (
"context"
"fmt"
"time"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
"github.com/openshift-online/maestro/pkg/logger"
"github.com/openshift-online/maestro/pkg/services"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
Expand Down Expand Up @@ -45,16 +41,14 @@ type EventServer interface {
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error
}

var _ EventServer = &PulseServer{}
var _ EventServer = &MQTTEventServer{}

// PulseServer represents a server responsible for publish resource spec events from
// MQTTEventServer represents a server responsible for publish resource spec events from
// resource controller and handle resource status update events from the maestro agent.
// It also periodic heartbeat updates and checking the liveness of Maestro instances,
// triggering status resync based on instances' status and other conditions.
type PulseServer struct {
type MQTTEventServer struct {
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster // event broadcaster to broadcast resource status update events to subscribers
Expand All @@ -64,22 +58,10 @@ type PulseServer struct {
statusDispatcher dispatcher.Dispatcher
}

func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer {
var statusDispatcher dispatcher.Dispatcher
switch config.SubscriptionType(env().Config.PulseServer.SubscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig)
default:
klog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType)
}
func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer {
sessionFactory := env().Database.SessionFactory
return &PulseServer{
return &MQTTEventServer{
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
Expand All @@ -93,99 +75,22 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer {
// Start initializes and runs the pulse server, updating and checking Maestro instances' liveness,
// initializes subscription to status update messages and triggers status resync based on
// instances' status and other conditions.
func (s *PulseServer) Start(ctx context.Context) {
func (s *MQTTEventServer) Start(ctx context.Context) {
log.Infof("Starting pulse server")

// start subscribing to resource status update messages.
s.startSubscription(ctx)
// start the status dispatcher
go s.statusDispatcher.Start(ctx)

// start a goroutine to periodically update heartbeat for the current maestro instance
go wait.UntilWithContext(ctx, s.pulse, time.Duration(s.pulseInterval*int64(time.Second)))

// start a goroutine to periodically check the liveness of maestro instances
go wait.UntilWithContext(ctx, s.checkInstances, time.Duration(s.pulseInterval/3*int64(time.Second)))

// wait until context is canceled
<-ctx.Done()
log.Infof("Shutting down pulse server")
}

func (s *PulseServer) pulse(ctx context.Context) {
log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID)
instance := &api.ServerInstance{
Meta: api.Meta{
ID: s.instanceID,
},
LastPulse: time.Now(),
}
_, err := s.instanceDao.UpSert(ctx, instance)
if err != nil {
log.Error(fmt.Sprintf("Unable to upsert maestro instance: %s", err.Error()))
}
}

func (s *PulseServer) checkInstances(ctx context.Context) {
log.V(10).Infof("Checking liveness of maestro instances")
// lock the Instance with a fail-fast advisory lock context.
// this allows concurrent processing of many instances by one or more maestro instances exclusively.
lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-pulse-check", db.Instances)
// Ensure that the transaction related to this lock always end.
defer s.lockFactory.Unlock(ctx, lockOwnerID)
if err != nil {
log.Error(fmt.Sprintf("error obtaining the instance lock: %v", err))
return
}
// skip if the lock is not acquired
if !acquired {
log.V(4).Infof("failed to acquire the lock as another maestro instance is checking instances, skip")
return
}

instances, err := s.instanceDao.All(ctx)
if err != nil {
log.Error(fmt.Sprintf("Unable to get all maestro instances: %s", err.Error()))
return
}

activeInstanceIDs := []string{}
inactiveInstanceIDs := []string{}
for _, instance := range instances {
// Instances pulsing within the last three check intervals are considered as active.
if instance.LastPulse.After(time.Now().Add(time.Duration(int64(-3*time.Second) * s.pulseInterval))) {
if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil {
log.Error(fmt.Sprintf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error()))
}
// mark the instance as active after it is added to the status dispatcher
activeInstanceIDs = append(activeInstanceIDs, instance.ID)
} else {
if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil {
log.Error(fmt.Sprintf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error()))
} else {
inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID)
}
}
}

if len(activeInstanceIDs) > 0 {
// batch mark active instances
if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil {
log.Error(fmt.Sprintf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error()))
}
}

if len(inactiveInstanceIDs) > 0 {
// batch delete inactive instances
if err := s.instanceDao.DeleteByIDs(ctx, inactiveInstanceIDs); err != nil {
log.Error(fmt.Sprintf("Unable to delete inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error()))
}
}
}

// startSubscription initiates the subscription to resource status update messages.
// It runs asynchronously in the background until the provided context is canceled.
func (s *PulseServer) startSubscription(ctx context.Context) {
func (s *MQTTEventServer) startSubscription(ctx context.Context) {
s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error {
log.V(4).Infof("received action %s for resource %s", action, resource.ID)

Expand All @@ -210,25 +115,25 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *PulseServer) OnCreate(ctx context.Context, resourceID string) error {
func (s *MQTTEventServer) OnCreate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *PulseServer) OnUpdate(ctx context.Context, resourceID string) error {
func (s *MQTTEventServer) OnUpdate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *PulseServer) OnDelete(ctx context.Context, resourceID string) error {
func (s *MQTTEventServer) OnDelete(ctx context.Context, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

// On StatusUpdate will be called on each new status event inserted into db.
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
func (s *MQTTEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
Expand Down
Loading

0 comments on commit 852d8b0

Please sign in to comment.