Skip to content

Commit

Permalink
program cache: reduce contention (#1192)
Browse files Browse the repository at this point in the history
* program cache: reduce contention

Before this change we used to take the write lock to extract(). This
means that even in the ideal case (all programs are already cached),
the cache was contended by all batches and all operations were
serialized.

With this change we now take the write lock only when we store a new
entry in the cache, and take the read lock to extract(). This means
that in the common case where most/all programs are cached, there is no
contention and all batches progress in parallel.

This improves node replay perf by 20-25% on current mnb traffic.

* ProgramCache: remove SecondLevel structure
  • Loading branch information
alessandrod authored May 6, 2024
1 parent aa6c69a commit 10e5086
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 87 deletions.
86 changes: 35 additions & 51 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
saturating_add_assign,
},
std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -227,7 +227,7 @@ pub struct ProgramCacheStats {
pub prunes_orphan: AtomicU64,
/// a program got pruned because it was not recompiled for the next epoch
pub prunes_environment: AtomicU64,
/// the [SecondLevel] was empty because all slot versions got pruned
/// a program had no entries because all slot versions got pruned
pub empty_entries: AtomicU64,
}

Expand Down Expand Up @@ -578,18 +578,6 @@ impl LoadingTaskWaiter {
}
}

/// Contains all the program versions at a specific address.
#[derive(Debug, Default)]
struct SecondLevel {
/// List of all versions (across all forks) of a program sorted by the slot in which they were modified
slot_versions: Vec<Arc<ProgramCacheEntry>>,
/// `Some` if there is currently a cooperative loading task for this program address
///
/// It is possible that multiple TX batches from different slots need different versions of a program.
/// However, that can only be figured out once a program is loaded and its deployment slot is known.
cooperative_loading_lock: Option<(Slot, std::thread::ThreadId)>,
}

/// This structure is the global cache of loaded, verified and compiled programs.
///
/// It ...
Expand All @@ -608,8 +596,18 @@ struct SecondLevel {
pub struct ProgramCache<FG: ForkGraph> {
/// A two level index:
///
/// The first level is for the address at which programs are deployed and the second level for the slot (and thus also fork).
entries: HashMap<Pubkey, SecondLevel>,
/// - the first level is for the address at which programs are deployed
/// - the second level for the slot (and thus also fork), sorted by slot number.
entries: HashMap<Pubkey, Vec<Arc<ProgramCacheEntry>>>,
/// The entries that are getting loaded and have not yet finished loading.
///
/// The key is the program address, the value is a tuple of the slot in which the program is
/// being loaded and the thread ID doing the load.
///
/// It is possible that multiple TX batches from different slots need different versions of a
/// program. The deployment slot of a program is only known after load tho,
/// so all loads for a given program key are serialized.
loading_entries: Mutex<HashMap<Pubkey, (Slot, std::thread::ThreadId)>>,
/// The slot of the last rerooting
pub latest_root_slot: Slot,
/// The epoch of the last rerooting
Expand Down Expand Up @@ -776,6 +774,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
pub fn new(root_slot: Slot, root_epoch: Epoch) -> Self {
Self {
entries: HashMap::new(),
loading_entries: Mutex::new(HashMap::new()),
latest_root_slot: root_slot,
latest_root_epoch: root_epoch,
environments: ProgramRuntimeEnvironments::default(),
Expand Down Expand Up @@ -819,7 +818,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
&entry.program,
ProgramCacheEntryType::DelayVisibility
));
let slot_versions = &mut self.entries.entry(key).or_default().slot_versions;
let slot_versions = &mut self.entries.entry(key).or_default();
match slot_versions.binary_search_by(|at| {
at.effective_slot
.cmp(&entry.effective_slot)
Expand Down Expand Up @@ -860,9 +859,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {

pub fn prune_by_deployment_slot(&mut self, slot: Slot) {
for second_level in self.entries.values_mut() {
second_level
.slot_versions
.retain(|entry| entry.deployment_slot != slot);
second_level.retain(|entry| entry.deployment_slot != slot);
}
self.remove_programs_with_no_entries();
}
Expand Down Expand Up @@ -890,8 +887,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
// Remove entries un/re/deployed on orphan forks
let mut first_ancestor_found = false;
let mut first_ancestor_env = None;
second_level.slot_versions = second_level
.slot_versions
*second_level = second_level
.iter()
.rev()
.filter(|entry| {
Expand Down Expand Up @@ -940,7 +936,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
})
.cloned()
.collect();
second_level.slot_versions.reverse();
second_level.reverse();
}
self.remove_programs_with_no_entries();
debug_assert!(self.latest_root_slot <= new_root_slot);
Expand Down Expand Up @@ -974,7 +970,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
/// Extracts a subset of the programs relevant to a transaction batch
/// and returns which program accounts the accounts DB needs to load.
pub fn extract(
&mut self,
&self,
search_for: &mut Vec<(Pubkey, (ProgramCacheMatchCriteria, u64))>,
loaded_programs_for_tx_batch: &mut ProgramCacheForTxBatch,
is_first_round: bool,
Expand All @@ -983,8 +979,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {
let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap();
let mut cooperative_loading_task = None;
search_for.retain(|(key, (match_criteria, usage_count))| {
if let Some(second_level) = self.entries.get_mut(key) {
for entry in second_level.slot_versions.iter().rev() {
if let Some(second_level) = self.entries.get(key) {
for entry in second_level.iter().rev() {
if entry.deployment_slot <= self.latest_root_slot
|| matches!(
locked_fork_graph.relationship(
Expand Down Expand Up @@ -1033,15 +1029,14 @@ impl<FG: ForkGraph> ProgramCache<FG> {
}
}
if cooperative_loading_task.is_none() {
// We have not selected a task so far
let second_level = self.entries.entry(*key).or_default();
if second_level.cooperative_loading_lock.is_none() {
// Select this missing entry which is not selected by any other TX batch yet
cooperative_loading_task = Some((*key, *usage_count));
second_level.cooperative_loading_lock = Some((
let mut loading_entries = self.loading_entries.lock().unwrap();
let entry = loading_entries.entry(*key);
if let Entry::Vacant(entry) = entry {
entry.insert((
loaded_programs_for_tx_batch.slot,
std::thread::current().id(),
));
cooperative_loading_task = Some((*key, *usage_count));
}
}
true
Expand All @@ -1066,12 +1061,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {
key: Pubkey,
loaded_program: Arc<ProgramCacheEntry>,
) -> bool {
let second_level = self.entries.entry(key).or_default();
debug_assert_eq!(
second_level.cooperative_loading_lock,
Some((slot, std::thread::current().id()))
);
second_level.cooperative_loading_lock = None;
let loading_thread = self.loading_entries.lock().unwrap().remove(&key);
debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id())));
// Check that it will be visible to our own fork once inserted
if loaded_program.deployment_slot > self.latest_root_slot
&& !matches!(
Expand Down Expand Up @@ -1107,7 +1098,6 @@ impl<FG: ForkGraph> ProgramCache<FG> {
.iter()
.flat_map(|(id, second_level)| {
second_level
.slot_versions
.iter()
.filter_map(move |program| match program.program {
ProgramCacheEntryType::Loaded(_) => {
Expand All @@ -1132,19 +1122,16 @@ impl<FG: ForkGraph> ProgramCache<FG> {
self.entries
.iter()
.flat_map(|(id, second_level)| {
second_level
.slot_versions
.iter()
.map(|program| (*id, program.clone()))
second_level.iter().map(|program| (*id, program.clone()))
})
.collect()
}

/// Returns the `slot_versions` of the second level for the given program id.
/// Returns the slot versions for the given program id.
pub fn get_slot_versions_for_tests(&self, key: &Pubkey) -> &[Arc<ProgramCacheEntry>] {
self.entries
.get(key)
.map(|second_level| second_level.slot_versions.as_ref())
.map(|second_level| second_level.as_ref())
.unwrap_or(&[])
}

Expand Down Expand Up @@ -1205,7 +1192,6 @@ impl<FG: ForkGraph> ProgramCache<FG> {
fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc<ProgramCacheEntry>) {
let second_level = self.entries.get_mut(program).expect("Cache lookup failed");
let candidate = second_level
.slot_versions
.iter_mut()
.find(|entry| entry == &remove_entry)
.expect("Program entry not found");
Expand Down Expand Up @@ -1237,10 +1223,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {

fn remove_programs_with_no_entries(&mut self) {
let num_programs_before_removal = self.entries.len();
self.entries.retain(|_, second_level| {
!second_level.slot_versions.is_empty()
|| second_level.cooperative_loading_lock.is_some()
});
self.entries
.retain(|_key, second_level| !second_level.is_empty());
if self.entries.len() < num_programs_before_removal {
self.stats.empty_entries.fetch_add(
num_programs_before_removal.saturating_sub(self.entries.len()) as u64,
Expand Down Expand Up @@ -2072,7 +2056,7 @@ mod tests {
keys.iter()
.filter_map(|key| {
let visible_entry = cache.entries.get(key).and_then(|second_level| {
second_level.slot_versions.iter().rev().find(|entry| {
second_level.iter().rev().find(|entry| {
matches!(
locked_fork_graph.relationship(entry.deployment_slot, loading_slot),
BlockRelation::Equal | BlockRelation::Ancestor,
Expand Down
73 changes: 37 additions & 36 deletions svm/src/transaction_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
.collect();

let mut loaded_programs_for_txs = None;
let mut program_to_store = None;
loop {
let (program_to_load, task_cookie, task_waiter) = {
let (program_to_store, task_cookie, task_waiter) = {
// Lock the global cache.
let mut program_cache = self.program_cache.write().unwrap();
let program_cache = self.program_cache.read().unwrap();
// Initialize our local cache.
let is_first_round = loaded_programs_for_txs.is_none();
if is_first_round {
Expand All @@ -401,49 +400,51 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
&program_cache,
));
}
// Submit our last completed loading task.
if let Some((key, program)) = program_to_store.take() {
loaded_programs_for_txs.as_mut().unwrap().loaded_missing = true;
if program_cache.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
// This branch is taken when there is an error in assigning a program to a
// cache slot. It is not possible to mock this error for SVM unit
// tests purposes.
let mut ret = ProgramCacheForTxBatch::new_from_cache(
self.slot,
self.epoch,
&program_cache,
);
ret.hit_max_limit = true;
return ret;
}
}
// Figure out which program needs to be loaded next.
let program_to_load = program_cache.extract(
&mut missing_programs,
loaded_programs_for_txs.as_mut().unwrap(),
is_first_round,
);

let program_to_store = program_to_load.map(|(key, count)| {
// Load, verify and compile one program.
let program = load_program_with_pubkey(
callback,
&program_cache,
&key,
self.slot,
self.epoch,
&self.epoch_schedule,
false,
)
.expect("called load_program_with_pubkey() with nonexistent account");
program.tx_usage_counter.store(count, Ordering::Relaxed);
(key, program)
});

let task_waiter = Arc::clone(&program_cache.loading_task_waiter);
(program_to_load, task_waiter.cookie(), task_waiter)
(program_to_store, task_waiter.cookie(), task_waiter)
// Unlock the global cache again.
};

if let Some((key, count)) = program_to_load {
// Load, verify and compile one program.
let program = load_program_with_pubkey(
callback,
&self.program_cache.read().unwrap(),
&key,
self.slot,
self.epoch,
&self.epoch_schedule,
false,
)
.expect("called load_program_with_pubkey() with nonexistent account");
program.tx_usage_counter.store(count, Ordering::Relaxed);
program_to_store = Some((key, program));
if let Some((key, program)) = program_to_store {
let mut program_cache = self.program_cache.write().unwrap();
// Submit our last completed loading task.
if program_cache.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
// This branch is taken when there is an error in assigning a program to a
// cache slot. It is not possible to mock this error for SVM unit
// tests purposes.
let mut ret = ProgramCacheForTxBatch::new_from_cache(
self.slot,
self.epoch,
&program_cache,
);
ret.hit_max_limit = true;
return ret;
}
} else if missing_programs.is_empty() {
break;
} else {
Expand Down

0 comments on commit 10e5086

Please sign in to comment.