Skip to content

Commit

Permalink
Merge pull request #135 from getAlby/feature/event-stream
Browse files Browse the repository at this point in the history
Feature/event-stream
  • Loading branch information
kiwiidb authored Apr 20, 2022
2 parents 6db2869 + d36d22a commit aaf0094
Show file tree
Hide file tree
Showing 12 changed files with 507 additions and 6 deletions.
4 changes: 2 additions & 2 deletions controllers/gettxs.ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewGetTXSController(svc *service.LndhubService) *GetTXSController {
}

type OutgoingInvoice struct {
RHash interface{} `json:"r_hash"`
RHash interface{} `json:"r_hash,omitempty"`
PaymentHash interface{} `json:"payment_hash"`
PaymentPreimage string `json:"payment_preimage"`
Value int64 `json:"value"`
Expand All @@ -30,7 +30,7 @@ type OutgoingInvoice struct {
}

type IncomingInvoice struct {
RHash interface{} `json:"r_hash"`
RHash interface{} `json:"r_hash,omitempty"`
PaymentHash interface{} `json:"payment_hash"`
PaymentRequest string `json:"payment_request"`
Description string `json:"description"`
Expand Down
150 changes: 150 additions & 0 deletions controllers/invoicestream.ctrl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package controllers

import (
"net/http"
"time"

"github.com/getAlby/lndhub.go/common"
"github.com/getAlby/lndhub.go/db/models"
"github.com/getAlby/lndhub.go/lib/service"
"github.com/getAlby/lndhub.go/lib/tokens"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
)

// GetTXSController : GetTXSController struct
type InvoiceStreamController struct {
svc *service.LndhubService
}

type InvoiceEventWrapper struct {
Type string `json:"type"`
Invoice *IncomingInvoice `json:"invoice,omitempty"`
}

func NewInvoiceStreamController(svc *service.LndhubService) *InvoiceStreamController {
return &InvoiceStreamController{svc: svc}
}

// Stream invoices streams incoming payments to the client
func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error {
userId, err := tokens.ParseToken(controller.svc.Config.JWTSecret, (c.QueryParam("token")), false)
if err != nil {
return err
}
invoiceChan := make(chan models.Invoice)
ticker := time.NewTicker(30 * time.Second)
ws, done, err := createWebsocketUpgrader(c)
defer ws.Close()
if err != nil {
return err
}
//start subscription
subId := controller.svc.InvoicePubSub.Subscribe(userId, invoiceChan)

//start with keepalive message
err = ws.WriteJSON(&InvoiceEventWrapper{Type: "keepalive"})
if err != nil {
controller.svc.Logger.Error(err)
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
return err
}
fromPaymentHash := c.QueryParam("since_payment_hash")
if fromPaymentHash != "" {
err = controller.writeMissingInvoices(c, userId, ws, fromPaymentHash)
if err != nil {
controller.svc.Logger.Error(err)
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
return err
}
}
SocketLoop:
for {
select {
case <-done:
break SocketLoop
case <-ticker.C:
err := ws.WriteJSON(&InvoiceEventWrapper{Type: "keepalive"})
if err != nil {
controller.svc.Logger.Error(err)
break SocketLoop
}
case invoice := <-invoiceChan:
err := ws.WriteJSON(
&InvoiceEventWrapper{
Type: "invoice",
Invoice: &IncomingInvoice{
PaymentHash: invoice.RHash,
PaymentRequest: invoice.PaymentRequest,
Description: invoice.Memo,
PayReq: invoice.PaymentRequest,
Timestamp: invoice.CreatedAt.Unix(),
Type: common.InvoiceTypeUser,
Amount: invoice.Amount,
IsPaid: invoice.State == common.InvoiceStateSettled,
}})
if err != nil {
controller.svc.Logger.Error(err)
break SocketLoop
}
}
}
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
return nil
}

//open the websocket and start listening for close messages in a goroutine
func createWebsocketUpgrader(c echo.Context) (conn *websocket.Conn, done chan struct{}, err error) {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return nil, nil, err
}

//start listening for close messages
done = make(chan struct{})
go func() {
defer close(done)
for {
_, _, err := ws.ReadMessage()
if err != nil {
return
}
}
}()
return ws, done, nil
}

func (controller *InvoiceStreamController) writeMissingInvoices(c echo.Context, userId int64, ws *websocket.Conn, hash string) error {
invoices, err := controller.svc.InvoicesFor(c.Request().Context(), userId, common.InvoiceTypeIncoming)
if err != nil {
return err
}
for _, inv := range invoices {
//invoices are order from newest to oldest (with a maximum of 100 invoices being returned)
//so if we get a match on the hash, we have processed all missing invoices for this client
if inv.RHash == hash {
break
}
if inv.State == common.InvoiceStateSettled {
err := ws.WriteJSON(
&InvoiceEventWrapper{
Type: "invoice",
Invoice: &IncomingInvoice{
PaymentHash: inv.RHash,
PaymentRequest: inv.PaymentRequest,
Description: inv.Memo,
PayReq: inv.PaymentRequest,
Timestamp: inv.CreatedAt.Unix(),
Type: common.InvoiceTypeUser,
Amount: inv.Amount,
IsPaid: inv.State == common.InvoiceStateSettled,
}})
if err != nil {
return err
}
}
}
return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (

require (
github.com/SporkHubr/echo-http-cache v0.0.0-20200706100054-1d7ae9f38029
github.com/gorilla/websocket v1.5.0
github.com/labstack/echo-contrib v0.12.0
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,9 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/expected_requests_and_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type ExpectedIncomingInvoice struct {
Amount int64 `json:"amt"`
IsPaid bool `json:"ispaid"`
}
type ExpectedInvoiceEventWrapper struct {
Type string `json:"type"`
Invoice *ExpectedIncomingInvoice `json:"invoice,omitempty"`
}

type ExpectedPayInvoiceRequestBody struct {
Invoice string `json:"invoice" validate:"required"`
Expand Down
1 change: 1 addition & 0 deletions integration_tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func LndHubTestServiceInit(lndClientMock lnd.LightningClientWrapper) (svc *servi
}
svc.IdentityPubkey = getInfo.IdentityPubkey

svc.InvoicePubSub = service.NewPubsub()
return svc, nil
}

Expand Down
Loading

0 comments on commit aaf0094

Please sign in to comment.