From 10760ad40bcbe4c9d0b02169b167b002a4aa0a03 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 28 Jun 2024 14:27:15 +0200 Subject: [PATCH] feat(SPV-848): webhook features to a separate package --- examples/webhooks/webhooks.go | 16 +++- http.go | 6 +- notifications/eventsMap.go | 25 +++++ notifications/interface.go | 8 ++ notifications/options.go | 56 ++++++++++++ notifications/registerer.go | 31 +++++++ notifications/webhook.go | 108 ++++++++++++++++++++++ webhook.go | 168 ---------------------------------- 8 files changed, 243 insertions(+), 175 deletions(-) create mode 100644 notifications/eventsMap.go create mode 100644 notifications/interface.go create mode 100644 notifications/options.go create mode 100644 notifications/registerer.go create mode 100644 notifications/webhook.go delete mode 100644 webhook.go diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index f10f6a2..aee9dab 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -11,13 +11,21 @@ import ( walletclient "github.com/bitcoin-sv/spv-wallet-go-client" "github.com/bitcoin-sv/spv-wallet-go-client/examples" + "github.com/bitcoin-sv/spv-wallet-go-client/notifications" ) func main() { defer examples.HandlePanic() - client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA") - wh := walletclient.NewWebhook(context.Background(), client, "http://localhost:5005/notification", "Authorization", "this-is-the-token", 3) + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) + //"Authorization", "this-is-the-token", 3 + wh := notifications.NewWebhook( + context.Background(), + client, + "http://localhost:5005/notification", + notifications.WithToken("Authorization", "this-is-the-token"), + notifications.WithProcessors(3), + ) err := wh.Subscribe(context.Background()) if err != nil { panic(err) @@ -25,14 +33,14 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.NumericEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *notifications.NumericEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric) }); err != nil { panic(err) } - if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.StringEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *notifications.StringEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-string: %s\n", gpe.Value) }); err != nil { diff --git a/http.go b/http.go index fd833a3..6299512 100644 --- a/http.go +++ b/http.go @@ -1129,7 +1129,7 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci return wc.RecordTransaction(ctx, hex, draft.ID, metadata) } -func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) ResponseError { +func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error { requestModel := struct { URL string `json:"url"` TokenHeader string `json:"tokenHeader"` @@ -1147,7 +1147,7 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t return WrapError(err) } -func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) ResponseError { +func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error { requestModel := struct { URL string `json:"url"` }{ @@ -1158,5 +1158,5 @@ func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL return WrapError(nil) } err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/unsubscribe", rawJSON, wc.adminXPriv, true, nil) - return WrapError(err) + return err } diff --git a/notifications/eventsMap.go b/notifications/eventsMap.go new file mode 100644 index 0000000..00e0b52 --- /dev/null +++ b/notifications/eventsMap.go @@ -0,0 +1,25 @@ +package notifications + +import "sync" + +type eventsMap struct { + registered *sync.Map +} + +func newEventsMap() *eventsMap { + return &eventsMap{ + registered: &sync.Map{}, + } +} + +func (em *eventsMap) store(name string, handler *eventHandler) { + em.registered.Store(name, handler) +} + +func (em *eventsMap) load(name string) (*eventHandler, bool) { + h, ok := em.registered.Load(name) + if !ok { + return nil, false + } + return h.(*eventHandler), true +} diff --git a/notifications/interface.go b/notifications/interface.go new file mode 100644 index 0000000..0741568 --- /dev/null +++ b/notifications/interface.go @@ -0,0 +1,8 @@ +package notifications + +import "context" + +type WebhookSubscriber interface { + AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error + AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error +} diff --git a/notifications/options.go b/notifications/options.go new file mode 100644 index 0000000..9f09f27 --- /dev/null +++ b/notifications/options.go @@ -0,0 +1,56 @@ +package notifications + +import "context" + +type WebhookOptions struct { + TokenHeader string + TokenValue string + BufferSize int + RootContext context.Context + Processors int +} + +func NewWebhookOptions() *WebhookOptions { + return &WebhookOptions{ + TokenHeader: "", + TokenValue: "", + BufferSize: 100, + Processors: 1, + RootContext: context.Background(), + } +} + +type WebhookOpts = func(*WebhookOptions) + +func WithToken(tokenHeader, tokenValue string) WebhookOpts { + return func(w *WebhookOptions) { + w.TokenHeader = tokenHeader + w.TokenValue = tokenValue + } +} + +func WithBufferSize(size int) WebhookOpts { + return func(w *WebhookOptions) { + w.BufferSize = size + } +} + +func WithRootContext(ctx context.Context) WebhookOpts { + return func(w *WebhookOptions) { + w.RootContext = ctx + } +} + +func WithProcessors(count int) WebhookOpts { + return func(w *WebhookOptions) { + w.Processors = count + } +} + +type Webhook struct { + URL string + options *WebhookOptions + buffer chan *RawEvent + subscriber WebhookSubscriber + handlers *eventsMap +} diff --git a/notifications/registerer.go b/notifications/registerer.go new file mode 100644 index 0000000..2c40c44 --- /dev/null +++ b/notifications/registerer.go @@ -0,0 +1,31 @@ +package notifications + +import ( + "fmt" + "reflect" +) + +type eventHandler struct { + Caller reflect.Value + ModelType reflect.Type +} + +func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error { + handlerValue := reflect.ValueOf(handlerFunction) + if handlerValue.Kind() != reflect.Func { + return fmt.Errorf("Not a function") + } + + modelType := handlerValue.Type().In(0) + if modelType.Kind() == reflect.Ptr { + modelType = modelType.Elem() + } + name := modelType.Name() + + nd.handlers.store(name, &eventHandler{ + Caller: handlerValue, + ModelType: modelType, + }) + + return nil +} diff --git a/notifications/webhook.go b/notifications/webhook.go new file mode 100644 index 0000000..85cc38b --- /dev/null +++ b/notifications/webhook.go @@ -0,0 +1,108 @@ +package notifications + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + "time" +) + +func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { + options := NewWebhookOptions() + for _, opt := range opts { + opt(options) + } + + wh := &Webhook{ + URL: url, + options: options, + buffer: make(chan *RawEvent, options.BufferSize), + subscriber: subscriber, + handlers: newEventsMap(), + } + for i := 0; i < options.Processors; i++ { + go wh.process() + } + return wh +} + +func (w *Webhook) Subscribe(ctx context.Context) error { + return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue) +} + +func (w *Webhook) Unsubscribe(ctx context.Context) error { + return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL) +} + +func (w *Webhook) HTTPHandler() http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue { + http.Error(rw, "Unauthorized", http.StatusUnauthorized) + return + } + var events []*RawEvent + if err := json.NewDecoder(r.Body).Decode(&events); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + fmt.Printf("Received: %v\n", events) + for _, event := range events { + select { + case w.buffer <- event: + // event sent + case <-r.Context().Done(): + // request context cancelled + return + case <-w.options.RootContext.Done(): + // root context cancelled - the whole event processing has been stopped + return + case <-time.After(1 * time.Second): + // timeout, most probably the channel is full + // TODO: log this + } + } + rw.WriteHeader(http.StatusOK) + }) +} + +func (nd *Webhook) process() { + for { + select { + case event := <-nd.buffer: + handler, ok := nd.handlers.load(event.Type) + if !ok { + fmt.Printf("No handlers for %s event type", event.Type) + continue + } + model := reflect.New(handler.ModelType).Interface() + if err := json.Unmarshal(event.Content, model); err != nil { + fmt.Println("Cannot unmarshall the content json") + continue + } + handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) + case <-nd.options.RootContext.Done(): + return + } + } +} + +////////////////////// BELOW it should be imported from spv-wallet models + +type RawEvent struct { + Type string `json:"type"` + Content json.RawMessage `json:"content"` +} + +type StringEvent struct { + Value string +} + +type NumericEvent struct { + Numeric int +} + +type Events interface { + StringEvent | NumericEvent +} diff --git a/webhook.go b/webhook.go deleted file mode 100644 index 0ba7fda..0000000 --- a/webhook.go +++ /dev/null @@ -1,168 +0,0 @@ -package walletclient - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "reflect" - "sync" - "time" -) - -const ( - eventBufferLength = 100 -) - -type Webhook struct { - URL string - TokenHeader string - TokenValue string - buffer chan *RawEvent - - client *WalletClient - rootCtx context.Context - handlers *eventsMap -} - -func NewWebhook(ctx context.Context, client *WalletClient, url, tokenHeader, tokenValue string, processors int) *Webhook { - wh := &Webhook{ - URL: url, - TokenHeader: tokenHeader, - TokenValue: tokenValue, - buffer: make(chan *RawEvent, eventBufferLength), - client: client, - rootCtx: ctx, - handlers: newEventsMap(), - } - for i := 0; i < processors; i++ { - go wh.process() - } - return wh -} - -func (w *Webhook) Subscribe(ctx context.Context) ResponseError { - return w.client.AdminSubscribeWebhook(ctx, w.URL, w.TokenHeader, w.TokenValue) -} - -func (w *Webhook) Unsubscribe(ctx context.Context) ResponseError { - return w.client.AdminUnsubscribeWebhook(ctx, w.URL) -} - -func (w *Webhook) HTTPHandler() http.Handler { - return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - if w.TokenHeader != "" && r.Header.Get(w.TokenHeader) != w.TokenValue { - http.Error(rw, "Unauthorized", http.StatusUnauthorized) - return - } - var events []*RawEvent - if err := json.NewDecoder(r.Body).Decode(&events); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - fmt.Printf("Received: %v\n", events) - for _, event := range events { - select { - case w.buffer <- event: - // event sent - case <-r.Context().Done(): - // request context cancelled - return - case <-w.rootCtx.Done(): - // root context cancelled - the whole event processing has been stopped - return - case <-time.After(1 * time.Second): - // timeout, most probably the channel is full - // TODO: log this - } - } - rw.WriteHeader(http.StatusOK) - }) -} - -func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error { - handlerValue := reflect.ValueOf(handlerFunction) - if handlerValue.Kind() != reflect.Func { - return fmt.Errorf("Not a function") - } - - modelType := handlerValue.Type().In(0) - if modelType.Kind() == reflect.Ptr { - modelType = modelType.Elem() - } - name := modelType.Name() - - nd.handlers.store(name, &eventHandler{ - Caller: handlerValue, - ModelType: modelType, - }) - - return nil -} - -type eventHandler struct { - Caller reflect.Value - ModelType reflect.Type -} - -type eventsMap struct { - registered *sync.Map -} - -func newEventsMap() *eventsMap { - return &eventsMap{ - registered: &sync.Map{}, - } -} - -func (em *eventsMap) store(name string, handler *eventHandler) { - em.registered.Store(name, handler) -} - -func (em *eventsMap) load(name string) (*eventHandler, bool) { - h, ok := em.registered.Load(name) - if !ok { - return nil, false - } - return h.(*eventHandler), true -} - -func (nd *Webhook) process() { - for { - select { - case event := <-nd.buffer: - handler, ok := nd.handlers.load(event.Type) - if !ok { - fmt.Printf("No handlers for %s event type", event.Type) - continue - } - model := reflect.New(handler.ModelType).Interface() - if err := json.Unmarshal(event.Content, model); err != nil { - fmt.Println("Cannot unmarshall the content json") - continue - } - handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) - case <-nd.rootCtx.Done(): - return - } - } -} - -////////////////////// BELOW it should be imported from spv-wallet models - -type RawEvent struct { - Type string `json:"type"` - Content json.RawMessage `json:"content"` -} - -type StringEvent struct { - Value string -} - -type NumericEvent struct { - Numeric int -} - -type Events interface { - StringEvent | NumericEvent -}