Skip to content

Commit

Permalink
Merge pull request #316 from getAlby/catch-add-index-error
Browse files Browse the repository at this point in the history
Invoice subscription: safety improvements
  • Loading branch information
kiwiidb authored Feb 22, 2023
2 parents 9dc5cd7 + e36d46f commit b40d13f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 10 deletions.
5 changes: 4 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ func Open(config *service.Config) (*bun.DB, error) {
dsn := config.DatabaseUri
switch {
case strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") || strings.HasPrefix(dsn, "unix://"):
dbConn := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn)))
dbConn := sql.OpenDB(
pgdriver.NewConnector(
pgdriver.WithDSN(dsn),
pgdriver.WithTimeout(time.Duration(config.DatabaseTimeout)*time.Second)))
db = bun.NewDB(dbConn, pgdialect.New())
db.SetMaxOpenConns(config.DatabaseMaxConns)
db.SetMaxIdleConns(config.DatabaseMaxIdleConns)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/subscription_start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (mock *lndSubscriptionStartMockClient) SubscribeInvoices(ctx context.Contex
return mock, nil
}

//wait forever
// wait forever
func (mock *lndSubscriptionStartMockClient) Recv() (*lnrpc.Invoice, error) {
select {}
}
Expand Down
1 change: 1 addition & 0 deletions lib/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Config struct {
DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"`
DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"`
DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes
DatabaseTimeout int `envconfig:"DATABASE_TIMEOUT" default:"60"` // 60 seconds
SentryDSN string `envconfig:"SENTRY_DSN"`
SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"`
LogFilePath string `envconfig:"LOG_FILE_PATH"`
Expand Down
21 changes: 15 additions & 6 deletions lib/service/invoicesubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,19 @@ func (svc *LndhubService) ConnectInvoiceSubscription(ctx context.Context) (lnd.S
var invoice models.Invoice
invoiceSubscriptionOptions := lnrpc.InvoiceSubscription{}
// Find the oldest NOT settled AND NOT expired invoice with an add_index
// Note: expired invoices will not be settled anymore, so we don't care about those
err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL AND invoice.expires_at >= now()").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx)
// Build in a safety buffer of 14h to account for lndhub downtime
// Note: expired invoices will not be settled anymore, so we don't care about those
err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL AND invoice.expires_at >= (now() - interval '14 hours')").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx)
// IF we found an invoice we use that index to start the subscription
if err == nil {
invoiceSubscriptionOptions = lnrpc.InvoiceSubscription{AddIndex: invoice.AddIndex - 1} // -1 because we want updates for that invoice already
// if we get an error there might be a serious issue here
// and we are at risk of missing paid invoices, so we should not continue
// if we just didn't find any unsettled invoices that's allright though
if err != nil && err != sql.ErrNoRows {
sentry.CaptureException(err)
return nil, err
}
// subtract 1 (read invoiceSubscriptionOptions.Addindex docs)
invoiceSubscriptionOptions.AddIndex = invoice.AddIndex - 1
svc.Logger.Infof("Starting invoice subscription from index: %v", invoiceSubscriptionOptions.AddIndex)
return svc.LndClient.SubscribeInvoices(ctx, &invoiceSubscriptionOptions)
}
Expand All @@ -239,14 +246,16 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("Context was canceled")
return context.Canceled
default:
// receive the next invoice update
rawInvoice, err := invoiceSubscriptionStream.Recv()
// in case of an error, we want to return and restart LNDhub
// in order to try and reconnect the gRPC subscription
if err != nil {
svc.Logger.Errorf("Error processing invoice update subscription: %v", err)
sentry.CaptureException(err)
continue
return err
}

// Ignore updates for open invoices
Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ func main() {
backgroundWg.Add(1)
go func() {
err = svc.InvoiceUpdateSubscription(backGroundCtx)
if err != nil {
svc.Logger.Error(err)
if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub
svc.Logger.Fatal(err)
}
svc.Logger.Info("Invoice routine done")
backgroundWg.Done()
Expand Down

0 comments on commit b40d13f

Please sign in to comment.