Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate EigenSDK wallet in batcher #348

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions common/fireblocks_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package common

import (
"github.com/urfave/cli"
)

const (
FireblocksAPIKeyFlagName = "fireblocks-api-key"
FireblocksAPISecretPathFlagName = "fireblocks-api-secret-path"
FireblocksBaseURLFlagName = "fireblocks-api-url"
FireblocksVaultAccountNameFlagName = "fireblocks-vault-account-name"
)

type FireblocksConfig struct {
APIKey string
SecretKeyPath string
BaseURL string
VaultAccountName string
}

func FireblocksCLIFlags(envPrefix string, flagPrefix string) []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksAPIKeyFlagName),
Usage: "Fireblocks API Key. To configure Fireblocks MPC wallet, this field is required. Otherwise, private key must be configured in eth client so that it can fall back to private key wallet.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_KEY"),
},
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksAPISecretPathFlagName),
Usage: "Fireblocks API Secret Path. To configure Fireblocks MPC wallet, this field is required. Otherwise, private key must be configured in eth client so that it can fall back to private key wallet.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_SECRET_PATH"),
},
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksBaseURLFlagName),
Usage: "Fireblocks API URL. To configure Fireblocks MPC wallet, this field is required. Otherwise, private key must be configured in eth client so that it can fall back to private key wallet.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_URL"),
},
cli.StringFlag{
Name: PrefixFlag(flagPrefix, FireblocksVaultAccountNameFlagName),
Usage: "Fireblocks Vault Account Name. To configure Fireblocks MPC wallet, this field is required. Otherwise, private key must be configured in eth client so that it can fall back to private key wallet.",
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_VAULT_ACCOUNT_NAME"),
},
}
}

func ReadFireblocksCLIConfig(ctx *cli.Context, flagPrefix string) FireblocksConfig {
return FireblocksConfig{
APIKey: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksAPIKeyFlagName)),
SecretKeyPath: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksAPISecretPathFlagName)),
BaseURL: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksBaseURLFlagName)),
VaultAccountName: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksVaultAccountNameFlagName)),
}
}
3 changes: 3 additions & 0 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (c *EthClient) GetAccountAddress() gethcommon.Address {
}

func (c *EthClient) GetNoSendTransactOpts() (*bind.TransactOpts, error) {
if c.privateKey == nil {
return nil, fmt.Errorf("NewClient: cannot create NoSendTransactOpts: private key is nil")
}
opts, err := bind.NewKeyedTransactorWithChainID(c.privateKey, c.chainID)
if err != nil {
return nil, fmt.Errorf("NewClient: cannot create NoSendTransactOpts: %w", err)
Expand Down
14 changes: 7 additions & 7 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ type Batcher struct {
Transactor core.Transactor
TransactionManager TxnManager
Metrics *Metrics
HeartbeatChan chan time.Time

ethClient common.EthClient
finalizer Finalizer
logger logging.Logger
HeartbeatChan chan time.Time
ethClient common.EthClient
finalizer Finalizer
logger logging.Logger
}

func NewBatcher(
Expand Down Expand Up @@ -482,7 +482,7 @@ func serializeProof(proof *merkletree.Proof) []byte {
return proofBytes
}

func (b *Batcher) parseBatchIDFromReceipt(ctx context.Context, txReceipt *types.Receipt) (uint32, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context isn't used here

func (b *Batcher) parseBatchIDFromReceipt(txReceipt *types.Receipt) (uint32, error) {
if len(txReceipt.Logs) == 0 {
return 0, errors.New("failed to get transaction receipt with logs")
}
Expand Down Expand Up @@ -528,7 +528,7 @@ func (b *Batcher) getBatchID(ctx context.Context, txReceipt *types.Receipt) (uin
err error
)

batchID, err = b.parseBatchIDFromReceipt(ctx, txReceipt)
batchID, err = b.parseBatchIDFromReceipt(txReceipt)
if err == nil {
return batchID, nil
}
Expand All @@ -544,7 +544,7 @@ func (b *Batcher) getBatchID(ctx context.Context, txReceipt *types.Receipt) (uin
continue
}

batchID, err = b.parseBatchIDFromReceipt(ctx, txReceipt)
batchID, err = b.parseBatchIDFromReceipt(txReceipt)
if err == nil {
return batchID, nil
}
Expand Down
97 changes: 75 additions & 22 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common"
walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -31,6 +33,11 @@ type TxnManager interface {
ReceiptChan() chan *ReceiptOrErr
}

type transaction struct {
*types.Transaction
TxID walletsdk.TxID
}

type TxnRequest struct {
Tx *types.Transaction
Tag string
Expand All @@ -41,7 +48,7 @@ type TxnRequest struct {
// txAttempts are the transactions that have been attempted to be mined for this request.
// If a transaction hasn't been confirmed within the timeout and a replacement transaction is sent,
// the original transaction hash will be kept in this slice
txAttempts []*types.Transaction
txAttempts []*transaction
}

// ReceiptOrErr is a wrapper for a transaction receipt or an error.
Expand All @@ -56,9 +63,11 @@ type ReceiptOrErr struct {
type txnManager struct {
mu sync.Mutex

ethClient common.EthClient
requestChan chan *TxnRequest
logger logging.Logger
ethClient common.EthClient
wallet walletsdk.Wallet
numConfirmations int
requestChan chan *TxnRequest
logger logging.Logger

receiptChan chan *ReceiptOrErr
queueSize int
Expand All @@ -68,9 +77,11 @@ type txnManager struct {

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger,
receiptChan: make(chan *ReceiptOrErr, queueSize),
Expand All @@ -88,7 +99,7 @@ func NewTxnRequest(tx *types.Transaction, tag string, value *big.Int, metadata i
Metadata: metadata,

requestedAt: time.Now(),
txAttempts: make([]*types.Transaction, 0),
txAttempts: make([]*transaction, 0),
}
}

Expand Down Expand Up @@ -129,7 +140,7 @@ func (t *txnManager) Start(ctx context.Context) {
func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("[TxnManager] new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug("new transaction", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
gasTipCap, gasFeeCap, err := t.ethClient.GetLatestGasCaps(ctx)
if err != nil {
return fmt.Errorf("failed to get latest gas caps: %w", err)
Expand All @@ -139,14 +150,17 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
if err != nil {
return fmt.Errorf("failed to update gas price: %w", err)
}
err = t.ethClient.SendTransaction(ctx, txn)
txID, err := t.wallet.SendTransaction(ctx, txn)
if err != nil {
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err)
} else {
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", txn.Hash().Hex())
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
}
req.Tx = txn
req.txAttempts = append(req.txAttempts, txn)
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: txn,
})

t.requestChan <- req
t.metrics.UpdateTxQueue(len(t.requestChan))
Expand All @@ -157,6 +171,43 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
return t.receiptChan
}

func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
for {
for _, tx := range txs {
receipt, err = t.wallet.GetTransactionReceipt(ctx, tx.TxID)
if err == nil {
chainTip, err := t.ethClient.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip {
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) {
t.logger.Debug("Transaction not yet mined", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", tx.TxID, "txHash", tx.Hash().Hex())
} else if err != nil {
t.logger.Debug("Transaction receipt retrieval failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
}
}
// Wait for the next round.
select {
case <-ctx.Done():
return receipt, ctx.Err()
case <-queryTicker.C:
}
}
}

// monitorTransaction waits until the transaction is confirmed (or failed) and resends it with a higher gas price if it is not mined without a timeout.
// It returns the receipt once the transaction has been confirmed.
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
Expand All @@ -167,11 +218,10 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()

t.logger.Debug("[TxnManager] monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ethClient.EnsureAnyTransactionEvaled(
t.logger.Debug("monitoring transaction", "component", "TxnManager", "method", "monitorTransaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ensureAnyTransactionEvaled(
ctxWithTimeout,
req.txAttempts,
req.Tag,
)
if err == nil {
t.metrics.UpdateSpeedUps(numSpeedUps)
Expand All @@ -181,19 +231,19 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("[TxnManager] transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("[TxnManager] transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("[TxnManager] failed to speed up transaction", "err", err)
t.logger.Error("failed to speed up transaction", "component", "TxnManager", "method", "monitorTransaction", "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
err = t.ethClient.SendTransaction(ctx, newTx)
txID, err := t.wallet.SendTransaction(ctx, newTx)
if err != nil {
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSpeedUpRetry, "err", err)
t.logger.Error("failed to send txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSpeedUpRetry, "err", err)
if retryFromFailure >= maxSpeedUpRetry {
t.metrics.IncrementTxnCount("failure")
return nil, err
Expand All @@ -202,12 +252,15 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
continue
}

t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, newTx)
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: newTx,
})
numSpeedUps++
} else {
t.logger.Error("[TxnManager] transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("transaction failed", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
Expand Down Expand Up @@ -239,7 +292,7 @@ func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Info("[TxnManager] increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info("increasing gas price", "component", "TxnManager", "method", "speedUpTxn", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

Expand Down
Loading
Loading