Skip to content

Commit

Permalink
fix: If put transaction gets context deadline exceeded, check for sta…
Browse files Browse the repository at this point in the history
…tus first to avoid false negatives
  • Loading branch information
boecklim committed Nov 19, 2024
1 parent b0e146b commit 3b5a00c
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 39 deletions.
156 changes: 122 additions & 34 deletions pkg/metamorph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (

sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/bitcoin-sv/arc/config"
Expand All @@ -21,6 +24,9 @@ import (
"github.com/bitcoin-sv/arc/internal/tracing"
)

const retryInterval = 300 * time.Millisecond
const maxTimeoutDefault = 5 * time.Second

var (
ErrTransactionNotFound = errors.New("transaction not found")
)
Expand Down Expand Up @@ -58,6 +64,13 @@ type Metamorph struct {
now func() time.Time
tracingEnabled bool
tracingAttributes []attribute.KeyValue
maxTimeout time.Duration
}

func WithMaxTimeoutDefault(d time.Duration) func(*Metamorph) {
return func(m *Metamorph) {
m.maxTimeout = d
}
}

func WithMqClient(mqClient MessageQueueClient) func(*Metamorph) {
Expand Down Expand Up @@ -94,9 +107,10 @@ func WithTracer(attr ...attribute.KeyValue) func(s *Metamorph) {
// NewClient creates a connection to a list of metamorph servers via gRPC.
func NewClient(client metamorph_api.MetaMorphAPIClient, opts ...func(client *Metamorph)) *Metamorph {
m := &Metamorph{
client: client,
logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})),
now: time.Now,
client: client,
logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})),
now: time.Now,
maxTimeout: maxTimeoutDefault,
}

for _, opt := range opts {
Expand Down Expand Up @@ -232,22 +246,54 @@ func (m *Metamorph) SubmitTransaction(ctx context.Context, tx *sdkTx.Transaction
var response *metamorph_api.TransactionStatus
var err error
// in case of error try PutTransaction until timeout expires
start := time.Now()
const interval = 300 * time.Millisecond
maxTimeout := time.Duration(5 * time.Second)
maxTimeout = max(time.Duration(request.MaxTimeout)*time.Second, maxTimeout)

maxTimeout := max(time.Duration(request.MaxTimeout)*time.Second, m.maxTimeout)

retryTicker := time.NewTicker(retryInterval)

timeoutTimer := time.NewTimer(maxTimeout)

response, err = m.client.PutTransaction(ctx, request)
if err == nil {
return &TransactionStatus{
TxID: response.GetTxid(),
Status: response.GetStatus().String(),
ExtraInfo: response.GetRejectReason(),
CompetingTxs: response.GetCompetingTxs(),
BlockHash: response.GetBlockHash(),
BlockHeight: response.GetBlockHeight(),
MerklePath: response.GetMerklePath(),
Timestamp: m.now().Unix(),
}, nil
}

m.logger.ErrorContext(ctx, "Failed to put transaction", slog.String("err", err.Error()))
forLoop:
for {
response, err = m.client.PutTransaction(ctx, request)
if err == nil {
break
}
m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error()))
time.Sleep(interval)
if maxTimeout >= time.Since(start) {
continue
}
select {
case <-timeoutTimer.C:
return nil, err

return nil, err
case <-retryTicker.C:
response, err = m.client.PutTransaction(ctx, request)
if err == nil {
break forLoop
}

m.logger.ErrorContext(ctx, "Failed to put transaction", slog.String("err", err.Error()))

if status.Code(err) != codes.Code(code.Code_DEADLINE_EXCEEDED) {
continue
}

// if error is deadline exceeded, check tx status to avoid false negatives
txStatus, getStatusErr := m.GetTransactionStatus(ctx, tx.TxID())
if getStatusErr != nil {
continue
}

return txStatus, nil
}
}

return &TransactionStatus{
Expand Down Expand Up @@ -294,28 +340,70 @@ func (m *Metamorph) SubmitTransactions(ctx context.Context, txs sdkTx.Transactio

return ret, nil
}
var responses *metamorph_api.TransactionStatuses
var err error

// put all transactions together
start := time.Now()
const interval = 300 * time.Millisecond
maxTimeout := time.Duration(5 * time.Second)
if len(in.Transactions) != 0 {
maxTimeout = time.Duration(in.Transactions[0].MaxTimeout) * time.Second
maxTimeout := max(time.Duration(in.Transactions[0].MaxTimeout)*time.Second, m.maxTimeout)

retryTicker := time.NewTicker(retryInterval)

timeoutTimer := time.NewTimer(maxTimeout)

responses, err = m.client.PutTransactions(ctx, in)
if err == nil {
// parse response and return to user
ret := make([]*TransactionStatus, 0)
for _, response := range responses.GetStatuses() {
ret = append(ret, &TransactionStatus{
TxID: response.GetTxid(),
MerklePath: response.GetMerklePath(),
Status: response.GetStatus().String(),
ExtraInfo: response.GetRejectReason(),
CompetingTxs: response.GetCompetingTxs(),
BlockHash: response.GetBlockHash(),
BlockHeight: response.GetBlockHeight(),
Timestamp: m.now().Unix(),
})
}

return ret, nil
}
var responses *metamorph_api.TransactionStatuses
var err error

m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error()))
forLoop:
for {
responses, err = m.client.PutTransactions(ctx, in)
if err == nil {
break
}
m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error()))
time.Sleep(interval)
if maxTimeout >= time.Since(start) {
continue
}
select {
case <-timeoutTimer.C:
return nil, err

return nil, err
case <-retryTicker.C:
responses, err = m.client.PutTransactions(ctx, in)
if err == nil {
break forLoop
}

m.logger.ErrorContext(ctx, "Failed to put transactions", slog.String("err", err.Error()))

if status.Code(err) != codes.Code(code.Code_DEADLINE_EXCEEDED) {
continue
}

// if error is deadline exceeded, check tx status to avoid false negatives

// Todo: Create and use here client.GetTransactionStatuses rpc function
txStatuses := make([]*TransactionStatus, 0)
for _, tx := range txs {
txStatus, getStatusErr := m.GetTransactionStatus(ctx, tx.TxID())
if getStatusErr != nil {
continue forLoop
}

txStatuses = append(txStatuses, txStatus)
}

return txStatuses, nil
}
}

// parse response and return to user
Expand Down
Loading

0 comments on commit 3b5a00c

Please sign in to comment.