Skip to content

Commit

Permalink
fix(provider): broadcaster and withdraw process
Browse files Browse the repository at this point in the history
fixes #1635
if transaction fails in serial broadcaster query rpc
for the correct sequence number regardless to error code

move withdraw process to balance checker

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Sep 14, 2022
1 parent 5b4883c commit 2ab3b0f
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 296 deletions.
227 changes: 123 additions & 104 deletions client/broadcaster/serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,80 @@ package broadcaster

import (
"context"
"encoding/hex"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/ovrclk/akash/sdkutil"

"github.com/boz/go-lifecycle"
sdkclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
authtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
"github.com/tendermint/tendermint/libs/log"
ttypes "github.com/tendermint/tendermint/types"

"github.com/ovrclk/akash/sdkutil"
)

const (
syncDuration = 10 * time.Second
errCodeMismatch = 32
// invalid group - 7
broadcastBlockRetryPeriod = time.Second
)

var (
ErrNotRunning = errors.New("not running")
// sadface.

// Only way to detect the timeout error.
// https://github.com/tendermint/tendermint/blob/46e06c97320bc61c4d98d3018f59d47ec69863c9/rpc/core/mempool.go#L124
timeoutErrorMessage = "timed out waiting for tx to be included in a block"

// errors are of the form:
// "account sequence mismatch, expected 25, got 27: incorrect account sequence"
recoverRegexp = regexp.MustCompile(`^account sequence mismatch, expected (\d+), got (\d+):`)
// Only way to check for tx not found error.
// https://github.com/tendermint/tendermint/blob/46e06c97320bc61c4d98d3018f59d47ec69863c9/rpc/core/tx.go#L31-L33
notFoundErrorMessageSuffix = ") not found"
)

type SerialClient interface {
Client
Close()
}

type serialBroadcaster struct {
cctx sdkclient.Context
txf tx.Factory
info keyring.Info
broadcastch chan broadcastRequest
lc lifecycle.Lifecycle
log log.Logger
type seqreq struct {
curr uint64
ch chan<- uint64
}

func NewSerialClient(log log.Logger, cctx sdkclient.Context, txf tx.Factory, info keyring.Info) (SerialClient, error) {
type serialBroadcaster struct {
cctx sdkclient.Context
txf tx.Factory
info keyring.Info
broadcastTimeout time.Duration
broadcastch chan broadcastRequest
seqreqch chan seqreq
lc lifecycle.Lifecycle
log log.Logger
}

func NewSerialClient(log log.Logger, cctx sdkclient.Context, timeout time.Duration, txf tx.Factory, info keyring.Info) (SerialClient, error) {
// populate account number, current sequence number
poptxf, err := sdkutil.PrepareFactory(cctx, txf)
if err != nil {
return nil, err
}

poptxf = poptxf.WithSimulateAndExecute(true)
client := &serialBroadcaster{
cctx: cctx,
txf: poptxf,
info: info,
lc: lifecycle.New(),
broadcastch: make(chan broadcastRequest),
log: log.With("cmp", "client/broadcaster"),
cctx: cctx,
txf: poptxf,
info: info,
broadcastTimeout: timeout,
lc: lifecycle.New(),
broadcastch: make(chan broadcastRequest),
seqreqch: make(chan seqreq),
log: log.With("cmp", "client/broadcaster"),
}

go client.run()
Expand All @@ -85,7 +100,6 @@ func (c *serialBroadcaster) Broadcast(ctx context.Context, msgs ...sdk.Msg) erro
}

select {

// request received, return response
case c.broadcastch <- request:
return <-responsech
Expand All @@ -97,22 +111,17 @@ func (c *serialBroadcaster) Broadcast(ctx context.Context, msgs ...sdk.Msg) erro
// loop shutting down, return error
case <-c.lc.ShuttingDown():
return ErrNotRunning

}
}

func (c *serialBroadcaster) run() {
defer c.lc.ShutdownCompleted()

var (
txf = c.txf
synch = make(chan uint64)
donech = make(chan struct{})
)
donech := make(chan struct{})

go func() {
defer close(donech)
c.syncLoop(synch)
c.syncLoop()
}()

defer func() { <-donech }()
Expand All @@ -125,126 +134,136 @@ loop:
break loop
case req := <-c.broadcastch:
// broadcast the message
var err error
txf, err = c.doBroadcast(txf, false, req.msgs...)

txf, err := c.broadcast(c.txf, false, req.msgs...)
// send response
req.responsech <- err

case seqno := <-synch:

c.log.Info("syncing sequence", "local", txf.Sequence(), "remote", seqno)

// fast-forward current sequence if necessary
if seqno > txf.Sequence() {
txf = txf.WithSequence(seqno)
}
c.txf = txf
}
}
}

func (c *serialBroadcaster) syncLoop(ch chan<- uint64) {
// TODO: add jitter, force update on "sequence mismatch"-type errors.
ticker := time.NewTicker(syncDuration)
defer ticker.Stop()

func (c *serialBroadcaster) syncLoop() {
for {
select {
case <-c.lc.ShuttingDown():
return
case <-ticker.C:

case req := <-c.seqreqch:
// query sequence number
_, seq, err := c.cctx.AccountRetriever.
GetAccountNumberSequence(c.cctx, c.info.GetAddress())
_, seq, err := c.cctx.AccountRetriever.GetAccountNumberSequence(c.cctx, c.info.GetAddress())

// send to main loop if no error
if err != nil {
c.log.Error("error requesting account", "err", err)
break
seq = req.curr
}

select {
case ch <- seq:
case req.ch <- seq:
case <-c.lc.ShuttingDown():
}

}
}
}

func (c *serialBroadcaster) doBroadcast(txf tx.Factory, retried bool, msgs ...sdk.Msg) (tx.Factory, error) {
txf, err := sdkutil.AdjustGas(c.cctx, txf, msgs...)
if err != nil {
return txf, err
}
func (c *serialBroadcaster) broadcast(txf tx.Factory, retry bool, msgs ...sdk.Msg) (tx.Factory, error) {
var err error

response, err := doBroadcast(c.cctx, txf, c.info.GetName(), msgs...)

c.log.Info("broadcast response", "response", response, "err", err)
if !retry {
txf, err = sdkutil.AdjustGas(c.cctx, txf, msgs...)
if err != nil {
return txf, err
}
}

response, err := c.doBroadcast(c.cctx, txf, c.broadcastTimeout, c.info.GetName(), msgs...)
if err != nil {
c.log.Error("broadcast response", "response", response, "err", err)
return txf, err
}

// if no error, increment sequence.
if response.Code == 0 {
return txf.WithSequence(txf.Sequence() + 1), nil
txf = txf.WithSequence(txf.Sequence() + 1)
return txf, nil
}

// if not mismatch error, don't increment sequence and return
if response.Code != errCodeMismatch {
return txf, fmt.Errorf("%w: response code %d - (%#v)", ErrBroadcastTx, response.Code, response)
c.log.Error("broadcast response", "response", response)
// transaction has failed, perform the query of account sequence to make sure correct one is used
// for the next transaction
ch := make(chan uint64)
c.seqreqch <- seqreq{
curr: txf.Sequence(),
ch: ch,
}

// if we're retrying a parsed sequence (see below), don't try to fix it again.
if retried {
return txf, fmt.Errorf("%w: retried response code %d - (%#v)", ErrBroadcastTx, response.Code, response)
select {
case curseq := <-ch:
txf = txf.WithSequence(curseq)
case <-c.lc.ShuttingDown():
return txf, ErrNotRunning
}

// attempt to parse correct next sequence
nextseq, ok := parseNextSequence(txf.Sequence(), response.RawLog)

if !ok {
return txf, fmt.Errorf("%w: response code %d - (%#v)", ErrBroadcastTx, response.Code, response)
if retry || (response.Code != sdkerrors.ErrWrongSequence.ABCICode()) {
return txf, fmt.Errorf("%w: response code %d", ErrBroadcastTx, response.Code)
}

txf = txf.WithSequence(nextseq)

// try again
return c.doBroadcast(txf, true, msgs...)

return c.broadcast(txf, retry, msgs...)
}

func parseNextSequence(current uint64, message string) (uint64, bool) {

// errors are of the form:
// "account sequence mismatch, expected 25, got 27: incorrect account sequence"

matches := recoverRegexp.FindStringSubmatch(message)

if len(matches) != 3 {
return 0, false
func (c *serialBroadcaster) doBroadcast(cctx sdkclient.Context, txf tx.Factory, timeout time.Duration, keyName string, msgs ...sdk.Msg) (*sdk.TxResponse, error) {
txn, err := tx.BuildUnsignedTx(txf, msgs...)
if err != nil {
return nil, err
}

if len(matches[1]) == 0 || len(matches[2]) == 0 {
return 0, false
txn.SetFeeGranter(cctx.GetFeeGranterAddress())
err = tx.Sign(txf, keyName, txn, true)
if err != nil {
return nil, err
}

expected, err := strconv.ParseUint(matches[1], 10, 64)
if err != nil || expected == 0 {
return 0, false
bytes, err := cctx.TxConfig.TxEncoder()(txn.GetTx())
if err != nil {
return nil, err
}

received, err := strconv.ParseUint(matches[2], 10, 64)
if err != nil || received == 0 {
return 0, false
txb := ttypes.Tx(bytes)
hash := hex.EncodeToString(txb.Hash())

// broadcast-mode=block
// submit with mode commit/block
cres, err := cctx.BroadcastTxCommit(txb)
if err == nil {
// good job
return cres, nil
} else if !strings.HasSuffix(err.Error(), timeoutErrorMessage) {
return cres, err
}

if received != current {
// XXX not sure wtf todo.
return expected, true
// timeout error, continue on to retry

// loop
lctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for lctx.Err() == nil {
// wait up to one second
select {
case <-lctx.Done():
return cres, err
case <-time.After(broadcastBlockRetryPeriod):
}

// check transaction
// https://github.com/cosmos/cosmos-sdk/pull/8734
res, err := authtx.QueryTx(cctx, hash)
if err == nil {
return res, nil
}

// if it's not a "not found" error, return
if !strings.HasSuffix(err.Error(), notFoundErrorMessageSuffix) {
return res, err
}
}

return expected, true
return cres, lctx.Err()
}
24 changes: 0 additions & 24 deletions client/broadcaster/serial_test.go

This file was deleted.

Loading

0 comments on commit 2ab3b0f

Please sign in to comment.