From 3b5a00c6d724b0997f1751dc4d09257e0ca1a28b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Tue, 19 Nov 2024 14:46:08 +0100 Subject: [PATCH] fix: If put transaction gets context deadline exceeded, check for status first to avoid false negatives --- pkg/metamorph/client.go | 156 +++++++++++++++++++++++++++-------- pkg/metamorph/client_test.go | 141 +++++++++++++++++++++++++++++-- 2 files changed, 258 insertions(+), 39 deletions(-) diff --git a/pkg/metamorph/client.go b/pkg/metamorph/client.go index 10364ce6f..272c4c7b6 100644 --- a/pkg/metamorph/client.go +++ b/pkg/metamorph/client.go @@ -11,7 +11,10 @@ import ( sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "go.opentelemetry.io/otel/attribute" + "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "github.com/bitcoin-sv/arc/config" @@ -21,6 +24,9 @@ import ( "github.com/bitcoin-sv/arc/internal/tracing" ) +const retryInterval = 300 * time.Millisecond +const maxTimeoutDefault = 5 * time.Second + var ( ErrTransactionNotFound = errors.New("transaction not found") ) @@ -58,6 +64,13 @@ type Metamorph struct { now func() time.Time tracingEnabled bool tracingAttributes []attribute.KeyValue + maxTimeout time.Duration +} + +func WithMaxTimeoutDefault(d time.Duration) func(*Metamorph) { + return func(m *Metamorph) { + m.maxTimeout = d + } } func WithMqClient(mqClient MessageQueueClient) func(*Metamorph) { @@ -94,9 +107,10 @@ func WithTracer(attr ...attribute.KeyValue) func(s *Metamorph) { // NewClient creates a connection to a list of metamorph servers via gRPC. func NewClient(client metamorph_api.MetaMorphAPIClient, opts ...func(client *Metamorph)) *Metamorph { m := &Metamorph{ - client: client, - logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), - now: time.Now, + client: client, + logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), + now: time.Now, + maxTimeout: maxTimeoutDefault, } for _, opt := range opts { @@ -232,22 +246,54 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction var response *metamorph_api.TransactionStatus var err error // in case of error try PutTransaction until timeout expires - start := time.Now() - const interval = 300 * time.Millisecond - maxTimeout := time.Duration(5 * time.Second) - maxTimeout = max(time.Duration(request.MaxTimeout)*time.Second, maxTimeout) + + maxTimeout := max(time.Duration(request.MaxTimeout)*time.Second, m.maxTimeout) + + retryTicker := time.NewTicker(retryInterval) + + timeoutTimer := time.NewTimer(maxTimeout) + + response, err = m.client.PutTransaction(ctx, request) + if err == nil { + return &TransactionStatus{ + TxID: response.GetTxid(), + Status: response.GetStatus().String(), + ExtraInfo: response.GetRejectReason(), + CompetingTxs: response.GetCompetingTxs(), + BlockHash: response.GetBlockHash(), + BlockHeight: response.GetBlockHeight(), + MerklePath: response.GetMerklePath(), + Timestamp: m.now().Unix(), + }, nil + } + + m.logger.ErrorContext(ctx, "Failed to put transaction", slog.String("err", err.Error())) +forLoop: for { - response, err = m.client.PutTransaction(ctx, request) - if err == nil { - break - } - m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error())) - time.Sleep(interval) - if maxTimeout >= time.Since(start) { - continue - } + select { + case <-timeoutTimer.C: + return nil, err - return nil, err + case <-retryTicker.C: + response, err = m.client.PutTransaction(ctx, request) + if err == nil { + break forLoop + } + + m.logger.ErrorContext(ctx, "Failed to put transaction", slog.String("err", err.Error())) + + if status.Code(err) != codes.Code(code.Code_DEADLINE_EXCEEDED) { + continue + } + + // if error is deadline exceeded, check tx status to avoid false negatives + txStatus, getStatusErr := m.GetTransactionStatus(ctx, tx.TxID()) + if getStatusErr != nil { + continue + } + + return txStatus, nil + } } return &TransactionStatus{ @@ -294,28 +340,70 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio return ret, nil } + var responses *metamorph_api.TransactionStatuses + var err error // put all transactions together - start := time.Now() - const interval = 300 * time.Millisecond - maxTimeout := time.Duration(5 * time.Second) - if len(in.Transactions) != 0 { - maxTimeout = time.Duration(in.Transactions[0].MaxTimeout) * time.Second + maxTimeout := max(time.Duration(in.Transactions[0].MaxTimeout)*time.Second, m.maxTimeout) + + retryTicker := time.NewTicker(retryInterval) + + timeoutTimer := time.NewTimer(maxTimeout) + + responses, err = m.client.PutTransactions(ctx, in) + if err == nil { + // parse response and return to user + ret := make([]*TransactionStatus, 0) + for _, response := range responses.GetStatuses() { + ret = append(ret, &TransactionStatus{ + TxID: response.GetTxid(), + MerklePath: response.GetMerklePath(), + Status: response.GetStatus().String(), + ExtraInfo: response.GetRejectReason(), + CompetingTxs: response.GetCompetingTxs(), + BlockHash: response.GetBlockHash(), + BlockHeight: response.GetBlockHeight(), + Timestamp: m.now().Unix(), + }) + } + + return ret, nil } - var responses *metamorph_api.TransactionStatuses - var err error + + m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error())) +forLoop: for { - responses, err = m.client.PutTransactions(ctx, in) - if err == nil { - break - } - m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error())) - time.Sleep(interval) - if maxTimeout >= time.Since(start) { - continue - } + select { + case <-timeoutTimer.C: + return nil, err - return nil, err + case <-retryTicker.C: + responses, err = m.client.PutTransactions(ctx, in) + if err == nil { + break forLoop + } + + m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error())) + + if status.Code(err) != codes.Code(code.Code_DEADLINE_EXCEEDED) { + continue + } + + // if error is deadline exceeded, check tx status to avoid false negatives + + // Todo: Create and use here client.GetTransactionStatuses rpc function + txStatuses := make([]*TransactionStatus, 0) + for _, tx := range txs { + txStatus, getStatusErr := m.GetTransactionStatus(ctx, tx.TxID()) + if getStatusErr != nil { + continue forLoop + } + + txStatuses = append(txStatuses, txStatus) + } + + return txStatuses, nil + } } // parse response and return to user diff --git a/pkg/metamorph/client_test.go b/pkg/metamorph/client_test.go index 88917d215..2077e9956 100644 --- a/pkg/metamorph/client_test.go +++ b/pkg/metamorph/client_test.go @@ -6,6 +6,10 @@ import ( "testing" "time" + "google.golang.org/genproto/googleapis/rpc/code" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" apiMocks "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -68,6 +72,8 @@ func TestClient_SubmitTransaction(t *testing.T) { options *metamorph.TransactionOptions putTxErr error putTxStatus *metamorph_api.TransactionStatus + getTxErr error + getTxStatus *metamorph_api.TransactionStatus withMqClient bool publishSubmitTxErr error @@ -109,7 +115,7 @@ func TestClient_SubmitTransaction(t *testing.T) { }, }, { - name: "wait for received, put tx err, no mq client", + name: "wait for received, put tx err", options: &metamorph.TransactionOptions{ WaitForStatus: metamorph_api.Status_RECEIVED, }, @@ -123,6 +129,43 @@ func TestClient_SubmitTransaction(t *testing.T) { expectedStatus: nil, expectedErrorStr: "failed to put tx", }, + { + name: "wait for received, put tx err deadline exceeded", + options: &metamorph.TransactionOptions{ + WaitForStatus: metamorph_api.Status_RECEIVED, + }, + putTxStatus: &metamorph_api.TransactionStatus{ + Txid: testdata.TX1Hash.String(), + Status: metamorph_api.Status_RECEIVED, + }, + putTxErr: status.New(codes.Code(code.Code_DEADLINE_EXCEEDED), errors.New("deadline exceeded").Error()).Err(), + withMqClient: false, + getTxStatus: &metamorph_api.TransactionStatus{ + Txid: testdata.TX1Hash.String(), + Status: metamorph_api.Status_RECEIVED, + }, + + expectedStatus: &metamorph.TransactionStatus{ + TxID: testdata.TX1Hash.String(), + Status: metamorph_api.Status_RECEIVED.String(), + Timestamp: now.Unix(), + }, + }, + { + name: "wait for received, put tx err deadline exceeded, get tx err", + options: &metamorph.TransactionOptions{ + WaitForStatus: metamorph_api.Status_RECEIVED, + }, + putTxStatus: &metamorph_api.TransactionStatus{ + Txid: testdata.TX1Hash.String(), + Status: metamorph_api.Status_RECEIVED, + }, + putTxErr: status.New(codes.Code(code.Code_DEADLINE_EXCEEDED), errors.New("deadline exceeded").Error()).Err(), + withMqClient: false, + getTxErr: errors.New("failed to get tx status"), + + expectedErrorStr: "rpc error: code = DeadlineExceeded desc = deadline exceeded", + }, { name: "wait for queued, with mq client", options: &metamorph.TransactionOptions{ @@ -156,9 +199,15 @@ func TestClient_SubmitTransaction(t *testing.T) { PutTransactionFunc: func(_ context.Context, _ *metamorph_api.TransactionRequest, _ ...grpc.CallOption) (*metamorph_api.TransactionStatus, error) { return tc.putTxStatus, tc.putTxErr }, + GetTransactionStatusFunc: func(_ context.Context, _ *metamorph_api.TransactionStatusRequest, _ ...grpc.CallOption) (*metamorph_api.TransactionStatus, error) { + return tc.getTxStatus, tc.getTxErr + }, } - opts := []func(client *metamorph.Metamorph){metamorph.WithNow(func() time.Time { return now })} + opts := []func(client *metamorph.Metamorph){ + metamorph.WithNow(func() time.Time { return now }), + metamorph.WithMaxTimeoutDefault(1 * time.Second), + } if tc.withMqClient { mqClient := &mocks.MessageQueueClientMock{ PublishMarshalFunc: func(_ string, _ protoreflect.ProtoMessage) error { return tc.publishSubmitTxErr }, @@ -171,9 +220,9 @@ func TestClient_SubmitTransaction(t *testing.T) { tx, err := sdkTx.NewTransactionFromHex(testdata.TX1RawString) // Then require.NoError(t, err) - status, err := client.SubmitTransaction(context.Background(), tx, tc.options) + txStatus, err := client.SubmitTransaction(context.Background(), tx, tc.options) - require.Equal(t, tc.expectedStatus, status) + require.Equal(t, tc.expectedStatus, txStatus) if tc.expectedErrorStr != "" { require.ErrorContains(t, err, tc.expectedErrorStr) @@ -197,6 +246,8 @@ func TestClient_SubmitTransactions(t *testing.T) { options *metamorph.TransactionOptions putTxErr error putTxStatus *metamorph_api.TransactionStatuses + getTxErr error + getTxStatus *metamorph_api.TransactionStatus withMqClient bool publishSubmitTxErr error @@ -270,6 +321,80 @@ func TestClient_SubmitTransactions(t *testing.T) { expectedStatuses: nil, expectedErrorStr: "failed to put tx", }, + { + name: "wait for received, put tx err deadline exceeded", + options: &metamorph.TransactionOptions{ + WaitForStatus: metamorph_api.Status_RECEIVED, + }, + putTxStatus: &metamorph_api.TransactionStatuses{ + Statuses: []*metamorph_api.TransactionStatus{ + { + Txid: tx1.TxID(), + Status: metamorph_api.Status_RECEIVED, + }, + { + Txid: tx2.TxID(), + Status: metamorph_api.Status_RECEIVED, + }, + { + Txid: tx3.TxID(), + Status: metamorph_api.Status_RECEIVED, + }, + }, + }, + putTxErr: status.New(codes.Code(code.Code_DEADLINE_EXCEEDED), errors.New("deadline exceeded").Error()).Err(), + withMqClient: false, + getTxStatus: &metamorph_api.TransactionStatus{ + Txid: testdata.TX1Hash.String(), + Status: metamorph_api.Status_RECEIVED, + }, + + expectedStatuses: []*metamorph.TransactionStatus{ + { + TxID: tx1.TxID(), + Status: metamorph_api.Status_RECEIVED.String(), + Timestamp: now.Unix(), + }, + { + TxID: tx2.TxID(), + Status: metamorph_api.Status_RECEIVED.String(), + Timestamp: now.Unix(), + }, + { + TxID: tx3.TxID(), + Status: metamorph_api.Status_RECEIVED.String(), + Timestamp: now.Unix(), + }, + }, + }, + { + name: "wait for received, put tx err deadline exceeded, get tx err", + options: &metamorph.TransactionOptions{ + WaitForStatus: metamorph_api.Status_RECEIVED, + }, + putTxStatus: &metamorph_api.TransactionStatuses{ + Statuses: []*metamorph_api.TransactionStatus{ + { + Txid: tx1.TxID(), + Status: metamorph_api.Status_RECEIVED, + }, + { + Txid: tx2.TxID(), + Status: metamorph_api.Status_RECEIVED, + }, + { + Txid: tx3.TxID(), + Status: metamorph_api.Status_RECEIVED, + }, + }, + }, + putTxErr: status.New(codes.Code(code.Code_DEADLINE_EXCEEDED), errors.New("deadline exceeded").Error()).Err(), + withMqClient: false, + getTxErr: errors.New("failed to get tx status"), + + expectedStatuses: nil, + expectedErrorStr: "rpc error: code = DeadlineExceeded desc = deadline exceeded", + }, { name: "wait for queued, with mq client", options: &metamorph.TransactionOptions{ @@ -315,9 +440,15 @@ func TestClient_SubmitTransactions(t *testing.T) { PutTransactionsFunc: func(_ context.Context, _ *metamorph_api.TransactionRequests, _ ...grpc.CallOption) (*metamorph_api.TransactionStatuses, error) { return tc.putTxStatus, tc.putTxErr }, + GetTransactionStatusFunc: func(_ context.Context, _ *metamorph_api.TransactionStatusRequest, _ ...grpc.CallOption) (*metamorph_api.TransactionStatus, error) { + return tc.getTxStatus, tc.getTxErr + }, } - opts := []func(client *metamorph.Metamorph){metamorph.WithNow(func() time.Time { return now })} + opts := []func(client *metamorph.Metamorph){ + metamorph.WithNow(func() time.Time { return now }), + metamorph.WithMaxTimeoutDefault(1 * time.Second), + } if tc.withMqClient { mqClient := &mocks.MessageQueueClientMock{ PublishMarshalFunc: func(_ string, _ protoreflect.ProtoMessage) error {