From 13386738b07182daa781b78361b501dccc02b263 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Tue, 2 Jul 2024 13:48:57 +0200 Subject: [PATCH] feat(SPV-848): fix lint errors --- examples/webhooks/webhooks.go | 1 - notifications/interface.go | 1 + notifications/options.go | 20 +++++++++----------- notifications/registerer.go | 5 +++-- notifications/webhook.go | 33 +++++++++++++++++++++------------ 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index c10f17c..2b612ce 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -22,7 +22,6 @@ func main() { client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) wh := notifications.NewWebhook( - context.Background(), client, "http://localhost:5005/notification", notifications.WithToken("Authorization", "this-is-the-token"), diff --git a/notifications/interface.go b/notifications/interface.go index 0741568..610ea2b 100644 --- a/notifications/interface.go +++ b/notifications/interface.go @@ -2,6 +2,7 @@ package notifications import "context" +// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks type WebhookSubscriber interface { AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error diff --git a/notifications/options.go b/notifications/options.go index 626f80d..436a3ed 100644 --- a/notifications/options.go +++ b/notifications/options.go @@ -2,10 +2,10 @@ package notifications import ( "context" - - "github.com/bitcoin-sv/spv-wallet/models" + "runtime" ) +// WebhookOptions - options for the webhook type WebhookOptions struct { TokenHeader string TokenValue string @@ -14,18 +14,21 @@ type WebhookOptions struct { Processors int } +// NewWebhookOptions - creates a new webhook options func NewWebhookOptions() *WebhookOptions { return &WebhookOptions{ TokenHeader: "", TokenValue: "", BufferSize: 100, - Processors: 1, + Processors: runtime.NumCPU(), RootContext: context.Background(), } } +// WebhookOpts - functional options for the webhook type WebhookOpts = func(*WebhookOptions) +// WithToken - sets the token header and value func WithToken(tokenHeader, tokenValue string) WebhookOpts { return func(w *WebhookOptions) { w.TokenHeader = tokenHeader @@ -33,28 +36,23 @@ func WithToken(tokenHeader, tokenValue string) WebhookOpts { } } +// WithBufferSize - sets the buffer size func WithBufferSize(size int) WebhookOpts { return func(w *WebhookOptions) { w.BufferSize = size } } +// WithRootContext - sets the root context func WithRootContext(ctx context.Context) WebhookOpts { return func(w *WebhookOptions) { w.RootContext = ctx } } +// WithProcessors - sets the number of concurrent loops which will process the events func WithProcessors(count int) WebhookOpts { return func(w *WebhookOptions) { w.Processors = count } } - -type Webhook struct { - URL string - options *WebhookOptions - buffer chan *models.RawEvent - subscriber WebhookSubscriber - handlers *eventsMap -} diff --git a/notifications/registerer.go b/notifications/registerer.go index 8022838..eb6bc31 100644 --- a/notifications/registerer.go +++ b/notifications/registerer.go @@ -1,7 +1,7 @@ package notifications import ( - "fmt" + "errors" "reflect" "github.com/bitcoin-sv/spv-wallet/models" @@ -12,10 +12,11 @@ type eventHandler struct { ModelType reflect.Type } +// RegisterHandler - registers a handler for a specific event type func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error { handlerValue := reflect.ValueOf(handlerFunction) if handlerValue.Kind() != reflect.Func { - return fmt.Errorf("Not a function") + return errors.New("handlerFunction must be a function") } modelType := handlerValue.Type().In(0) diff --git a/notifications/webhook.go b/notifications/webhook.go index 7d002fe..0dd488d 100644 --- a/notifications/webhook.go +++ b/notifications/webhook.go @@ -3,7 +3,6 @@ package notifications import ( "context" "encoding/json" - "fmt" "net/http" "reflect" "time" @@ -11,7 +10,17 @@ import ( "github.com/bitcoin-sv/spv-wallet/models" ) -func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { +// Webhook - the webhook event receiver +type Webhook struct { + URL string + options *WebhookOptions + buffer chan *models.RawEvent + subscriber WebhookSubscriber + handlers *eventsMap +} + +// NewWebhook - creates a new webhook +func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { options := NewWebhookOptions() for _, opt := range opts { opt(options) @@ -30,14 +39,17 @@ func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, o return wh } +// Subscribe - sends a subscription request to the spv-wallet func (w *Webhook) Subscribe(ctx context.Context) error { return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue) } +// Unsubscribe - sends an unsubscription request to the spv-wallet func (w *Webhook) Unsubscribe(ctx context.Context) error { return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL) } +// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server func (w *Webhook) HTTPHandler() http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue { @@ -49,42 +61,39 @@ func (w *Webhook) HTTPHandler() http.Handler { http.Error(rw, err.Error(), http.StatusBadRequest) return } - fmt.Printf("Received: %v\n", events) + for _, event := range events { select { case w.buffer <- event: // event sent case <-r.Context().Done(): - // request context cancelled + // request context canceled return case <-w.options.RootContext.Done(): - // root context cancelled - the whole event processing has been stopped + // root context canceled - the whole event processing has been stopped return case <-time.After(1 * time.Second): // timeout, most probably the channel is full - // TODO: log this } } rw.WriteHeader(http.StatusOK) }) } -func (nd *Webhook) process() { +func (w *Webhook) process() { for { select { - case event := <-nd.buffer: - handler, ok := nd.handlers.load(event.Type) + case event := <-w.buffer: + handler, ok := w.handlers.load(event.Type) if !ok { - fmt.Printf("No handlers for %s event type", event.Type) continue } model := reflect.New(handler.ModelType).Interface() if err := json.Unmarshal(event.Content, model); err != nil { - fmt.Println("Cannot unmarshall the content json") continue } handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) - case <-nd.options.RootContext.Done(): + case <-w.options.RootContext.Done(): return } }