Skip to content

Commit

Permalink
feat(SPV-848): notifications dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Jun 27, 2024
1 parent 2268fe4 commit b76f919
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 49 deletions.
29 changes: 18 additions & 11 deletions examples/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@ func main() {

http.Handle("/notification", wh.HTTPHandler())

go func() {
for {
select {
case event := <-wh.Channel:
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Println("Processing event:", event)
case <-context.Background().Done():
return
}
}
}()
_ = walletclient.NewNotificationsDispatcher(context.Background(), wh.Channel, []walletclient.Handler{
{Model: &walletclient.GeneralPurposeEvent{}, HandlerFunc: func(gpe *walletclient.GeneralPurposeEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event: %s\n", gpe.Value)
}},
})

// go func() {
// for {
// select {
// case event := <-wh.Channel:
// time.Sleep(50 * time.Millisecond) // simulate processing time
// fmt.Println("Processing event:", event)
// case <-context.Background().Done():
// return
// }
// }
// }()

go func() {
_ = http.ListenAndServe(":5005", nil)
Expand Down
77 changes: 39 additions & 38 deletions notifications_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,64 @@ import (
"context"
"encoding/json"
"fmt"

"github.com/pkg/errors"
"reflect"
)

type Handlers struct {
GeneralPurposeEvent []func(*GeneralPurposeEvent)
type Handler struct {
HandlerFunc any // any as function to handle the event
Model EventContent
}

type NotificationsDispatcher struct {
ctx context.Context
input chan *RawEvent
handlers Handlers
handlers map[string][]Handler
}

func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent) *NotificationsDispatcher {
obj := &NotificationsDispatcher{
ctx: ctx,
input: inputChannel,
func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent, providedHandlers []Handler) *NotificationsDispatcher {
dispatcher := &NotificationsDispatcher{
ctx: ctx,
input: inputChannel,
handlers: make(map[string][]Handler, len(providedHandlers)),
}

for _, handler := range providedHandlers {
dispatcher.handlers[handler.Model.GetType()] = append(dispatcher.handlers[handler.Model.GetType()], handler)
}

return obj
go dispatcher.process()

return dispatcher
}

func (nd *NotificationsDispatcher) process() {
for {
select {
case event := <-nd.input:
switch event.Type {
case "general-purpose-event":
content, err := GetEventContent[GeneralPurposeEvent](event)
if err != nil {
fmt.Println("Error getting event content")
handlers, ok := nd.handlers[event.Type]
if !ok {
fmt.Printf("No handlers for %s event type", event.Type)
continue
}
for _, handler := range handlers {
modelSource := handler.Model
// copy the event to the model, use reflection
model := reflect.New(reflect.TypeOf(modelSource).Elem()).Interface()
if err := json.Unmarshal(event.Content, model); err != nil {
fmt.Println("Cannot unmarshall the content json")
continue
}
// use reflect
handlerValue := reflect.ValueOf(handler.HandlerFunc)
if handlerValue.Kind() != reflect.Func {
fmt.Println("Not a function")
continue
}
for _, handler := range nd.handlers.GeneralPurposeEvent {
handler(content)
if handlerValue.Type().NumIn() != 1 {
fmt.Println("Wrong number of arguments")
continue
}
handlerValue.Call([]reflect.Value{reflect.ValueOf(model)})
}
case <-nd.ctx.Done():
return
Expand All @@ -64,26 +85,6 @@ type GeneralPurposeEvent struct {
Value string
}

func (GeneralPurposeEvent) GetType() string {
func (*GeneralPurposeEvent) GetType() string {
return "general-purpose-event"
}

func GetEventContent[modelType EventContent](raw *RawEvent) (*modelType, error) {
model := *new(modelType)
if raw.Type != model.GetType() {
return nil, fmt.Errorf("Wrong type")
}

if err := json.Unmarshal(raw.Content, &model); err != nil {
return nil, errors.Wrap(err, "Cannot unmarshall the content json")
}
return &model, nil
}

func NewRawEvent(namedEvent EventContent) *RawEvent {
asJson, _ := json.Marshal(namedEvent)
return &RawEvent{
Type: namedEvent.GetType(),
Content: asJson,
}
}

0 comments on commit b76f919

Please sign in to comment.