Skip to content

Commit

Permalink
IBD sync: recover sampled window (#598)
Browse files Browse the repository at this point in the history
* Search for the full consecutive window covering all sampled blocks

* unrelated: reachability docs

* Avoid searching for the cover if window is not sampled

* cleanup WindowType::FullDifficultyWindow

* rename

* Fix cache origin issue and simplify cache management

* prevent access to block window cache get w/o specifying origin

* Suggested refactor for determining lock time type prior the call (to avoid leaking logic out of the TransactionValidator)

* long due renames

* renames and comments

* move window "cover" logic into WindowManager

* unrelated technical debt:  make sure to run par_iter within the context of an existing thread pool (avoid creating a global thread pool if possible)
  • Loading branch information
michaelsutton authored Nov 28, 2024
1 parent 73159f7 commit c63dfc0
Show file tree
Hide file tree
Showing 18 changed files with 419 additions and 180 deletions.
2 changes: 1 addition & 1 deletion consensus/benches/check_scripts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion, SamplingMode};
use kaspa_addresses::{Address, Prefix, Version};
use kaspa_consensus::processes::transaction_validator::transaction_validator_populated::{
use kaspa_consensus::processes::transaction_validator::tx_validation_in_utxo_context::{
check_scripts_par_iter, check_scripts_par_iter_pool, check_scripts_sequential,
};
use kaspa_consensus_core::hashing::sighash::{calc_schnorr_signature_hash, SigHashReusedValuesUnsync};
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,12 +767,13 @@ impl ConsensusApi for Consensus {
let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap();

// Parallelize processing
let inner_multiset =
// Parallelize processing using the context of an existing thread pool.
let inner_multiset = self.virtual_processor.install(|| {
utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});
})
});

current_multiset.combine(&inner_multiset);
}
Expand Down Expand Up @@ -979,7 +980,7 @@ impl ConsensusApi for Consensus {
Ok(self
.services
.window_manager
.block_window(&self.ghostdag_store.get_data(hash).unwrap(), WindowType::SampledDifficultyWindow)
.block_window(&self.ghostdag_store.get_data(hash).unwrap(), WindowType::DifficultyWindow)
.unwrap()
.deref()
.iter()
Expand Down
21 changes: 21 additions & 0 deletions consensus/src/model/services/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,35 @@ use crate::processes::reachability::{inquirer, Result};
use kaspa_hashes::Hash;

pub trait ReachabilityService {
/// Checks if `this` block is a chain ancestor of `queried` block (i.e., `this ∈ chain(queried) ∪ {queried}`).
/// Note that we use the graph theory convention here which defines that a block is also an ancestor of itself.
fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool;

/// Result version of [`is_dag_ancestor_of`] (avoids unwrapping internally)
fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result<bool>;

/// Returns true if `this` is a DAG ancestor of `queried` (i.e., `queried ∈ future(this) ∪ {this}`).
/// Note: this method will return true if `this == queried`.
/// The complexity of this method is `O(log(|future_covering_set(this)|))`
fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool;

/// Checks if `this` is DAG ancestor of any of the blocks in `queried`. See [`is_dag_ancestor_of`] as well.
fn is_dag_ancestor_of_any(&self, this: Hash, queried: &mut impl Iterator<Item = Hash>) -> bool;

/// Checks if any of the blocks in `list` is DAG ancestor of `queried`. See [`is_dag_ancestor_of`] as well.
fn is_any_dag_ancestor(&self, list: &mut impl Iterator<Item = Hash>, queried: Hash) -> bool;

/// Result version of [`is_any_dag_ancestor`] (avoids unwrapping internally)
fn is_any_dag_ancestor_result(&self, list: &mut impl Iterator<Item = Hash>, queried: Hash) -> Result<bool>;

/// Finds the tree child of `ancestor` which is also a chain ancestor of `descendant`.
/// (A "tree child of X" is a block which X is its chain parent)
fn get_next_chain_ancestor(&self, descendant: Hash, ancestor: Hash) -> Hash;

/// Returns the chain parent of `this`
fn get_chain_parent(&self, this: Hash) -> Hash;

/// Checks whether `this` has reachability data
fn has_reachability_data(&self, this: Hash) -> bool;
}

Expand Down
44 changes: 37 additions & 7 deletions consensus/src/model/stores/block_window_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::processes::ghostdag::ordering::SortableBlock;
use kaspa_consensus_core::BlockHasher;
use kaspa_database::prelude::Cache;
use kaspa_database::prelude::{Cache, CachePolicy};
use kaspa_hashes::Hash;
use kaspa_utils::mem_size::MemSizeEstimator;
use std::{
Expand All @@ -10,7 +10,7 @@ use std::{
sync::Arc,
};

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowOrigin {
Full,
Sampled,
Expand Down Expand Up @@ -54,16 +54,46 @@ impl DerefMut for BlockWindowHeap {
}
}

/// A newtype wrapper over `[Cache]` meant to prevent erroneous reads of windows from different origins
#[derive(Clone)]
pub struct BlockWindowCacheStore {
inner: Cache<Hash, Arc<BlockWindowHeap>, BlockHasher>,
}

impl BlockWindowCacheStore {
pub fn new(policy: CachePolicy) -> Self {
Self { inner: Cache::new(policy) }
}

pub fn contains_key(&self, key: &Hash) -> bool {
self.inner.contains_key(key)
}

pub fn remove(&self, key: &Hash) -> Option<Arc<BlockWindowHeap>> {
self.inner.remove(key)
}
}

/// Reader API for `BlockWindowCacheStore`.
pub trait BlockWindowCacheReader {
fn get(&self, hash: &Hash) -> Option<Arc<BlockWindowHeap>>;
/// Get the cache entry to this hash conditioned that *it matches the provided origin*.
/// We demand the origin to be provided in order to prevent reader errors.
fn get(&self, hash: &Hash, origin: WindowOrigin) -> Option<Arc<BlockWindowHeap>>;
}

pub type BlockWindowCacheStore = Cache<Hash, Arc<BlockWindowHeap>, BlockHasher>;

impl BlockWindowCacheReader for BlockWindowCacheStore {
#[inline(always)]
fn get(&self, hash: &Hash) -> Option<Arc<BlockWindowHeap>> {
self.get(hash)
fn get(&self, hash: &Hash, origin: WindowOrigin) -> Option<Arc<BlockWindowHeap>> {
self.inner.get(hash).and_then(|win| if win.origin() == origin { Some(win) } else { None })
}
}

pub trait BlockWindowCacheWriter {
fn insert(&self, hash: Hash, window: Arc<BlockWindowHeap>);
}

impl BlockWindowCacheWriter for BlockWindowCacheStore {
fn insert(&self, hash: Hash, window: Arc<BlockWindowHeap>) {
self.inner.insert(hash, window);
}
}
30 changes: 18 additions & 12 deletions consensus/src/pipeline/body_processor/body_validation_in_context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use super::BlockBodyProcessor;
use crate::{
errors::{BlockProcessResult, RuleError},
model::stores::{ghostdag::GhostdagStoreReader, statuses::StatusesStoreReader},
processes::window::WindowManager,
model::stores::statuses::StatusesStoreReader,
processes::{
transaction_validator::{
tx_validation_in_header_context::{LockTimeArg, LockTimeType},
TransactionValidator,
},
window::WindowManager,
},
};
use kaspa_consensus_core::block::Block;
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use once_cell::unsync::Lazy;
use std::sync::Arc;

impl BlockBodyProcessor {
Expand All @@ -17,18 +24,17 @@ impl BlockBodyProcessor {
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
// Note: This is somewhat expensive during ibd, as it incurs cache misses.

let pmt = {
let (pmt, pmt_window) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?;
if !self.block_window_cache_for_past_median_time.contains_key(&block.hash()) {
self.block_window_cache_for_past_median_time.insert(block.hash(), pmt_window);
};
pmt
};
// Use lazy evaluation to avoid unnecessary work, as most of the time we expect the txs not to have lock time.
let lazy_pmt_res = Lazy::new(|| self.window_manager.calc_past_median_time_for_known_hash(block.hash()));

for tx in block.transactions.iter() {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) {
let lock_time_arg = match TransactionValidator::get_lock_time_type(tx) {
LockTimeType::Finalized => LockTimeArg::Finalized,
LockTimeType::DaaScore => LockTimeArg::DaaScore(block.header.daa_score),
// We only evaluate the pmt calculation when actually needed
LockTimeType::Time => LockTimeArg::MedianTime((*lazy_pmt_res).clone()?),
};
if let Err(e) = self.transaction_validator.validate_tx_in_header_context(tx, block.header.daa_score, lock_time_arg) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
};
}
Expand Down
4 changes: 0 additions & 4 deletions consensus/src/pipeline/body_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
services::reachability::MTReachabilityService,
stores::{
block_transactions::DbBlockTransactionsStore,
block_window_cache::BlockWindowCacheStore,
ghostdag::DbGhostdagStore,
headers::DbHeadersStore,
reachability::DbReachabilityStore,
Expand Down Expand Up @@ -67,7 +66,6 @@ pub struct BlockBodyProcessor {
pub(super) headers_store: Arc<DbHeadersStore>,
pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Managers and services
pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
Expand All @@ -93,7 +91,6 @@ pub struct BlockBodyProcessor {
}

impl BlockBodyProcessor {
#[allow(clippy::too_many_arguments)]
pub fn new(
receiver: Receiver<BlockProcessingMessage>,
sender: Sender<VirtualStateProcessingMessage>,
Expand Down Expand Up @@ -122,7 +119,6 @@ impl BlockBodyProcessor {
headers_store: storage.headers_store.clone(),
block_transactions_store: storage.block_transactions_store.clone(),
body_tips_store: storage.body_tips_store.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

reachability_service: services.reachability_service.clone(),
coinbase_manager: services.coinbase_manager.clone(),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
model::{
services::reachability::MTReachabilityService,
stores::{
block_window_cache::{BlockWindowCacheStore, BlockWindowHeap},
block_window_cache::{BlockWindowCacheStore, BlockWindowCacheWriter, BlockWindowHeap},
daa::DbDaaStore,
depth::DbDepthStore,
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
Expand Down
25 changes: 21 additions & 4 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
stores::{
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
block_window_cache::BlockWindowCacheStore,
block_window_cache::{BlockWindowCacheStore, BlockWindowCacheWriter},
daa::DbDaaStore,
depth::{DbDepthStore, DepthStoreReader},
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
Expand All @@ -43,7 +43,7 @@ use crate::{
processes::{
coinbase::CoinbaseManager,
ghostdag::ordering::SortableBlock,
transaction_validator::{errors::TxResult, transaction_validator_populated::TxValidationFlags, TransactionValidator},
transaction_validator::{errors::TxResult, tx_validation_in_utxo_context::TxValidationFlags, TransactionValidator},
window::WindowManager,
},
};
Expand Down Expand Up @@ -807,7 +807,11 @@ impl VirtualStateProcessor {
args: &TransactionValidationArgs,
) -> TxResult<()> {
self.transaction_validator.validate_tx_in_isolation(&mutable_tx.tx)?;
self.transaction_validator.utxo_free_tx_validation(&mutable_tx.tx, virtual_daa_score, virtual_past_median_time)?;
self.transaction_validator.validate_tx_in_header_context_with_args(
&mutable_tx.tx,
virtual_daa_score,
virtual_past_median_time,
)?;
self.validate_mempool_transaction_in_utxo_context(mutable_tx, virtual_utxo_view, virtual_daa_score, args)?;
Ok(())
}
Expand Down Expand Up @@ -896,7 +900,11 @@ impl VirtualStateProcessor {
// No need to validate the transaction in isolation since we rely on the mining manager to submit transactions
// which were previously validated through `validate_mempool_transaction_and_populate`, hence we only perform
// in-context validations
self.transaction_validator.utxo_free_tx_validation(tx, virtual_state.daa_score, virtual_state.past_median_time)?;
self.transaction_validator.validate_tx_in_header_context_with_args(
tx,
virtual_state.daa_score,
virtual_state.past_median_time,
)?;
let ValidatedTransaction { calculated_fee, .. } =
self.validate_transaction_in_utxo_context(tx, utxo_view, virtual_state.daa_score, TxValidationFlags::Full)?;
Ok(calculated_fee)
Expand Down Expand Up @@ -1202,6 +1210,15 @@ impl VirtualStateProcessor {
true
}
}

/// Executes `op` within the thread pool associated with this processor.
pub fn install<OP, R>(&self, op: OP) -> R
where
OP: FnOnce() -> R + Send,
R: Send,
{
self.thread_pool.install(op)
}
}

enum MergesetIncreaseResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
model::stores::{block_transactions::BlockTransactionsStoreReader, daa::DaaStoreReader, ghostdag::GhostdagData},
processes::transaction_validator::{
errors::{TxResult, TxRuleError},
transaction_validator_populated::TxValidationFlags,
tx_validation_in_utxo_context::TxValidationFlags,
},
};
use kaspa_consensus_core::{
Expand Down
10 changes: 4 additions & 6 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
hash_map::Entry::{self},
VecDeque,
},
ops::Deref,
sync::{atomic::AtomicBool, Arc},
};

Expand Down Expand Up @@ -279,12 +278,11 @@ impl PruningProofManager {
// PRUNE SAFETY: called either via consensus under the prune guard or by the pruning processor (hence no pruning in parallel)

for anticone_block in anticone.iter().copied() {
let window = self
.window_manager
.block_window(&self.ghostdag_store.get_data(anticone_block).unwrap(), WindowType::FullDifficultyWindow)
.unwrap();
let ghostdag = self.ghostdag_store.get_data(anticone_block).unwrap();
let window = self.window_manager.block_window(&ghostdag, WindowType::DifficultyWindow).unwrap();
let cover = self.window_manager.consecutive_cover_for_window(ghostdag, &window);

for hash in window.deref().iter().map(|block| block.0.hash) {
for hash in cover {
if let Entry::Vacant(e) = daa_window_blocks.entry(hash) {
e.insert(TrustedHeader {
header: self.headers_store.get_header(hash).unwrap(),
Expand Down
12 changes: 6 additions & 6 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,21 @@ pub fn hint_virtual_selected_parent(store: &mut (impl ReachabilityStore + ?Sized
)
}

/// Checks if the `this` block is a strict chain ancestor of the `queried` block (aka `this ∈ chain(queried)`).
/// Checks if the `this` block is a strict chain ancestor of the `queried` block (i.e., `this ∈ chain(queried)`).
/// Note that this results in `false` if `this == queried`
pub fn is_strict_chain_ancestor_of(store: &(impl ReachabilityStoreReader + ?Sized), this: Hash, queried: Hash) -> Result<bool> {
Ok(store.get_interval(this)?.strictly_contains(store.get_interval(queried)?))
}

/// Checks if `this` block is a chain ancestor of `queried` block (aka `this ∈ chain(queried) ∪ {queried}`).
/// Checks if `this` block is a chain ancestor of `queried` block (i.e., `this ∈ chain(queried) ∪ {queried}`).
/// Note that we use the graph theory convention here which defines that a block is also an ancestor of itself.
pub fn is_chain_ancestor_of(store: &(impl ReachabilityStoreReader + ?Sized), this: Hash, queried: Hash) -> Result<bool> {
Ok(store.get_interval(this)?.contains(store.get_interval(queried)?))
}

/// Returns true if `this` is a DAG ancestor of `queried` (aka `queried ∈ future(this) ∪ {this}`).
/// Returns true if `this` is a DAG ancestor of `queried` (i.e., `queried ∈ future(this) ∪ {this}`).
/// Note: this method will return true if `this == queried`.
/// The complexity of this method is O(log(|future_covering_set(this)|))
/// The complexity of this method is `O(log(|future_covering_set(this)|))`
pub fn is_dag_ancestor_of(store: &(impl ReachabilityStoreReader + ?Sized), this: Hash, queried: Hash) -> Result<bool> {
// First, check if `this` is a chain ancestor of queried
if is_chain_ancestor_of(store, this, queried)? {
Expand All @@ -184,7 +184,7 @@ pub fn is_dag_ancestor_of(store: &(impl ReachabilityStoreReader + ?Sized), this:
}
}

/// Finds the child of `ancestor` which is also a chain ancestor of `descendant`.
/// Finds the tree child of `ancestor` which is also a chain ancestor of `descendant`.
pub fn get_next_chain_ancestor(store: &(impl ReachabilityStoreReader + ?Sized), descendant: Hash, ancestor: Hash) -> Result<Hash> {
if descendant == ancestor {
// The next ancestor does not exist
Expand All @@ -200,7 +200,7 @@ pub fn get_next_chain_ancestor(store: &(impl ReachabilityStoreReader + ?Sized),
}

/// Note: it is important to keep the unchecked version for internal module use,
/// since in some scenarios during reindexing `descendant` might have a modified
/// since in some scenarios during reindexing `ancestor` might have a modified
/// interval which was not propagated yet.
pub(super) fn get_next_chain_ancestor_unchecked(
store: &(impl ReachabilityStoreReader + ?Sized),
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/processes/transaction_validator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod errors;
pub mod transaction_validator_populated;
mod tx_validation_in_isolation;
pub mod tx_validation_not_utxo_related;
pub mod tx_validation_in_header_context;
pub mod tx_validation_in_isolation;
pub mod tx_validation_in_utxo_context;
use std::sync::Arc;

use crate::model::stores::ghostdag;
Expand Down
Loading

0 comments on commit c63dfc0

Please sign in to comment.