Skip to content

Commit

Permalink
fix the performance issue caused by track_entry_statics
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Mar 14, 2024
1 parent 0ad645f commit 08e50f2
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 19 deletions.
64 changes: 50 additions & 14 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ckb_types::{
};
use multi_index_map::MultiIndexMap;
use std::collections::HashSet;
use std::time::Instant;

type ConflictEntry = (TxEntry, Reject);

Expand Down Expand Up @@ -71,6 +72,9 @@ pub struct PoolMap {
pub(crate) total_tx_size: usize,
// sum of all tx_pool tx's cycles.
pub(crate) total_tx_cycles: Cycle,
pub(crate) pending_count: usize,
pub(crate) gap_count: usize,
pub(crate) proposed_count: usize,
}

impl PoolMap {
Expand All @@ -82,6 +86,9 @@ impl PoolMap {
max_ancestors_count,
total_tx_size: 0,
total_tx_cycles: 0,
pending_count: 0,
gap_count: 0,
proposed_count: 0,
}
}

Expand Down Expand Up @@ -124,11 +131,16 @@ impl PoolMap {
}

pub(crate) fn get_max_update_time(&self) -> u64 {
self.entries
let instant = Instant::now();
let res = self
.entries
.iter()
.map(|(_, entry)| entry.inner.timestamp)
.max()
.unwrap_or(0)
.unwrap_or(0);
let duration = instant.elapsed();
debug!("[Perf] get_max_update_time duration: {:?}", duration);
res
}

pub(crate) fn get_by_id(&self, id: &ProposalShortId) -> Option<&PoolEntry> {
Expand All @@ -144,12 +156,11 @@ impl PoolMap {
}

pub(crate) fn pending_size(&self) -> usize {
self.entries.get_by_status(&Status::Pending).len()
+ self.entries.get_by_status(&Status::Gap).len()
self.pending_count + self.gap_count
}

pub(crate) fn proposed_size(&self) -> usize {
self.entries.get_by_status(&Status::Proposed).len()
self.proposed_count
}

pub(crate) fn sorted_proposed_iter(&self) -> impl Iterator<Item = &TxEntry> {
Expand Down Expand Up @@ -213,19 +224,21 @@ impl PoolMap {
self.record_entry_edges(&entry)?;
self.insert_entry(&entry, status);
self.record_entry_descendants(&entry);
self.track_entry_statics();
self.track_entry_statics(None, Some(status));
self.update_stat_for_add_tx(entry.size, entry.cycles);
Ok((true, evicts))
}

/// Change the status of the entry, only used for `gap_rtx` and `proposed_rtx`
pub(crate) fn set_entry(&mut self, short_id: &ProposalShortId, status: Status) {
let mut old_status = None;
self.entries
.modify_by_id(short_id, |e| {
old_status = Some(e.status);
e.status = status;
})
.expect("unconsistent pool");
self.track_entry_statics();
self.track_entry_statics(old_status, Some(status));
}

pub(crate) fn remove_entry(&mut self, id: &ProposalShortId) -> Option<TxEntry> {
Expand All @@ -239,6 +252,7 @@ impl PoolMap {
self.update_descendants_index_key(&entry.inner, EntryOp::Remove);
self.remove_entry_edges(&entry.inner);
self.remove_entry_links(id);
self.track_entry_statics(Some(entry.status), None);
self.update_stat_for_remove_tx(entry.inner.size, entry.inner.cycles);
entry.inner
})
Expand Down Expand Up @@ -625,20 +639,42 @@ impl PoolMap {
});
}

fn track_entry_statics(&self) {
pub fn track_entry_statics(&mut self, remove: Option<Status>, add: Option<Status>) {
match remove {
Some(Status::Pending) => self.pending_count -= 1,
Some(Status::Gap) => self.gap_count -= 1,
Some(Status::Proposed) => self.proposed_count -= 1,
_ => {}
}
match add {
Some(Status::Pending) => self.pending_count += 1,
Some(Status::Gap) => self.gap_count += 1,
Some(Status::Proposed) => self.proposed_count += 1,
_ => {}
}
assert_eq!(
self.pending_count + self.gap_count + self.proposed_count,
self.entries.len()
);
// let duration = instant.elapsed();
// eprintln!(
// "pending: {}, gap: {}, proposed: {} => duration: {:?} total_entries_size: {}",
// self.pending_count,
// self.gap_count,
// self.proposed_count,
// duration,
// self.entries.len()
// );
if let Some(metrics) = ckb_metrics::handle() {
metrics
.ckb_tx_pool_entry
.pending
.set(self.entries.get_by_status(&Status::Pending).len() as i64);
metrics
.ckb_tx_pool_entry
.gap
.set(self.entries.get_by_status(&Status::Gap).len() as i64);
.set(self.pending_count as i64);
metrics.ckb_tx_pool_entry.gap.set(self.gap_count as i64);
metrics
.ckb_tx_pool_entry
.proposed
.set(self.proposed_size() as i64);
.set(self.proposed_count as i64);
}
}

Expand Down
64 changes: 64 additions & 0 deletions tx-pool/src/component/tests/proposed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use crate::component::tests::util::{
build_tx, build_tx_with_dep, build_tx_with_header_dep, DEFAULT_MAX_ANCESTORS_COUNT,
MOCK_CYCLES, MOCK_FEE, MOCK_SIZE,
};
use ckb_types::core::capacity_bytes;
use ckb_types::core::ScriptHashType;
use ckb_types::packed::CellOutputBuilder;
use ckb_types::packed::ScriptBuilder;
use ckb_types::H256;
use std::time::Instant;

use crate::component::{entry::TxEntry, pool_map::PoolMap};
use ckb_types::{
Expand Down Expand Up @@ -732,3 +738,61 @@ fn test_container_bench_add_limits() {
pool.clear();
assert_eq!(pool.size(), 0);
}

#[test]
fn test_pool_map_bench() {
use rand::Rng;
let mut rng = rand::thread_rng();

let mut pool = PoolMap::new(150);

let mut instant = Instant::now();
for i in 0..500000 {
let lock_script1 = ScriptBuilder::default()
.code_hash(H256(rand::random()).pack())
.hash_type(ScriptHashType::Data.into())
.args(Bytes::from(b"lock_script1".to_vec()).pack())
.build();

let type_script1 = ScriptBuilder::default()
.code_hash(H256(rand::random()).pack())
.hash_type(ScriptHashType::Data.into())
.args(Bytes::from(b"type_script1".to_vec()).pack())
.build();

let tx = TransactionBuilder::default()
.output(
CellOutputBuilder::default()
.capacity(capacity_bytes!(1000).pack())
.lock(lock_script1)
.type_(Some(type_script1).pack())
.build(),
)
.output_data(Default::default())
.build();

let entry = TxEntry::dummy_resolve(
tx,
rng.gen_range(0..1000),
Capacity::shannons(200),
rng.gen_range(0..1000),
);
let short_id = entry.proposal_short_id();
if i % 1000 == 0 {
eprintln!("i: {}, time: {:?}", i, instant.elapsed());
instant = Instant::now();
}
let status = if rng.gen_range(0..100) >= 30 {
Status::Pending
} else {
Status::Gap
};
pool.add_entry(entry, status);
}
// for _i in 0..100 {
// let instant = Instant::now();
// let res = pool.track_entry_statics(None, None);
// let duration = instant.elapsed();
// eprintln!("duration: {:?}", duration);
// }
}
8 changes: 8 additions & 0 deletions tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use ckb_types::{
use lru::LruCache;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

const COMMITTED_HASH_CACHE_SIZE: usize = 100_000;
const CONFLICTES_CACHE_SIZE: usize = 10_000;
Expand Down Expand Up @@ -254,6 +255,8 @@ impl TxPool {
// Expire all transaction (and their dependencies) in the pool.
pub(crate) fn remove_expired(&mut self, callbacks: &Callbacks) {
let now_ms = ckb_systemtime::unix_time_as_millis();
let instant = Instant::now();

let removed: Vec<_> = self
.pool_map
.iter()
Expand All @@ -268,10 +271,13 @@ impl TxPool {
let reject = Reject::Expiry(entry.timestamp);
callbacks.call_reject(self, &entry, reject);
}
let duration = instant.elapsed();
debug!("[Perf] remove_expired duration: {:?}", duration);
}

// Remove transactions from the pool until total size <= size_limit.
pub(crate) fn limit_size(&mut self, callbacks: &Callbacks) {
let instant = Instant::now();
while self.pool_map.total_tx_size > self.config.max_tx_pool_size {
let next_evict_entry = || {
self.pool_map
Expand All @@ -297,6 +303,8 @@ impl TxPool {
}
}
self.pool_map.entries.shrink_to_fit();
let duration = instant.elapsed();
debug!("[Perf] limit_size duration: {:?}", duration);
}

// remove transaction with detached proposal from gap and proposed
Expand Down
26 changes: 25 additions & 1 deletion tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use ckb_verification::{
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::task::block_in_place;

const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks
Expand Down Expand Up @@ -103,6 +103,7 @@ impl TxPoolService {
entry: TxEntry,
mut status: TxStatus,
) -> (Result<(), Reject>, Arc<Snapshot>) {
let instant = Instant::now();
let (ret, snapshot) = self
.with_tx_pool_write_lock(move |tx_pool, snapshot| {
// check_rbf must be invoked in `write` lock to avoid concurrent issues.
Expand Down Expand Up @@ -170,6 +171,8 @@ impl TxPoolService {
})
.await;

let duration = instant.elapsed();
debug!("[Perf] submit_entry: {:?}", duration);
(ret, snapshot)
}

Expand Down Expand Up @@ -238,6 +241,7 @@ impl TxPoolService {
tx: &TransactionView,
) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
// Acquire read lock for cheap check
let instant = Instant::now();
let tx_size = tx.data().serialized_size_in_block();

let (ret, snapshot) = self
Expand Down Expand Up @@ -279,6 +283,8 @@ impl TxPoolService {
}
})
.await;
let duration = instant.elapsed();
debug!("[Perf] pre-check: {:?}", duration);
(ret, snapshot)
}

Expand Down Expand Up @@ -1125,6 +1131,7 @@ fn _submit_entry(
) -> Result<HashSet<TxEntry>, Reject> {
let tx_hash = entry.transaction().hash();
debug!("submit_entry {:?} {}", status, tx_hash);
let start = Instant::now();
let (succ, evicts) = match status {
TxStatus::Fresh => tx_pool.add_pending(entry.clone())?,
TxStatus::Gap => tx_pool.add_gap(entry.clone())?,
Expand All @@ -1137,6 +1144,8 @@ fn _submit_entry(
TxStatus::Proposed => callbacks.call_proposed(&entry),
}
}
let duration = start.elapsed();
debug!("[Perf] Time elapsed in _submit_entry is: {:?}", duration);
Ok(evicts)
}

Expand All @@ -1149,6 +1158,7 @@ fn _update_tx_pool_for_reorg(
callbacks: &Callbacks,
mine_mode: bool,
) {
let instant = Instant::now();
tx_pool.snapshot = Arc::clone(&snapshot);

// NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
Expand All @@ -1166,6 +1176,7 @@ fn _update_tx_pool_for_reorg(
let mut proposals = Vec::new();
let mut gaps = Vec::new();

let instant = Instant::now();
for entry in tx_pool.pool_map.entries.get_by_status(&Status::Gap) {
let short_id = entry.inner.proposal_short_id();
if snapshot.proposals().contains_proposed(&short_id) {
Expand All @@ -1182,7 +1193,16 @@ fn _update_tx_pool_for_reorg(
gaps.push(elem);
}
}
let duration = instant.elapsed();
debug!("[Perf] reorg duration: {:?}", duration);
debug!(
"[Perf] reorg size: tx_pool size {:?} snapshot gap: {:?}, proposed: {:?}",
tx_pool.pool_map.entries.len(),
snapshot.proposals().gap().len(),
snapshot.proposals().set().len(),
);

let instant = Instant::now();
for (id, entry) in proposals {
debug!("begin to proposed: {:x}", id);
if let Err(e) = tx_pool.proposed_rtx(&id) {
Expand All @@ -1208,13 +1228,17 @@ fn _update_tx_pool_for_reorg(
callbacks.call_reject(tx_pool, &entry, e.clone());
}
}
let duration = instant.elapsed();
debug!("[Perf] reorg setting: {:?}", duration);
}

// Remove expired transaction from pending
tx_pool.remove_expired(callbacks);

// Remove transactions from the pool until its size <= size_limit.
tx_pool.limit_size(callbacks);
let duration = instant.elapsed();
debug!("[Perf] reorg _update_tx_pool_for_reorg: {:?}", duration);
}

pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool {
Expand Down
Loading

0 comments on commit 08e50f2

Please sign in to comment.