Skip to content

Commit

Permalink
Private Transaction API Sample (v1.10.13)
Browse files Browse the repository at this point in the history
  • Loading branch information
xcarlo authored and Ruteri committed Jun 15, 2022
1 parent 9457b13 commit 4d28194
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 24 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolPrivateLifetimeFlag,
utils.SyncModeFlag,
utils.ExitWhenSyncedFlag,
utils.GCModeFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolPrivateLifetimeFlag,
},
},
{
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ var (
Usage: "Maximum amount of time non-executable transaction are queued",
Value: ethconfig.Defaults.TxPool.Lifetime,
}
TxPoolPrivateLifetimeFlag = cli.DurationFlag{
Name: "txpool.privatelifetime",
Usage: "Maximum amount of time private transactions are withheld from public broadcasting",
Value: ethconfig.Defaults.TxPool.PrivateTxLifetime,
}
// Performance tuning settings
CacheFlag = cli.IntFlag{
Name: "cache",
Expand Down Expand Up @@ -1466,6 +1471,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
if ctx.GlobalIsSet(TxPoolPrivateLifetimeFlag.Name) {
cfg.PrivateTxLifetime = ctx.GlobalDuration(TxPoolPrivateLifetimeFlag.Name)
}

addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",")
for _, address := range addresses {
Expand Down
113 changes: 101 additions & 12 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ var (
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
privateTxCleanupInterval = 1 * time.Hour
)

var (
Expand Down Expand Up @@ -164,7 +165,8 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
PrivateTxLifetime time.Duration // Maximum amount of time to keep private transactions private

TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config.
}
Expand All @@ -183,7 +185,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
Lifetime: 3 * time.Hour,
PrivateTxLifetime: 3 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -222,6 +225,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
conf.Lifetime = DefaultTxPoolConfig.Lifetime
}
if conf.PrivateTxLifetime < 1 {
log.Warn("Sanitizing invalid txpool private tx lifetime", "provided", conf.PrivateTxLifetime, "updated", DefaultTxPoolConfig.PrivateTxLifetime)
conf.PrivateTxLifetime = DefaultTxPoolConfig.PrivateTxLifetime
}
return conf
}

Expand Down Expand Up @@ -261,6 +268,7 @@ type TxPool struct {
NewMegabundleHooks []func(common.Address, *types.MevBundle)
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
privateTxs *timestampedTxHashSet

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
Expand Down Expand Up @@ -296,6 +304,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
beats: make(map[common.Address]time.Time),
megabundles: make(map[common.Address]types.MevBundle),
all: newTxLookup(),
privateTxs: newExpiringTxHashSet(config.PrivateTxLifetime),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
Expand Down Expand Up @@ -346,9 +355,10 @@ func (pool *TxPool) loop() {
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
privateTx = time.NewTicker(privateTxCleanupInterval)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
Expand Down Expand Up @@ -412,6 +422,10 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()
}

// Remove stale hashes that must be kept private
case <-privateTx.C:
pool.privateTxs.prune()
}
}
}
Expand Down Expand Up @@ -532,6 +546,11 @@ func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types.
return pending, queued
}

// IsPrivateTxHash indicates whether the transaction should be shared with peers
func (pool *TxPool) IsPrivateTxHash(hash common.Hash) bool {
return pool.privateTxs.Contains(hash)
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
Expand Down Expand Up @@ -958,7 +977,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
return pool.addTxs(txs, !pool.config.NoLocals, true)
return pool.addTxs(txs, !pool.config.NoLocals, true, false)
}

// AddLocal enqueues a single local transaction into the pool if it is valid. This is
Expand All @@ -974,12 +993,18 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, false)
return pool.addTxs(txs, false, false, false)
}

// AddPrivateRemote adds transactions to the pool, but does not broadcast these transactions to any peers.
func (pool *TxPool) AddPrivateRemote(tx *types.Transaction) error {
errs := pool.addTxs([]*types.Transaction{tx}, false, false, true)
return errs[0]
}

// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, true)
return pool.addTxs(txs, false, true, false)
}

// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
Expand All @@ -998,7 +1023,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
}

// addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync, private bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
Expand Down Expand Up @@ -1027,6 +1052,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
return errs
}

// Track private transactions, so they don't get leaked to the public mempool
if private {
for _, tx := range news {
pool.privateTxs.Add(tx.Hash())
}
}

// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
Expand Down Expand Up @@ -1321,7 +1353,11 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
if len(events) > 0 {
var txs []*types.Transaction
for _, set := range events {
txs = append(txs, set.Flatten()...)
for _, tx := range set.Flatten() {
if !pool.IsPrivateTxHash(tx.Hash()) {
txs = append(txs, tx)
}
}
}
pool.txFeed.Send(NewTxsEvent{txs})
}
Expand Down Expand Up @@ -1931,6 +1967,59 @@ func (t *txLookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
return found
}

type timestampedTxHashSet struct {
lock sync.RWMutex
hashes []common.Hash
timestamps map[common.Hash]time.Time
ttl time.Duration
}

func newExpiringTxHashSet(ttl time.Duration) *timestampedTxHashSet {
s := &timestampedTxHashSet{
hashes: make([]common.Hash, 0),
timestamps: make(map[common.Hash]time.Time),
ttl: ttl,
}

return s
}

func (s *timestampedTxHashSet) Add(hash common.Hash) {
s.lock.Lock()
defer s.lock.Unlock()

s.hashes = append(s.hashes, hash)
s.timestamps[hash] = time.Now().Add(s.ttl)
}

func (s *timestampedTxHashSet) Contains(hash common.Hash) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, ok := s.timestamps[hash]
return ok
}

func (s *timestampedTxHashSet) prune() {
s.lock.Lock()
defer s.lock.Unlock()

var (
count int
now = time.Now()
)
for _, hash := range s.hashes {
ts := s.timestamps[hash]
if ts.After(now) {
break
}

delete(s.timestamps, hash)
count += 1
}

s.hashes = s.hashes[count:]
}

// numSlots calculates the number of slots needed for a single transaction.
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
Expand Down
8 changes: 6 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,12 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}

func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error {
if private {
return b.eth.txPool.AddPrivateRemote(signedTx)
} else {
return b.eth.txPool.AddLocal(signedTx)
}
}

func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
Expand Down
4 changes: 4 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type txPool interface {
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription

// IsPrivateTxHash indicates if the transaction hash should not
// be broadcast on public channels
IsPrivateTxHash(hash common.Hash) bool
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down
5 changes: 5 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
return p.txFeed.Subscribe(ch)
}

// IsPrivateTxHash always returns false in tests
func (p *testTxPool) IsPrivateTxHash(hash common.Hash) bool {
return false
}

// testHandler is a live implementation of the Ethereum protocol handler, just
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.
Expand Down
4 changes: 4 additions & 0 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type Backend interface {
type TxPool interface {
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction

// IsPrivateTxHash indicates if the transaction hash should not
// be broadcast on public channels
IsPrivateTxHash(hash common.Hash) bool
}

// MakeProtocols constructs the P2P protocol definitions for `eth`.
Expand Down
7 changes: 6 additions & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func (h *handler) syncTransactions(p *eth.Peer) {
var txs types.Transactions
pending := h.txpool.Pending(false)
for _, batch := range pending {
txs = append(txs, batch...)
for _, tx := range batch {
// don't share any transactions marked as private
if !h.txpool.IsPrivateTxHash(tx.Hash()) {
txs = append(txs, tx)
}
}
}
if len(txs) == 0 {
return
Expand Down
2 changes: 1 addition & 1 deletion graphql/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ func (r *Resolver) SendRawTransaction(ctx context.Context, args struct{ Data hex
if err := tx.UnmarshalBinary(args.Data); err != nil {
return common.Hash{}, err
}
hash, err := ethapi.SubmitTransaction(ctx, r.backend, tx)
hash, err := ethapi.SubmitTransaction(ctx, r.backend, tx, false)
return hash, err
}

Expand Down
25 changes: 19 additions & 6 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (s *PrivateAccountAPI) SendTransaction(ctx context.Context, args Transactio
log.Warn("Failed transaction send attempt", "from", args.from(), "to", args.To, "value", args.Value.ToInt(), "err", err)
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, signed)
return SubmitTransaction(ctx, s.b, signed, false)
}

// SignTransaction will create a transaction from the given arguments and
Expand Down Expand Up @@ -1655,7 +1655,7 @@ func (s *PublicTransactionPoolAPI) sign(addr common.Address, tx *types.Transacti
}

// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction, private bool) (common.Hash, error) {
// If the transaction fee cap is already specified, ensure the
// fee of the given transaction is _reasonable_.
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
Expand All @@ -1665,7 +1665,7 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
// Ensure only eip155 signed transactions are submitted if EIP155Required is set.
return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
}
if err := b.SendTx(ctx, tx); err != nil {
if err := b.SendTx(ctx, tx, private); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
Expand Down Expand Up @@ -1713,7 +1713,7 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Tra
if err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, signed)
return SubmitTransaction(ctx, s.b, signed, false)
}

// FillTransaction fills the defaults (nonce, gas, gasPrice or 1559 fields)
Expand All @@ -1740,7 +1740,20 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, input
if err := tx.UnmarshalBinary(input); err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, tx)
return SubmitTransaction(ctx, s.b, tx, false)
}

// SendPrivateRawTransaction will add the signed transaction to the transaction pool,
// without broadcasting the transaction to its peers, and mark the transaction to avoid
// future syncs.
//
// See SendRawTransaction.
func (s *PublicTransactionPoolAPI) SendPrivateRawTransaction(ctx context.Context, input hexutil.Bytes) (common.Hash, error) {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(input); err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, tx, true)
}

// Sign calculates an ECDSA signature for:
Expand Down Expand Up @@ -1873,7 +1886,7 @@ func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, sendArgs Transact
if err != nil {
return common.Hash{}, err
}
if err = s.b.SendTx(ctx, signedTx); err != nil {
if err = s.b.SendTx(ctx, signedTx, false); err != nil {
return common.Hash{}, err
}
return signedTx.Hash(), nil
Expand Down
Loading

0 comments on commit 4d28194

Please sign in to comment.