Skip to content

Commit

Permalink
Merge pull request #46 from xmidt-org/add-error-to-handler
Browse files Browse the repository at this point in the history
Change the handler to support errors and make it a separate package.
  • Loading branch information
schmidtw authored Mar 26, 2024
2 parents dc7f415 + 7687336 commit b5aeaad
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 47 deletions.
70 changes: 63 additions & 7 deletions internal/pubsub/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package pubsub_test
import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/pubsub"
"github.com/xmidt-org/xmidt-agent/internal/wrpkit"
)

type msgWithExpectations struct {
Expand Down Expand Up @@ -74,6 +76,14 @@ var messages = []msgWithExpectations{
ContentType: string([]byte{0xbf}),
},
expectErr: wrp.ErrNotUTF8,
}, {
// no handlers for this message
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Source: "self:/ignore-me",
Destination: "self:/ignored-service/ignored",
},
expectErr: wrpkit.ErrNotHandled,
},
}

Expand All @@ -84,6 +94,7 @@ type mockHandler struct {
calls int
dests []wrp.Locator
expect []wrp.Locator
rv []error
}

func (h *mockHandler) WG(wg *sync.WaitGroup) {
Expand All @@ -92,15 +103,21 @@ func (h *mockHandler) WG(wg *sync.WaitGroup) {
//fmt.Printf("%s adding %d to the wait group\n", h.name, len(h.expect))
}

func (h *mockHandler) HandleWrp(msg wrp.Message) {
func (h *mockHandler) HandleWrp(msg wrp.Message) error {
h.lock.Lock()
defer h.lock.Unlock()

calls := h.calls
h.calls++

dest, _ := wrp.ParseLocator(msg.Destination)
h.dests = append(h.dests, dest)
h.wg.Done()
//fmt.Printf("%s done\n", h.name)
if calls < len(h.rv) {
return h.rv[calls]
}

return nil
}

func (h *mockHandler) assert(a *assert.Assertions) {
Expand Down Expand Up @@ -172,18 +189,27 @@ func TestEndToEnd(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

transIdValidator := pubsub.HandlerFunc(
func(msg wrp.Message) {
transIdValidator := wrpkit.HandlerFunc(
func(msg wrp.Message) error {
if msg.TransactionUUID == "" {
assert.Fail("transaction UUID is empty")
}
return wrpkit.ErrNotHandled
})

noTransIdValidator := pubsub.HandlerFunc(
func(msg wrp.Message) {
noTransIdValidator := wrpkit.HandlerFunc(
func(msg wrp.Message) error {
src, err := wrp.ParseLocator(msg.Source)
assert.NoError(err)

if src.Service == "ignore-me" {
return wrpkit.ErrNotHandled
}

if msg.TransactionUUID != "" {
assert.Fail("transaction UUID is not empty")
}
return wrpkit.ErrNotHandled
})

var allCancel, singleCancel, serviceCancel, egressCancel pubsub.CancelFunc
Expand Down Expand Up @@ -211,7 +237,7 @@ func TestEndToEnd(t *testing.T) {

for _, m := range messages {
msg := m.msg
err := ps.Publish(&msg)
err := ps.HandleWrp(msg)

if m.expectErr != nil {
assert.ErrorIs(err, m.expectErr)
Expand All @@ -227,3 +253,33 @@ func TestEndToEnd(t *testing.T) {
singleServiceListener.assert(assert)
egressListener.assert(assert)
}

func TestTimeout(t *testing.T) {
id := wrp.DeviceID("mac:112233445566")

assert := assert.New(t)
require := require.New(t)

timeoutHandler := wrpkit.HandlerFunc(
func(msg wrp.Message) error {
time.Sleep(100 * time.Millisecond)
return nil
})

ps, err := pubsub.New(id,
pubsub.WithPublishTimeout(50*time.Millisecond),
pubsub.WithEgressHandler(timeoutHandler),
)

require.NoError(err)
require.NotNil(ps)

msg := wrp.Message{
Type: wrp.SimpleEventMessageType,
Source: "self:/service/ignored",
Destination: "event:event_1/ignored",
}

err = ps.HandleWrp(msg)
assert.ErrorIs(err, pubsub.ErrTimeout)
}
22 changes: 19 additions & 3 deletions internal/pubsub/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
package pubsub

import (
"fmt"
"time"

"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/wrpkit"
)

type optionFunc func(*PubSub) error
Expand All @@ -30,7 +34,7 @@ func Normify(opts ...wrp.NormifierOption) Option {
// WithEgressHandler is an option that adds a handler for egress messages.
// If the optional cancel parameter is provided, it will be set to a function
// that can be used to cancel the subscription.
func WithEgressHandler(handler Handler, cancel ...*CancelFunc) Option {
func WithEgressHandler(handler wrpkit.Handler, cancel ...*CancelFunc) Option {
return optionFunc(func(ps *PubSub) error {
c, err := ps.SubscribeEgress(handler)
if err != nil {
Expand All @@ -47,7 +51,7 @@ func WithEgressHandler(handler Handler, cancel ...*CancelFunc) Option {
// WithServiceHandler is an option that adds a handler for service messages.
// If the optional cancel parameter is provided, it will be set to a function
// that can be used to cancel the subscription.
func WithServiceHandler(service string, handler Handler, cancel ...*CancelFunc) Option {
func WithServiceHandler(service string, handler wrpkit.Handler, cancel ...*CancelFunc) Option {
return optionFunc(func(ps *PubSub) error {
c, err := ps.SubscribeService(service, handler)
if err != nil {
Expand All @@ -64,7 +68,7 @@ func WithServiceHandler(service string, handler Handler, cancel ...*CancelFunc)
// WithEventHandler is an option that adds a handler for event messages.
// If the optional cancel parameter is provided, it will be set to a function
// that can be used to cancel the subscription.
func WithEventHandler(event string, handler Handler, cancel ...*CancelFunc) Option {
func WithEventHandler(event string, handler wrpkit.Handler, cancel ...*CancelFunc) Option {
return optionFunc(func(ps *PubSub) error {
c, err := ps.SubscribeEvent(event, handler)
if err != nil {
Expand All @@ -77,3 +81,15 @@ func WithEventHandler(event string, handler Handler, cancel ...*CancelFunc) Opti
return nil
})
}

// WithPublishTimeout is an option that sets the timeout for publishing a message.
// If the timeout is exceeded, the publish will fail.
func WithPublishTimeout(timeout time.Duration) Option {
return optionFunc(func(ps *PubSub) error {
if timeout < 0 {
return fmt.Errorf("%w: timeout must be zero or larger", ErrInvalidInput)
}
ps.publishTimeout = timeout
return nil
})
}
Loading

0 comments on commit b5aeaad

Please sign in to comment.