Skip to content

Commit

Permalink
event handler to event filter.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 23, 2024
1 parent 1cf6559 commit acaa814
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 600 deletions.
12 changes: 4 additions & 8 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,11 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
var eventHandler controllers.EventHandler
var eventFilter controllers.EventFilter
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent,
environments.Environment().Services.Events(),
dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewInstanceDao(&environments.Environment().Database.SessionFactory))
eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent)
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
Expand All @@ -74,14 +71,13 @@ func runServer(cmd *cobra.Command, args []string) {
// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory),
environments.Environment().Services.Events())
eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory))
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
controllersServer := server.NewControllersServer(eventServer, eventHandler)
controllersServer := server.NewControllersServer(eventServer, eventFilter)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer {
func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer {
s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
eventHandler,
eventFilter,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down
12 changes: 6 additions & 6 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type EventServer interface {
Start(ctx context.Context)

// OnCreate handles the creation of a resource.
OnCreate(ctx context.Context, eventID, resourceID string) error
OnCreate(ctx context.Context, resourceID string) error

// OnUpdate handles updates to a resource.
OnUpdate(ctx context.Context, eventID, resourceID string) error
OnUpdate(ctx context.Context, resourceID string) error

// OnDelete handles the deletion of a resource.
OnDelete(ctx context.Context, eventID, resourceID string) error
OnDelete(ctx context.Context, resourceID string) error

// OnStatusUpdate handles status update events for a resource.
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error
Expand Down Expand Up @@ -117,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

Expand Down
71 changes: 23 additions & 48 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,23 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}

// send the cloudevent to the subscriber
// TODO: error handling to address errors beyond network issues.
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
return err // return error to requeue the spec event
// return the error to requeue the event if sending to the subscriber fails (e.g., due to network issues).
// TODO: handle other types of errors beyond network issues.
return err
}

return nil
})

select {
// TODO: add comments for the case
case err := <-errChan:
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
bkr.unregister(subscriberID)
return err
// TODO: add comments for the case
case <-subServer.Context().Done():
bkr.unregister(subscriberID)
return nil
Expand Down Expand Up @@ -392,49 +395,40 @@ func (bkr *GRPCBroker) handleRes(resource *api.Resource) {
}
}

// handleResEvent publish the resource to the correct subscriber and add the event instance record.
func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error {
bkr.handleRes(resource)

// add the event instance record to mark the event has been processed by the current instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}

return nil
}

// OnCreate is called by the controller when a resource is created on the maestro server.
func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
return err
}

return bkr.handleResEvent(ctx, eventID, resource)
bkr.handleRes(resource)

return nil
}

// OnUpdate is called by the controller when a resource is updated on the maestro server.
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
return err
}

return bkr.handleResEvent(ctx, eventID, resource)
bkr.handleRes(resource)

return nil
}

// OnDelete is called by the controller when a resource is deleted from the maestro server.
func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
return err
}

return bkr.handleResEvent(ctx, eventID, resource)
bkr.handleRes(resource)

return nil
}

// On StatusUpdate will be called on each new status event inserted into db.
Expand Down Expand Up @@ -470,35 +464,16 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by
// other instances, so we can mark the event as reconciled and ignore it.
// if the resource is not found, it indicates the resource has been handled by other instances.
if svcErr.Is404() {
klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error())
}
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
return true, nil
}

// if the consumer is not subscribed to the broker, then add the event instance record
// to indicate the event has been processed by the instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName)

return false, nil
// check if the consumer is subscribed to the broker
return bkr.IsConsumerSubscribed(resource.ConsumerName), nil
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
Expand Down
5 changes: 2 additions & 3 deletions pkg/api/event_instances.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package api

type EventInstance struct {
EventID string `gorm:"default:null"`
SpecEventID string `gorm:"default:null"`
InstanceID string
EventID string
InstanceID string
}

type EventInstanceList []*EventInstance
96 changes: 96 additions & 0 deletions pkg/controllers/event_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package controllers

import (
"context"
"fmt"

"github.com/openshift-online/maestro/pkg/db"
)

// EventFilter defines an interface for filtering and deferring actions on events.
// Implementations of EventFilter should provide logic for determining whether an event
// should be processed and for handling any actions that need to be deferred.
//
// - Filter: Decides whether the event should be processed based on its ID.
// - DeferredAction: Allows for scheduling actions that should occur regardless of whether the event
// was processed successfully or not, such as cleanup tasks or releasing resources.
type EventFilter interface {
// Filter determines whether the event should be processed.
// Returns true if the event should be handled, false and an error otherwise.
Filter(ctx context.Context, id string) (bool, error)

// DeferredAction schedules actions to be executed regardless of event processing success.
DeferredAction(ctx context.Context, id string)
}

// LockBasedEventFilter implements EventFilter using a locking mechanism for event processing.
// It creates advisory locks on event IDs to ensure thread-safe access.
// - Filter acquires a lock on the event ID and returns true if the lock is successful.
// - DeferredAction releases the lock for the event ID.
type LockBasedEventFilter struct {
lockFactory db.LockFactory
// locks map is accessed by a single-threaded handler goroutine, no need for lock on it.
locks map[string]string
}

func NewLockBasedEventFilter(lockFactory db.LockFactory) EventFilter {
return &LockBasedEventFilter{
lockFactory: lockFactory,
locks: make(map[string]string),
}
}

// Filter attempts to acquire a lock on the event ID. Returns true if successful, false and error otherwise.
func (h *LockBasedEventFilter) Filter(ctx context.Context, id string) (bool, error) {
// lock the Event with a fail-fast advisory lock context.
// this allows concurrent processing of many events by one or many controller managers.
// allow the lock to be released by the handler goroutine and allow this function to continue.
// subsequent events will be locked by their own distinct IDs.
lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events)
// store the lock owner ID for deferred action
h.locks[id] = lockOwnerID
if err != nil {
return false, fmt.Errorf("error obtaining the event lock: %v", err)
}

if !acquired {
logger.V(4).Infof("Event %s is processed by another worker", id)
return false, nil
}

return true, nil
}

// DeferredAction releases the lock for the given event ID if it was acquired.
func (h *LockBasedEventFilter) DeferredAction(ctx context.Context, id string) {
if ownerID, exists := h.locks[id]; exists {
h.lockFactory.Unlock(ctx, ownerID)
delete(h.locks, id)
}
}

// eventFilterPredicate is a function type for filtering events based on their ID.
type eventFilterPredicate func(ctx context.Context, eventID string) (bool, error)

// PredicatedEventFilter implements EventFilter using a predicate function for event filtering.
// - Filter uses the predicate to decide if the event should be processed.
// - DeferredAction is a no-op as no locking is performed.
type PredicatedEventFilter struct {
predicate eventFilterPredicate
}

func NewPredicatedEventFilter(predicate eventFilterPredicate) EventFilter {
return &PredicatedEventFilter{
predicate: predicate,
}
}

// Filter calls the predicate function to determine if the event should be processed.
func (h *PredicatedEventFilter) Filter(ctx context.Context, id string) (bool, error) {
return h.predicate(ctx, id)
}

// DeferredAction is a no-op since no locks are involved.
func (h *PredicatedEventFilter) DeferredAction(ctx context.Context, id string) {
// no-op
}
Loading

0 comments on commit acaa814

Please sign in to comment.