Skip to content

Commit

Permalink
Merge pull request #44 from xmidt-org/pubsub
Browse files Browse the repository at this point in the history
feat: Add a simple wrp focused pubsub package.
  • Loading branch information
schmidtw authored Mar 18, 2024
2 parents 4884c2c + a8c7451 commit dc7f415
Show file tree
Hide file tree
Showing 6 changed files with 682 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/xmidt-org/eventor v0.0.0-20230910205925-8ff168bd12ed
github.com/xmidt-org/retry v0.0.3
github.com/xmidt-org/sallust v0.2.2
github.com/xmidt-org/wrp-go/v3 v3.5.0
github.com/xmidt-org/wrp-go/v3 v3.5.1
go.uber.org/fx v1.21.0
go.uber.org/zap v1.27.0
gopkg.in/dealancer/validate.v2 v2.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/xmidt-org/retry v0.0.3 h1:wvmBnEEn1OKwSZaQtr1RZ2Vey8JIvP72mGTgR+3wPiM
github.com/xmidt-org/retry v0.0.3/go.mod h1:I7FO3VVrxPckNuotwGYZIxfBnmjMSyOTitTKNL0VkIA=
github.com/xmidt-org/sallust v0.2.2 h1:MrINLEr7cMj6ENx/O76fvpfd5LNGYnk7OipZAGXPYA0=
github.com/xmidt-org/sallust v0.2.2/go.mod h1:ytBoypcPw10OmjM6b92Jx3eoqWX4J5zVXOQozGwz4qs=
github.com/xmidt-org/wrp-go/v3 v3.5.0 h1:2HzhdDticGvMVC8z+JWHBsp2si/aE/WQeZutY+erUr0=
github.com/xmidt-org/wrp-go/v3 v3.5.0/go.mod h1:j1kLLoPJmKkMFz/vlwP238WBoFhJgbPyJDN9W2V1TxY=
github.com/xmidt-org/wrp-go/v3 v3.5.1 h1:AYjdNDlck3M1rdsZugMxn8TcltySmT25HIuoqU105Fw=
github.com/xmidt-org/wrp-go/v3 v3.5.1/go.mod h1:zruorkuO3UJQKD8okAFOr8PYIlrPBtRGBVQxsoKX8L8=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc=
go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
Expand Down
229 changes: 229 additions & 0 deletions internal/pubsub/end2end_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0

package pubsub_test

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/pubsub"
)

type msgWithExpectations struct {
msg wrp.Message
expectErr error
}

var messages = []msgWithExpectations{
{
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Source: "self:/service/ignored",
Destination: "event:event_1/ignored",
},
}, {
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Source: "self:/service/ignored",
Destination: "event:event_2/ignored",
},
}, {
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
Source: "dns:tr1d1um.example.com/service/ignored",
Destination: "mac:112233445566/service/ignored",
},
}, {
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
Source: "mac:112233445566/service/ignored",
Destination: "dns:tr1d1um.example.com/service/ignored",
},
}, {
// invalid message - no src
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
Destination: "dns:tr1d1um.example.com/service/ignored",
},
expectErr: wrp.ErrorInvalidLocator,
}, {
// invalid message - no dest
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
Source: "mac:112233445566/service/ignored",
},
expectErr: wrp.ErrorInvalidLocator,
}, {
// invalid message - invalid msg type (empty)
msg: wrp.Message{
Source: "mac:112233445566/service/ignored",
Destination: "dns:tr1d1um.example.com/service/ignored",
},
expectErr: wrp.ErrInvalidMessageType,
}, {
// invalid message - a string field is not valid UTF-8
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
Source: "self:/service/ignored",
Destination: "dns:tr1d1um.example.com/service/ignored",
PartnerIDs: []string{string([]byte{0xbf})},
ContentType: string([]byte{0xbf}),
},
expectErr: wrp.ErrNotUTF8,
},
}

type mockHandler struct {
lock sync.Mutex
wg *sync.WaitGroup
name string
calls int
dests []wrp.Locator
expect []wrp.Locator
}

func (h *mockHandler) WG(wg *sync.WaitGroup) {
h.wg = wg
h.wg.Add(len(h.expect))
//fmt.Printf("%s adding %d to the wait group\n", h.name, len(h.expect))
}

func (h *mockHandler) HandleWrp(msg wrp.Message) {
h.lock.Lock()
defer h.lock.Unlock()

h.calls++
dest, _ := wrp.ParseLocator(msg.Destination)
h.dests = append(h.dests, dest)
h.wg.Done()
//fmt.Printf("%s done\n", h.name)
}

func (h *mockHandler) assert(a *assert.Assertions) {
h.lock.Lock()
defer h.lock.Unlock()

if !a.Equal(len(h.expect), h.calls, "handler %s calls mismatch", h.name) {
return
}

for _, expected := range h.expect {
var found bool
for j, d := range h.dests {
if expected.Scheme == d.Scheme &&
expected.Authority == d.Authority &&
expected.Service == d.Service &&
expected.Ignored == d.Ignored {
found = true
h.dests[j].Scheme = ""
break
}
}
if !found {
a.Fail("dest not found", "handler: %s expected: %s", h.name, expected.String())
}
}
}

func TestEndToEnd(t *testing.T) {
id := wrp.DeviceID("mac:112233445566")

var wg sync.WaitGroup

allEventListener := &mockHandler{
name: "allEventListener",
expect: []wrp.Locator{
{Scheme: "event", Authority: "event_1", Ignored: "/ignored"},
{Scheme: "event", Authority: "event_2", Ignored: "/ignored"},
},
}
allEventListener.WG(&wg)

singleEventListener := &mockHandler{
name: "singleEventListener",
expect: []wrp.Locator{
{Scheme: "event", Authority: "event_2", Ignored: "/ignored"},
},
}
singleEventListener.WG(&wg)

singleServiceListener := &mockHandler{
name: "singleServiceListener",
expect: []wrp.Locator{
{Scheme: "mac", Authority: "112233445566", Service: "service", Ignored: "/ignored"},
},
}
singleServiceListener.WG(&wg)

egressListener := &mockHandler{
name: "egressListener",
expect: []wrp.Locator{
{Scheme: "event", Authority: "event_1", Ignored: "/ignored"},
{Scheme: "event", Authority: "event_2", Ignored: "/ignored"},
{Scheme: "dns", Authority: "tr1d1um.example.com", Service: "service", Ignored: "/ignored"},
},
}
egressListener.WG(&wg)

assert := assert.New(t)
require := require.New(t)

transIdValidator := pubsub.HandlerFunc(
func(msg wrp.Message) {
if msg.TransactionUUID == "" {
assert.Fail("transaction UUID is empty")
}
})

noTransIdValidator := pubsub.HandlerFunc(
func(msg wrp.Message) {
if msg.TransactionUUID != "" {
assert.Fail("transaction UUID is not empty")
}
})

var allCancel, singleCancel, serviceCancel, egressCancel pubsub.CancelFunc
ps, err := pubsub.New(id,
pubsub.WithEgressHandler(egressListener, &egressCancel),
pubsub.WithEventHandler("*", allEventListener, &allCancel),
pubsub.WithEventHandler("event_2", singleEventListener, &singleCancel),
pubsub.WithServiceHandler("service", singleServiceListener, &serviceCancel),

pubsub.WithEgressHandler(transIdValidator),
pubsub.WithServiceHandler("*", noTransIdValidator),
pubsub.Normify(
wrp.ValidateMessageType(),
wrp.EnsureTransactionUUID(),
wrp.ValidateOnlyUTF8Strings(),
),
)

require.NoError(err)
require.NotNil(ps)
require.NotNil(allCancel)
require.NotNil(singleCancel)
require.NotNil(serviceCancel)
require.NotNil(egressCancel)

for _, m := range messages {
msg := m.msg
err := ps.Publish(&msg)

if m.expectErr != nil {
assert.ErrorIs(err, m.expectErr)
} else {
assert.NoError(err)
}
}

wg.Wait()

allEventListener.assert(assert)
singleEventListener.assert(assert)
singleServiceListener.assert(assert)
egressListener.assert(assert)
}
79 changes: 79 additions & 0 deletions internal/pubsub/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0

package pubsub

import (
"github.com/xmidt-org/wrp-go/v3"
)

type optionFunc func(*PubSub) error

var _ Option = optionFunc(nil)

func (f optionFunc) apply(ps *PubSub) error {
return f(ps)
}

// Normify is an option that sets the desired normalization options used to
// normalize/validate the wrp message.
//
// As an example, if you want all messages to contain metadata about something
// like the network interface used, this is the place to define it.
func Normify(opts ...wrp.NormifierOption) Option {
return optionFunc(func(ps *PubSub) error {
ps.desiredOpts = append(ps.desiredOpts, opts...)
return nil
})
}

// WithEgressHandler is an option that adds a handler for egress messages.
// If the optional cancel parameter is provided, it will be set to a function
// that can be used to cancel the subscription.
func WithEgressHandler(handler Handler, cancel ...*CancelFunc) Option {
return optionFunc(func(ps *PubSub) error {
c, err := ps.SubscribeEgress(handler)
if err != nil {
return err
}
if len(cancel) > 0 && cancel[0] != nil {
*cancel[0] = c
}

return nil
})
}

// WithServiceHandler is an option that adds a handler for service messages.
// If the optional cancel parameter is provided, it will be set to a function
// that can be used to cancel the subscription.
func WithServiceHandler(service string, handler Handler, cancel ...*CancelFunc) Option {
return optionFunc(func(ps *PubSub) error {
c, err := ps.SubscribeService(service, handler)
if err != nil {
return err
}
if len(cancel) > 0 && cancel[0] != nil {
*cancel[0] = c
}

return nil
})
}

// WithEventHandler is an option that adds a handler for event messages.
// If the optional cancel parameter is provided, it will be set to a function
// that can be used to cancel the subscription.
func WithEventHandler(event string, handler Handler, cancel ...*CancelFunc) Option {
return optionFunc(func(ps *PubSub) error {
c, err := ps.SubscribeEvent(event, handler)
if err != nil {
return err
}
if len(cancel) > 0 && cancel[0] != nil {
*cancel[0] = c
}

return nil
})
}
Loading

0 comments on commit dc7f415

Please sign in to comment.