From 76873362a39b7649ddf7c916381770c25a0ffd01 Mon Sep 17 00:00:00 2001 From: schmidtw Date: Mon, 25 Mar 2024 17:11:12 -0700 Subject: [PATCH] Change the handler to support errors and make it a separate package. --- internal/pubsub/end2end_test.go | 70 ++++++++++++++++++-- internal/pubsub/options.go | 22 ++++++- internal/pubsub/pubsub.go | 111 +++++++++++++++++++++----------- internal/pubsub/pubsub_test.go | 11 +++- internal/wrpkit/handler.go | 35 ++++++++++ internal/wrpkit/handler_test.go | 25 +++++++ 6 files changed, 227 insertions(+), 47 deletions(-) create mode 100644 internal/wrpkit/handler.go create mode 100644 internal/wrpkit/handler_test.go diff --git a/internal/pubsub/end2end_test.go b/internal/pubsub/end2end_test.go index c4d747b..2f2e4eb 100644 --- a/internal/pubsub/end2end_test.go +++ b/internal/pubsub/end2end_test.go @@ -6,11 +6,13 @@ package pubsub_test import ( "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/xmidt-org/wrp-go/v3" "github.com/xmidt-org/xmidt-agent/internal/pubsub" + "github.com/xmidt-org/xmidt-agent/internal/wrpkit" ) type msgWithExpectations struct { @@ -74,6 +76,14 @@ var messages = []msgWithExpectations{ ContentType: string([]byte{0xbf}), }, expectErr: wrp.ErrNotUTF8, + }, { + // no handlers for this message + msg: wrp.Message{ + Type: wrp.SimpleEventMessageType, + Source: "self:/ignore-me", + Destination: "self:/ignored-service/ignored", + }, + expectErr: wrpkit.ErrNotHandled, }, } @@ -84,6 +94,7 @@ type mockHandler struct { calls int dests []wrp.Locator expect []wrp.Locator + rv []error } func (h *mockHandler) WG(wg *sync.WaitGroup) { @@ -92,15 +103,21 @@ func (h *mockHandler) WG(wg *sync.WaitGroup) { //fmt.Printf("%s adding %d to the wait group\n", h.name, len(h.expect)) } -func (h *mockHandler) HandleWrp(msg wrp.Message) { +func (h *mockHandler) HandleWrp(msg wrp.Message) error { h.lock.Lock() defer h.lock.Unlock() - + calls := h.calls h.calls++ + dest, _ := wrp.ParseLocator(msg.Destination) h.dests = append(h.dests, dest) h.wg.Done() //fmt.Printf("%s done\n", h.name) + if calls < len(h.rv) { + return h.rv[calls] + } + + return nil } func (h *mockHandler) assert(a *assert.Assertions) { @@ -172,18 +189,27 @@ func TestEndToEnd(t *testing.T) { assert := assert.New(t) require := require.New(t) - transIdValidator := pubsub.HandlerFunc( - func(msg wrp.Message) { + transIdValidator := wrpkit.HandlerFunc( + func(msg wrp.Message) error { if msg.TransactionUUID == "" { assert.Fail("transaction UUID is empty") } + return wrpkit.ErrNotHandled }) - noTransIdValidator := pubsub.HandlerFunc( - func(msg wrp.Message) { + noTransIdValidator := wrpkit.HandlerFunc( + func(msg wrp.Message) error { + src, err := wrp.ParseLocator(msg.Source) + assert.NoError(err) + + if src.Service == "ignore-me" { + return wrpkit.ErrNotHandled + } + if msg.TransactionUUID != "" { assert.Fail("transaction UUID is not empty") } + return wrpkit.ErrNotHandled }) var allCancel, singleCancel, serviceCancel, egressCancel pubsub.CancelFunc @@ -211,7 +237,7 @@ func TestEndToEnd(t *testing.T) { for _, m := range messages { msg := m.msg - err := ps.Publish(&msg) + err := ps.HandleWrp(msg) if m.expectErr != nil { assert.ErrorIs(err, m.expectErr) @@ -227,3 +253,33 @@ func TestEndToEnd(t *testing.T) { singleServiceListener.assert(assert) egressListener.assert(assert) } + +func TestTimeout(t *testing.T) { + id := wrp.DeviceID("mac:112233445566") + + assert := assert.New(t) + require := require.New(t) + + timeoutHandler := wrpkit.HandlerFunc( + func(msg wrp.Message) error { + time.Sleep(100 * time.Millisecond) + return nil + }) + + ps, err := pubsub.New(id, + pubsub.WithPublishTimeout(50*time.Millisecond), + pubsub.WithEgressHandler(timeoutHandler), + ) + + require.NoError(err) + require.NotNil(ps) + + msg := wrp.Message{ + Type: wrp.SimpleEventMessageType, + Source: "self:/service/ignored", + Destination: "event:event_1/ignored", + } + + err = ps.HandleWrp(msg) + assert.ErrorIs(err, pubsub.ErrTimeout) +} diff --git a/internal/pubsub/options.go b/internal/pubsub/options.go index 8bb07f7..bba64d6 100644 --- a/internal/pubsub/options.go +++ b/internal/pubsub/options.go @@ -4,7 +4,11 @@ package pubsub import ( + "fmt" + "time" + "github.com/xmidt-org/wrp-go/v3" + "github.com/xmidt-org/xmidt-agent/internal/wrpkit" ) type optionFunc func(*PubSub) error @@ -30,7 +34,7 @@ func Normify(opts ...wrp.NormifierOption) Option { // WithEgressHandler is an option that adds a handler for egress messages. // If the optional cancel parameter is provided, it will be set to a function // that can be used to cancel the subscription. -func WithEgressHandler(handler Handler, cancel ...*CancelFunc) Option { +func WithEgressHandler(handler wrpkit.Handler, cancel ...*CancelFunc) Option { return optionFunc(func(ps *PubSub) error { c, err := ps.SubscribeEgress(handler) if err != nil { @@ -47,7 +51,7 @@ func WithEgressHandler(handler Handler, cancel ...*CancelFunc) Option { // WithServiceHandler is an option that adds a handler for service messages. // If the optional cancel parameter is provided, it will be set to a function // that can be used to cancel the subscription. -func WithServiceHandler(service string, handler Handler, cancel ...*CancelFunc) Option { +func WithServiceHandler(service string, handler wrpkit.Handler, cancel ...*CancelFunc) Option { return optionFunc(func(ps *PubSub) error { c, err := ps.SubscribeService(service, handler) if err != nil { @@ -64,7 +68,7 @@ func WithServiceHandler(service string, handler Handler, cancel ...*CancelFunc) // WithEventHandler is an option that adds a handler for event messages. // If the optional cancel parameter is provided, it will be set to a function // that can be used to cancel the subscription. -func WithEventHandler(event string, handler Handler, cancel ...*CancelFunc) Option { +func WithEventHandler(event string, handler wrpkit.Handler, cancel ...*CancelFunc) Option { return optionFunc(func(ps *PubSub) error { c, err := ps.SubscribeEvent(event, handler) if err != nil { @@ -77,3 +81,15 @@ func WithEventHandler(event string, handler Handler, cancel ...*CancelFunc) Opti return nil }) } + +// WithPublishTimeout is an option that sets the timeout for publishing a message. +// If the timeout is exceeded, the publish will fail. +func WithPublishTimeout(timeout time.Duration) Option { + return optionFunc(func(ps *PubSub) error { + if timeout < 0 { + return fmt.Errorf("%w: timeout must be zero or larger", ErrInvalidInput) + } + ps.publishTimeout = timeout + return nil + }) +} diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index 59fc141..e0d33aa 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -4,16 +4,21 @@ package pubsub import ( + "context" + "errors" "fmt" "strings" "sync" + "time" "github.com/xmidt-org/eventor" "github.com/xmidt-org/wrp-go/v3" + "github.com/xmidt-org/xmidt-agent/internal/wrpkit" ) var ( ErrInvalidInput = fmt.Errorf("invalid input") + ErrTimeout = fmt.Errorf("timeout") ) // CancelFunc removes the associated listener with and cancels any future events @@ -23,35 +28,20 @@ var ( // will have no effect. type CancelFunc func() -// Handler is a function that is called whenever a message is received that -// matches the service associated with the handler. -// listening handler. -type Handler interface { - // HandleWrp is called whenever a message is received that matches the - // service associated with the handler. - HandleWrp(wrp.Message) -} - -// HandlerFunc is an adapter to allow the use of ordinary functions as handlers. -type HandlerFunc func(wrp.Message) - -func (f HandlerFunc) HandleWrp(msg wrp.Message) { - f(msg) -} - -var _ Handler = HandlerFunc(nil) - // PubSub is a struct representing a publish-subscribe system focusing on wrp // messages. type PubSub struct { - lock sync.RWMutex - self wrp.DeviceID - required *wrp.Normifier - desiredOpts []wrp.NormifierOption - desired *wrp.Normifier - routes map[string]*eventor.Eventor[Handler] + lock sync.RWMutex + self wrp.DeviceID + required *wrp.Normifier + desiredOpts []wrp.NormifierOption + desired *wrp.Normifier + routes map[string]*eventor.Eventor[wrpkit.Handler] + publishTimeout time.Duration } +var _ wrpkit.Handler = (*PubSub)(nil) + // Option is the interface implemented by types that can be used to // configure the credentials. type Option interface { @@ -68,7 +58,7 @@ func New(self wrp.DeviceID, opts ...Option) (*PubSub, error) { } ps := PubSub{ - routes: make(map[string]*eventor.Eventor[Handler]), + routes: make(map[string]*eventor.Eventor[wrpkit.Handler]), self: self, required: wrp.NewNormifier( // Only the absolutely required normalizers are included here. @@ -78,6 +68,13 @@ func New(self wrp.DeviceID, opts ...Option) (*PubSub, error) { ), } + defaults := []Option{ + WithPublishTimeout(5 * time.Second), + } + + // Prepend the defaults to the provided options. + opts = append(defaults, opts...) + for _, opt := range opts { if opt != nil { if err := opt.apply(&ps); err != nil { @@ -95,7 +92,7 @@ func New(self wrp.DeviceID, opts ...Option) (*PubSub, error) { // when a message targets something other than this device. The returned // CancelFunc may be called to remove the listener and cancel any future events // sent to that listener. -func (ps *PubSub) SubscribeEgress(h Handler) (CancelFunc, error) { +func (ps *PubSub) SubscribeEgress(h wrpkit.Handler) (CancelFunc, error) { return ps.subscribe(egressRoute(), h) } @@ -103,7 +100,7 @@ func (ps *PubSub) SubscribeEgress(h Handler) (CancelFunc, error) { // called when a message matches the service. A service value of '*' may be // used to match any service. The returned CancelFunc may be called to remove // the listener and cancel any future events sent to that listener. -func (ps *PubSub) SubscribeService(service string, h Handler) (CancelFunc, error) { +func (ps *PubSub) SubscribeService(service string, h wrpkit.Handler) (CancelFunc, error) { if err := validateString(service, "service"); err != nil { return nil, err } @@ -115,7 +112,7 @@ func (ps *PubSub) SubscribeService(service string, h Handler) (CancelFunc, error // when a message matches the event. An event value of '*' may be used to match // any event. The returned CancelFunc may be called to remove the listener and // cancel any future events sent to that listener. -func (ps *PubSub) SubscribeEvent(event string, h Handler) (CancelFunc, error) { +func (ps *PubSub) SubscribeEvent(event string, h wrpkit.Handler) (CancelFunc, error) { if err := validateString(event, "event"); err != nil { return nil, err } @@ -136,7 +133,7 @@ func validateString(s, typ string) error { return nil } -func (ps *PubSub) subscribe(route string, h Handler) (CancelFunc, error) { +func (ps *PubSub) subscribe(route string, h wrpkit.Handler) (CancelFunc, error) { if h == nil { return nil, fmt.Errorf("%w: handler may not be nil", ErrInvalidInput) } @@ -145,15 +142,17 @@ func (ps *PubSub) subscribe(route string, h Handler) (CancelFunc, error) { defer ps.lock.Unlock() if _, found := ps.routes[route]; !found { - ps.routes[route] = new(eventor.Eventor[Handler]) + ps.routes[route] = new(eventor.Eventor[wrpkit.Handler]) } return CancelFunc(ps.routes[route].Add(h)), nil } -// Publish publishes a wrp message to the appropriate listeners. -func (ps *PubSub) Publish(msg *wrp.Message) error { - normalized, dest, err := ps.normalize(msg) +// HandleWrp publishes a wrp message to the appropriate listeners and returns +// if there was at least one handler that accepted the message. The error +// wrpkit.ErrNotHandled is returned if no listeners were found for the message. +func (ps *PubSub) HandleWrp(msg wrp.Message) error { + normalized, dest, err := ps.normalize(&msg) if err != nil { return err } @@ -179,20 +178,60 @@ func (ps *PubSub) Publish(msg *wrp.Message) error { ps.lock.RLock() defer ps.lock.RUnlock() + wg := sync.WaitGroup{} + stop := make(chan struct{}) + handled := make(chan struct{}, 1) + ctx, cancel := context.WithTimeout(context.Background(), ps.publishTimeout) + defer cancel() + for _, route := range routes { if _, found := ps.routes[route]; found { - ps.routes[route].Visit(func(h Handler) { + ps.routes[route].Visit(func(h wrpkit.Handler) { // By making this a go routine, we can avoid deadlocks if the handler // tries to subscribe to the same service. It also avoids blocking the // caller if the handler takes a long time to process the message. if h != nil { - go h.HandleWrp(*normalized) + wg.Add(1) + go func() { + defer wg.Done() + + err := h.HandleWrp(*normalized) + if errors.Is(err, wrpkit.ErrNotHandled) { + return + } + + // Signal that the message was handled, or stop + // trying to send the message if the stop channel + // is closed. + select { + case handled <- struct{}{}: + case <-stop: + } + }() } }) } } - return nil + // Make waiting operate on a channel so that it can be interrupted if the + // message is handled, or a timeout is reached. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-handled: // No more responses are needed. + err = nil + case <-done: // All handlers have finished. + err = wrpkit.ErrNotHandled + case <-ctx.Done(): // The timeout has been reached. + err = ErrTimeout + } + close(stop) + + return err } func (ps *PubSub) normalize(msg *wrp.Message) (*wrp.Message, wrp.Locator, error) { diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go index 626807e..8fcf4d1 100644 --- a/internal/pubsub/pubsub_test.go +++ b/internal/pubsub/pubsub_test.go @@ -5,14 +5,18 @@ package pubsub import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/xmidt-org/wrp-go/v3" + "github.com/xmidt-org/xmidt-agent/internal/wrpkit" ) func TestNew(t *testing.T) { - fn := HandlerFunc(func(wrp.Message) {}) + fn := wrpkit.HandlerFunc(func(wrp.Message) error { + return nil + }) tests := []struct { description string @@ -115,6 +119,11 @@ func TestNew(t *testing.T) { self: "mac:112233445566", opts: []Option{WithEventHandler("", fn)}, expectedErr: ErrInvalidInput, + }, { + description: "Invalid timeout", + self: "mac:112233445566", + opts: []Option{WithPublishTimeout(-1 * time.Second)}, + expectedErr: ErrInvalidInput, }, } for _, tc := range tests { diff --git a/internal/wrpkit/handler.go b/internal/wrpkit/handler.go new file mode 100644 index 0000000..0663c5c --- /dev/null +++ b/internal/wrpkit/handler.go @@ -0,0 +1,35 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package wrpkit + +import ( + "errors" + + "github.com/xmidt-org/wrp-go/v3" +) + +var ( + ErrNotHandled = errors.New("message not handled") +) + +// Handler interface is used to handle wrp messages in the system in a +// consistent way. +type Handler interface { + // HandleWrp is called whenever a message is received that matches the + // criteria associated with the handler. + // + // Unless the error of ErrNotHandled is returned, the handler is + // considered to have consumed the message. It is up to the handler to + // perform any responses or further actions. + HandleWrp(wrp.Message) error +} + +// HandlerFunc is an adapter to allow the use of ordinary functions as handlers. +type HandlerFunc func(wrp.Message) error + +func (f HandlerFunc) HandleWrp(msg wrp.Message) error { + return f(msg) +} + +var _ Handler = HandlerFunc(nil) diff --git a/internal/wrpkit/handler_test.go b/internal/wrpkit/handler_test.go new file mode 100644 index 0000000..e578c84 --- /dev/null +++ b/internal/wrpkit/handler_test.go @@ -0,0 +1,25 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package wrpkit + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/xmidt-org/wrp-go/v3" +) + +func TestHandlerFunc_HandleWrp(t *testing.T) { + assert := assert.New(t) + + h := HandlerFunc(func(wrp.Message) error { + return nil + }) + + msg := wrp.Message{} + + err := h.HandleWrp(msg) + + assert.NoError(err) +}