Skip to content

Commit

Permalink
feat(SPV-848): webhook features to a separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Jun 28, 2024
1 parent 3538458 commit 10760ad
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 175 deletions.
16 changes: 12 additions & 4 deletions examples/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,36 @@ import (

walletclient "github.com/bitcoin-sv/spv-wallet-go-client"
"github.com/bitcoin-sv/spv-wallet-go-client/examples"
"github.com/bitcoin-sv/spv-wallet-go-client/notifications"
)

func main() {
defer examples.HandlePanic()

client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA")
wh := walletclient.NewWebhook(context.Background(), client, "http://localhost:5005/notification", "Authorization", "this-is-the-token", 3)
client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey)
//"Authorization", "this-is-the-token", 3
wh := notifications.NewWebhook(
context.Background(),
client,
"http://localhost:5005/notification",
notifications.WithToken("Authorization", "this-is-the-token"),
notifications.WithProcessors(3),
)
err := wh.Subscribe(context.Background())
if err != nil {
panic(err)
}

http.Handle("/notification", wh.HTTPHandler())

if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.NumericEvent) {
if err = notifications.RegisterHandler(wh, func(gpe *notifications.NumericEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric)
}); err != nil {
panic(err)
}

if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.StringEvent) {
if err = notifications.RegisterHandler(wh, func(gpe *notifications.StringEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-string: %s\n", gpe.Value)
}); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci
return wc.RecordTransaction(ctx, hex, draft.ID, metadata)
}

func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) ResponseError {
func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error {
requestModel := struct {
URL string `json:"url"`
TokenHeader string `json:"tokenHeader"`
Expand All @@ -1147,7 +1147,7 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t
return WrapError(err)
}

func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) ResponseError {
func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error {
requestModel := struct {
URL string `json:"url"`
}{
Expand All @@ -1158,5 +1158,5 @@ func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL
return WrapError(nil)
}
err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/unsubscribe", rawJSON, wc.adminXPriv, true, nil)
return WrapError(err)
return err
}
25 changes: 25 additions & 0 deletions notifications/eventsMap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package notifications

import "sync"

type eventsMap struct {
registered *sync.Map
}

func newEventsMap() *eventsMap {
return &eventsMap{
registered: &sync.Map{},
}
}

func (em *eventsMap) store(name string, handler *eventHandler) {
em.registered.Store(name, handler)
}

func (em *eventsMap) load(name string) (*eventHandler, bool) {
h, ok := em.registered.Load(name)
if !ok {
return nil, false
}
return h.(*eventHandler), true
}
8 changes: 8 additions & 0 deletions notifications/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package notifications

import "context"

type WebhookSubscriber interface {
AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error
AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error
}
56 changes: 56 additions & 0 deletions notifications/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package notifications

import "context"

type WebhookOptions struct {
TokenHeader string
TokenValue string
BufferSize int
RootContext context.Context
Processors int
}

func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
TokenHeader: "",
TokenValue: "",
BufferSize: 100,
Processors: 1,
RootContext: context.Background(),
}
}

type WebhookOpts = func(*WebhookOptions)

func WithToken(tokenHeader, tokenValue string) WebhookOpts {
return func(w *WebhookOptions) {
w.TokenHeader = tokenHeader
w.TokenValue = tokenValue
}
}

func WithBufferSize(size int) WebhookOpts {
return func(w *WebhookOptions) {
w.BufferSize = size
}
}

func WithRootContext(ctx context.Context) WebhookOpts {
return func(w *WebhookOptions) {
w.RootContext = ctx
}
}

func WithProcessors(count int) WebhookOpts {
return func(w *WebhookOptions) {
w.Processors = count
}
}

type Webhook struct {
URL string
options *WebhookOptions
buffer chan *RawEvent
subscriber WebhookSubscriber
handlers *eventsMap
}
31 changes: 31 additions & 0 deletions notifications/registerer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package notifications

import (
"fmt"
"reflect"
)

type eventHandler struct {
Caller reflect.Value
ModelType reflect.Type
}

func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error {
handlerValue := reflect.ValueOf(handlerFunction)
if handlerValue.Kind() != reflect.Func {
return fmt.Errorf("Not a function")
}

modelType := handlerValue.Type().In(0)
if modelType.Kind() == reflect.Ptr {
modelType = modelType.Elem()
}
name := modelType.Name()

nd.handlers.store(name, &eventHandler{
Caller: handlerValue,
ModelType: modelType,
})

return nil
}
108 changes: 108 additions & 0 deletions notifications/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package notifications

import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"time"
)

func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook {
options := NewWebhookOptions()
for _, opt := range opts {
opt(options)
}

wh := &Webhook{
URL: url,
options: options,
buffer: make(chan *RawEvent, options.BufferSize),
subscriber: subscriber,
handlers: newEventsMap(),
}
for i := 0; i < options.Processors; i++ {
go wh.process()
}
return wh
}

func (w *Webhook) Subscribe(ctx context.Context) error {
return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue)
}

func (w *Webhook) Unsubscribe(ctx context.Context) error {
return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL)
}

func (w *Webhook) HTTPHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue {
http.Error(rw, "Unauthorized", http.StatusUnauthorized)
return
}
var events []*RawEvent
if err := json.NewDecoder(r.Body).Decode(&events); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
fmt.Printf("Received: %v\n", events)
for _, event := range events {
select {
case w.buffer <- event:
// event sent
case <-r.Context().Done():
// request context cancelled
return
case <-w.options.RootContext.Done():
// root context cancelled - the whole event processing has been stopped
return
case <-time.After(1 * time.Second):
// timeout, most probably the channel is full
// TODO: log this
}
}
rw.WriteHeader(http.StatusOK)
})
}

func (nd *Webhook) process() {
for {
select {
case event := <-nd.buffer:
handler, ok := nd.handlers.load(event.Type)
if !ok {
fmt.Printf("No handlers for %s event type", event.Type)
continue
}
model := reflect.New(handler.ModelType).Interface()
if err := json.Unmarshal(event.Content, model); err != nil {
fmt.Println("Cannot unmarshall the content json")
continue
}
handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)})
case <-nd.options.RootContext.Done():
return
}
}
}

////////////////////// BELOW it should be imported from spv-wallet models

type RawEvent struct {
Type string `json:"type"`
Content json.RawMessage `json:"content"`
}

type StringEvent struct {
Value string
}

type NumericEvent struct {
Numeric int
}

type Events interface {
StringEvent | NumericEvent
}
Loading

0 comments on commit 10760ad

Please sign in to comment.