Skip to content

Commit

Permalink
feat(SPV-848): fix lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Jul 2, 2024
1 parent a26b756 commit 1338673
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 26 deletions.
1 change: 0 additions & 1 deletion examples/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func main() {

client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey)
wh := notifications.NewWebhook(
context.Background(),
client,
"http://localhost:5005/notification",
notifications.WithToken("Authorization", "this-is-the-token"),
Expand Down
1 change: 1 addition & 0 deletions notifications/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package notifications

import "context"

// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks
type WebhookSubscriber interface {
AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error
AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error
Expand Down
20 changes: 9 additions & 11 deletions notifications/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package notifications

import (
"context"

"github.com/bitcoin-sv/spv-wallet/models"
"runtime"
)

// WebhookOptions - options for the webhook
type WebhookOptions struct {
TokenHeader string
TokenValue string
Expand All @@ -14,47 +14,45 @@ type WebhookOptions struct {
Processors int
}

// NewWebhookOptions - creates a new webhook options
func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
TokenHeader: "",
TokenValue: "",
BufferSize: 100,
Processors: 1,
Processors: runtime.NumCPU(),
RootContext: context.Background(),
}
}

// WebhookOpts - functional options for the webhook
type WebhookOpts = func(*WebhookOptions)

// WithToken - sets the token header and value
func WithToken(tokenHeader, tokenValue string) WebhookOpts {
return func(w *WebhookOptions) {
w.TokenHeader = tokenHeader
w.TokenValue = tokenValue
}
}

// WithBufferSize - sets the buffer size
func WithBufferSize(size int) WebhookOpts {
return func(w *WebhookOptions) {
w.BufferSize = size
}
}

// WithRootContext - sets the root context
func WithRootContext(ctx context.Context) WebhookOpts {
return func(w *WebhookOptions) {
w.RootContext = ctx
}
}

// WithProcessors - sets the number of concurrent loops which will process the events
func WithProcessors(count int) WebhookOpts {
return func(w *WebhookOptions) {
w.Processors = count
}
}

type Webhook struct {
URL string
options *WebhookOptions
buffer chan *models.RawEvent
subscriber WebhookSubscriber
handlers *eventsMap
}
5 changes: 3 additions & 2 deletions notifications/registerer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package notifications

import (
"fmt"
"errors"
"reflect"

"github.com/bitcoin-sv/spv-wallet/models"
Expand All @@ -12,10 +12,11 @@ type eventHandler struct {
ModelType reflect.Type
}

// RegisterHandler - registers a handler for a specific event type
func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error {
handlerValue := reflect.ValueOf(handlerFunction)
if handlerValue.Kind() != reflect.Func {
return fmt.Errorf("Not a function")
return errors.New("handlerFunction must be a function")
}

modelType := handlerValue.Type().In(0)
Expand Down
33 changes: 21 additions & 12 deletions notifications/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@ package notifications
import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"time"

"github.com/bitcoin-sv/spv-wallet/models"
)

func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook {
// Webhook - the webhook event receiver
type Webhook struct {
URL string
options *WebhookOptions
buffer chan *models.RawEvent
subscriber WebhookSubscriber
handlers *eventsMap
}

// NewWebhook - creates a new webhook
func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook {
options := NewWebhookOptions()
for _, opt := range opts {
opt(options)
Expand All @@ -30,14 +39,17 @@ func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, o
return wh
}

// Subscribe - sends a subscription request to the spv-wallet
func (w *Webhook) Subscribe(ctx context.Context) error {
return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue)
}

// Unsubscribe - sends an unsubscription request to the spv-wallet
func (w *Webhook) Unsubscribe(ctx context.Context) error {
return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL)
}

// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server
func (w *Webhook) HTTPHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue {
Expand All @@ -49,42 +61,39 @@ func (w *Webhook) HTTPHandler() http.Handler {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
fmt.Printf("Received: %v\n", events)

for _, event := range events {
select {
case w.buffer <- event:
// event sent
case <-r.Context().Done():
// request context cancelled
// request context canceled
return
case <-w.options.RootContext.Done():
// root context cancelled - the whole event processing has been stopped
// root context canceled - the whole event processing has been stopped
return
case <-time.After(1 * time.Second):
// timeout, most probably the channel is full
// TODO: log this
}
}
rw.WriteHeader(http.StatusOK)
})
}

func (nd *Webhook) process() {
func (w *Webhook) process() {
for {
select {
case event := <-nd.buffer:
handler, ok := nd.handlers.load(event.Type)
case event := <-w.buffer:
handler, ok := w.handlers.load(event.Type)
if !ok {
fmt.Printf("No handlers for %s event type", event.Type)
continue
}
model := reflect.New(handler.ModelType).Interface()
if err := json.Unmarshal(event.Content, model); err != nil {
fmt.Println("Cannot unmarshall the content json")
continue
}
handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)})
case <-nd.options.RootContext.Done():
case <-w.options.RootContext.Done():
return
}
}
Expand Down

0 comments on commit 1338673

Please sign in to comment.