From 7c113cdae39d2a68017a97ec047a0b81b86831ad Mon Sep 17 00:00:00 2001 From: Tyler Smith Date: Thu, 16 Nov 2023 23:55:40 +0300 Subject: [PATCH] tweak: Add future tx events to trace API feed. --- accounts/abi/bind/backends/simulated.go | 4 + core/events.go | 2 + core/txpool/blobpool/blobpool.go | 5 + core/txpool/legacypool/legacypool.go | 9 ++ core/txpool/subpool.go | 2 + core/txpool/txpool.go | 8 ++ eth/api_backend.go | 4 + eth/filters/dropped_tx_subscription.go | 3 +- eth/filters/filter_system.go | 39 +++++- eth/filters/trace_api.go | 153 +++++++++++++----------- internal/ethapi/backend.go | 1 + les/api_backend.go | 4 + 12 files changed, 162 insertions(+), 72 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 906574668220..7351dd9a0046 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -898,6 +898,10 @@ func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event. return nullSubscription() } +func (fb *filterBackend) SubscribeNewFutureTxsEvent(ch chan<- core.NewFutureTxsEvent) event.Subscription { + return nullSubscription() +} + func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return fb.bc.SubscribeChainEvent(ch) } diff --git a/core/events.go b/core/events.go index db1056bd0018..6196bb121ca5 100644 --- a/core/events.go +++ b/core/events.go @@ -24,6 +24,8 @@ import ( // NewTxsEvent is posted when a batch of transactions enter the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } +type NewFutureTxsEvent NewTxsEvent + // DropTxsEvent is posted when a batch of transactions are removed from the transaction pool type DropTxsEvent struct{ Txs []*types.Transaction diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index c539a5c84f49..c233b6cf15b5 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1504,6 +1504,11 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool } } +func (p *BlobPool) SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription { + f := &event.Feed{} + return f.Subscribe(ch) +} + // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (p *BlobPool) Nonce(addr common.Address) uint64 { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index bf85a2cd33b8..056dae55a049 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -208,6 +208,7 @@ type LegacyPool struct { chain BlockChain gasTip atomic.Pointer[big.Int] txFeed event.Feed + futureTxFeed event.Feed dropTxFeed event.Feed rejectTxFeed event.Feed scope event.SubscriptionScope @@ -438,6 +439,12 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs return pool.txFeed.Subscribe(ch) } +// SubscribeFutureTransactions registers a subscription for new transaction queue +// events. +func (pool *LegacyPool) SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription { + return pool.futureTxFeed.Subscribe(ch) +} + // SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and // starts sending event to the given channel. func (pool *LegacyPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription { @@ -809,6 +816,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pool.priced.Put(tx, isLocal) pool.journalTx(from, tx) pool.queueTxEvent(tx) + pool.futureTxFeed.Send(core.NewFutureTxsEvent{Txs: []*types.Transaction{tx}}) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // Successful promotion, bump the heartbeat @@ -820,6 +828,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e if err != nil { return false, err } + pool.futureTxFeed.Send(core.NewFutureTxsEvent{Txs: []*types.Transaction{tx}}) // Mark local addresses and journal local transactions if local && !pool.locals.contains(from) { log.Info("Setting new local account", "address", from) diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 67effae4c729..810fbecb5d07 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -115,6 +115,8 @@ type SubPool interface { // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription + // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. Nonce(addr common.Address) uint64 diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index fd552a1a60cd..f7ae74ee0d24 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -328,6 +328,14 @@ func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) return p.subs.Track(event.JoinSubscriptions(subs...)) } +func (p *TxPool) SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription { + subs := make([]event.Subscription, len(p.subpools)) + for i, subpool := range p.subpools { + subs[i] = subpool.SubscribeFutureTransactions(ch) + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + // SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and // starts sending event to the given channel. func (p *TxPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription { diff --git a/eth/api_backend.go b/eth/api_backend.go index 601e55515857..ecdb876ccacb 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -337,6 +337,10 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S return b.eth.txPool.SubscribeTransactions(ch, true) } +func (b *EthAPIBackend) SubscribeNewFutureTxsEvent(ch chan<- core.NewFutureTxsEvent) event.Subscription { + return b.eth.txPool.SubscribeFutureTransactions(ch) +} + func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress { return b.eth.Downloader().Progress() } diff --git a/eth/filters/dropped_tx_subscription.go b/eth/filters/dropped_tx_subscription.go index 76218451abf9..97d5a69f8b87 100644 --- a/eth/filters/dropped_tx_subscription.go +++ b/eth/filters/dropped_tx_subscription.go @@ -54,7 +54,8 @@ type RPCTransaction struct { R *hexutil.Big `json:"r"` S *hexutil.Big `json:"s"` - Trace *blocknative.Trace `json:"trace,omitempty"` + Trace *blocknative.Trace `json:"trace,omitempty"` + Future bool `json:"future"` } // newRPCTransaction returns a transaction that will serialize to the RPC diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 3b075b9b851e..ecd4bd8e6ffe 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -68,6 +68,7 @@ type Backend interface { CurrentHeader() *types.Header ChainConfig() *params.ChainConfig SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeNewFutureTxsEvent(chan<- core.NewFutureTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription @@ -169,6 +170,9 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + + FutureTransactionsSubscription + // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -212,6 +216,8 @@ type EventSystem struct { pendingLogsSub event.Subscription // Subscription for pending log event chainSub event.Subscription // Subscription for new chain event + futureTxsSub event.Subscription + // Channels install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification @@ -220,6 +226,8 @@ type EventSystem struct { pendingLogsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event + + futureTxsCh chan core.NewFutureTxsEvent } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -240,6 +248,8 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), pendingLogsCh: make(chan []*types.Log, logsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + + futureTxsCh: make(chan core.NewFutureTxsEvent, txChanSize), } // Subscribe events @@ -249,8 +259,10 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) + m.futureTxsSub = m.backend.SubscribeNewFutureTxsEvent(m.futureTxsCh) + // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.futureTxsSub == nil { log.Crit("Subscribe for event system failed") } @@ -425,6 +437,20 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc return es.subscribe(sub) } +func (es *EventSystem) SubscribeFutureTxs(txs chan []*types.Transaction) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: FutureTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + txs: txs, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + type filterIndex map[Type]map[rpc.ID]*subscription func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { @@ -457,6 +483,12 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) } } +func (es *EventSystem) handleFutureTxsEvent(filters filterIndex, ev core.NewFutureTxsEvent) { + for _, f := range filters[FutureTransactionsSubscription] { + f.txs <- ev.Txs + } +} + func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() @@ -553,6 +585,7 @@ func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { es.txsSub.Unsubscribe() + es.futureTxsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.pendingLogsSub.Unsubscribe() @@ -568,6 +601,8 @@ func (es *EventSystem) eventLoop() { select { case ev := <-es.txsCh: es.handleTxsEvent(index, ev) + case ev := <-es.futureTxsCh: + es.handleFutureTxsEvent(index, ev) case ev := <-es.logsCh: es.handleLogs(index, ev) case ev := <-es.rmLogsCh: @@ -600,6 +635,8 @@ func (es *EventSystem) eventLoop() { // System stopped case <-es.txsSub.Err(): return + case <-es.futureTxsSub.Err(): + return case <-es.logsSub.Err(): return case <-es.rmLogsSub.Err(): diff --git a/eth/filters/trace_api.go b/eth/filters/trace_api.go index 59e26cd6bd66..6497a6bae4ea 100644 --- a/eth/filters/trace_api.go +++ b/eth/filters/trace_api.go @@ -44,10 +44,14 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace go func() { chainConfig := api.sys.backend.ChainConfig() - txs := make(chan []*types.Transaction, 128) - pendingTxSub := api.events.SubscribePendingTxs(txs) + pendingTxs := make(chan []*types.Transaction, 128) + pendingTxSub := api.events.SubscribePendingTxs(pendingTxs) defer pendingTxSub.Unsubscribe() + futureTxs := make(chan []*types.Transaction, 128) + futureTxSub := api.events.SubscribeFutureTxs(futureTxs) + defer futureTxSub.Unsubscribe() + tracerOpts, err := getTracerOpts(tracerOptsJSON, defaultTxTraceOpts) if err != nil { log.Error("pending_txs_stream: failed to parse tracer options", "err", err) @@ -65,90 +69,99 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace } }() + var ( + txs []*types.Transaction + txsAreFuture bool + ) for { select { - case txs := <-txs: - var ( - currentHeader = api.sys.backend.CurrentHeader() - header = &types.Header{ - ParentHash: currentHeader.Hash(), - Coinbase: currentHeader.Coinbase, - Difficulty: currentHeader.Difficulty, - GasLimit: currentHeader.GasLimit, - Time: currentHeader.Time + 12, - BaseFee: eip1559.CalcBaseFee(chainConfig, currentHeader), - Number: new(big.Int).Add(currentHeader.Number, common.Big1), - } - signer = types.MakeSigner(chainConfig, header.Number, header.Time) - - blockCtx = core.NewEVMBlockContext(header, api.sys.chain, nil) - traceCtx = &tracers.Context{ - BlockHash: header.Hash(), - BlockNumber: header.Number, - } + case txs = <-pendingTxs: + txsAreFuture = false + case txs = <-futureTxs: + txsAreFuture = true + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } - msg *core.Message - tracedTxs = make([]*RPCTransaction, 0, len(txs)) - blockNumber = hexutil.Big(*header.Number) - blockHash = header.Hash() - txIndex = hexutil.Uint64(0) - ) + var ( + currentHeader = api.sys.backend.CurrentHeader() + header = &types.Header{ + ParentHash: currentHeader.Hash(), + Coinbase: currentHeader.Coinbase, + Difficulty: currentHeader.Difficulty, + GasLimit: currentHeader.GasLimit, + Time: currentHeader.Time + 12, + BaseFee: eip1559.CalcBaseFee(chainConfig, currentHeader), + Number: new(big.Int).Add(currentHeader.Number, common.Big1), + } + signer = types.MakeSigner(chainConfig, header.Number, header.Time) - statedb, err := api.sys.chain.State() - if err != nil { - log.Error("pending_txs_stream: failed to get state", "err", err) - return + blockCtx = core.NewEVMBlockContext(header, api.sys.chain, nil) + traceCtx = &tracers.Context{ + BlockHash: header.Hash(), + BlockNumber: header.Number, } - metricsPendingTxsReceived.Inc(int64(len(txs))) - for _, tx := range txs { - msg, _ = core.TransactionToMessage(tx, signer, header.BaseFee) - if err != nil { - log.Error("pending_txs_stream: failed to create tx message", "err", err, "tx", tx.Hash()) - continue - } + msg *core.Message + tracedTxs = make([]*RPCTransaction, 0, len(txs)) + blockNumber = hexutil.Big(*header.Number) + blockHash = header.Hash() + txIndex = hexutil.Uint64(0) + ) - if msg.GasFeeCap.Cmp(header.BaseFee) < 0 { - log.Trace("pending_txs_stream: tx gas fee too low", "tx", tx.Hash(), "gasFeeCap", msg.GasFeeCap, "baseFee", header.BaseFee) - metricsPendingTxsGasTooLow.Inc(1) - continue - } + statedb, err := api.sys.chain.State() + if err != nil { + log.Error("pending_txs_stream: failed to get state", "err", err) + return + } - traceCtx.TxHash = tx.Hash() - startTime := time.Now() - trace, err := traceTx(msg, traceCtx, blockCtx, chainConfig, statedb, tracerOpts) - if err != nil { - log.Error("pending_txs_stream: failed to trace tx", "err", err, "tx", tx.Hash()) - metricsPendingTxsTraceFailed.Inc(1) - continue - } - metricsTracePendingTxTimer.Update(time.Since(startTime).Milliseconds()) - metricsPendingTxsTraceSuccess.Inc(1) - - gasPrice := hexutil.Big(*tx.GasPrice()) - rpcTx := newRPCPendingTransaction(tx) - rpcTx.BlockHash = &blockHash - rpcTx.BlockNumber = &blockNumber - rpcTx.TransactionIndex = &txIndex - rpcTx.Trace = trace - rpcTx.GasPrice = &gasPrice - tracedTxs = append(tracedTxs, rpcTx) + metricsPendingTxsReceived.Inc(int64(len(txs))) + for _, tx := range txs { + msg, _ = core.TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + log.Error("pending_txs_stream: failed to create tx message", "err", err, "tx", tx.Hash()) + continue } - if len(tracedTxs) == 0 { + if msg.GasFeeCap.Cmp(header.BaseFee) < 0 { + log.Trace("pending_txs_stream: tx gas fee too low", "tx", tx.Hash(), "gasFeeCap", msg.GasFeeCap, "baseFee", header.BaseFee) + metricsPendingTxsGasTooLow.Inc(1) continue } - if err := notifier.Notify(rpcSub.ID, tracedTxs); err != nil { - log.Error("pending_txs_stream: failed to notify", "err", err) - return + traceCtx.TxHash = tx.Hash() + startTime := time.Now() + trace, err := traceTx(msg, traceCtx, blockCtx, chainConfig, statedb, tracerOpts) + if err != nil { + log.Error("pending_txs_stream: failed to trace tx", "err", err, "tx", tx.Hash()) + metricsPendingTxsTraceFailed.Inc(1) + continue } - metricsPendingTxsSent.Inc(int64(len(tracedTxs))) - case <-rpcSub.Err(): - return - case <-notifier.Closed(): + metricsTracePendingTxTimer.Update(time.Since(startTime).Milliseconds()) + metricsPendingTxsTraceSuccess.Inc(1) + + gasPrice := hexutil.Big(*tx.GasPrice()) + rpcTx := newRPCPendingTransaction(tx) + rpcTx.BlockHash = &blockHash + rpcTx.BlockNumber = &blockNumber + rpcTx.TransactionIndex = &txIndex + rpcTx.Trace = trace + rpcTx.Future = txsAreFuture + rpcTx.GasPrice = &gasPrice + tracedTxs = append(tracedTxs, rpcTx) + } + + if len(tracedTxs) == 0 { + continue + } + + if err := notifier.Notify(rpcSub.ID, tracedTxs); err != nil { + log.Error("pending_txs_stream: failed to notify", "err", err) return } + metricsPendingTxsSent.Inc(int64(len(tracedTxs))) } }() diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 16b143c24295..5d69d8608581 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -83,6 +83,7 @@ type Backend interface { TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeNewFutureTxsEvent(chan<- core.NewFutureTxsEvent) event.Subscription SubscribeRejectedTxEvent(chan<- core.RejectedTxEvent) event.Subscription SubscribeDropTxsEvent(chan<- core.DropTxsEvent) event.Subscription diff --git a/les/api_backend.go b/les/api_backend.go index 3e9dbadce86b..a95d0c796b32 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -235,6 +235,10 @@ func (b *LesApiBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S return b.eth.txPool.SubscribeNewTxsEvent(ch) } +func (b *LesApiBackend) SubscribeNewFutureTxsEvent(ch chan<- core.NewFutureTxsEvent) event.Subscription { + return nil +} + func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.eth.blockchain.SubscribeChainEvent(ch) }