From f7ad8cbeaf2d1cd3230ba609cc05d0d31995d3bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 5 Jan 2024 10:05:31 +0100 Subject: [PATCH 1/2] BAARC-96: Do not respond with error if transaction initialization with blocktx fails or times out --- cmd/metamorph.go | 5 ++++ config.yaml | 1 + metamorph/server.go | 65 ++++++++++++++++++++++++++++++--------------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/cmd/metamorph.go b/cmd/metamorph.go index 4b2a82897..2e554aa26 100644 --- a/cmd/metamorph.go +++ b/cmd/metamorph.go @@ -271,6 +271,11 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { opts = append(opts, metamorph.WithForceCheckUtxos(node)) } + btxTimeout := viper.GetDuration("metamorph.blocktxTimeout") + if btxTimeout > 0 { + opts = append(opts, metamorph.WithBlocktxTimeout(btxTimeout)) + } + serv := metamorph.NewServer(s, metamorphProcessor, btx, source, opts...) go func() { diff --git a/config.yaml b/config.yaml index 07eda0430..a96da17c8 100644 --- a/config.yaml +++ b/config.yaml @@ -61,6 +61,7 @@ metamorph: network: fixedIp: true # Denotes if ARC is running with fixed IP addresses ipAddressHint: ^172.28.* + blocktxTimeout: 1s # timeout for blocktx service blocktx: listenAddr: localhost:8011 # address space for blocktx to listen on. Can be for example localhost:8011 or :8011 for listening on all addresses diff --git a/metamorph/server.go b/metamorph/server.go index 8a7b7059e..7107e4f12 100644 --- a/metamorph/server.go +++ b/metamorph/server.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + "github.com/bitcoin-sv/arc/api/handler" "log/slog" "net" "net/url" @@ -38,7 +39,11 @@ func init() { const ( responseTimeout = 5 * time.Second - blocktxTimeout = 2 * time.Second + blocktxTimeout = 1 * time.Second +) + +var ( + ErrRegisterTxTimeout = errors.New("failed to register transaction due to timeout") ) type BitcoinNode interface { @@ -68,22 +73,29 @@ type Server struct { source string bitcoinNode BitcoinNode forceCheckUtxos bool + blocktxTimeout time.Duration +} + +func WithBlocktxTimeout(d time.Duration) func(*Server) { + return func(s *Server) { + s.blocktxTimeout = d + } } func WithLogger(logger *slog.Logger) func(*Server) { - return func(p *Server) { - p.logger = logger.With(slog.String("service", "mtm")) + return func(s *Server) { + s.logger = logger.With(slog.String("service", "mtm")) } } func WithForceCheckUtxos(bitcoinNode BitcoinNode) func(*Server) { - return func(p *Server) { - p.bitcoinNode = bitcoinNode - p.forceCheckUtxos = true + return func(s *Server) { + s.bitcoinNode = bitcoinNode + s.forceCheckUtxos = true } } -type ServerOption func(f *Server) +type ServerOption func(s *Server) // NewServer will return a server instance with the zmqLogger stored within it func NewServer(s store.MetamorphStore, p ProcessorI, btc blocktx.ClientI, source string, opts ...ServerOption) *Server { @@ -95,6 +107,7 @@ func NewServer(s store.MetamorphStore, p ProcessorI, btc blocktx.ClientI, source btc: btc, source: source, forceCheckUtxos: false, + blocktxTimeout: blocktxTimeout, } for _, opt := range opts { @@ -200,8 +213,7 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact next, status, hash, transactionStatus, err := s.putTransactionInit(ctx, req, start) if err != nil { - // if we have an error, we will return immediately - return nil, err + s.logger.Error("failed to initialize transaction", slog.String("err", err.Error())) } if transactionStatus != nil { @@ -249,6 +261,7 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac resp.Statuses = make([]*metamorph_api.TransactionStatus, len(req.GetTransactions())) processTxsInputMap := make(map[chainhash.Hash]processTxInput) + skipTxInit := false for ind, txReq := range req.GetTransactions() { err := ValidateCallbackURL(txReq.GetCallbackUrl()) @@ -256,16 +269,26 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac return nil, err } - _, status, hash, transactionStatus, err := s.putTransactionInit(ctx, txReq, start) - if err != nil { - // if we have an error, we will return immediately - return nil, err - } + status := metamorph_api.Status_UNKNOWN + hash := handler.PtrTo(chainhash.DoubleHashH(txReq.GetRawTx())) - if transactionStatus != nil { - // if we have a transactionStatus, no need to process it another time - resp.Statuses[ind] = transactionStatus - continue + var transactionStatus *metamorph_api.TransactionStatus + + if !skipTxInit { + _, status, hash, transactionStatus, err = s.putTransactionInit(ctx, txReq, start) + if err != nil { + // If the initialization step times out once, then don't repeat for subsequent transactions in the batch as it may consume a lot of time + if errors.Is(err, ErrRegisterTxTimeout) { + skipTxInit = true + } + + s.logger.Error("failed to register transaction", slog.String("err", err.Error())) + } + if transactionStatus != nil { + // if we have a transactionStatus, no need to process it another time + resp.Statuses[ind] = transactionStatus + continue + } } // Convert gRPC req to store.StoreData struct... @@ -347,7 +370,7 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph func (s *Server) registerTransaction(ctx context.Context, hash chainhash.Hash) (*blocktx_api.RegisterTransactionResponse, error) { responseCh := make(chan *blocktx_api.RegisterTransactionResponse, 1) errCh := make(chan error, 1) - blocktxCtx, cancel := context.WithTimeout(ctx, blocktxTimeout) + blocktxCtx, cancel := context.WithTimeout(ctx, s.blocktxTimeout) defer cancel() go func() { @@ -365,7 +388,7 @@ func (s *Server) registerTransaction(ctx context.Context, hash chainhash.Hash) ( select { case <-blocktxCtx.Done(): - return nil, errors.New("failed to register transaction due to timeout") + return nil, ErrRegisterTxTimeout case err := <-errCh: return nil, fmt.Errorf("failed to register transaction: %v", err) case response := <-responseCh: @@ -516,7 +539,7 @@ func (s *Server) GetTransaction(ctx context.Context, req *metamorph_api.Transact func (s *Server) getMerklePath(ctx context.Context, hash *chainhash.Hash, dataStatus metamorph_api.Status) (string, error) { merklePathCh := make(chan string, 1) errCh := make(chan error, 1) - blocktxCtx, cancel := context.WithTimeout(ctx, blocktxTimeout) + blocktxCtx, cancel := context.WithTimeout(ctx, s.blocktxTimeout) defer cancel() go func() { From 8acb299447881346c4581a86c007858c8a971e9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Fri, 5 Jan 2024 10:59:23 +0100 Subject: [PATCH 2/2] BAARC-96: do not return tx hash from tx init function --- metamorph/server.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/metamorph/server.go b/metamorph/server.go index 7107e4f12..454389a65 100644 --- a/metamorph/server.go +++ b/metamorph/server.go @@ -210,8 +210,9 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact if err != nil { return nil, err } + hash := handler.PtrTo(chainhash.DoubleHashH(req.GetRawTx())) - next, status, hash, transactionStatus, err := s.putTransactionInit(ctx, req, start) + next, status, transactionStatus, err := s.putTransactionInit(ctx, req, start) if err != nil { s.logger.Error("failed to initialize transaction", slog.String("err", err.Error())) } @@ -275,7 +276,7 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac var transactionStatus *metamorph_api.TransactionStatus if !skipTxInit { - _, status, hash, transactionStatus, err = s.putTransactionInit(ctx, txReq, start) + _, status, transactionStatus, err = s.putTransactionInit(ctx, txReq, start) if err != nil { // If the initialization step times out once, then don't repeat for subsequent transactions in the batch as it may consume a lot of time if errors.Is(err, ErrRegisterTxTimeout) { @@ -396,7 +397,7 @@ func (s *Server) registerTransaction(ctx context.Context, hash chainhash.Hash) ( } } -func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.TransactionRequest, start int64) (int64, metamorph_api.Status, *chainhash.Hash, *metamorph_api.TransactionStatus, error) { +func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.TransactionRequest, start int64) (int64, metamorph_api.Status, *metamorph_api.TransactionStatus, error) { initSpan, initCtx := opentracing.StartSpanFromContext(ctx, "Server:PutTransaction:init") defer initSpan.Finish() @@ -411,7 +412,7 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran // Register the transaction in blocktx store rtr, err := s.registerTransaction(ctx, hash) if err != nil { - return 0, 0, nil, nil, err + return 0, 0, nil, err } if rtr.BlockHash != nil { @@ -437,7 +438,7 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran status = metamorph_api.Status_MINED blockHash, _ := chainhash.NewHash(rtr.GetBlockHash()) if err = s.store.UpdateMined(initCtx, &hash, blockHash, rtr.GetBlockHeight()); err != nil { - return 0, 0, nil, nil, err + return 0, 0, nil, err } } @@ -447,14 +448,14 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran if transactionStatus != nil { // just return the status if we found it in the store transactionStatus.MerklePath = rtr.GetMerklePath() - return 0, 0, nil, transactionStatus, nil + return 0, 0, transactionStatus, nil } if s.forceCheckUtxos { next, err = s.CheckUtxos(initCtx, next, req.GetRawTx()) s.logger.Error("Error checking utxos", slog.String("err", err.Error())) if err != nil { - return 0, 0, nil, &metamorph_api.TransactionStatus{ + return 0, 0, &metamorph_api.TransactionStatus{ Status: metamorph_api.Status_REJECTED, Txid: hash.String(), RejectReason: err.Error(), @@ -462,7 +463,7 @@ func (s *Server) putTransactionInit(ctx context.Context, req *metamorph_api.Tran } } - return next, status, &hash, nil, nil + return next, status, nil, nil } func (s *Server) checkStore(ctx context.Context, hash *chainhash.Hash, next int64) (int64, *metamorph_api.TransactionStatus) {