Skip to content

Commit

Permalink
tweak: Add future tx events to trace API feed.
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler-smith committed Nov 16, 2023
1 parent 6a123e6 commit 7c113cd
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 72 deletions.
4 changes: 4 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,10 @@ func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.
return nullSubscription()
}

func (fb *filterBackend) SubscribeNewFutureTxsEvent(ch chan<- core.NewFutureTxsEvent) event.Subscription {
return nullSubscription()
}

func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}
Expand Down
2 changes: 2 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

type NewFutureTxsEvent NewTxsEvent

// DropTxsEvent is posted when a batch of transactions are removed from the transaction pool
type DropTxsEvent struct{
Txs []*types.Transaction
Expand Down
5 changes: 5 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,11 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool
}
}

func (p *BlobPool) SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription {
f := &event.Feed{}
return f.Subscribe(ch)
}

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *BlobPool) Nonce(addr common.Address) uint64 {
Expand Down
9 changes: 9 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type LegacyPool struct {
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
futureTxFeed event.Feed
dropTxFeed event.Feed
rejectTxFeed event.Feed
scope event.SubscriptionScope
Expand Down Expand Up @@ -438,6 +439,12 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
return pool.txFeed.Subscribe(ch)
}

// SubscribeFutureTransactions registers a subscription for new transaction queue
// events.
func (pool *LegacyPool) SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription {
return pool.futureTxFeed.Subscribe(ch)
}

// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription {
Expand Down Expand Up @@ -809,6 +816,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
pool.futureTxFeed.Send(core.NewFutureTxsEvent{Txs: []*types.Transaction{tx}})
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
Expand All @@ -820,6 +828,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
if err != nil {
return false, err
}
pool.futureTxFeed.Send(core.NewFutureTxsEvent{Txs: []*types.Transaction{tx}})
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
Expand Down
2 changes: 2 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type SubPool interface {
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription

SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Nonce(addr common.Address) uint64
Expand Down
8 changes: 8 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool)
return p.subs.Track(event.JoinSubscriptions(subs...))
}

func (p *TxPool) SubscribeFutureTransactions(ch chan<- core.NewFutureTxsEvent) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeFutureTransactions(ch)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}

// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
// starts sending event to the given channel.
func (p *TxPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription {
Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S
return b.eth.txPool.SubscribeTransactions(ch, true)
}

func (b *EthAPIBackend) SubscribeNewFutureTxsEvent(ch chan<- core.NewFutureTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeFutureTransactions(ch)
}

func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
return b.eth.Downloader().Progress()
}
Expand Down
3 changes: 2 additions & 1 deletion eth/filters/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type RPCTransaction struct {
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`

Trace *blocknative.Trace `json:"trace,omitempty"`
Trace *blocknative.Trace `json:"trace,omitempty"`
Future bool `json:"future"`
}

// newRPCTransaction returns a transaction that will serialize to the RPC
Expand Down
39 changes: 38 additions & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Backend interface {
CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeNewFutureTxsEvent(chan<- core.NewFutureTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
Expand Down Expand Up @@ -169,6 +170,9 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription

FutureTransactionsSubscription

// LastIndexSubscription keeps track of the last index
LastIndexSubscription
)
Expand Down Expand Up @@ -212,6 +216,8 @@ type EventSystem struct {
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event

futureTxsSub event.Subscription

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
Expand All @@ -220,6 +226,8 @@ type EventSystem struct {
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event

futureTxsCh chan core.NewFutureTxsEvent
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -240,6 +248,8 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),

futureTxsCh: make(chan core.NewFutureTxsEvent, txChanSize),
}

// Subscribe events
Expand All @@ -249,8 +259,10 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)

m.futureTxsSub = m.backend.SubscribeNewFutureTxsEvent(m.futureTxsCh)

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.futureTxsSub == nil {
log.Crit("Subscribe for event system failed")
}

Expand Down Expand Up @@ -425,6 +437,20 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc
return es.subscribe(sub)
}

func (es *EventSystem) SubscribeFutureTxs(txs chan []*types.Transaction) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: FutureTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: txs,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

type filterIndex map[Type]map[rpc.ID]*subscription

func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
Expand Down Expand Up @@ -457,6 +483,12 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)
}
}

func (es *EventSystem) handleFutureTxsEvent(filters filterIndex, ev core.NewFutureTxsEvent) {
for _, f := range filters[FutureTransactionsSubscription] {
f.txs <- ev.Txs
}
}

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
Expand Down Expand Up @@ -553,6 +585,7 @@ func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.txsSub.Unsubscribe()
es.futureTxsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
Expand All @@ -568,6 +601,8 @@ func (es *EventSystem) eventLoop() {
select {
case ev := <-es.txsCh:
es.handleTxsEvent(index, ev)
case ev := <-es.futureTxsCh:
es.handleFutureTxsEvent(index, ev)
case ev := <-es.logsCh:
es.handleLogs(index, ev)
case ev := <-es.rmLogsCh:
Expand Down Expand Up @@ -600,6 +635,8 @@ func (es *EventSystem) eventLoop() {
// System stopped
case <-es.txsSub.Err():
return
case <-es.futureTxsSub.Err():
return
case <-es.logsSub.Err():
return
case <-es.rmLogsSub.Err():
Expand Down
Loading

0 comments on commit 7c113cd

Please sign in to comment.