Skip to content

Commit

Permalink
Merge bitcoin#27675: p2p: Drop m_recently_announced_invs bloom filter
Browse files Browse the repository at this point in the history
fb02ba3 mempool_entry: improve struct packing (Anthony Towns)
1a11806 net_processing: Clean up INVENTORY_BROADCAST_MAX constants (Anthony Towns)
6fa4993 test: Check tx from disconnected block is immediately requestable (glozow)
e4ffabb net_processing: don't add txids to m_tx_inventory_known_filter (Anthony Towns)
6ec1809 net_processing: drop m_recently_announced_invs bloom filter (Anthony Towns)
a70beaf validation: when adding txs due to a block reorg, allow immediate relay (Anthony Towns)
1e9684f mempool_entry: add mempool entry sequence number (Anthony Towns)

Pull request description:

  This PR replaces the `m_recently_announced_invs` bloom filter with a simple sequence number tracking the mempool state when we last considered sending an INV message to a node. This saves 33kB per peer (or more if we raise the rate at which we relay transactions over the network, in which case we would need to increase the size of the bloom filter proportionally).

  The philosophy here (compare with bitcoin#18861 and bitcoin#19109) is that we consider the rate limiting on INV messages to only be about saving bandwidth and not protecting privacy, and therefore after you receive an INV message, it's immediately fair game to request any transaction that was in the mempool at the time the INV message was sent. We likewise consider the BIP 133 feefilter and BIP 37 bloom filters to be bandwidth optimisations here, and treat transactions as requestable if they would have been announced without those filters. Given that philosophy, tracking the timestamp of the last INV message and comparing that against the mempool entry time allows removal of each of `m_recently_announced_invs`, `m_last_mempool_req` and `UNCONDITIONAL_RELAY_DELAY` and associated logic.

ACKs for top commit:
  naumenkogs:
    ACK fb02ba3
  amitiuttarwar:
    review ACK fb02ba3
  glozow:
    reACK fb02ba3

Tree-SHA512: cbba5ee04c86df26b6057f3654c00a2b45ec94d354f4f157a769cecdaa0b509edaac02b3128afba39b023e82473fc5e28c915a787f84457ffe66638c6ac9c2d4
  • Loading branch information
fanquake committed Aug 17, 2023
2 parents 60d3e4b + fb02ba3 commit a62f5ee
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 91 deletions.
3 changes: 2 additions & 1 deletion src/bench/mempool_eviction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ static void AddTx(const CTransactionRef& tx, const CAmount& nFee, CTxMemPool& po
{
int64_t nTime = 0;
unsigned int nHeight = 1;
uint64_t sequence = 0;
bool spendsCoinbase = false;
unsigned int sigOpCost = 4;
LockPoints lp;
pool.addUnchecked(CTxMemPoolEntry(
tx, nFee, nTime, nHeight,
tx, nFee, nTime, nHeight, sequence,
spendsCoinbase, sigOpCost, lp));
}

Expand Down
3 changes: 2 additions & 1 deletion src/bench/mempool_stress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ static void AddTx(const CTransactionRef& tx, CTxMemPool& pool) EXCLUSIVE_LOCKS_R
{
int64_t nTime = 0;
unsigned int nHeight = 1;
uint64_t sequence = 0;
bool spendsCoinbase = false;
unsigned int sigOpCost = 4;
LockPoints lp;
pool.addUnchecked(CTxMemPoolEntry(tx, 1000, nTime, nHeight, spendsCoinbase, sigOpCost, lp));
pool.addUnchecked(CTxMemPoolEntry(tx, 1000, nTime, nHeight, sequence, spendsCoinbase, sigOpCost, lp));
}

struct Available {
Expand Down
2 changes: 1 addition & 1 deletion src/bench/rpc_mempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
static void AddTx(const CTransactionRef& tx, const CAmount& fee, CTxMemPool& pool) EXCLUSIVE_LOCKS_REQUIRED(cs_main, pool.cs)
{
LockPoints lp;
pool.addUnchecked(CTxMemPoolEntry(tx, fee, /*time=*/0, /*entry_height=*/1, /*spends_coinbase=*/false, /*sigops_cost=*/4, lp));
pool.addUnchecked(CTxMemPoolEntry(tx, fee, /*time=*/0, /*entry_height=*/1, /*entry_sequence=*/0, /*spends_coinbase=*/false, /*sigops_cost=*/4, lp));
}

static void RpcMempool(benchmark::Bench& bench)
Expand Down
5 changes: 4 additions & 1 deletion src/kernel/mempool_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class CTxMemPoolEntry
const int32_t nTxWeight; //!< ... and avoid recomputing tx weight (also used for GetTxSize())
const size_t nUsageSize; //!< ... and total memory usage
const int64_t nTime; //!< Local time when entering the mempool
const uint64_t entry_sequence; //!< Sequence number used to determine whether this transaction is too recent for relay
const unsigned int entryHeight; //!< Chain height when entering the mempool
const bool spendsCoinbase; //!< keep track of transactions that spend a coinbase
const int64_t sigOpCost; //!< Total sigop cost
Expand All @@ -101,14 +102,15 @@ class CTxMemPoolEntry

public:
CTxMemPoolEntry(const CTransactionRef& tx, CAmount fee,
int64_t time, unsigned int entry_height,
int64_t time, unsigned int entry_height, uint64_t entry_sequence,
bool spends_coinbase,
int64_t sigops_cost, LockPoints lp)
: tx{tx},
nFee{fee},
nTxWeight{GetTransactionWeight(*tx)},
nUsageSize{RecursiveDynamicUsage(tx)},
nTime{time},
entry_sequence{entry_sequence},
entryHeight{entry_height},
spendsCoinbase{spends_coinbase},
sigOpCost{sigops_cost},
Expand All @@ -130,6 +132,7 @@ class CTxMemPoolEntry
int32_t GetTxWeight() const { return nTxWeight; }
std::chrono::seconds GetTime() const { return std::chrono::seconds{nTime}; }
unsigned int GetHeight() const { return entryHeight; }
uint64_t GetSequence() const { return entry_sequence; }
int64_t GetSigOpCost() const { return sigOpCost; }
CAmount GetModifiedFee() const { return m_modified_fee; }
size_t DynamicMemoryUsage() const { return nUsageSize; }
Expand Down
102 changes: 23 additions & 79 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
#include <optional>
#include <typeinfo>

/** How long a transaction has to be in the mempool before it can unconditionally be relayed. */
static constexpr auto UNCONDITIONAL_RELAY_DELAY = 2min;
/** Headers download timeout.
* Timeout = base + per_header * (expected number of headers) */
static constexpr auto HEADERS_DOWNLOAD_TIMEOUT_BASE = 15min;
Expand Down Expand Up @@ -149,15 +147,12 @@ static constexpr auto OUTBOUND_INVENTORY_BROADCAST_INTERVAL{2s};
/** Maximum rate of inventory items to send per second.
* Limits the impact of low-fee transaction floods. */
static constexpr unsigned int INVENTORY_BROADCAST_PER_SECOND = 7;
/** Target number of tx inventory items to send per transmission. */
static constexpr unsigned int INVENTORY_BROADCAST_TARGET = INVENTORY_BROADCAST_PER_SECOND * count_seconds(INBOUND_INVENTORY_BROADCAST_INTERVAL);
/** Maximum number of inventory items to send per transmission. */
static constexpr unsigned int INVENTORY_BROADCAST_MAX = INVENTORY_BROADCAST_PER_SECOND * count_seconds(INBOUND_INVENTORY_BROADCAST_INTERVAL);
/** The number of most recently announced transactions a peer can request. */
static constexpr unsigned int INVENTORY_MAX_RECENT_RELAY = 3500;
/** Verify that INVENTORY_MAX_RECENT_RELAY is enough to cache everything typically
* relayed before unconditional relay from the mempool kicks in. This is only a
* lower bound, and it should be larger to account for higher inv rate to outbound
* peers, and random variations in the broadcast mechanism. */
static_assert(INVENTORY_MAX_RECENT_RELAY >= INVENTORY_BROADCAST_PER_SECOND * UNCONDITIONAL_RELAY_DELAY / std::chrono::seconds{1}, "INVENTORY_RELAY_MAX too low");
static constexpr unsigned int INVENTORY_BROADCAST_MAX = 1000;
static_assert(INVENTORY_BROADCAST_MAX >= INVENTORY_BROADCAST_TARGET, "INVENTORY_BROADCAST_MAX too low");
static_assert(INVENTORY_BROADCAST_MAX <= MAX_PEER_TX_ANNOUNCEMENTS, "INVENTORY_BROADCAST_MAX too high");
/** Average delay between feefilter broadcasts in seconds. */
static constexpr auto AVG_FEEFILTER_BROADCAST_INTERVAL{10min};
/** Maximum feefilter broadcast delay after significant change. */
Expand Down Expand Up @@ -273,13 +268,10 @@ struct Peer {
/** A bloom filter for which transactions to announce to the peer. See BIP37. */
std::unique_ptr<CBloomFilter> m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr};

/** A rolling bloom filter of all announced tx CInvs to this peer */
CRollingBloomFilter m_recently_announced_invs GUARDED_BY(NetEventsInterface::g_msgproc_mutex){INVENTORY_MAX_RECENT_RELAY, 0.000001};

mutable RecursiveMutex m_tx_inventory_mutex;
/** A filter of all the txids and wtxids that the peer has announced to
/** A filter of all the (w)txids that the peer has announced to
* us or we have announced to the peer. We use this to avoid announcing
* the same txid/wtxid to a peer that already has the transaction. */
* the same (w)txid to a peer that already has the transaction. */
CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001};
/** Set of transaction ids we still have to announce (txid for
* non-wtxid-relay peers, wtxid for wtxid-relay peers). We use the
Expand All @@ -290,11 +282,12 @@ struct Peer {
* permitted if the peer has NetPermissionFlags::Mempool or we advertise
* NODE_BLOOM. See BIP35. */
bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false};
/** The last time a BIP35 `mempool` request was serviced. */
std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
/** The next time after which we will send an `inv` message containing
* transaction announcements to this peer. */
std::chrono::microseconds m_next_inv_send_time GUARDED_BY(m_tx_inventory_mutex){0};
/** The mempool sequence num at which we sent the last `inv` message to this peer.
* Can relay txs with lower sequence numbers than this (see CTxMempool::info_for_relay). */
uint64_t m_last_inv_sequence GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1};

/** Minimum fee rate with which to filter transaction announcements to this node. See BIP133. */
std::atomic<CAmount> m_fee_filter_received{0};
Expand Down Expand Up @@ -907,7 +900,7 @@ class PeerManagerImpl final : public PeerManager
std::atomic<std::chrono::seconds> m_last_tip_update{0s};

/** Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). */
CTransactionRef FindTxForGetData(const Peer::TxRelay& tx_relay, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now)
CTransactionRef FindTxForGetData(const Peer::TxRelay& tx_relay, const GenTxid& gtxid)
EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex, NetEventsInterface::g_msgproc_mutex);

void ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic<bool>& interruptMsgProc)
Expand Down Expand Up @@ -2288,22 +2281,14 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv&
}
}

CTransactionRef PeerManagerImpl::FindTxForGetData(const Peer::TxRelay& tx_relay, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now)
CTransactionRef PeerManagerImpl::FindTxForGetData(const Peer::TxRelay& tx_relay, const GenTxid& gtxid)
{
auto txinfo = m_mempool.info(gtxid);
// If a tx was in the mempool prior to the last INV for this peer, permit the request.
auto txinfo = m_mempool.info_for_relay(gtxid, tx_relay.m_last_inv_sequence);
if (txinfo.tx) {
// If a TX could have been INVed in reply to a MEMPOOL request,
// or is older than UNCONDITIONAL_RELAY_DELAY, permit the request
// unconditionally.
if ((mempool_req.count() && txinfo.m_time <= mempool_req) || txinfo.m_time <= now - UNCONDITIONAL_RELAY_DELAY) {
return std::move(txinfo.tx);
}
return std::move(txinfo.tx);
}

// Otherwise, the transaction might have been announced recently.
bool recent = tx_relay.m_recently_announced_invs.contains(gtxid.GetHash());
if (recent && txinfo.tx) return std::move(txinfo.tx);

// Or it might be from the most recent block
{
LOCK(m_most_recent_block_mutex);
Expand All @@ -2326,10 +2311,6 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());

const auto now{GetTime<std::chrono::seconds>()};
// Get last mempool request time
const auto mempool_req = tx_relay != nullptr ? tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min();

// Process as many TX items from the front of the getdata queue as
// possible, since they're common and it's efficient to batch process
// them.
Expand All @@ -2347,33 +2328,12 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
continue;
}

CTransactionRef tx = FindTxForGetData(*tx_relay, ToGenTxid(inv), mempool_req, now);
CTransactionRef tx = FindTxForGetData(*tx_relay, ToGenTxid(inv));
if (tx) {
// WTX and WITNESS_TX imply we serialize with witness
int nSendFlags = (inv.IsMsgTx() ? SERIALIZE_TRANSACTION_NO_WITNESS : 0);
m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *tx));
m_mempool.RemoveUnbroadcastTx(tx->GetHash());
// As we're going to send tx, make sure its unconfirmed parents are made requestable.
std::vector<uint256> parent_ids_to_add;
{
LOCK(m_mempool.cs);
auto tx_iter = m_mempool.GetIter(tx->GetHash());
if (tx_iter) {
const CTxMemPoolEntry::Parents& parents = (*tx_iter)->GetMemPoolParentsConst();
parent_ids_to_add.reserve(parents.size());
for (const CTxMemPoolEntry& parent : parents) {
if (parent.GetTime() > now - UNCONDITIONAL_RELAY_DELAY) {
parent_ids_to_add.push_back(parent.GetTx().GetHash());
}
}
}
}
for (const uint256& parent_txid : parent_ids_to_add) {
// Relaying a transaction with a recent but unconfirmed parent.
if (WITH_LOCK(tx_relay->m_tx_inventory_mutex, return !tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) {
tx_relay->m_recently_announced_invs.insert(parent_txid);
}
}
} else {
vNotFound.push_back(inv);
}
Expand Down Expand Up @@ -4131,14 +4091,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,

const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
AddKnownTx(*peer, hash);
if (peer->m_wtxid_relay && txid != wtxid) {
// Insert txid into m_tx_inventory_known_filter, even for
// wtxidrelay peers. This prevents re-adding of
// unconfirmed parents to the recently_announced
// filter, when a child tx is requested. See
// ProcessGetData().
AddKnownTx(*peer, txid);
}

LOCK(cs_main);

Expand Down Expand Up @@ -5684,7 +5636,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
std::vector<CInv> vInv;
{
LOCK(peer->m_block_inv_mutex);
vInv.reserve(std::max<size_t>(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_MAX));
vInv.reserve(std::max<size_t>(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_TARGET));

// Add blocks
for (const uint256& hash : peer->m_blocks_for_inv_relay) {
Expand Down Expand Up @@ -5736,14 +5688,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
if (!tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
}
tx_relay->m_tx_inventory_known_filter.insert(hash);
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
vInv.push_back(inv);
if (vInv.size() == MAX_INV_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
}
tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
}

// Determine transactions to relay
Expand All @@ -5763,8 +5713,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// especially since we have many peers and some will draw much shorter delays.
unsigned int nRelayedTransactions = 0;
LOCK(tx_relay->m_bloom_filter_mutex);
size_t broadcast_max{INVENTORY_BROADCAST_MAX + (tx_relay->m_tx_inventory_to_send.size()/1000)*5};
broadcast_max = std::min<size_t>(1000, broadcast_max);
size_t broadcast_max{INVENTORY_BROADCAST_TARGET + (tx_relay->m_tx_inventory_to_send.size()/1000)*5};
broadcast_max = std::min<size_t>(INVENTORY_BROADCAST_MAX, broadcast_max);
while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) {
// Fetch the top element from the heap
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
Expand All @@ -5783,30 +5733,24 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
if (!txinfo.tx) {
continue;
}
auto txid = txinfo.tx->GetHash();
// Peer told you to not send transactions at that feerate? Don't bother sending it.
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
continue;
}
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
tx_relay->m_recently_announced_invs.insert(hash);
vInv.push_back(inv);
nRelayedTransactions++;
if (vInv.size() == MAX_INV_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
tx_relay->m_tx_inventory_known_filter.insert(hash);
if (hash != txid) {
// Insert txid into m_tx_inventory_known_filter, even for
// wtxidrelay peers. This prevents re-adding of
// unconfirmed parents to the recently_announced
// filter, when a child tx is requested. See
// ProcessGetData().
tx_relay->m_tx_inventory_known_filter.insert(txid);
}
}

// Ensure we'll respond to GETDATA requests for anything we've just announced
LOCK(m_mempool.cs);
tx_relay->m_last_inv_sequence = m_mempool.GetSequence();
}
}
if (!vInv.empty())
Expand Down
2 changes: 1 addition & 1 deletion src/node/interfaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ class ChainImpl : public Chain
{
if (!m_node.mempool) return true;
LockPoints lp;
CTxMemPoolEntry entry(tx, 0, 0, 0, false, 0, lp);
CTxMemPoolEntry entry(tx, 0, 0, 0, 0, false, 0, lp);
const CTxMemPool::Limits& limits{m_node.mempool->m_limits};
LOCK(m_node.mempool->cs);
return m_node.mempool->CalculateMemPoolAncestors(entry, limits).has_value();
Expand Down
3 changes: 2 additions & 1 deletion src/test/fuzz/util/mempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ CTxMemPoolEntry ConsumeTxMemPoolEntry(FuzzedDataProvider& fuzzed_data_provider,
const CAmount fee{ConsumeMoney(fuzzed_data_provider, /*max=*/std::numeric_limits<CAmount>::max() / CAmount{100'000})};
assert(MoneyRange(fee));
const int64_t time = fuzzed_data_provider.ConsumeIntegral<int64_t>();
const uint64_t entry_sequence{fuzzed_data_provider.ConsumeIntegral<uint64_t>()};
const unsigned int entry_height = fuzzed_data_provider.ConsumeIntegral<unsigned int>();
const bool spends_coinbase = fuzzed_data_provider.ConsumeBool();
const unsigned int sig_op_cost = fuzzed_data_provider.ConsumeIntegralInRange<unsigned int>(0, MAX_BLOCK_SIGOPS_COST);
return CTxMemPoolEntry{MakeTransactionRef(tx), fee, time, entry_height, spends_coinbase, sig_op_cost, {}};
return CTxMemPoolEntry{MakeTransactionRef(tx), fee, time, entry_height, entry_sequence, spends_coinbase, sig_op_cost, {}};
}
4 changes: 2 additions & 2 deletions src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ std::vector<CTransactionRef> TestChain100Setup::PopulateMempool(FastRandomContex
LOCK2(cs_main, m_node.mempool->cs);
LockPoints lp;
m_node.mempool->addUnchecked(CTxMemPoolEntry(ptx, /*fee=*/(total_in - num_outputs * amount_per_output),
/*time=*/0, /*entry_height=*/1,
/*time=*/0, /*entry_height=*/1, /*entry_sequence=*/0,
/*spends_coinbase=*/false, /*sigops_cost=*/4, lp));
}
--num_transactions;
Expand Down Expand Up @@ -454,7 +454,7 @@ void TestChain100Setup::MockMempoolMinFee(const CFeeRate& target_feerate)
const auto tx_fee = target_feerate.GetFee(GetVirtualTransactionSize(*tx)) -
m_node.mempool->m_incremental_relay_feerate.GetFee(GetVirtualTransactionSize(*tx));
m_node.mempool->addUnchecked(CTxMemPoolEntry(tx, /*fee=*/tx_fee,
/*time=*/0, /*entry_height=*/1,
/*time=*/0, /*entry_height=*/1, /*entry_sequence=*/0,
/*spends_coinbase=*/true, /*sigops_cost=*/1, lp));
m_node.mempool->TrimToSize(0);
assert(m_node.mempool->GetMinFee() == target_feerate);
Expand Down
2 changes: 1 addition & 1 deletion src/test/util/txmempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ CTxMemPoolEntry TestMemPoolEntryHelper::FromTx(const CMutableTransaction& tx) co

CTxMemPoolEntry TestMemPoolEntryHelper::FromTx(const CTransactionRef& tx) const
{
return CTxMemPoolEntry{tx, nFee, TicksSinceEpoch<std::chrono::seconds>(time), nHeight, spendsCoinbase, sigOpCost, lp};
return CTxMemPoolEntry{tx, nFee, TicksSinceEpoch<std::chrono::seconds>(time), nHeight, m_sequence, spendsCoinbase, sigOpCost, lp};
}
Loading

0 comments on commit a62f5ee

Please sign in to comment.