From 10e5086343841007cb11fd935b28dd82686ddd15 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Tue, 7 May 2024 01:26:55 +0800 Subject: [PATCH] program cache: reduce contention (#1192) * program cache: reduce contention Before this change we used to take the write lock to extract(). This means that even in the ideal case (all programs are already cached), the cache was contended by all batches and all operations were serialized. With this change we now take the write lock only when we store a new entry in the cache, and take the read lock to extract(). This means that in the common case where most/all programs are cached, there is no contention and all batches progress in parallel. This improves node replay perf by 20-25% on current mnb traffic. * ProgramCache: remove SecondLevel structure --- program-runtime/src/loaded_programs.rs | 86 +++++++++++--------------- svm/src/transaction_processor.rs | 73 +++++++++++----------- 2 files changed, 72 insertions(+), 87 deletions(-) diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 5f2a88061d162c..d1a98b9149bc44 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -21,7 +21,7 @@ use { saturating_add_assign, }, std::{ - collections::HashMap, + collections::{hash_map::Entry, HashMap}, fmt::{Debug, Formatter}, sync::{ atomic::{AtomicU64, Ordering}, @@ -227,7 +227,7 @@ pub struct ProgramCacheStats { pub prunes_orphan: AtomicU64, /// a program got pruned because it was not recompiled for the next epoch pub prunes_environment: AtomicU64, - /// the [SecondLevel] was empty because all slot versions got pruned + /// a program had no entries because all slot versions got pruned pub empty_entries: AtomicU64, } @@ -578,18 +578,6 @@ impl LoadingTaskWaiter { } } -/// Contains all the program versions at a specific address. -#[derive(Debug, Default)] -struct SecondLevel { - /// List of all versions (across all forks) of a program sorted by the slot in which they were modified - slot_versions: Vec>, - /// `Some` if there is currently a cooperative loading task for this program address - /// - /// It is possible that multiple TX batches from different slots need different versions of a program. - /// However, that can only be figured out once a program is loaded and its deployment slot is known. - cooperative_loading_lock: Option<(Slot, std::thread::ThreadId)>, -} - /// This structure is the global cache of loaded, verified and compiled programs. /// /// It ... @@ -608,8 +596,18 @@ struct SecondLevel { pub struct ProgramCache { /// A two level index: /// - /// The first level is for the address at which programs are deployed and the second level for the slot (and thus also fork). - entries: HashMap, + /// - the first level is for the address at which programs are deployed + /// - the second level for the slot (and thus also fork), sorted by slot number. + entries: HashMap>>, + /// The entries that are getting loaded and have not yet finished loading. + /// + /// The key is the program address, the value is a tuple of the slot in which the program is + /// being loaded and the thread ID doing the load. + /// + /// It is possible that multiple TX batches from different slots need different versions of a + /// program. The deployment slot of a program is only known after load tho, + /// so all loads for a given program key are serialized. + loading_entries: Mutex>, /// The slot of the last rerooting pub latest_root_slot: Slot, /// The epoch of the last rerooting @@ -776,6 +774,7 @@ impl ProgramCache { pub fn new(root_slot: Slot, root_epoch: Epoch) -> Self { Self { entries: HashMap::new(), + loading_entries: Mutex::new(HashMap::new()), latest_root_slot: root_slot, latest_root_epoch: root_epoch, environments: ProgramRuntimeEnvironments::default(), @@ -819,7 +818,7 @@ impl ProgramCache { &entry.program, ProgramCacheEntryType::DelayVisibility )); - let slot_versions = &mut self.entries.entry(key).or_default().slot_versions; + let slot_versions = &mut self.entries.entry(key).or_default(); match slot_versions.binary_search_by(|at| { at.effective_slot .cmp(&entry.effective_slot) @@ -860,9 +859,7 @@ impl ProgramCache { pub fn prune_by_deployment_slot(&mut self, slot: Slot) { for second_level in self.entries.values_mut() { - second_level - .slot_versions - .retain(|entry| entry.deployment_slot != slot); + second_level.retain(|entry| entry.deployment_slot != slot); } self.remove_programs_with_no_entries(); } @@ -890,8 +887,7 @@ impl ProgramCache { // Remove entries un/re/deployed on orphan forks let mut first_ancestor_found = false; let mut first_ancestor_env = None; - second_level.slot_versions = second_level - .slot_versions + *second_level = second_level .iter() .rev() .filter(|entry| { @@ -940,7 +936,7 @@ impl ProgramCache { }) .cloned() .collect(); - second_level.slot_versions.reverse(); + second_level.reverse(); } self.remove_programs_with_no_entries(); debug_assert!(self.latest_root_slot <= new_root_slot); @@ -974,7 +970,7 @@ impl ProgramCache { /// Extracts a subset of the programs relevant to a transaction batch /// and returns which program accounts the accounts DB needs to load. pub fn extract( - &mut self, + &self, search_for: &mut Vec<(Pubkey, (ProgramCacheMatchCriteria, u64))>, loaded_programs_for_tx_batch: &mut ProgramCacheForTxBatch, is_first_round: bool, @@ -983,8 +979,8 @@ impl ProgramCache { let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap(); let mut cooperative_loading_task = None; search_for.retain(|(key, (match_criteria, usage_count))| { - if let Some(second_level) = self.entries.get_mut(key) { - for entry in second_level.slot_versions.iter().rev() { + if let Some(second_level) = self.entries.get(key) { + for entry in second_level.iter().rev() { if entry.deployment_slot <= self.latest_root_slot || matches!( locked_fork_graph.relationship( @@ -1033,15 +1029,14 @@ impl ProgramCache { } } if cooperative_loading_task.is_none() { - // We have not selected a task so far - let second_level = self.entries.entry(*key).or_default(); - if second_level.cooperative_loading_lock.is_none() { - // Select this missing entry which is not selected by any other TX batch yet - cooperative_loading_task = Some((*key, *usage_count)); - second_level.cooperative_loading_lock = Some(( + let mut loading_entries = self.loading_entries.lock().unwrap(); + let entry = loading_entries.entry(*key); + if let Entry::Vacant(entry) = entry { + entry.insert(( loaded_programs_for_tx_batch.slot, std::thread::current().id(), )); + cooperative_loading_task = Some((*key, *usage_count)); } } true @@ -1066,12 +1061,8 @@ impl ProgramCache { key: Pubkey, loaded_program: Arc, ) -> bool { - let second_level = self.entries.entry(key).or_default(); - debug_assert_eq!( - second_level.cooperative_loading_lock, - Some((slot, std::thread::current().id())) - ); - second_level.cooperative_loading_lock = None; + let loading_thread = self.loading_entries.lock().unwrap().remove(&key); + debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id()))); // Check that it will be visible to our own fork once inserted if loaded_program.deployment_slot > self.latest_root_slot && !matches!( @@ -1107,7 +1098,6 @@ impl ProgramCache { .iter() .flat_map(|(id, second_level)| { second_level - .slot_versions .iter() .filter_map(move |program| match program.program { ProgramCacheEntryType::Loaded(_) => { @@ -1132,19 +1122,16 @@ impl ProgramCache { self.entries .iter() .flat_map(|(id, second_level)| { - second_level - .slot_versions - .iter() - .map(|program| (*id, program.clone())) + second_level.iter().map(|program| (*id, program.clone())) }) .collect() } - /// Returns the `slot_versions` of the second level for the given program id. + /// Returns the slot versions for the given program id. pub fn get_slot_versions_for_tests(&self, key: &Pubkey) -> &[Arc] { self.entries .get(key) - .map(|second_level| second_level.slot_versions.as_ref()) + .map(|second_level| second_level.as_ref()) .unwrap_or(&[]) } @@ -1205,7 +1192,6 @@ impl ProgramCache { fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc) { let second_level = self.entries.get_mut(program).expect("Cache lookup failed"); let candidate = second_level - .slot_versions .iter_mut() .find(|entry| entry == &remove_entry) .expect("Program entry not found"); @@ -1237,10 +1223,8 @@ impl ProgramCache { fn remove_programs_with_no_entries(&mut self) { let num_programs_before_removal = self.entries.len(); - self.entries.retain(|_, second_level| { - !second_level.slot_versions.is_empty() - || second_level.cooperative_loading_lock.is_some() - }); + self.entries + .retain(|_key, second_level| !second_level.is_empty()); if self.entries.len() < num_programs_before_removal { self.stats.empty_entries.fetch_add( num_programs_before_removal.saturating_sub(self.entries.len()) as u64, @@ -2072,7 +2056,7 @@ mod tests { keys.iter() .filter_map(|key| { let visible_entry = cache.entries.get(key).and_then(|second_level| { - second_level.slot_versions.iter().rev().find(|entry| { + second_level.iter().rev().find(|entry| { matches!( locked_fork_graph.relationship(entry.deployment_slot, loading_slot), BlockRelation::Equal | BlockRelation::Ancestor, diff --git a/svm/src/transaction_processor.rs b/svm/src/transaction_processor.rs index 2b75da5d97c250..d1f72e29e361ab 100644 --- a/svm/src/transaction_processor.rs +++ b/svm/src/transaction_processor.rs @@ -387,11 +387,10 @@ impl TransactionBatchProcessor { .collect(); let mut loaded_programs_for_txs = None; - let mut program_to_store = None; loop { - let (program_to_load, task_cookie, task_waiter) = { + let (program_to_store, task_cookie, task_waiter) = { // Lock the global cache. - let mut program_cache = self.program_cache.write().unwrap(); + let program_cache = self.program_cache.read().unwrap(); // Initialize our local cache. let is_first_round = loaded_programs_for_txs.is_none(); if is_first_round { @@ -401,49 +400,51 @@ impl TransactionBatchProcessor { &program_cache, )); } - // Submit our last completed loading task. - if let Some((key, program)) = program_to_store.take() { - loaded_programs_for_txs.as_mut().unwrap().loaded_missing = true; - if program_cache.finish_cooperative_loading_task(self.slot, key, program) - && limit_to_load_programs - { - // This branch is taken when there is an error in assigning a program to a - // cache slot. It is not possible to mock this error for SVM unit - // tests purposes. - let mut ret = ProgramCacheForTxBatch::new_from_cache( - self.slot, - self.epoch, - &program_cache, - ); - ret.hit_max_limit = true; - return ret; - } - } // Figure out which program needs to be loaded next. let program_to_load = program_cache.extract( &mut missing_programs, loaded_programs_for_txs.as_mut().unwrap(), is_first_round, ); + + let program_to_store = program_to_load.map(|(key, count)| { + // Load, verify and compile one program. + let program = load_program_with_pubkey( + callback, + &program_cache, + &key, + self.slot, + self.epoch, + &self.epoch_schedule, + false, + ) + .expect("called load_program_with_pubkey() with nonexistent account"); + program.tx_usage_counter.store(count, Ordering::Relaxed); + (key, program) + }); + let task_waiter = Arc::clone(&program_cache.loading_task_waiter); - (program_to_load, task_waiter.cookie(), task_waiter) + (program_to_store, task_waiter.cookie(), task_waiter) // Unlock the global cache again. }; - if let Some((key, count)) = program_to_load { - // Load, verify and compile one program. - let program = load_program_with_pubkey( - callback, - &self.program_cache.read().unwrap(), - &key, - self.slot, - self.epoch, - &self.epoch_schedule, - false, - ) - .expect("called load_program_with_pubkey() with nonexistent account"); - program.tx_usage_counter.store(count, Ordering::Relaxed); - program_to_store = Some((key, program)); + if let Some((key, program)) = program_to_store { + let mut program_cache = self.program_cache.write().unwrap(); + // Submit our last completed loading task. + if program_cache.finish_cooperative_loading_task(self.slot, key, program) + && limit_to_load_programs + { + // This branch is taken when there is an error in assigning a program to a + // cache slot. It is not possible to mock this error for SVM unit + // tests purposes. + let mut ret = ProgramCacheForTxBatch::new_from_cache( + self.slot, + self.epoch, + &program_cache, + ); + ret.hit_max_limit = true; + return ret; + } } else if missing_programs.is_empty() { break; } else {