Skip to content

Commit

Permalink
Introduce mempool changesets
Browse files Browse the repository at this point in the history
Introduce the CTxMemPool::ChangeSet, a wrapper for creating (potential) new
mempool entries and removing conflicts.
  • Loading branch information
sdaftuar committed Nov 13, 2024
1 parent 87d92fa commit 802214c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 21 deletions.
20 changes: 20 additions & 0 deletions src/txmempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1369,3 +1369,23 @@ util::Result<std::pair<std::vector<FeeFrac>, std::vector<FeeFrac>>> CTxMemPool::
std::sort(new_chunks.begin(), new_chunks.end(), std::greater());
return std::make_pair(old_chunks, new_chunks);
}

CTxMemPool::ChangeSet::TxHandle CTxMemPool::ChangeSet::StageAddition(const CTransactionRef& tx, const CAmount fee, int64_t time, unsigned int entry_height, uint64_t entry_sequence, bool spends_coinbase, int64_t sigops_cost, LockPoints lp)
{
auto newit = m_to_add.emplace(tx, fee, time, entry_height, entry_sequence, spends_coinbase, sigops_cost, lp).first;
m_entry_vec.push_back(newit);
return newit;
}

void CTxMemPool::ChangeSet::Apply()
{
LOCK(m_pool->cs);
m_pool->RemoveStaged(m_to_remove, false, MemPoolRemovalReason::REPLACED);
for (size_t i=0; i<m_entry_vec.size(); ++i) {
auto tx_entry = m_entry_vec[i];
m_pool->addUnchecked(*tx_entry);
}
m_to_add.clear();
m_to_remove.clear();
m_entry_vec.clear();
}
31 changes: 31 additions & 0 deletions src/txmempool.h
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,37 @@ class CTxMemPool
assert(m_epoch.guarded()); // verify guard even when it==nullopt
return !it || visited(*it);
}

class ChangeSet {
public:
explicit ChangeSet(CTxMemPool* pool) : m_pool(pool) {}
~ChangeSet() = default;

ChangeSet(const ChangeSet&) = delete;
ChangeSet& operator=(const ChangeSet&) = delete;

using TxHandle = CTxMemPool::txiter;

TxHandle StageAddition(const CTransactionRef& tx, const CAmount fee, int64_t time, unsigned int entry_height, uint64_t entry_sequence, bool spends_coinbase, int64_t sigops_cost, LockPoints lp);
void StageRemoval(CTxMemPool::txiter it) { m_to_remove.insert(it); }

util::Result<CTxMemPool::setEntries> CalculateMemPoolAncestors(TxHandle tx, const Limits& limits)
{
LOCK(m_pool->cs);
auto ret{m_pool->CalculateMemPoolAncestors(*tx, limits)};
return ret;
}

void Apply() EXCLUSIVE_LOCKS_REQUIRED(cs_main);

private:
CTxMemPool* m_pool;
CTxMemPool::indexed_transaction_set m_to_add;
std::vector<CTxMemPool::txiter> m_entry_vec; // track the added transactions' insertion order
CTxMemPool::setEntries m_to_remove;
};

std::unique_ptr<ChangeSet> GetChangeSet() EXCLUSIVE_LOCKS_REQUIRED(cs) { return std::make_unique<ChangeSet>(this); }
};

/**
Expand Down
50 changes: 29 additions & 21 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,9 @@ class MemPoolAccept
CTxMemPool::setEntries m_iters_conflicting;
/** All mempool ancestors of this transaction. */
CTxMemPool::setEntries m_ancestors;
/** Mempool entry constructed for this transaction. Constructed in PreChecks() but not
* inserted into the mempool until Finalize(). */
std::unique_ptr<CTxMemPoolEntry> m_entry;
/* Changeset representing adding a transaction and removing its conflicts. */
std::unique_ptr<CTxMemPool::ChangeSet> m_changeset;
CTxMemPool::ChangeSet::TxHandle m_tx_handle;
/** Whether RBF-related data structures (m_conflicts, m_iters_conflicting, m_all_conflicting,
* m_replaced_transactions) include a sibling in addition to txns with conflicting inputs. */
bool m_sibling_eviction{false};
Expand Down Expand Up @@ -780,7 +780,6 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)

// Alias what we need out of ws
TxValidationState& state = ws.m_state;
std::unique_ptr<CTxMemPoolEntry>& entry = ws.m_entry;

if (!CheckTransaction(tx, state)) {
return false; // state filled in by CheckTransaction
Expand Down Expand Up @@ -909,9 +908,10 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
// Set entry_sequence to 0 when bypass_limits is used; this allows txs from a block
// reorg to be marked earlier than any child txs that were already in the mempool.
const uint64_t entry_sequence = bypass_limits ? 0 : m_pool.GetSequence();
entry.reset(new CTxMemPoolEntry(ptx, ws.m_base_fees, nAcceptTime, m_active_chainstate.m_chain.Height(), entry_sequence,
fSpendsCoinbase, nSigOpsCost, lock_points.value()));
ws.m_vsize = entry->GetTxSize();
ws.m_changeset = m_pool.GetChangeSet();
ws.m_tx_handle = ws.m_changeset->StageAddition(ptx, ws.m_base_fees, nAcceptTime, m_active_chainstate.m_chain.Height(), entry_sequence, fSpendsCoinbase, nSigOpsCost, lock_points.value());

ws.m_vsize = ws.m_tx_handle->GetTxSize();

// Enforces 0-fee for dust transactions, no incentive to be mined alone
if (m_pool.m_opts.require_standard) {
Expand Down Expand Up @@ -983,7 +983,7 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
maybe_rbf_limits.descendant_size_vbytes += conflict->GetSizeWithDescendants();
}

if (auto ancestors{m_pool.CalculateMemPoolAncestors(*entry, maybe_rbf_limits)}) {
if (auto ancestors{ws.m_changeset->CalculateMemPoolAncestors(ws.m_tx_handle, maybe_rbf_limits)}) {
ws.m_ancestors = std::move(*ancestors);
} else {
// If CalculateMemPoolAncestors fails second time, we want the original error string.
Expand Down Expand Up @@ -1015,7 +1015,7 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
if (ws.m_vsize > EXTRA_DESCENDANT_TX_SIZE_LIMIT || ws.m_ptx->version == TRUC_VERSION) {
return state.Invalid(TxValidationResult::TX_MEMPOOL_POLICY, "too-long-mempool-chain", error_message);
}
if (auto ancestors_retry{m_pool.CalculateMemPoolAncestors(*entry, cpfp_carve_out_limits)}) {
if (auto ancestors_retry{ws.m_changeset->CalculateMemPoolAncestors(ws.m_tx_handle, cpfp_carve_out_limits)}) {
ws.m_ancestors = std::move(*ancestors_retry);
} else {
return state.Invalid(TxValidationResult::TX_MEMPOOL_POLICY, "too-long-mempool-chain", error_message);
Expand Down Expand Up @@ -1114,6 +1114,11 @@ bool MemPoolAccept::ReplacementChecks(Workspace& ws)
return state.Invalid(TxValidationResult::TX_RECONSIDERABLE,
strprintf("insufficient fee%s", ws.m_sibling_eviction ? " (including sibling eviction)" : ""), *err_string);
}

// Add all the to-be-removed transactions to the changeset.
for (auto it : m_subpackage.m_all_conflicts) {
ws.m_changeset->StageRemoval(it);
}
return true;
}

Expand Down Expand Up @@ -1173,7 +1178,9 @@ bool MemPoolAccept::PackageMempoolChecks(const std::vector<CTransactionRef>& txn
"package RBF failed: too many potential replacements", *err_string);
}


for (CTxMemPool::txiter it : m_subpackage.m_all_conflicts) {
parent_ws.m_changeset->StageRemoval(it);
m_subpackage.m_conflicting_fees += it->GetModifiedFee();
m_subpackage.m_conflicting_size += it->GetTxSize();
}
Expand Down Expand Up @@ -1283,7 +1290,6 @@ bool MemPoolAccept::Finalize(const ATMPArgs& args, Workspace& ws)
const uint256& hash = ws.m_hash;
TxValidationState& state = ws.m_state;
const bool bypass_limits = args.m_bypass_limits;
std::unique_ptr<CTxMemPoolEntry>& entry = ws.m_entry;

if (!m_subpackage.m_all_conflicts.empty()) Assume(args.m_allow_replacement);
// Remove conflicting transactions from the mempool
Expand All @@ -1296,25 +1302,23 @@ bool MemPoolAccept::Finalize(const ATMPArgs& args, Workspace& ws)
it->GetTxSize(),
hash.ToString(),
tx.GetWitnessHash().ToString(),
entry->GetFee(),
entry->GetTxSize());
ws.m_tx_handle->GetFee(),
ws.m_tx_handle->GetTxSize());
TRACEPOINT(mempool, replaced,
it->GetTx().GetHash().data(),
it->GetTxSize(),
it->GetFee(),
std::chrono::duration_cast<std::chrono::duration<std::uint64_t>>(it->GetTime()).count(),
hash.data(),
entry->GetTxSize(),
entry->GetFee()
ws.m_tx_handle->GetTxSize(),
ws.m_tx_handle->GetFee()
);
m_subpackage.m_replaced_transactions.push_back(it->GetSharedTx());
}
m_pool.RemoveStaged(m_subpackage.m_all_conflicts, false, MemPoolRemovalReason::REPLACED);
ws.m_changeset->Apply();
// Don't attempt to process the same conflicts repeatedly during subpackage evaluation:
// they no longer exist on subsequent calls to Finalize() post-RemoveStaged
// they no longer exist on subsequent calls to Finalize() post-Apply()
m_subpackage.m_all_conflicts.clear();
// Store transaction in memory
m_pool.addUnchecked(*entry, ws.m_ancestors);

// trim mempool and check if tx was trimmed
// If we are validating a package, don't trim here because we could evict a previous transaction
Expand Down Expand Up @@ -1359,7 +1363,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
// Re-calculate mempool ancestors to call addUnchecked(). They may have changed since the
// last calculation done in PreChecks, since package ancestors have already been submitted.
{
auto ancestors{m_pool.CalculateMemPoolAncestors(*ws.m_entry, m_pool.m_opts.limits)};
auto ancestors{ws.m_changeset->CalculateMemPoolAncestors(ws.m_tx_handle, m_pool.m_opts.limits)};
if(!ancestors) {
results.emplace(ws.m_ptx->GetWitnessHash(), MempoolAcceptResult::Failure(ws.m_state));
// Since PreChecks() and PackageMempoolChecks() both enforce limits, this should never fail.
Expand Down Expand Up @@ -1400,6 +1404,8 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&

// Add successful results. The returned results may change later if LimitMempoolSize() evicts them.
for (Workspace& ws : workspaces) {
auto iter = m_pool.GetIter(ws.m_ptx->GetHash());
Assume(iter.has_value());
const auto effective_feerate = args.m_package_feerates ? ws.m_package_feerate :
CFeeRate{ws.m_modified_fees, static_cast<uint32_t>(ws.m_vsize)};
const auto effective_feerate_wtxids = args.m_package_feerates ? all_package_wtxids :
Expand All @@ -1410,7 +1416,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
if (!m_pool.m_opts.signals) continue;
const CTransaction& tx = *ws.m_ptx;
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
ws.m_vsize, ws.m_entry->GetHeight(),
ws.m_vsize, (*iter)->GetHeight(),
args.m_bypass_limits, args.m_package_submission,
IsCurrentForFeeEstimation(m_active_chainstate),
m_pool.HasNoInputsOf(tx));
Expand Down Expand Up @@ -1481,8 +1487,10 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef

if (m_pool.m_opts.signals) {
const CTransaction& tx = *ws.m_ptx;
auto iter = m_pool.GetIter(tx.GetHash());
Assume(iter.has_value());
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
ws.m_vsize, ws.m_entry->GetHeight(),
ws.m_vsize, (*iter)->GetHeight(),
args.m_bypass_limits, args.m_package_submission,
IsCurrentForFeeEstimation(m_active_chainstate),
m_pool.HasNoInputsOf(tx));
Expand Down

0 comments on commit 802214c

Please sign in to comment.