Skip to content

Commit

Permalink
Merge pull request #162 from getAlby/feature/webhooks
Browse files Browse the repository at this point in the history
Feature/webhooks
  • Loading branch information
kiwiidb authored May 3, 2022
2 parents e153cf7 + 00bfc56 commit c688ace
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 19 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ vim .env # edit your config
+ `BURST_RATE_LIMIT`: (default: 1) Rate limit burst
+ `ENABLE_PROMETHEUS`: (default: false) Enable Prometheus metrics to be exposed
+ `PROMETHEUS_PORT`: (default: 9092) Prometheus port (path: `/metrics`)
+ `WEBHOOK_URL`: Optional. Callback URL for incoming and outgoing payment events, see below.
## Developing

```shell
Expand Down Expand Up @@ -77,6 +78,35 @@ LndHub.go supports PostgreSQL and SQLite as database backend. But SQLite does no
Prometheus metrics can be optionally exposed through the `ENABLE_PROMETHEUS` environment variable.
For an example dashboard, see https://grafana.com/grafana/dashboards/10913.

## Webhooks

If `WEBHOOK_URL` is specified, a http POST request will be dispatched at that location when an incoming payment is settled, or an outgoing payment is completed. Example payload:

```
{
"id": 721,
"type": "incoming", //incoming, outgoing
"user_id": 299,
"amount": 1000,
"fee": 0,
"memo": "fill wallet",
"description_hash": "",
"payment_request": "lnbcrt10u1p38p4ehpp5xp07pda02vk40wxd9gyrene8qzheucz7ast435u9jwxejs6f0v5sdqjve5kcmpqwaskcmr9wscqzpgxqyz5vqsp56nyve3v5fw306j74nmewv7t5ey3aer2khjrrwznh4k2vuw44unzq9qyyssqv2wq9hn7a39x8cvz9fvpzul87u4kc4edf0t6jukzvmx8v5swl3jqg8p3sh6czkepczcjkm523q9x8yswsastctnsns3q9d26szu703gpwh7a09",
"destination_pubkey_hex": "0376442c750766d5d127512609a5618d9aa82db2d06aae226084da92a3e133acda",
"custom_records": {
"5482373484": "YWY4MDhlZDUxZjNmY2YxNWMxYWI3MmM3ODVhNWI1MDE="
}, //only set when keysend=true
"r_hash": "305fe0b7af532d57b8cd2a083ccf2700af9e605eec1758d385938d9943497b29",
"preimage": "3735363531303032626332356439376136643461326434336335626434653035",
"keysend": false,
"state": "settled",
"created_at": "2022-05-03T09:18:15.15774+02:00",
"expires_at": "2022-05-04T09:18:15.157597+02:00",
"updated_at": "2022-05-03T09:18:19.837567+02:00",
"settled_at": "2022-05-03T09:18:19+02:00"
}
```

### Ideas
+ Using low level database constraints to prevent data inconsistencies
+ Follow double-entry bookkeeping ideas (Every transaction is a debit of one account and a credit to another one)
Expand Down
9 changes: 5 additions & 4 deletions controllers/invoicestream.ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"net/http"
"strconv"
"time"

"github.com/getAlby/lndhub.go/common"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error
return err
}
//start subscription
subId, err := controller.svc.InvoicePubSub.Subscribe(userId, invoiceChan)
subId, err := controller.svc.InvoicePubSub.Subscribe(strconv.FormatInt(userId, 10), invoiceChan)
if err != nil {
controller.svc.Logger.Error(err)
return err
Expand All @@ -62,15 +63,15 @@ func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error
err = ws.WriteJSON(&InvoiceEventWrapper{Type: "keepalive"})
if err != nil {
controller.svc.Logger.Error(err)
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
controller.svc.InvoicePubSub.Unsubscribe(subId, strconv.FormatInt(userId, 10))
return err
}
fromPaymentHash := c.QueryParam("since_payment_hash")
if fromPaymentHash != "" {
err = controller.writeMissingInvoices(c, userId, ws, fromPaymentHash)
if err != nil {
controller.svc.Logger.Error(err)
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
controller.svc.InvoicePubSub.Unsubscribe(subId, strconv.FormatInt(userId, 10))
return err
}
}
Expand Down Expand Up @@ -105,7 +106,7 @@ SocketLoop:
}
}
}
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
controller.svc.InvoicePubSub.Unsubscribe(subId, strconv.FormatInt(userId, 10))
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions db/models/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ type Invoice struct {
ID int64 `json:"id" bun:",pk,autoincrement"`
Type string `json:"type" validate:"required"`
UserID int64 `json:"user_id" validate:"required"`
User *User `bun:"rel:belongs-to,join:user_id=id"`
User *User `json:"-" bun:"rel:belongs-to,join:user_id=id"`
Amount int64 `json:"amount" validate:"gte=0"`
Fee int64 `json:"fee" bun:",nullzero"`
Memo string `json:"memo" bun:",nullzero"`
DescriptionHash string `json:"description_hash" bun:",nullzero"`
DescriptionHash string `json:"description_hash,omitempty" bun:",nullzero"`
PaymentRequest string `json:"payment_request" bun:",nullzero"`
DestinationPubkeyHex string `json:"destination_pubkey_hex" bun:",notnull"`
DestinationCustomRecords map[uint64][]byte `bun:"-"`
DestinationCustomRecords map[uint64][]byte `json:"custom_records,omitempty" bun:"-"`
RHash string `json:"r_hash"`
Preimage string `json:"preimage" bun:",nullzero"`
Internal bool `json:"internal" bun:",nullzero"`
Internal bool `json:"-" bun:",nullzero"`
Keysend bool `json:"keysend" bun:",nullzero"`
State string `json:"state" bun:",default:'initialized'"`
ErrorMessage string `json:"error_message" bun:",nullzero"`
AddIndex uint64 `json:"add_index" bun:",nullzero"`
CreatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp"`
ExpiresAt bun.NullTime `bun:",nullzero"`
ErrorMessage string `json:"error_message,omitempty" bun:",nullzero"`
AddIndex uint64 `json:"-" bun:",nullzero"`
CreatedAt time.Time `json:"created_at" bun:",nullzero,notnull,default:current_timestamp"`
ExpiresAt bun.NullTime `json:"expires_at" bun:",nullzero"`
UpdatedAt bun.NullTime `json:"updated_at"`
SettledAt bun.NullTime `json:"settled_at"`
}
Expand Down
109 changes: 109 additions & 0 deletions integration_tests/webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package integration_tests

import (
"context"
"encoding/json"
"log"
"net/http"
"net/http/httptest"
"testing"

"github.com/getAlby/lndhub.go/common"
"github.com/getAlby/lndhub.go/controllers"
"github.com/getAlby/lndhub.go/db/models"
"github.com/getAlby/lndhub.go/lib"
"github.com/getAlby/lndhub.go/lib/responses"
"github.com/getAlby/lndhub.go/lib/service"
"github.com/getAlby/lndhub.go/lib/tokens"
"github.com/getAlby/lndhub.go/lnd"
"github.com/go-playground/validator/v10"
"github.com/labstack/echo/v4"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type WebHookTestSuite struct {
TestSuite
fundingClient *lnd.LNDWrapper
service *service.LndhubService
userLogin ExpectedCreateUserResponseBody
userToken string
webHookServer *httptest.Server
invoiceChan chan (models.Invoice)
invoiceUpdateSubCancelFn context.CancelFunc
}

func (suite *WebHookTestSuite) SetupSuite() {
lndClient, err := lnd.NewLNDclient(lnd.LNDoptions{
Address: lnd2RegtestAddress,
MacaroonHex: lnd2RegtestMacaroonHex,
})
if err != nil {
log.Fatalf("Error setting up funding client: %v", err)
}
suite.fundingClient = lndClient

suite.invoiceChan = make(chan models.Invoice)
webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
invoice := models.Invoice{}
err = json.NewDecoder(r.Body).Decode(&invoice)
if err != nil {
suite.echo.Logger.Error(err)
close(suite.invoiceChan)
return
}
suite.invoiceChan <- invoice
}))
suite.webHookServer = webhookServer
svc, err := LndHubTestServiceInit(nil)
if err != nil {
log.Fatalf("Error initializing test service: %v", err)
}
svc.Config.WebhookUrl = suite.webHookServer.URL

users, userTokens, err := createUsers(svc, 1)
if err != nil {
log.Fatalf("Error creating test users: %v", err)
}
// Subscribe to LND invoice updates in the background
// store cancel func to be called in tear down suite
ctx, cancel := context.WithCancel(context.Background())
suite.invoiceUpdateSubCancelFn = cancel
go svc.InvoiceUpdateSubscription(ctx)

go svc.StartWebhookSubscribtion(ctx, svc.Config.WebhookUrl)

suite.service = svc
e := echo.New()

e.HTTPErrorHandler = responses.HTTPErrorHandler
e.Validator = &lib.CustomValidator{Validator: validator.New()}
suite.echo = e
suite.userLogin = users[0]
suite.userToken = userTokens[0]
suite.echo.Use(tokens.Middleware([]byte(suite.service.Config.JWTSecret)))
suite.echo.POST("/addinvoice", controllers.NewAddInvoiceController(suite.service).AddInvoice)
}
func (suite *WebHookTestSuite) TestWebHook() {
// create incoming invoice and fund account
invoice := suite.createAddInvoiceReq(1000, "integration test webhook", suite.userToken)
sendPaymentRequest := lnrpc.SendRequest{
PaymentRequest: invoice.PayReq,
FeeLimit: nil,
}
_, err := suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequest)
assert.NoError(suite.T(), err)
invoiceFromWebhook := <-suite.invoiceChan
assert.Equal(suite.T(), "integration test webhook", invoiceFromWebhook.Memo)
assert.Equal(suite.T(), common.InvoiceTypeIncoming, invoiceFromWebhook.Type)
}
func (suite *WebHookTestSuite) TearDownSuite() {
suite.invoiceUpdateSubCancelFn()
suite.webHookServer.Close()
clearTable(suite.service, "invoices")
}

func TestWebHookSuite(t *testing.T) {
suite.Run(t, new(WebHookTestSuite))
}
1 change: 1 addition & 0 deletions lib/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ type Config struct {
BurstRateLimit int `envconfig:"BURST_RATE_LIMIT" default:"1"`
EnablePrometheus bool `envconfig:"ENABLE_PROMETHEUS" default:"false"`
PrometheusPort int `envconfig:"PROMETHEUS_PORT" default:"9092"`
WebhookUrl string `envconfig:"WEBHOOK_URL"`
}
5 changes: 4 additions & 1 deletion lib/service/invoices.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"math"
"strconv"
"time"

"github.com/getAlby/lndhub.go/common"
Expand Down Expand Up @@ -96,7 +97,8 @@ func (svc *LndhubService) SendInternalPayment(ctx context.Context, invoice *mode
// could not save the invoice of the recipient
return sendPaymentResponse, err
}
svc.InvoicePubSub.Publish(incomingInvoice.UserID, incomingInvoice)
svc.InvoicePubSub.Publish(strconv.FormatInt(incomingInvoice.UserID, 10), incomingInvoice)
svc.InvoicePubSub.Publish(common.InvoiceTypeIncoming, incomingInvoice)

return sendPaymentResponse, nil
}
Expand Down Expand Up @@ -310,6 +312,7 @@ func (svc *LndhubService) HandleSuccessfulPayment(ctx context.Context, invoice *
svc.Logger.Info(amountMsg)
sentry.CaptureMessage(amountMsg)
}
svc.InvoicePubSub.Publish(common.InvoiceTypeOutgoing, *invoice)

return nil
}
Expand Down
4 changes: 3 additions & 1 deletion lib/service/invoicesubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -102,7 +103,8 @@ func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice *
svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
return err
}
svc.InvoicePubSub.Publish(invoice.UserID, invoice)
svc.InvoicePubSub.Publish(strconv.FormatInt(invoice.UserID, 10), invoice)
svc.InvoicePubSub.Publish(common.InvoiceTypeIncoming, invoice)

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions lib/service/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (

type Pubsub struct {
mu sync.RWMutex
subs map[int64]map[string]chan models.Invoice
subs map[string]map[string]chan models.Invoice
}

func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[int64]map[string]chan models.Invoice)
ps.subs = make(map[string]map[string]chan models.Invoice)
return ps
}

func (ps *Pubsub) Subscribe(topic int64, ch chan models.Invoice) (subId string, err error) {
func (ps *Pubsub) Subscribe(topic string, ch chan models.Invoice) (subId string, err error) {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.subs[topic] == nil {
Expand All @@ -33,7 +33,7 @@ func (ps *Pubsub) Subscribe(topic int64, ch chan models.Invoice) (subId string,
return subId, nil
}

func (ps *Pubsub) Unsubscribe(id string, topic int64) {
func (ps *Pubsub) Unsubscribe(id string, topic string) {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.subs[topic] == nil {
Expand All @@ -46,7 +46,7 @@ func (ps *Pubsub) Unsubscribe(id string, topic int64) {
delete(ps.subs[topic], id)
}

func (ps *Pubsub) Publish(topic int64, msg models.Invoice) {
func (ps *Pubsub) Publish(topic string, msg models.Invoice) {
ps.mu.RLock()
defer ps.mu.RUnlock()

Expand Down
53 changes: 53 additions & 0 deletions lib/service/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package service

import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"net/http"

"github.com/getAlby/lndhub.go/common"
"github.com/getAlby/lndhub.go/db/models"
)

func (svc *LndhubService) StartWebhookSubscribtion(ctx context.Context, url string) {

svc.Logger.Infof("Starting webhook subscription with webhook url %s", svc.Config.WebhookUrl)
incomingInvoices := make(chan models.Invoice)
outgoingInvoices := make(chan models.Invoice)
svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices)
svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing, outgoingInvoices)
for {
select {
case <-ctx.Done():
return
case incoming := <-incomingInvoices:
svc.postToWebhook(incoming, url)
case outgoing := <-outgoingInvoices:
svc.postToWebhook(outgoing, url)
}
}
}
func (svc *LndhubService) postToWebhook(invoice models.Invoice, url string) {

payload := new(bytes.Buffer)
err := json.NewEncoder(payload).Encode(invoice)
if err != nil {
svc.Logger.Error(err)
return
}

resp, err := http.Post(svc.Config.WebhookUrl, "application/json", payload)
if err != nil {
svc.Logger.Error(err)
return
}
if resp.StatusCode != http.StatusOK {
msg, err := ioutil.ReadAll(resp.Body)
if err != nil {
svc.Logger.Error(err)
}
svc.Logger.Errorf("Webhook status code was %d, body: %s", resp.StatusCode, msg)
}
}
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ func main() {
// Subscribe to LND invoice updates in the background
go svc.InvoiceUpdateSubscription(context.Background())

//Start webhook subscription
if svc.Config.WebhookUrl != "" {
webhookCtx, cancelWebhook := context.WithCancel(context.Background())
go svc.StartWebhookSubscribtion(webhookCtx, svc.Config.WebhookUrl)
defer cancelWebhook()
}

//Start Prometheus server if necessary
var echoPrometheus *echo.Echo
if svc.Config.EnablePrometheus {
Expand Down

0 comments on commit c688ace

Please sign in to comment.