From 465e998574f7bbf74463443e31ffa52dc87d7abe Mon Sep 17 00:00:00 2001 From: Ivan Bogatyy Date: Wed, 27 Oct 2021 16:08:07 -0400 Subject: [PATCH] Flashbots changes v0.3 to v0.4 --- cmd/geth/main.go | 3 +- cmd/geth/usage.go | 3 +- cmd/utils/flags.go | 28 +++++++++++++- core/tx_pool.go | 62 +++++++++++++++++++++++++++--- eth/api_backend.go | 4 ++ internal/ethapi/api.go | 76 ++++++++++++++++++++++++++++++++++++- internal/ethapi/backend.go | 1 + internal/web3ext/web3ext.go | 5 +++ les/api_backend.go | 5 +++ miner/miner.go | 1 + miner/multi_worker.go | 27 +++++++++---- miner/worker.go | 51 +++++++++++++++++++++---- 12 files changed, 240 insertions(+), 26 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 68f8ef8af76b..9fc03f3fda20 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -131,7 +131,8 @@ var ( utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, utils.MinerNoVerifyFlag, - utils.MinerMaxMergedBundles, + utils.MinerMaxMergedBundlesFlag, + utils.MinerTrustedRelaysFlag, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV5Flag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index d3d39e2861d2..027711b6c357 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -189,7 +189,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, utils.MinerNoVerifyFlag, - utils.MinerMaxMergedBundles, + utils.MinerMaxMergedBundlesFlag, + utils.MinerTrustedRelaysFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 030ba4a4c851..c199eac1f955 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -475,11 +475,16 @@ var ( Usage: "Time interval to recreate the block being mined", Value: ethconfig.Defaults.Miner.Recommit, } - MinerMaxMergedBundles = cli.IntFlag{ + MinerMaxMergedBundlesFlag = cli.IntFlag{ Name: "miner.maxmergedbundles", Usage: "flashbots - The maximum amount of bundles to merge. The miner will run this many workers in parallel to calculate if the full block is more profitable with these additional bundles.", Value: 3, } + MinerTrustedRelaysFlag = cli.StringFlag{ + Name: "miner.trustedrelays", + Usage: "flashbots - The Ethereum addresses of trusted relays for signature verification. The miner will accept signed bundles and other tasks from the relay, being reasonably certain about DDoS safety.", + Value: "0x870e2734DdBe2Fba9864f33f3420d59Bc641f2be", + } MinerNoVerifyFlag = cli.BoolFlag{ Name: "miner.noverify", Usage: "Disable remote sealing verification", @@ -1355,6 +1360,15 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } + + addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",") + for _, address := range addresses { + if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed) + } else { + cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed)) + } + } } func setEthash(ctx *cli.Context, cfg *ethconfig.Config) { @@ -1408,7 +1422,17 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { log.Warn("The generic --miner.gastarget flag is deprecated and will be removed in the future!") } - cfg.MaxMergedBundles = ctx.GlobalInt(MinerMaxMergedBundles.Name) + cfg.MaxMergedBundles = ctx.GlobalInt(MinerMaxMergedBundlesFlag.Name) + + addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",") + for _, address := range addresses { + if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed) + } else { + cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed)) + } + } + log.Info("Trusted relays set as", "addresses", cfg.TrustedRelays) } func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/tx_pool.go b/core/tx_pool.go index 51312d3e78b6..75b5ac101949 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -165,6 +165,8 @@ type TxPoolConfig struct { GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + + TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config. } // DefaultTxPoolConfig contains the default configurations for the transaction @@ -251,12 +253,13 @@ type TxPool struct { locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - mevBundles []types.MevBundle - all *txLookup // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + mevBundles []types.MevBundle + megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay + all *txLookup // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription @@ -290,6 +293,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), + megabundles: make(map[common.Address]types.MevBundle), all: newTxLookup(), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), reqResetCh: make(chan *txpoolResetRequest), @@ -611,6 +615,52 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m return nil } +// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already. +func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { + pool.mu.Lock() + defer pool.mu.Unlock() + + fromTrustedRelay := false + for _, trustedAddr := range pool.config.TrustedRelays { + if relayAddr == trustedAddr { + fromTrustedRelay = true + } + } + if !fromTrustedRelay { + return errors.New("megabundle from non-trusted address") + } + + pool.megabundles[relayAddr] = types.MevBundle{ + Txs: txs, + BlockNumber: blockNumber, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + RevertingTxHashes: revertingTxHashes, + } + return nil +} + +// GetMegabundle returns the latest megabundle submitted by a given relay. +func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + megabundle, ok := pool.megabundles[relayAddr] + if !ok { + return types.MevBundle{}, errors.New("No megabundle found") + } + if megabundle.BlockNumber.Cmp(blockNumber) != 0 { + return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints") + } + if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp { + return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints") + } + if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp { + return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints") + } + return megabundle, nil +} + // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() diff --git a/eth/api_backend.go b/eth/api_backend.go index 8454c0afe701..ea3b4a2e0461 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -243,6 +243,10 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } +func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) +} + func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(false) var txs types.Transactions diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 5599714f7ab3..63b046c90a2f 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2095,7 +2095,7 @@ func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI { return &PrivateTxBundleAPI{b} } -// SendBundleArgs represents the arguments for a call. +// SendBundleArgs represents the arguments for a SendBundle call. type SendBundleArgs struct { Txs []hexutil.Bytes `json:"txs"` BlockNumber rpc.BlockNumber `json:"blockNumber"` @@ -2104,6 +2104,25 @@ type SendBundleArgs struct { RevertingTxHashes []common.Hash `json:"revertingTxHashes"` } +// SendMegabundleArgs represents the arguments for a SendMegabundle call. +type SendMegabundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + BlockNumber uint64 `json:"blockNumber"` + MinTimestamp *uint64 `json:"minTimestamp"` + MaxTimestamp *uint64 `json:"maxTimestamp"` + RevertingTxHashes []common.Hash `json:"revertingTxHashes"` + RelaySignature hexutil.Bytes `json:"relaySignature"` +} + +// UnsignedMegabundle is used for serialization and subsequent digital signing. +type UnsignedMegabundle struct { + Txs []hexutil.Bytes + BlockNumber uint64 + MinTimestamp uint64 + MaxTimestamp uint64 + RevertingTxHashes []common.Hash +} + // SendBundle will add the signed transaction to the transaction pool. // The sender is responsible for signing the transaction and using the correct nonce and ensuring validity func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error { @@ -2133,3 +2152,58 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes) } + +// Recovers the Ethereum address of the trusted relay that signed the megabundle. +func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) { + megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes} + if args.MinTimestamp != nil { + megabundle.MinTimestamp = *args.MinTimestamp + } else { + megabundle.MinTimestamp = 0 + } + if args.MaxTimestamp != nil { + megabundle.MaxTimestamp = *args.MaxTimestamp + } else { + megabundle.MaxTimestamp = 0 + } + rlpEncoding, _ := rlp.EncodeToBytes(megabundle) + signature := args.RelaySignature + signature[64] -= 27 // account for Ethereum V + recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature) + if err != nil { + return common.Address{}, err + } + return crypto.PubkeyToAddress(*recoveredPubkey), nil +} + +// SendMegabundle will add the signed megabundle to one of the workers for evaluation. +func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error { + log.Info("Received a Megabundle request", "signature", args.RelaySignature) + var txs types.Transactions + if len(args.Txs) == 0 { + return errors.New("megabundle missing txs") + } + if args.BlockNumber == 0 { + return errors.New("megabundle missing blockNumber") + } + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return err + } + txs = append(txs, tx) + } + var minTimestamp, maxTimestamp uint64 + if args.MinTimestamp != nil { + minTimestamp = *args.MinTimestamp + } + if args.MaxTimestamp != nil { + maxTimestamp = *args.MaxTimestamp + } + relayAddr, err := RecoverRelayAddress(args) + log.Info("Megabundle", "relayAddr", relayAddr, "err", err) + if err != nil { + return err + } + return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr) +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index bcdccf2bd9d6..58c8f0bf04e1 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -75,6 +75,7 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error + SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 7fb98255d0ff..0e38eecc0d38 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -586,6 +586,11 @@ web3._extend({ call: 'eth_sendBundle', params: 1, }), + new web3._extend.Method({ + name: 'sendMegabundle', + call: 'eth_sendMegabundle', + params: 1 + }), ], properties: [ new web3._extend.Property({ diff --git a/les/api_backend.go b/les/api_backend.go index 9bb08c79f6a7..b910bd3e1f48 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -198,10 +198,15 @@ func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) func (b *LesApiBackend) RemoveTx(txHash common.Hash) { b.eth.txPool.RemoveTx(txHash) } + func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } +func (b *LesApiBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return nil +} + func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) { return b.eth.txPool.GetTransactions() } diff --git a/miner/miner.go b/miner/miner.go index 923adef9f5d1..f4b8e740fd60 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -55,6 +55,7 @@ type Config struct { Recommit time.Duration // The time interval for miner to re-create mining work. Noverify bool // Disable remote mining solution verification(only useful in ethash). MaxMergedBundles int + TrustedRelays []common.Address `toml:",omitempty"` // Trusted relay addresses to receive tasks from. } // Miner creates blocks and searches for proof-of-work values. diff --git a/miner/multi_worker.go b/miner/multi_worker.go index 9a39983c5a43..050ea38af4e5 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -98,13 +98,24 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons for i := 1; i <= config.MaxMergedBundles; i++ { workers = append(workers, newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ - isFlashbots: true, - queue: queue, - maxMergedBundles: i, + isFlashbots: true, + isMegabundleWorker: false, + queue: queue, + maxMergedBundles: i, })) } - log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "worker", len(workers)) + for i := 0; i < len(config.TrustedRelays); i++ { + workers = append(workers, + newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ + isFlashbots: true, + isMegabundleWorker: true, + queue: queue, + relayAddr: config.TrustedRelays[i], + })) + } + + log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, workers: workers, @@ -112,7 +123,9 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons } type flashbotsData struct { - isFlashbots bool - queue chan *task - maxMergedBundles int + isFlashbots bool + isMegabundleWorker bool + queue chan *task + maxMergedBundles int + relayAddr common.Address } diff --git a/miner/worker.go b/miner/worker.go index 1052b5291621..2205f4fa71da 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -151,9 +151,10 @@ type task struct { block *types.Block createdAt time.Time - profit *big.Int - isFlashbots bool - worker int + profit *big.Int + isFlashbots bool + worker int + isMegabundle bool } const ( @@ -702,7 +703,7 @@ func (w *worker) taskLoop() { // Interrupt previous sealing operation interrupt() stopCh, prev = make(chan struct{}), sealHash - log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker) + log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle) if w.skipSealHook != nil && w.skipSealHook(task) { continue } @@ -1230,7 +1231,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { localTxs[account] = txs } } - if w.flashbots.isFlashbots { + if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker { bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) @@ -1249,8 +1250,42 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { if w.commitBundle(env, bundleTxs, interrupt) { return } - env.profit.Add(env.profit, bundle.totalEth) + env.profit.Add(env.profit, bundle.ethSentToCoinbase) } + if w.flashbots.isMegabundleWorker { + megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time) + log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err) + if err != nil { + return // no valid megabundle for this relay, nothing to do + } + // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. + // Megabundles API focuses on speed and runs everything in one cycle. + coinbaseBalanceBefore := env.state.GetBalance(env.coinbase) + if w.commitBundle(env, megabundle.Txs, interrupt) { + log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle) + return + } + var txStatuses = map[common.Hash]bool{} + for _, receipt := range env.receipts { + txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful + } + for _, tx := range megabundle.Txs { + status, ok := txStatuses[tx.Hash()] + if !ok { + log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash()) + return + } + if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) { + log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash()) + return + } + } + coinbaseBalanceAfter := env.state.GetBalance(env.coinbase) + coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore) + env.profit = coinbaseDelta + log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(env.profit)) + } + if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) if w.commitTransactions(env, txs, interrupt) { @@ -1334,12 +1369,12 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // If we're post merge, just ignore if !w.isTTDReached(block.Header()) { select { - case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}: + case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}: w.unconfirmed.Shift(block.NumberU64() - 1) log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), "uncles", len(env.uncles), "txs", env.tcount, "gas", block.GasUsed(), "fees", totalFees(block, env.receipts), "profit", ethIntToFloat(env.profit), "elapsed", common.PrettyDuration(time.Since(start)), - "isFlashbots", w.flashbots.isFlashbots, "worker", w.flashbots.maxMergedBundles) + "isFlashbots", w.flashbots.isFlashbots, "worker", w.flashbots.maxMergedBundles, "isMegabundle", w.flashbots.isMegabundleWorker) case <-w.exitCh: log.Info("Worker has exited") }