Skip to content

Commit

Permalink
Merge pull request #233 from bitcoin-sv/fix/decouple-mtm-btx
Browse files Browse the repository at this point in the history
decouple metamorph from blocktx
  • Loading branch information
boecklim authored Jan 5, 2024
2 parents 3385345 + 8acb299 commit d02c960
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 28 deletions.
5 changes: 5 additions & 0 deletions cmd/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ func StartMetamorph(logger *slog.Logger) (func(), error) {
opts = append(opts, metamorph.WithForceCheckUtxos(node))
}

btxTimeout := viper.GetDuration("metamorph.blocktxTimeout")
if btxTimeout > 0 {
opts = append(opts, metamorph.WithBlocktxTimeout(btxTimeout))
}

serv := metamorph.NewServer(s, metamorphProcessor, btx, source, opts...)

go func() {
Expand Down
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ metamorph:
network:
fixedIp: true # Denotes if ARC is running with fixed IP addresses
ipAddressHint: ^172.28.*
blocktxTimeout: 1s # timeout for blocktx service

blocktx:
listenAddr: localhost:8011 # address space for blocktx to listen on. Can be for example localhost:8011 or :8011 for listening on all addresses
Expand Down
80 changes: 52 additions & 28 deletions metamorph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/bitcoin-sv/arc/api/handler"
"log/slog"
"net"
"net/url"
Expand Down Expand Up @@ -38,7 +39,11 @@ func init() {

const (
responseTimeout = 5 * time.Second
blocktxTimeout = 2 * time.Second
blocktxTimeout = 1 * time.Second
)

var (
ErrRegisterTxTimeout = errors.New("failed to register transaction due to timeout")
)

type BitcoinNode interface {
Expand Down Expand Up @@ -68,22 +73,29 @@ type Server struct {
source string
bitcoinNode BitcoinNode
forceCheckUtxos bool
blocktxTimeout time.Duration
}

func WithBlocktxTimeout(d time.Duration) func(*Server) {
return func(s *Server) {
s.blocktxTimeout = d
}
}

func WithLogger(logger *slog.Logger) func(*Server) {
return func(p *Server) {
p.logger = logger.With(slog.String("service", "mtm"))
return func(s *Server) {
s.logger = logger.With(slog.String("service", "mtm"))
}
}

func WithForceCheckUtxos(bitcoinNode BitcoinNode) func(*Server) {
return func(p *Server) {
p.bitcoinNode = bitcoinNode
p.forceCheckUtxos = true
return func(s *Server) {
s.bitcoinNode = bitcoinNode
s.forceCheckUtxos = true
}
}

type ServerOption func(f *Server)
type ServerOption func(s *Server)

// NewServer will return a server instance with the zmqLogger stored within it
func NewServer(s store.MetamorphStore, p ProcessorI, btc blocktx.ClientI, source string, opts ...ServerOption) *Server {
Expand All @@ -95,6 +107,7 @@ func NewServer(s store.MetamorphStore, p ProcessorI, btc blocktx.ClientI, source
btc: btc,
source: source,
forceCheckUtxos: false,
blocktxTimeout: blocktxTimeout,
}

for _, opt := range opts {
Expand Down Expand Up @@ -197,11 +210,11 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact
if err != nil {
return nil, err
}
hash := handler.PtrTo(chainhash.DoubleHashH(req.GetRawTx()))

next, status, hash, transactionStatus, err := s.putTransactionInit(ctx, req, start)
next, status, transactionStatus, err := s.putTransactionInit(ctx, req, start)
if err != nil {
// if we have an error, we will return immediately
return nil, err
s.logger.Error("failed to initialize transaction", slog.String("err", err.Error()))
}

if transactionStatus != nil {
Expand Down Expand Up @@ -249,23 +262,34 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac
resp.Statuses = make([]*metamorph_api.TransactionStatus, len(req.GetTransactions()))

processTxsInputMap := make(map[chainhash.Hash]processTxInput)
skipTxInit := false

for ind, txReq := range req.GetTransactions() {
err := ValidateCallbackURL(txReq.GetCallbackUrl())
if err != nil {
return nil, err
}

_, status, hash, transactionStatus, err := s.putTransactionInit(ctx, txReq, start)
if err != nil {
// if we have an error, we will return immediately
return nil, err
}
status := metamorph_api.Status_UNKNOWN
hash := handler.PtrTo(chainhash.DoubleHashH(txReq.GetRawTx()))

if transactionStatus != nil {
// if we have a transactionStatus, no need to process it another time
resp.Statuses[ind] = transactionStatus
continue
var transactionStatus *metamorph_api.TransactionStatus

if !skipTxInit {
_, status, transactionStatus, err = s.putTransactionInit(ctx, txReq, start)
if err != nil {
// If the initialization step times out once, then don't repeat for subsequent transactions in the batch as it may consume a lot of time
if errors.Is(err, ErrRegisterTxTimeout) {
skipTxInit = true
}

s.logger.Error("failed to register transaction", slog.String("err", err.Error()))
}
if transactionStatus != nil {
// if we have a transactionStatus, no need to process it another time
resp.Statuses[ind] = transactionStatus
continue
}
}

// Convert gRPC req to store.StoreData struct...
Expand Down Expand Up @@ -347,7 +371,7 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph
func (s *Server) registerTransaction(ctx context.Context, hash chainhash.Hash) (*blocktx_api.RegisterTransactionResponse, error) {
responseCh := make(chan *blocktx_api.RegisterTransactionResponse, 1)
errCh := make(chan error, 1)
blocktxCtx, cancel := context.WithTimeout(ctx, blocktxTimeout)
blocktxCtx, cancel := context.WithTimeout(ctx, s.blocktxTimeout)
defer cancel()

go func() {
Expand All @@ -365,15 +389,15 @@ func (s *Server) registerTransaction(ctx context.Context, hash chainhash.Hash) (

select {
case <-blocktxCtx.Done():
return nil, errors.New("failed to register transaction due to timeout")
return nil, ErrRegisterTxTimeout
case err := <-errCh:
return nil, fmt.Errorf("failed to register transaction: %v", err)
case response := <-responseCh:
return response, nil
}
}

func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.TransactionRequest, start int64) (int64, metamorph_api.Status, *chainhash.Hash, *metamorph_api.TransactionStatus, error) {
func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.TransactionRequest, start int64) (int64, metamorph_api.Status, *metamorph_api.TransactionStatus, error) {
initSpan, initCtx := opentracing.StartSpanFromContext(ctx, "Server:PutTransaction:init")
defer initSpan.Finish()

Expand All @@ -388,7 +412,7 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran
// Register the transaction in blocktx store
rtr, err := s.registerTransaction(ctx, hash)
if err != nil {
return 0, 0, nil, nil, err
return 0, 0, nil, err
}

if rtr.BlockHash != nil {
Expand All @@ -414,7 +438,7 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran
status = metamorph_api.Status_MINED
blockHash, _ := chainhash.NewHash(rtr.GetBlockHash())
if err = s.store.UpdateMined(initCtx, &hash, blockHash, rtr.GetBlockHeight()); err != nil {
return 0, 0, nil, nil, err
return 0, 0, nil, err
}
}

Expand All @@ -424,22 +448,22 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran
if transactionStatus != nil {
// just return the status if we found it in the store
transactionStatus.MerklePath = rtr.GetMerklePath()
return 0, 0, nil, transactionStatus, nil
return 0, 0, transactionStatus, nil
}

if s.forceCheckUtxos {
next, err = s.CheckUtxos(initCtx, next, req.GetRawTx())
s.logger.Error("Error checking utxos", slog.String("err", err.Error()))
if err != nil {
return 0, 0, nil, &metamorph_api.TransactionStatus{
return 0, 0, &metamorph_api.TransactionStatus{
Status: metamorph_api.Status_REJECTED,
Txid: hash.String(),
RejectReason: err.Error(),
}, nil
}
}

return next, status, &hash, nil, nil
return next, status, nil, nil
}

func (s *Server) checkStore(ctx context.Context, hash *chainhash.Hash, next int64) (int64, *metamorph_api.TransactionStatus) {
Expand Down Expand Up @@ -516,7 +540,7 @@ func (s *Server) GetTransaction(ctx context.Context, req *metamorph_api.Transact
func (s *Server) getMerklePath(ctx context.Context, hash *chainhash.Hash, dataStatus metamorph_api.Status) (string, error) {
merklePathCh := make(chan string, 1)
errCh := make(chan error, 1)
blocktxCtx, cancel := context.WithTimeout(ctx, blocktxTimeout)
blocktxCtx, cancel := context.WithTimeout(ctx, s.blocktxTimeout)
defer cancel()

go func() {
Expand Down

0 comments on commit d02c960

Please sign in to comment.