diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index 270fd588c..5506f4cec 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -500,7 +500,7 @@ func TestStartFillGaps(t *testing.T) { sut.Shutdown() // then - require.GreaterOrEqual(t, tc.minExpectedGetBlockCapsCalls, len(storeMock.GetBlockGapsCalls())) + require.GreaterOrEqual(t, len(storeMock.GetBlockGapsCalls()), tc.minExpectedGetBlockCapsCalls) }) } } diff --git a/pkg/metamorph/client.go b/pkg/metamorph/client.go index 7236b22d6..811a88b25 100644 --- a/pkg/metamorph/client.go +++ b/pkg/metamorph/client.go @@ -196,20 +196,22 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction }, nil } - response, err := m.client.PutTransaction(ctx, request) - if err != nil { - m.logger.Warn("Failed to submit transaction", slog.String("hash", tx.TxID()), slog.String("key", err.Error())) - if m.mqClient != nil { - err := m.mqClient.PublishMarshal(SubmitTxTopic, request) - if err != nil { - return nil, err - } - - return &TransactionStatus{ - TxID: tx.TxID(), - Status: metamorph_api.Status_QUEUED.String(), - Timestamp: m.now().Unix(), - }, nil + var response *metamorph_api.TransactionStatus + var err error + // in case of error try PutTransaction until timeout expires + start := time.Now() + const interval = 300 * time.Millisecond + maxTimeout := time.Duration(5 * time.Second) + maxTimeout = max(time.Duration(request.MaxTimeout)*time.Second, maxTimeout) + for { + response, err = m.client.PutTransaction(ctx, request) + if err == nil { + break + } + m.logger.Error("Failed to put transactions", slog.String("err", err.Error())) + time.Sleep(interval) + if maxTimeout >= time.Since(start) { + continue } return nil, err @@ -258,28 +260,23 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio } // put all transactions together - responses, err := m.client.PutTransactions(ctx, in) - if err != nil { - m.logger.Warn("Failed to submit transactions", slog.String("key", err.Error())) - if m.mqClient != nil { - for _, tx := range in.Transactions { - err := m.mqClient.PublishMarshal(SubmitTxTopic, tx) - if err != nil { - return nil, err - } - } - - // parse response and return to user - ret := make([]*TransactionStatus, 0) - for _, tx := range txs { - ret = append(ret, &TransactionStatus{ - TxID: tx.TxID(), - Status: metamorph_api.Status_QUEUED.String(), - Timestamp: m.now().Unix(), - }) - } - - return ret, nil + start := time.Now() + const interval = 300 * time.Millisecond + maxTimeout := time.Duration(5 * time.Second) + if len(in.Transactions) != 0 { + maxTimeout = time.Duration(in.Transactions[0].MaxTimeout) * time.Second + } + var responses *metamorph_api.TransactionStatuses + var err error + for { + responses, err = m.client.PutTransactions(ctx, in) + if err == nil { + break + } + m.logger.Error("Failed to put transactions", slog.String("err", err.Error())) + time.Sleep(interval) + if maxTimeout >= time.Since(start) { + continue } return nil, err diff --git a/pkg/metamorph/client_test.go b/pkg/metamorph/client_test.go index 43621e29b..27d32638f 100644 --- a/pkg/metamorph/client_test.go +++ b/pkg/metamorph/client_test.go @@ -120,40 +120,6 @@ func TestClient_SubmitTransaction(t *testing.T) { expectedStatus: nil, expectedErrorStr: "failed to put tx", }, - { - name: "wait for received, put tx err, with mq client", - options: &metamorph.TransactionOptions{ - WaitForStatus: metamorph_api.Status_RECEIVED, - }, - putTxStatus: &metamorph_api.TransactionStatus{ - Txid: testdata.TX1Hash.String(), - Status: metamorph_api.Status_RECEIVED, - }, - putTxErr: errors.New("failed to put tx"), - withMqClient: true, - - expectedStatus: &metamorph.TransactionStatus{ - TxID: testdata.TX1Hash.String(), - Status: metamorph_api.Status_QUEUED.String(), - Timestamp: now.Unix(), - }, - }, - { - name: "wait for received, put tx err, with mq client, publish submit tx err", - options: &metamorph.TransactionOptions{ - WaitForStatus: metamorph_api.Status_RECEIVED, - }, - putTxStatus: &metamorph_api.TransactionStatus{ - Txid: testdata.TX1Hash.String(), - Status: metamorph_api.Status_RECEIVED, - }, - putTxErr: errors.New("failed to put tx"), - withMqClient: true, - publishSubmitTxErr: errors.New("failed to publish tx"), - - expectedStatus: nil, - expectedErrorStr: "failed to publish tx", - }, { name: "wait for queued, with mq client", options: &metamorph.TransactionOptions{ @@ -300,76 +266,6 @@ func TestClient_SubmitTransactions(t *testing.T) { expectedStatuses: nil, expectedErrorStr: "failed to put tx", }, - { - name: "wait for received, put tx err, with mq client", - options: &metamorph.TransactionOptions{ - WaitForStatus: metamorph_api.Status_RECEIVED, - }, - putTxStatus: &metamorph_api.TransactionStatuses{ - Statuses: []*metamorph_api.TransactionStatus{ - { - Txid: tx1.TxID(), - Status: metamorph_api.Status_RECEIVED, - }, - { - Txid: tx2.TxID(), - Status: metamorph_api.Status_RECEIVED, - }, - { - Txid: tx3.TxID(), - Status: metamorph_api.Status_RECEIVED, - }, - }, - }, - putTxErr: errors.New("failed to put tx"), - withMqClient: true, - - expectedStatuses: []*metamorph.TransactionStatus{ - { - TxID: tx1.TxID(), - Status: metamorph_api.Status_QUEUED.String(), - Timestamp: now.Unix(), - }, - { - TxID: tx2.TxID(), - Status: metamorph_api.Status_QUEUED.String(), - Timestamp: now.Unix(), - }, - { - TxID: tx3.TxID(), - Status: metamorph_api.Status_QUEUED.String(), - Timestamp: now.Unix(), - }, - }, - }, - { - name: "wait for received, put tx err, with mq client, publish submit tx err", - options: &metamorph.TransactionOptions{ - WaitForStatus: metamorph_api.Status_RECEIVED, - }, - putTxStatus: &metamorph_api.TransactionStatuses{ - Statuses: []*metamorph_api.TransactionStatus{ - { - Txid: tx1.TxID(), - Status: metamorph_api.Status_RECEIVED, - }, - { - Txid: tx2.TxID(), - Status: metamorph_api.Status_RECEIVED, - }, - { - Txid: tx3.TxID(), - Status: metamorph_api.Status_RECEIVED, - }, - }, - }, - putTxErr: errors.New("failed to put tx"), - withMqClient: true, - publishSubmitTxErr: errors.New("failed to publish tx"), - - expectedStatuses: nil, - expectedErrorStr: "failed to publish tx", - }, { name: "wait for queued, with mq client", options: &metamorph.TransactionOptions{