Skip to content

Commit

Permalink
integrate hot wallet in batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Mar 18, 2024
1 parent a002ac2 commit 8b9c30d
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 150 deletions.
57 changes: 57 additions & 0 deletions common/fireblocks_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package common

import (
"github.com/urfave/cli"
)

const (
FireblocksAPIKeyFlagName = "fireblocks-api-key"
FireblocksAPISecretPathFlagName = "fireblocks-api-secret-path"
FireblocksBaseURLFlagName = "fireblocks-api-url"
FireblocksVaultAccountNameFlagName = "fireblocks-vault-account-name"
)

type FireblocksConfig struct {
APIKey string
SecretKeyPath string
BaseURL string
VaultAccountName string
}

func FireblocksCLIFlags(envPrefix string, flagPrefix string) []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksAPIKeyFlagName),
Usage: "Fireblocks API Key. To configure Fireblocks MPC wallet, this field is required.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_KEY"),
},
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksAPISecretPathFlagName),
Usage: "Fireblocks API Secret Path. To configure Fireblocks MPC wallet, this field is required.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_SECRET_PATH"),
},
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksBaseURLFlagName),
Usage: "Fireblocks API URL. To configure Fireblocks MPC wallet, this field is required.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_URL"),
},
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksVaultAccountNameFlagName),
Usage: "Fireblocks Vault Account Name. To configure Fireblocks MPC wallet, this field is required.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_VAULT_ACCOUNT_NAME"),
},
}
}

func ReadFireblocksCLIConfig(ctx *cli.Context, flagPrefix string) FireblocksConfig {
return FireblocksConfig{
APIKey: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksAPIKeyFlagName)),
SecretKeyPath: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksAPISecretPathFlagName)),
BaseURL: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksBaseURLFlagName)),
VaultAccountName: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksVaultAccountNameFlagName)),
}
}
14 changes: 7 additions & 7 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ type Batcher struct {
Transactor core.Transactor
TransactionManager TxnManager
Metrics *Metrics
HeartbeatChan chan time.Time

ethClient common.EthClient
finalizer Finalizer
logger logging.Logger
HeartbeatChan chan time.Time
ethClient common.EthClient
finalizer Finalizer
logger logging.Logger
}

func NewBatcher(
Expand Down Expand Up @@ -482,7 +482,7 @@ func serializeProof(proof *merkletree.Proof) []byte {
return proofBytes
}

func (b *Batcher) parseBatchIDFromReceipt(ctx context.Context, txReceipt *types.Receipt) (uint32, error) {
func (b *Batcher) parseBatchIDFromReceipt(txReceipt *types.Receipt) (uint32, error) {
if len(txReceipt.Logs) == 0 {
return 0, errors.New("failed to get transaction receipt with logs")
}
Expand Down Expand Up @@ -528,7 +528,7 @@ func (b *Batcher) getBatchID(ctx context.Context, txReceipt *types.Receipt) (uin
err error
)

batchID, err = b.parseBatchIDFromReceipt(ctx, txReceipt)
batchID, err = b.parseBatchIDFromReceipt(txReceipt)
if err == nil {
return batchID, nil
}
Expand All @@ -544,7 +544,7 @@ func (b *Batcher) getBatchID(ctx context.Context, txReceipt *types.Receipt) (uin
continue
}

batchID, err = b.parseBatchIDFromReceipt(ctx, txReceipt)
batchID, err = b.parseBatchIDFromReceipt(txReceipt)
if err == nil {
return batchID, nil
}
Expand Down
97 changes: 75 additions & 22 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common"
walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -31,6 +33,11 @@ type TxnManager interface {
ReceiptChan() chan *ReceiptOrErr
}

type transaction struct {
*types.Transaction
TxID walletsdk.TxID
}

type TxnRequest struct {
Tx *types.Transaction
Tag string
Expand All @@ -41,7 +48,7 @@ type TxnRequest struct {
// txAttempts are the transactions that have been attempted to be mined for this request.
// If a transaction hasn't been confirmed within the timeout and a replacement transaction is sent,
// the original transaction hash will be kept in this slice
txAttempts []*types.Transaction
txAttempts []*transaction
}

// ReceiptOrErr is a wrapper for a transaction receipt or an error.
Expand All @@ -56,9 +63,11 @@ type ReceiptOrErr struct {
type txnManager struct {
mu sync.Mutex

ethClient common.EthClient
requestChan chan *TxnRequest
logger logging.Logger
ethClient common.EthClient
wallet walletsdk.Wallet
numConfirmations int
requestChan chan *TxnRequest
logger logging.Logger

receiptChan chan *ReceiptOrErr
queueSize int
Expand All @@ -68,9 +77,11 @@ type txnManager struct {

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger,
receiptChan: make(chan *ReceiptOrErr, queueSize),
Expand All @@ -88,7 +99,7 @@ func NewTxnRequest(tx *types.Transaction, tag string, value *big.Int, metadata i
Metadata: metadata,

requestedAt: time.Now(),
txAttempts: make([]*types.Transaction, 0),
txAttempts: make([]*transaction, 0),
}
}

Expand Down Expand Up @@ -129,7 +140,7 @@ func (t *txnManager) Start(ctx context.Context) {
func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("[TxnManager] new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug("new transaction", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
gasTipCap, gasFeeCap, err := t.ethClient.GetLatestGasCaps(ctx)
if err != nil {
return fmt.Errorf("failed to get latest gas caps: %w", err)
Expand All @@ -139,14 +150,17 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
if err != nil {
return fmt.Errorf("failed to update gas price: %w", err)
}
err = t.ethClient.SendTransaction(ctx, txn)
txID, err := t.wallet.SendTransaction(ctx, txn)
if err != nil {
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err)
} else {
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", txn.Hash().Hex())
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
}
req.Tx = txn
req.txAttempts = append(req.txAttempts, txn)
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: txn,
})

t.requestChan <- req
t.metrics.UpdateTxQueue(len(t.requestChan))
Expand All @@ -157,6 +171,43 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
return t.receiptChan
}

func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
for {
for _, tx := range txs {
receipt, err = t.wallet.GetTransactionReceipt(ctx, tx.TxID)
if err == nil {
chainTip, err := t.ethClient.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip {
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) {
t.logger.Debug("Transaction not yet mined", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", tx.TxID, "txHash", tx.Hash().Hex())
} else if err != nil {
t.logger.Debug("Transaction receipt retrieval failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
}
}
// Wait for the next round.
select {
case <-ctx.Done():
return receipt, ctx.Err()
case <-queryTicker.C:
}
}
}

// monitorTransaction waits until the transaction is confirmed (or failed) and resends it with a higher gas price if it is not mined without a timeout.
// It returns the receipt once the transaction has been confirmed.
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
Expand All @@ -167,11 +218,10 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()

t.logger.Debug("[TxnManager] monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ethClient.EnsureAnyTransactionEvaled(
t.logger.Debug("monitoring transaction", "component", "TxnManager", "method", "monitorTransaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ensureAnyTransactionEvaled(
ctxWithTimeout,
req.txAttempts,
req.Tag,
)
if err == nil {
t.metrics.UpdateSpeedUps(numSpeedUps)
Expand All @@ -181,19 +231,19 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("[TxnManager] transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("[TxnManager] transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("[TxnManager] failed to speed up transaction", "err", err)
t.logger.Error("failed to speed up transaction", "component", "TxnManager", "method", "monitorTransaction", "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
err = t.ethClient.SendTransaction(ctx, newTx)
txID, err := t.wallet.SendTransaction(ctx, newTx)
if err != nil {
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSpeedUpRetry, "err", err)
t.logger.Error("failed to send txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSpeedUpRetry, "err", err)
if retryFromFailure >= maxSpeedUpRetry {
t.metrics.IncrementTxnCount("failure")
return nil, err
Expand All @@ -202,12 +252,15 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
continue
}

t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, newTx)
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: newTx,
})
numSpeedUps++
} else {
t.logger.Error("[TxnManager] transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("transaction failed", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
Expand Down Expand Up @@ -239,7 +292,7 @@ func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Info("[TxnManager] increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info("increasing gas price", "component", "TxnManager", "method", "speedUpTxn", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

Expand Down
Loading

0 comments on commit 8b9c30d

Please sign in to comment.