From 44c5921fc2ccfc26e4749a2f51acfbb12571115e Mon Sep 17 00:00:00 2001
From: chris-4chain <152964795+chris-4chain@users.noreply.github.com>
Date: Fri, 12 Jul 2024 11:12:52 +0200
Subject: [PATCH] feat(SPV-848) notifications (#246)

---
 examples/Taskfile.yml         |   5 ++
 examples/go.mod               |   2 +-
 examples/go.sum               |   4 ++
 examples/webhooks/webhooks.go |  77 ++++++++++++++++++++++++++
 go.mod                        |   2 +-
 go.sum                        |   4 ++
 http.go                       |  28 ++++++++++
 notifications/eventsMap.go    |  25 +++++++++
 notifications/interface.go    |   9 +++
 notifications/options.go      |  58 ++++++++++++++++++++
 notifications/registerer.go   |  27 +++++++++
 notifications/webhook.go      | 100 ++++++++++++++++++++++++++++++++++
 12 files changed, 339 insertions(+), 2 deletions(-)
 create mode 100644 examples/webhooks/webhooks.go
 create mode 100644 notifications/eventsMap.go
 create mode 100644 notifications/interface.go
 create mode 100644 notifications/options.go
 create mode 100644 notifications/registerer.go
 create mode 100644 notifications/webhook.go

diff --git a/examples/Taskfile.yml b/examples/Taskfile.yml
index b67c760f..f6116bd2 100644
--- a/examples/Taskfile.yml
+++ b/examples/Taskfile.yml
@@ -66,3 +66,8 @@ tasks:
     cmds:
       - echo "running generate_totp..."
       - go run ./generate_totp/generate_totp.go
+  webhooks:
+    desc: "running webhooks..."
+    cmds:
+      - echo "running webhooks..."
+      - go run ./webhooks/webhooks.go
diff --git a/examples/go.mod b/examples/go.mod
index 7b7b0bd8..a012b724 100644
--- a/examples/go.mod
+++ b/examples/go.mod
@@ -6,7 +6,7 @@ replace github.com/bitcoin-sv/spv-wallet-go-client => ../
 
 require (
 	github.com/bitcoin-sv/spv-wallet-go-client v0.0.0-00010101000000-000000000000
-	github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15
+	github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16
 )
 
 require (
diff --git a/examples/go.sum b/examples/go.sum
index 99f6fce5..da1cca28 100644
--- a/examples/go.sum
+++ b/examples/go.sum
@@ -4,6 +4,10 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c53
 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c5330a6/go.mod h1:u3gnRDS3uHWZNM2qbYATTpN+mAphyozCJrYIKGwBX7k=
 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGuzqNoQwktu1DuB6tzurdQI=
 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 h1:iHLUofGb40sQ31KpVwtdjuKVs3W/vW1w8exF8Vidvfc=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40=
 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck=
 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA=
 github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo=
diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go
new file mode 100644
index 00000000..b71e7b12
--- /dev/null
+++ b/examples/webhooks/webhooks.go
@@ -0,0 +1,77 @@
+/*
+Package main - send_op_return example
+*/
+package main
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	walletclient "github.com/bitcoin-sv/spv-wallet-go-client"
+	"github.com/bitcoin-sv/spv-wallet-go-client/examples"
+	"github.com/bitcoin-sv/spv-wallet-go-client/notifications"
+	"github.com/bitcoin-sv/spv-wallet/models"
+)
+
+func main() {
+	defer examples.HandlePanic()
+
+	examples.CheckIfAdminKeyExists()
+
+	client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey)
+	wh := notifications.NewWebhook(
+		client,
+		"http://localhost:5005/notification",
+		notifications.WithToken("Authorization", "this-is-the-token"),
+		notifications.WithProcessors(3),
+	)
+	err := wh.Subscribe(context.Background())
+	if err != nil {
+		panic(err)
+	}
+
+	http.Handle("/notification", wh.HTTPHandler())
+
+	if err = notifications.RegisterHandler(wh, func(gpe *models.StringEvent) {
+		time.Sleep(50 * time.Millisecond) // simulate processing time
+		fmt.Printf("Processing event-string: %s\n", gpe.Value)
+	}); err != nil {
+		panic(err)
+	}
+
+	if err = notifications.RegisterHandler(wh, func(gpe *models.TransactionEvent) {
+		time.Sleep(50 * time.Millisecond) // simulate processing time
+		fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s, Status: %s\n", gpe.XPubID, gpe.TransactionID, gpe.Status)
+	}); err != nil {
+		panic(err)
+	}
+
+	server := http.Server{
+		Addr:              ":5005",
+		Handler:           nil,
+		ReadHeaderTimeout: time.Second * 10,
+	}
+	go func() {
+		_ = server.ListenAndServe()
+	}()
+
+	// wait for signal to shutdown
+	sigChan := make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+	<-sigChan
+
+	fmt.Printf("Unsubscribing...\n")
+	if err = wh.Unsubscribe(context.Background()); err != nil {
+		panic(err)
+	}
+
+	fmt.Printf("Shutting down...\n")
+	if err = server.Shutdown(context.Background()); err != nil {
+		panic(err)
+	}
+}
diff --git a/go.mod b/go.mod
index 523c5d7f..b7ae30a6 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/bitcoin-sv/spv-wallet-go-client
 go 1.22.4
 
 require (
-	github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15
+	github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16
 	github.com/bitcoinschema/go-bitcoin/v2 v2.0.5
 	github.com/libsv/go-bk v0.1.6
 	github.com/libsv/go-bt/v2 v2.2.5
diff --git a/go.sum b/go.sum
index bc4ae1f6..cfe05a01 100644
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,10 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c53
 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c5330a6/go.mod h1:u3gnRDS3uHWZNM2qbYATTpN+mAphyozCJrYIKGwBX7k=
 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGuzqNoQwktu1DuB6tzurdQI=
 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 h1:iHLUofGb40sQ31KpVwtdjuKVs3W/vW1w8exF8Vidvfc=
+github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40=
 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck=
 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA=
 github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo=
diff --git a/http.go b/http.go
index 4b0b66de..8bf6144e 100644
--- a/http.go
+++ b/http.go
@@ -1132,3 +1132,31 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci
 
 	return wc.RecordTransaction(ctx, hex, draft.ID, metadata)
 }
+
+// AdminSubscribeWebhook subscribes to a webhook to receive notifications from spv-wallet
+func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error {
+	requestModel := models.SubscribeRequestBody{
+		URL:         webhookURL,
+		TokenHeader: tokenHeader,
+		TokenValue:  tokenValue,
+	}
+	rawJSON, err := json.Marshal(requestModel)
+	if err != nil {
+		return WrapError(err)
+	}
+	err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil)
+	return WrapError(err)
+}
+
+// AdminUnsubscribeWebhook unsubscribes from a webhook
+func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error {
+	requestModel := models.UnsubscribeRequestBody{
+		URL: webhookURL,
+	}
+	rawJSON, err := json.Marshal(requestModel)
+	if err != nil {
+		return WrapError(err)
+	}
+	err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil)
+	return err
+}
diff --git a/notifications/eventsMap.go b/notifications/eventsMap.go
new file mode 100644
index 00000000..00e0b523
--- /dev/null
+++ b/notifications/eventsMap.go
@@ -0,0 +1,25 @@
+package notifications
+
+import "sync"
+
+type eventsMap struct {
+	registered *sync.Map
+}
+
+func newEventsMap() *eventsMap {
+	return &eventsMap{
+		registered: &sync.Map{},
+	}
+}
+
+func (em *eventsMap) store(name string, handler *eventHandler) {
+	em.registered.Store(name, handler)
+}
+
+func (em *eventsMap) load(name string) (*eventHandler, bool) {
+	h, ok := em.registered.Load(name)
+	if !ok {
+		return nil, false
+	}
+	return h.(*eventHandler), true
+}
diff --git a/notifications/interface.go b/notifications/interface.go
new file mode 100644
index 00000000..610ea2b7
--- /dev/null
+++ b/notifications/interface.go
@@ -0,0 +1,9 @@
+package notifications
+
+import "context"
+
+// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks
+type WebhookSubscriber interface {
+	AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error
+	AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error
+}
diff --git a/notifications/options.go b/notifications/options.go
new file mode 100644
index 00000000..436a3ed7
--- /dev/null
+++ b/notifications/options.go
@@ -0,0 +1,58 @@
+package notifications
+
+import (
+	"context"
+	"runtime"
+)
+
+// WebhookOptions - options for the webhook
+type WebhookOptions struct {
+	TokenHeader string
+	TokenValue  string
+	BufferSize  int
+	RootContext context.Context
+	Processors  int
+}
+
+// NewWebhookOptions - creates a new webhook options
+func NewWebhookOptions() *WebhookOptions {
+	return &WebhookOptions{
+		TokenHeader: "",
+		TokenValue:  "",
+		BufferSize:  100,
+		Processors:  runtime.NumCPU(),
+		RootContext: context.Background(),
+	}
+}
+
+// WebhookOpts - functional options for the webhook
+type WebhookOpts = func(*WebhookOptions)
+
+// WithToken - sets the token header and value
+func WithToken(tokenHeader, tokenValue string) WebhookOpts {
+	return func(w *WebhookOptions) {
+		w.TokenHeader = tokenHeader
+		w.TokenValue = tokenValue
+	}
+}
+
+// WithBufferSize - sets the buffer size
+func WithBufferSize(size int) WebhookOpts {
+	return func(w *WebhookOptions) {
+		w.BufferSize = size
+	}
+}
+
+// WithRootContext - sets the root context
+func WithRootContext(ctx context.Context) WebhookOpts {
+	return func(w *WebhookOptions) {
+		w.RootContext = ctx
+	}
+}
+
+// WithProcessors - sets the number of concurrent loops which will process the events
+func WithProcessors(count int) WebhookOpts {
+	return func(w *WebhookOptions) {
+		w.Processors = count
+	}
+}
diff --git a/notifications/registerer.go b/notifications/registerer.go
new file mode 100644
index 00000000..edaf2e96
--- /dev/null
+++ b/notifications/registerer.go
@@ -0,0 +1,27 @@
+package notifications
+
+import (
+	"reflect"
+
+	"github.com/bitcoin-sv/spv-wallet/models"
+)
+
+type eventHandler struct {
+	Caller    reflect.Value
+	ModelType reflect.Type
+}
+
+// RegisterHandler - registers a handler for a specific event type
+func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error {
+	handlerValue := reflect.ValueOf(handlerFunction)
+
+	modelType := handlerValue.Type().In(0).Elem()
+	name := modelType.Name()
+
+	nd.handlers.store(name, &eventHandler{
+		Caller:    handlerValue,
+		ModelType: modelType,
+	})
+
+	return nil
+}
diff --git a/notifications/webhook.go b/notifications/webhook.go
new file mode 100644
index 00000000..0dd488da
--- /dev/null
+++ b/notifications/webhook.go
@@ -0,0 +1,100 @@
+package notifications
+
+import (
+	"context"
+	"encoding/json"
+	"net/http"
+	"reflect"
+	"time"
+
+	"github.com/bitcoin-sv/spv-wallet/models"
+)
+
+// Webhook - the webhook event receiver
+type Webhook struct {
+	URL        string
+	options    *WebhookOptions
+	buffer     chan *models.RawEvent
+	subscriber WebhookSubscriber
+	handlers   *eventsMap
+}
+
+// NewWebhook - creates a new webhook
+func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook {
+	options := NewWebhookOptions()
+	for _, opt := range opts {
+		opt(options)
+	}
+
+	wh := &Webhook{
+		URL:        url,
+		options:    options,
+		buffer:     make(chan *models.RawEvent, options.BufferSize),
+		subscriber: subscriber,
+		handlers:   newEventsMap(),
+	}
+	for i := 0; i < options.Processors; i++ {
+		go wh.process()
+	}
+	return wh
+}
+
+// Subscribe - sends a subscription request to the spv-wallet
+func (w *Webhook) Subscribe(ctx context.Context) error {
+	return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue)
+}
+
+// Unsubscribe - sends an unsubscription request to the spv-wallet
+func (w *Webhook) Unsubscribe(ctx context.Context) error {
+	return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL)
+}
+
+// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server
+func (w *Webhook) HTTPHandler() http.Handler {
+	return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
+		if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue {
+			http.Error(rw, "Unauthorized", http.StatusUnauthorized)
+			return
+		}
+		var events []*models.RawEvent
+		if err := json.NewDecoder(r.Body).Decode(&events); err != nil {
+			http.Error(rw, err.Error(), http.StatusBadRequest)
+			return
+		}
+
+		for _, event := range events {
+			select {
+			case w.buffer <- event:
+				// event sent
+			case <-r.Context().Done():
+				// request context canceled
+				return
+			case <-w.options.RootContext.Done():
+				// root context canceled - the whole event processing has been stopped
+				return
+			case <-time.After(1 * time.Second):
+				// timeout, most probably the channel is full
+			}
+		}
+		rw.WriteHeader(http.StatusOK)
+	})
+}
+
+func (w *Webhook) process() {
+	for {
+		select {
+		case event := <-w.buffer:
+			handler, ok := w.handlers.load(event.Type)
+			if !ok {
+				continue
+			}
+			model := reflect.New(handler.ModelType).Interface()
+			if err := json.Unmarshal(event.Content, model); err != nil {
+				continue
+			}
+			handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)})
+		case <-w.options.RootContext.Done():
+			return
+		}
+	}
+}