diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 2382e9e5d..99719d4ac 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -767,12 +767,13 @@ impl ConsensusApi for Consensus { let mut pruning_utxoset_write = self.pruning_utxoset_stores.write(); pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap(); - // Parallelize processing - let inner_multiset = + // Parallelize processing using the context of an existing thread pool. + let inner_multiset = self.virtual_processor.install(|| { utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| { a.combine(&b); a - }); + }) + }); current_multiset.combine(&inner_multiset); } diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index c217b34f5..a8e1f7f2f 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -1210,6 +1210,15 @@ impl VirtualStateProcessor { true } } + + /// Executes `op` within the thread pool associated with this processor. + pub fn install(&self, op: OP) -> R + where + OP: FnOnce() -> R + Send, + R: Send, + { + self.thread_pool.install(op) + } } enum MergesetIncreaseResult {