Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate block_body_in_isolation txs in parrallel #603

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub enum RuleError {
#[error("transaction in isolation validation failed for tx {0}: {1}")]
TxInIsolationValidationFailed(TransactionId, TxRuleError),

#[error("block exceeded mass limit of {0}")]
ExceedsMassLimit(u64),
#[error("block {0} with mass {0} exceeds limit of {1}")]
ExceedsMassLimit(Hash, u64, u64),

#[error("transaction {0} has mass field of {1} but mass should be at least {2}")]
MassFieldTooLow(TransactionId, u64, u64),
Expand Down
225 changes: 149 additions & 76 deletions consensus/src/pipeline/body_processor/body_validation_in_isolation.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,95 @@
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
ops::Add,
sync::Arc,
};

use super::BlockBodyProcessor;
use crate::errors::{BlockProcessResult, RuleError};
use kaspa_consensus_core::{block::Block, merkle::calc_hash_merkle_root, tx::TransactionOutpoint};
use kaspa_consensus_core::mass::MassCalculator;
use kaspa_consensus_core::{
block::Block,
merkle::calc_hash_merkle_root,
tx::{Transaction, TransactionId, TransactionIndexType, TransactionInput, TransactionOutpoint, COINBASE_TRANSACTION_INDEX},
};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};

struct BlockBodyValidationContext {
pub max_block_mass: u64,
pub storage_mass_activation: bool,
pub total_calculated_mass: u64,
pub calculated_mass: Vec<u64>,
pub existing_outpoints_count: HashMap<TransactionOutpoint, usize>,
pub number_of_input_outpoints: usize,
pub block_created_outpoints: HashSet<TransactionOutpoint>,
pub transaction_ids_count: HashMap<TransactionId, usize>,
}

impl BlockBodyValidationContext {
fn new(block: &Block, max_block_mass: u64, storage_mass_activation: bool, mass_calculator: Arc<MassCalculator>) -> Arc<Self> {
let mut transaction_ids_count = HashMap::<TransactionId, usize>::with_capacity(block.transactions.len());
let mut existing_outpoints_count = HashMap::<TransactionOutpoint, usize>::new();
let mut block_created_outpoints = HashSet::new();
let mut calculated_mass = Vec::<u64>::with_capacity(block.transactions.len());
let mut total_calculated_mass = 0u64;
let mut number_of_input_outpoints = 0;
for tx in block.transactions.iter() {
let tx_id = tx.id();
match transaction_ids_count.entry(tx_id) {
Entry::Occupied(mut entry) => {
entry.insert(entry.get().add(1));
}
Entry::Vacant(entry) => {
entry.insert(1);
}
};
number_of_input_outpoints += tx.inputs.len();
for input in tx.inputs.iter() {
match existing_outpoints_count.entry(input.previous_outpoint) {
Entry::Occupied(mut entry) => {
entry.insert(entry.get().add(1));
}
Entry::Vacant(entry) => {
entry.insert(1);
}
};
}
block_created_outpoints.extend(
(0..tx.outputs.len()).map(|index| TransactionOutpoint { transaction_id: tx_id, index: index as TransactionIndexType }),
);
let calculated_tx_mass = mass_calculator.calc_tx_compute_mass(tx);
calculated_mass.push(calculated_tx_mass);
total_calculated_mass = total_calculated_mass.saturating_add(calculated_tx_mass);
}
Arc::new(Self {
max_block_mass,
storage_mass_activation,
number_of_input_outpoints,
total_calculated_mass,
calculated_mass,
existing_outpoints_count,
block_created_outpoints,
transaction_ids_count,
})
}
}

impl BlockBodyProcessor {
pub fn validate_body_in_isolation(self: &Arc<Self>, block: &Block) -> BlockProcessResult<u64> {
let storage_mass_activated = self.storage_mass_activation.is_active(block.header.daa_score);

Self::check_has_transactions(block)?;
Self::check_hash_merkle_root(block, storage_mass_activated)?;
Self::check_only_one_coinbase(block)?;
self.check_transactions_in_isolation(block)?;
let mass = self.check_block_mass(block, storage_mass_activated)?;
self.check_duplicate_transactions(block)?;
self.check_block_double_spends(block)?;
self.check_no_chained_transactions(block)?;
Ok(mass)
let bbvc = &BlockBodyValidationContext::new(
block,
self.max_block_mass,
storage_mass_activated,
Arc::new(self.mass_calculator.clone()),
);
Self::check_duplicate_transactions(bbvc, block)?;
Self::check_input_double_spends(bbvc)?;
Self::check_transactions_full(self, bbvc, block)?;
Self::check_block_mass(bbvc, block)?;
Ok(bbvc.total_calculated_mass)
}

fn check_has_transactions(block: &Block) -> BlockProcessResult<()> {
Expand All @@ -28,6 +101,33 @@ impl BlockBodyProcessor {
Ok(())
}

fn check_transactions_full(&self, bbvc: &Arc<BlockBodyValidationContext>, block: &Block) -> BlockProcessResult<()> {
self.thread_pool.install(|| {
block.transactions.par_iter().enumerate().try_for_each(|(index, tx)| {
Self::validate_transaction_with_context(&(bbvc.clone()), tx, index as TransactionIndexType)?;
self.validate_transaction_in_isolation(tx)
// TODO: the tx hash may be cached from this point onward. Consider caching it here.
// i.e. something like: tx.finalize_hash(bbvc.storage_mass_activation)
// and then retrieve via tx.hash() in the future.
})
})
}

fn validate_transaction_in_isolation(&self, tx: &Transaction) -> BlockProcessResult<()> {
self.transaction_validator.validate_tx_in_isolation(tx).map_err(|err| RuleError::TxInIsolationValidationFailed(tx.id(), err))
}

fn validate_transaction_with_context(
bbvc: &Arc<BlockBodyValidationContext>,
tx: &Transaction,
index: TransactionIndexType,
) -> BlockProcessResult<()> {
Self::check_coinbase(tx, index)?;
Self::check_transaction_mass(bbvc, tx, index)?;
Self::check_transaction_inputs_with_context(bbvc, tx)?;
Ok(())
}

fn check_hash_merkle_root(block: &Block, storage_mass_activated: bool) -> BlockProcessResult<()> {
let calculated = calc_hash_merkle_root(block.transactions.iter(), storage_mass_activated);
if calculated != block.header.hash_merkle_root {
Expand All @@ -36,91 +136,64 @@ impl BlockBodyProcessor {
Ok(())
}

fn check_only_one_coinbase(block: &Block) -> BlockProcessResult<()> {
if !block.transactions[0].is_coinbase() {
return Err(RuleError::FirstTxNotCoinbase);
}

if let Some(i) = block.transactions[1..].iter().position(|tx| tx.is_coinbase()) {
return Err(RuleError::MultipleCoinbases(i));
fn check_coinbase(tx: &Transaction, index: TransactionIndexType) -> BlockProcessResult<()> {
if index as usize == COINBASE_TRANSACTION_INDEX {
if !tx.is_coinbase() {
Err(RuleError::FirstTxNotCoinbase)
} else {
Ok(())
}
} else if tx.is_coinbase() {
Err(RuleError::MultipleCoinbases(index as usize))
} else {
Ok(())
}
}

fn check_block_mass(bbvc: &Arc<BlockBodyValidationContext>, block: &Block) -> BlockProcessResult<()> {
if bbvc.total_calculated_mass > bbvc.max_block_mass {
return Err(RuleError::ExceedsMassLimit(block.hash(), bbvc.total_calculated_mass, bbvc.max_block_mass));
};
Ok(())
}

fn check_transactions_in_isolation(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
for tx in block.transactions.iter() {
if let Err(e) = self.transaction_validator.validate_tx_in_isolation(tx) {
return Err(RuleError::TxInIsolationValidationFailed(tx.id(), e));
}
fn check_transaction_mass(
bbvc: &Arc<BlockBodyValidationContext>,
tx: &Transaction,
index: TransactionIndexType,
) -> BlockProcessResult<()> {
if bbvc.storage_mass_activation && tx.mass() < bbvc.calculated_mass[index as usize] {
return Err(RuleError::MassFieldTooLow(tx.id(), tx.mass(), bbvc.calculated_mass[index as usize]));
}
Ok(())
}

fn check_block_mass(self: &Arc<Self>, block: &Block, storage_mass_activated: bool) -> BlockProcessResult<u64> {
let mut total_mass: u64 = 0;
if storage_mass_activated {
for tx in block.transactions.iter() {
// This is only the compute part of the mass, the storage part cannot be computed here
let calculated_tx_compute_mass = self.mass_calculator.calc_tx_compute_mass(tx);
let committed_contextual_mass = tx.mass();
// We only check the lower-bound here, a precise check of the mass commitment
// is done when validating the tx in context
if committed_contextual_mass < calculated_tx_compute_mass {
return Err(RuleError::MassFieldTooLow(tx.id(), committed_contextual_mass, calculated_tx_compute_mass));
}
// Sum over the committed masses
total_mass = total_mass.saturating_add(committed_contextual_mass);
if total_mass > self.max_block_mass {
return Err(RuleError::ExceedsMassLimit(self.max_block_mass));
}
}
} else {
for tx in block.transactions.iter() {
let calculated_tx_mass = self.mass_calculator.calc_tx_compute_mass(tx);
total_mass = total_mass.saturating_add(calculated_tx_mass);
if total_mass > self.max_block_mass {
return Err(RuleError::ExceedsMassLimit(self.max_block_mass));
}
}
}
Ok(total_mass)
fn check_transaction_inputs_with_context(bbvc: &Arc<BlockBodyValidationContext>, tx: &Transaction) -> BlockProcessResult<()> {
tx.inputs.iter().try_for_each(|input| Self::check_no_chained_inputs(bbvc, input))
}

fn check_block_double_spends(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let mut existing = HashSet::new();
for input in block.transactions.iter().flat_map(|tx| &tx.inputs) {
if !existing.insert(input.previous_outpoint) {
return Err(RuleError::DoubleSpendInSameBlock(input.previous_outpoint));
}
fn check_input_double_spends(bbvc: &Arc<BlockBodyValidationContext>) -> BlockProcessResult<()> {
if bbvc.existing_outpoints_count.len() < bbvc.number_of_input_outpoints {
return Err(RuleError::DoubleSpendInSameBlock(
*bbvc.existing_outpoints_count.iter().find(|(_, count)| **count > 1).unwrap().0,
));
}
Ok(())
}

fn check_no_chained_transactions(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let mut block_created_outpoints = HashSet::new();
for tx in block.transactions.iter() {
for index in 0..tx.outputs.len() {
block_created_outpoints.insert(TransactionOutpoint { transaction_id: tx.id(), index: index as u32 });
}
}

for input in block.transactions.iter().flat_map(|tx| &tx.inputs) {
if block_created_outpoints.contains(&input.previous_outpoint) {
return Err(RuleError::ChainedTransaction(input.previous_outpoint));
}
fn check_no_chained_inputs(bbvc: &Arc<BlockBodyValidationContext>, input: &TransactionInput) -> BlockProcessResult<()> {
if bbvc.block_created_outpoints.contains(&input.previous_outpoint) {
return Err(RuleError::ChainedTransaction(input.previous_outpoint));
}
Ok(())
}

fn check_duplicate_transactions(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let mut ids = HashSet::new();
for tx in block.transactions.iter() {
if !ids.insert(tx.id()) {
return Err(RuleError::DuplicateTransactions(tx.id()));
}
fn check_duplicate_transactions(bbvc: &Arc<BlockBodyValidationContext>, block: &Block) -> BlockProcessResult<()> {
if bbvc.transaction_ids_count.len() < block.transactions.len() {
return Err(RuleError::DuplicateTransactions(
*bbvc.transaction_ids_count.iter().find(|(_, count)| **count > 1).unwrap().0,
));
}

Ok(())
}
}
Expand Down Expand Up @@ -415,7 +488,7 @@ mod tests {
txs[1].inputs[0].sig_op_count = 255;
txs[1].inputs[1].sig_op_count = 255;
block.header.hash_merkle_root = calc_hash_merkle_root(txs.iter());
assert_match!(body_processor.validate_body_in_isolation(&block.to_immutable()), Err(RuleError::ExceedsMassLimit(_)));
assert_match!(body_processor.validate_body_in_isolation(&block.to_immutable()), Err(RuleError::ExceedsMassLimit(_, _, _)));

let mut block = example_block.clone();
let txs = &mut block.transactions;
Expand Down
Loading