Skip to content

Commit

Permalink
Add locks to db when writing
Browse files Browse the repository at this point in the history
  • Loading branch information
lescuer97 committed Jan 26, 2025
1 parent 75bbae3 commit 41fca30
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 26 deletions.
6 changes: 3 additions & 3 deletions internal/database/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ type MintDB interface {
GetNostrAuth(tx pgx.Tx, nonce string) (NostrLoginAuth, error)

// liquidity swaps
AddLiquiditySwap(utils.LiquiditySwap) error
GetLiquiditySwapById(id string) (utils.LiquiditySwap, error)
ChangeLiquiditySwapState(id string, state utils.SwapState) error
AddLiquiditySwap(tx pgx.Tx, swap utils.LiquiditySwap) error
GetLiquiditySwapById(tx pgx.Tx, id string) (utils.LiquiditySwap, error)
ChangeLiquiditySwapState(tx pgx.Tx, id string, state utils.SwapState) error
GetAllLiquiditySwaps() ([]utils.LiquiditySwap, error)
GetLiquiditySwapsByStates(states []utils.SwapState) ([]utils.LiquiditySwap, error)

Expand Down
6 changes: 3 additions & 3 deletions internal/database/mock_db/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func (m *MockDB) GetMintMeltBalanceByTime(time int64) (database.MintMeltBalance,
}
return mintmeltbalance, nil
}
func (m *MockDB) AddLiquiditySwap(swap utils.LiquiditySwap) error {
func (m *MockDB) AddLiquiditySwap(tx pgx.Tx, swap utils.LiquiditySwap) error {
m.LiquiditySwap = append(m.LiquiditySwap, swap)
return nil

}
func (m *MockDB) ChangeLiquiditySwapState(id string, state utils.SwapState) error {
func (m *MockDB) ChangeLiquiditySwapState(tx pgx.Tx, id string, state utils.SwapState) error {
var liquiditySwaps []utils.LiquiditySwap
for i := 0; i < len(m.LiquiditySwap); i++ {
if m.LiquiditySwap[i].Id == id {
Expand All @@ -75,7 +75,7 @@ func (m *MockDB) ChangeLiquiditySwapState(id string, state utils.SwapState) erro
return nil
}

func (m *MockDB) GetLiquiditySwapById(id string) (utils.LiquiditySwap, error) {
func (m *MockDB) GetLiquiditySwapById(tx pgx.Tx, id string) (utils.LiquiditySwap, error) {
var liquiditySwaps []utils.LiquiditySwap
for i := 0; i < len(m.LiquiditySwap); i++ {

Expand Down
12 changes: 6 additions & 6 deletions internal/database/postgresql/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ func (pql Postgresql) GetMintMeltBalanceByTime(time int64) (database.MintMeltBal

}

func (pql Postgresql) AddLiquiditySwap(swap utils.LiquiditySwap) error {
_, err := pql.pool.Exec(context.Background(), "INSERT INTO liquidity_swaps (amount, id , lightning_invoice, state, type, expiration) VALUES ($1, $2, $3, $4, $5, $6)", swap.Amount, swap.Id, swap.LightningInvoice, swap.State, swap.Type, swap.Expiration)
func (pql Postgresql) AddLiquiditySwap(tx pgx.Tx, swap utils.LiquiditySwap) error {
_, err := tx.Exec(context.Background(), "INSERT INTO liquidity_swaps (amount, id , lightning_invoice, state, type, expiration) VALUES ($1, $2, $3, $4, $5, $6)", swap.Amount, swap.Id, swap.LightningInvoice, swap.State, swap.Type, swap.Expiration)

if err != nil {
return databaseError(fmt.Errorf("INSERT INTO swap_request: %w", err))

}
return nil
}
func (pql Postgresql) ChangeLiquiditySwapState(id string, state utils.SwapState) error {
_, err := pql.pool.Exec(context.Background(), "UPDATE liquidity_swaps SET state = $1 WHERE id = $2", state, id)
func (pql Postgresql) ChangeLiquiditySwapState(tx pgx.Tx, id string, state utils.SwapState) error {
_, err := tx.Exec(context.Background(), "UPDATE liquidity_swaps SET state = $1 WHERE id = $2", state, id)

if err != nil {
return databaseError(fmt.Errorf("INSERT INTO swap_request: %w", err))
Expand All @@ -137,10 +137,10 @@ func (pql Postgresql) GetLiquiditySwaps(swap utils.LiquiditySwap) ([]utils.Liqui
return swaps, nil
}

func (pql Postgresql) GetLiquiditySwapById(id string) (utils.LiquiditySwap, error) {
func (pql Postgresql) GetLiquiditySwapById(tx pgx.Tx, id string) (utils.LiquiditySwap, error) {

var swaps utils.LiquiditySwap
rows, err := pql.pool.Query(context.Background(), "SELECT amount, id, lightning_invoice, state,type, expiration FROM liquidity_swaps WHERE id = $1 ", id)
rows, err := tx.Query(context.Background(), "SELECT amount, id, lightning_invoice, state,type, expiration FROM liquidity_swaps WHERE id = $1 FOR UPDATE", id)
defer rows.Close()
if err != nil {
return swaps, fmt.Errorf("Error checking for Active seeds: %w", err)
Expand Down
1 change: 0 additions & 1 deletion internal/routes/admin/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func Login(mint *mint.Mint, logger *slog.Logger) gin.HandlerFunc {
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
fmt.Println("Key rotation finished successfully")
}
}()

Expand Down
30 changes: 28 additions & 2 deletions internal/routes/admin/crons.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package admin

import (
"context"
"encoding/hex"
"fmt"
"log/slog"
"time"

Expand All @@ -14,6 +16,30 @@ import (
func CheckStatusOfLiquiditySwaps(mint *m.Mint, logger *slog.Logger) {

for {
ctx := context.Background()
tx, err := mint.MintDB.GetTx(ctx)
if err != nil {
logger.Debug(
"Could not get db transactions",
slog.String(utils.LogExtraInfo, err.Error()),
)
return
}

defer func() {
if p := recover(); p != nil {
logger.Error("\n Rolling back because of failure %+v\n", p)
tx.Rollback(ctx)
} else if err != nil {
logger.Error(fmt.Sprintf("\n Rolling back because of failure %+v\n", err))
tx.Rollback(ctx)
} else {
err = tx.Commit(ctx)
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
}
}()

swaps, err := mint.MintDB.GetLiquiditySwapsByStates([]utils.SwapState{
utils.MintWaitingPaymentRecv,
Expand Down Expand Up @@ -42,7 +68,7 @@ func CheckStatusOfLiquiditySwaps(mint *m.Mint, logger *slog.Logger) {
now := time.Now().Unix()

if now > int64(swap.Expiration) {
err := mint.MintDB.ChangeLiquiditySwapState(swap.Id, utils.Expired)
err := mint.MintDB.ChangeLiquiditySwapState(tx, swap.Id, utils.Expired)
if err != nil {
logger.Warn(
"mint.MintDB.ChangeLiquiditySwapState(swap.Id,utils.Expired)",
Expand Down Expand Up @@ -79,7 +105,7 @@ func CheckStatusOfLiquiditySwaps(mint *m.Mint, logger *slog.Logger) {
swap.State = utils.LightningPaymentFail
}

err = mint.MintDB.ChangeLiquiditySwapState(swap.Id, swap.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swap.Id, swap.State)
if err != nil {
logger.Warn(
"mint.MintDB.ChangeLiquiditySwapState(swap.Id,utils.Expired)",
Expand Down
120 changes: 110 additions & 10 deletions internal/routes/admin/liquidity-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,32 @@ func SwapOutRequest(logger *slog.Logger, mint *m.Mint) gin.HandlerFunc {
now := decodedInvoice.Timestamp.Add(decodedInvoice.Expiry()).Unix()
swap.Expiration = uint64(now)

err = mint.MintDB.AddLiquiditySwap(swap)
tx, err := mint.MintDB.GetTx(c.Request.Context())
if err != nil {
logger.Debug(
"Could not get db transactions",
slog.String(utils.LogExtraInfo, err.Error()),
)
c.Error(fmt.Errorf("mint.MintDB.GetTx(). %w", err))
return
}

defer func() {
if p := recover(); p != nil {
logger.Error("\n Rolling back because of failure %+v\n", p)
tx.Rollback(c.Request.Context())
} else if err != nil {
logger.Error(fmt.Sprintf("\n Rolling back because of failure %+v\n", err))
tx.Rollback(c.Request.Context())
} else {
err = tx.Commit(c.Request.Context())
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
}
}()

err = mint.MintDB.AddLiquiditySwap(tx, swap)
if err != nil {
// If the fees are acceptable, continue to create the Receive Payment
log.Printf("\n Could not add swap request %+v \n", err)
Expand Down Expand Up @@ -166,7 +191,32 @@ func SwapInRequest(logger *slog.Logger, mint *m.Mint) gin.HandlerFunc {
now := decodedInvoice.Timestamp.Add(decodedInvoice.Expiry()).Unix()
swap.Expiration = uint64(now)

err = mint.MintDB.AddLiquiditySwap(swap)
tx, err := mint.MintDB.GetTx(c.Request.Context())
if err != nil {
logger.Debug(
"Could not get db transactions",
slog.String(utils.LogExtraInfo, err.Error()),
)
c.Error(fmt.Errorf("mint.MintDB.GetTx(). %w", err))
return
}

defer func() {
if p := recover(); p != nil {
logger.Error("\n Rolling back because of failure %+v\n", p)
tx.Rollback(c.Request.Context())
} else if err != nil {
logger.Error(fmt.Sprintf("\n Rolling back because of failure %+v\n", err))
tx.Rollback(c.Request.Context())
} else {
err = tx.Commit(c.Request.Context())
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
}
}()

err = mint.MintDB.AddLiquiditySwap(tx, swap)
if err != nil {
// If the fees are acceptable, continue to create the Receive Payment
log.Printf("\n Could not add swap request %+v \n", err)
Expand Down Expand Up @@ -202,7 +252,32 @@ func SwapStateCheck(logger *slog.Logger, mint *m.Mint) gin.HandlerFunc {
// only needs the amount and we generate an invoice from the mint directly
swapId := c.Param("swapId")

swapRequest, err := mint.MintDB.GetLiquiditySwapById(swapId)
tx, err := mint.MintDB.GetTx(c.Request.Context())
if err != nil {
logger.Debug(
"Could not get db transactions",
slog.String(utils.LogExtraInfo, err.Error()),
)
c.Error(fmt.Errorf("mint.MintDB.GetTx(). %w", err))
return
}

defer func() {
if p := recover(); p != nil {
logger.Error("\n Rolling back because of failure %+v\n", p)
tx.Rollback(c.Request.Context())
} else if err != nil {
logger.Error(fmt.Sprintf("\n Rolling back because of failure %+v\n", err))
tx.Rollback(c.Request.Context())
} else {
err = tx.Commit(c.Request.Context())
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
}
}()

swapRequest, err := mint.MintDB.GetLiquiditySwapById(tx, swapId)
if err != nil {
c.Error(fmt.Errorf("mint.MintDB.GetLiquiditySwapById(swapId). %w", err))
return
Expand Down Expand Up @@ -231,7 +306,7 @@ func SwapStateCheck(logger *slog.Logger, mint *m.Mint) gin.HandlerFunc {
case utils.LiquidityOut:

}
err = mint.MintDB.ChangeLiquiditySwapState(swapId, swapRequest.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swapId, swapRequest.State)
if err != nil {
c.Error(fmt.Errorf("mint.MintDB.ChangeLiquiditySwapState(swapId, swapRequest.State). %w", err))
return
Expand All @@ -256,7 +331,32 @@ func ConfirmSwapOutTransaction(logger *slog.Logger, mint *m.Mint) gin.HandlerFun
// only needs the amount and we generate an invoice from the mint directly
swapId := c.Param("swapId")

swapRequest, err := mint.MintDB.GetLiquiditySwapById(swapId)
tx, err := mint.MintDB.GetTx(c.Request.Context())
if err != nil {
logger.Debug(
"Could not get db transactions",
slog.String(utils.LogExtraInfo, err.Error()),
)
c.Error(fmt.Errorf("mint.MintDB.GetTx(). %w", err))
return
}

defer func() {
if p := recover(); p != nil {
logger.Error("\n Rolling back because of failure %+v\n", p)
tx.Rollback(c.Request.Context())
} else if err != nil {
logger.Error(fmt.Sprintf("\n Rolling back because of failure %+v\n", err))
tx.Rollback(c.Request.Context())
} else {
err = tx.Commit(c.Request.Context())
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
}
}()

swapRequest, err := mint.MintDB.GetLiquiditySwapById(tx, swapId)
if err != nil {
c.Error(errors.New("mint.MintDB.GetLiquiditySwapById(swapId)"))
return
Expand All @@ -268,7 +368,7 @@ func ConfirmSwapOutTransaction(logger *slog.Logger, mint *m.Mint) gin.HandlerFun
}

swapRequest.State = utils.LightningPaymentPending
err = mint.MintDB.ChangeLiquiditySwapState(swapId, swapRequest.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swapId, swapRequest.State)
if err != nil {
c.Error(fmt.Errorf("mint.MintDB.ChangeLiquiditySwapState(swapId, swapRequest.State). %w", err))
return
Expand Down Expand Up @@ -296,7 +396,7 @@ func ConfirmSwapOutTransaction(logger *slog.Logger, mint *m.Mint) gin.HandlerFun
// if error on checking payement we will save as pending and returns status
if err != nil {

err = mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, swapRequest.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swapRequest.Id, swapRequest.State)
if err != nil {
logger.Error(fmt.Errorf("mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, utils.UnknownProblem): %w", err).Error())
}
Expand All @@ -309,7 +409,7 @@ func ConfirmSwapOutTransaction(logger *slog.Logger, mint *m.Mint) gin.HandlerFun
case lightning.PENDING, lightning.SETTLED:
swapRequest.State = utils.LightningPaymentPending
// change melt request state
err = mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, swapRequest.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swapRequest.Id, swapRequest.State)
if err != nil {
logger.Error(fmt.Errorf("mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, utils.UnknownProblem): %w", err).Error())
}
Expand All @@ -319,7 +419,7 @@ func ConfirmSwapOutTransaction(logger *slog.Logger, mint *m.Mint) gin.HandlerFun
// finish failure and release the proofs
case lightning.FAILED, lightning.UNKNOWN:
swapRequest.State = utils.LightningPaymentFail
err = mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, swapRequest.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swapRequest.Id, swapRequest.State)
if err != nil {
logger.Error(fmt.Errorf("mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, utils.LightnigPaymentFail): %w", err).Error())
}
Expand All @@ -329,7 +429,7 @@ func ConfirmSwapOutTransaction(logger *slog.Logger, mint *m.Mint) gin.HandlerFun

swapRequest.State = utils.Finished

err = mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, swapRequest.State)
err = mint.MintDB.ChangeLiquiditySwapState(tx, swapRequest.Id, swapRequest.State)
if err != nil {
logger.Error(fmt.Errorf("mint.MintDB.ChangeLiquiditySwapState(swapRequest.Id, utils.LightnigPaymentFail): %w", err).Error())
}
Expand Down
27 changes: 26 additions & 1 deletion internal/routes/admin/pages.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,32 @@ func SwapStatusPage(logger *slog.Logger, mint *mint.Mint) gin.HandlerFunc {
ctx := context.Background()

swapId := c.Param("swapId")
swap, err := mint.MintDB.GetLiquiditySwapById(swapId)
tx, err := mint.MintDB.GetTx(c.Request.Context())
if err != nil {
logger.Debug(
"Incorrect body",
slog.String(utils.LogExtraInfo, err.Error()),
)
c.Error(fmt.Errorf("mint.MintDB.GetTx(). %w", err))
return
}

defer func() {
if p := recover(); p != nil {
logger.Error("\n Rolling back because of failure %+v\n", p)
tx.Rollback(c.Request.Context())
} else if err != nil {
logger.Error(fmt.Sprintf("\n Rolling back because of failure %+v\n", err))
tx.Rollback(c.Request.Context())
} else {
err = tx.Commit(c.Request.Context())
if err != nil {
logger.Error(fmt.Sprintf("\n Failed to commit transaction: %+v \n", err))
}
fmt.Println("Key rotation finished successfully")
}
}()
swap, err := mint.MintDB.GetLiquiditySwapById(tx, swapId)
if err != nil {
c.Error(err)
return
Expand Down

0 comments on commit 41fca30

Please sign in to comment.