Skip to content

Commit

Permalink
feat(SPV-848): webhook handles event dispatching on itself
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Jun 28, 2024
1 parent 37b0517 commit 3538458
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 117 deletions.
20 changes: 3 additions & 17 deletions examples/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,28 @@ func main() {
defer examples.HandlePanic()

client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA")
wh := walletclient.NewWebhook(client, "http://localhost:5005/notification", "Authorization", "this-is-the-token")
wh := walletclient.NewWebhook(context.Background(), client, "http://localhost:5005/notification", "Authorization", "this-is-the-token", 3)
err := wh.Subscribe(context.Background())
if err != nil {
panic(err)
}

http.Handle("/notification", wh.HTTPHandler())

d := walletclient.NewNotificationsDispatcher(context.Background(), wh.Channel)

if err := walletclient.RegisterHandler(d, func(gpe *walletclient.NumericEvent) {
if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.NumericEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric)
}); err != nil {
panic(err)
}

if err := walletclient.RegisterHandler(d, func(gpe *walletclient.StringEvent) {
if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.StringEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-string: %s\n", gpe.Value)
}); err != nil {
panic(err)
}

// go func() {
// for {
// select {
// case event := <-wh.Channel:
// time.Sleep(50 * time.Millisecond) // simulate processing time
// fmt.Println("Processing event:", event)
// case <-context.Background().Done():
// return
// }
// }
// }()

go func() {
_ = http.ListenAndServe(":5005", nil)
}()
Expand Down
93 changes: 0 additions & 93 deletions notifications_dispatcher.go

This file was deleted.

114 changes: 107 additions & 7 deletions webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"sync"
"time"
)

Expand All @@ -16,19 +18,27 @@ type Webhook struct {
URL string
TokenHeader string
TokenValue string
Channel chan *RawEvent
buffer chan *RawEvent

client *WalletClient
client *WalletClient
rootCtx context.Context
handlers *eventsMap
}

func NewWebhook(client *WalletClient, url, tokenHeader, tokenValue string) *Webhook {
return &Webhook{
func NewWebhook(ctx context.Context, client *WalletClient, url, tokenHeader, tokenValue string, processors int) *Webhook {
wh := &Webhook{
URL: url,
TokenHeader: tokenHeader,
TokenValue: tokenValue,
Channel: make(chan *RawEvent, eventBufferLength),
buffer: make(chan *RawEvent, eventBufferLength),
client: client,
rootCtx: ctx,
handlers: newEventsMap(),
}
for i := 0; i < processors; i++ {
go wh.process()
}
return wh
}

func (w *Webhook) Subscribe(ctx context.Context) ResponseError {
Expand All @@ -53,10 +63,13 @@ func (w *Webhook) HTTPHandler() http.Handler {
fmt.Printf("Received: %v\n", events)
for _, event := range events {
select {
case w.Channel <- event:
case w.buffer <- event:
// event sent
case <-r.Context().Done():
// context cancelled
// request context cancelled
return
case <-w.rootCtx.Done():
// root context cancelled - the whole event processing has been stopped
return
case <-time.After(1 * time.Second):
// timeout, most probably the channel is full
Expand All @@ -66,3 +79,90 @@ func (w *Webhook) HTTPHandler() http.Handler {
rw.WriteHeader(http.StatusOK)
})
}

func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error {
handlerValue := reflect.ValueOf(handlerFunction)
if handlerValue.Kind() != reflect.Func {
return fmt.Errorf("Not a function")
}

modelType := handlerValue.Type().In(0)
if modelType.Kind() == reflect.Ptr {
modelType = modelType.Elem()
}
name := modelType.Name()

nd.handlers.store(name, &eventHandler{
Caller: handlerValue,
ModelType: modelType,
})

return nil
}

type eventHandler struct {
Caller reflect.Value
ModelType reflect.Type
}

type eventsMap struct {
registered *sync.Map
}

func newEventsMap() *eventsMap {
return &eventsMap{
registered: &sync.Map{},
}
}

func (em *eventsMap) store(name string, handler *eventHandler) {
em.registered.Store(name, handler)
}

func (em *eventsMap) load(name string) (*eventHandler, bool) {
h, ok := em.registered.Load(name)
if !ok {
return nil, false
}
return h.(*eventHandler), true
}

func (nd *Webhook) process() {
for {
select {
case event := <-nd.buffer:
handler, ok := nd.handlers.load(event.Type)
if !ok {
fmt.Printf("No handlers for %s event type", event.Type)
continue
}
model := reflect.New(handler.ModelType).Interface()
if err := json.Unmarshal(event.Content, model); err != nil {
fmt.Println("Cannot unmarshall the content json")
continue
}
handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)})
case <-nd.rootCtx.Done():
return
}
}
}

////////////////////// BELOW it should be imported from spv-wallet models

type RawEvent struct {
Type string `json:"type"`
Content json.RawMessage `json:"content"`
}

type StringEvent struct {
Value string
}

type NumericEvent struct {
Numeric int
}

type Events interface {
StringEvent | NumericEvent
}

0 comments on commit 3538458

Please sign in to comment.