Skip to content

Commit

Permalink
program cache: index v2: extract
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalojoec committed May 11, 2024
1 parent eaa2b11 commit 3b68e5c
Showing 1 changed file with 154 additions and 17 deletions.
171 changes: 154 additions & 17 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
invoke_context::{BuiltinFunctionWithContext, InvokeContext},
timings::ExecuteDetailsTimings,
},
itertools::Itertools,
log::{debug, error, log_enabled, trace},
percentage::PercentageInteger,
rand::{thread_rng, Rng},
Expand All @@ -21,6 +22,7 @@ use {
saturating_add_assign,
},
std::{
borrow::Borrow,
collections::{hash_map::Entry, HashMap, HashSet},
fmt::{Debug, Formatter},
sync::{
Expand Down Expand Up @@ -616,6 +618,16 @@ struct IndexV2 {
/// A lightweight index designed for fast lookups of entries by their
/// deployment slot.
deployment_slot_index: HashMap<Slot, HashSet<IndexV2Key>>,
/// The entries that are getting loaded and have not yet finished loading.
///
/// The key is the program address, the value is a tuple of the slot in
/// which the program is being loaded and the thread ID doing the load.
///
/// It is possible that multiple TX batches from different slots need
/// different versions of a program. The deployment slot of a program is
/// only known after load tho, so all loads for a given program key are
/// serialized.
loading_entries: Mutex<HashMap<Pubkey, (Slot, std::thread::ThreadId)>>,
}

impl IndexV2 {
Expand All @@ -625,9 +637,44 @@ impl IndexV2 {
entries: HashMap::new(),
address_index: HashMap::new(),
deployment_slot_index: HashMap::new(),
loading_entries: Mutex::new(HashMap::new()),
}
}

// Find the first matching entry whose deployment slot is both:
// * Less than or equal to the current root slot.
// * An ancestor or equal of the provided slot.
fn find_qualified_entries_for_extraction<FG: ForkGraph>(
&self,
address: &Pubkey,
latest_root_slot: Slot,
transaction_batch_slot: Slot,
fork_graph: &FG,
) -> Vec<Arc<ProgramCacheEntry>> {
let mut qualified_entries = Vec::new();
for slot in self
.deployment_slot_index
.keys()
.filter(|slot| {
**slot <= latest_root_slot
|| matches!(
fork_graph.relationship(**slot, transaction_batch_slot),
BlockRelation::Ancestor | BlockRelation::Equal
)
})
.sorted_by(|a, b| b.cmp(a))
{
if let Some(keys) = self.deployment_slot_index.get(slot) {
for key in keys.iter().filter(|key| key.address == *address) {
if let Some(entry) = self.entries.get(key) {
qualified_entries.push(entry.clone());
}
}
}
}
qualified_entries
}

/// Finds and returns a mutable reference to a _loaded_ program entry.
/// Lookup time is O(1).
fn get_entry_mut(&mut self, entry_key: &IndexV2Key) -> Option<&mut Arc<ProgramCacheEntry>> {
Expand Down Expand Up @@ -1279,7 +1326,69 @@ impl<FG: ForkGraph> ProgramCache<FG> {
true
});
}
IndexImplementation::V2(_) => unimplemented!(),
IndexImplementation::V2(index_v2) => {
search_for.retain(|(key, (match_criteria, usage_count))| {
// We need to find an entry whose deployment slot is both:
// * Less than or equal to the current root slot.
// * An ancestor of the slot of the transaction batch.
for qualified_entry in index_v2.find_qualified_entries_for_extraction::<FG>(
key,
self.latest_root_slot,
loaded_programs_for_tx_batch.slot,
locked_fork_graph.borrow(),
) {
let entry_to_return = if loaded_programs_for_tx_batch.slot
>= qualified_entry.effective_slot
&& Self::matches_environment(
&qualified_entry,
&loaded_programs_for_tx_batch.environments,
) {
if !Self::matches_criteria(&qualified_entry, match_criteria) {
break;
}
if let ProgramCacheEntryType::Unloaded(_environment) =
&qualified_entry.program
{
break;
}
qualified_entry.clone()
} else if qualified_entry.is_implicit_delay_visibility_tombstone(
loaded_programs_for_tx_batch.slot,
) {
// Found a program entry on the current fork, but it's not effective
// yet. It indicates that the program has delayed visibility. Return
// the tombstone to reflect that.
Arc::new(ProgramCacheEntry::new_tombstone(
qualified_entry.deployment_slot,
qualified_entry.account_owner,
ProgramCacheEntryType::DelayVisibility,
))
} else {
continue;
};
entry_to_return.update_access_slot(loaded_programs_for_tx_batch.slot);
entry_to_return
.tx_usage_counter
.fetch_add(*usage_count, Ordering::Relaxed);
loaded_programs_for_tx_batch
.entries
.insert(*key, entry_to_return);
return false;
}
if cooperative_loading_task.is_none() {
let mut loading_entries = index_v2.loading_entries.lock().unwrap();
let entry = loading_entries.entry(*key);
if let Entry::Vacant(entry) = entry {
entry.insert((
loaded_programs_for_tx_batch.slot,
std::thread::current().id(),
));
cooperative_loading_task = Some((*key, *usage_count));
}
}
true
});
}
}
drop(locked_fork_graph);
if is_first_round {
Expand Down Expand Up @@ -1325,7 +1434,27 @@ impl<FG: ForkGraph> ProgramCache<FG> {
self.loading_task_waiter.notify();
was_occupied
}
IndexImplementation::V2(_) => unimplemented!(),
IndexImplementation::V2(index_v2) => {
let loading_thread = index_v2.loading_entries.get_mut().unwrap().remove(&key);
debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id())));
// Check that it will be visible to our own fork once inserted
if loaded_program.deployment_slot > self.latest_root_slot
&& !matches!(
self.fork_graph
.as_ref()
.unwrap()
.read()
.unwrap()
.relationship(loaded_program.deployment_slot, slot),
BlockRelation::Equal | BlockRelation::Ancestor
)
{
self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed);
}
let was_occupied = self.assign_program(key, loaded_program);
self.loading_task_waiter.notify();
was_occupied
}
}
}

Expand Down Expand Up @@ -2414,11 +2543,15 @@ mod tests {
working_slot: Slot,
) -> bool {
assert_eq!(extracted.slot, working_slot);
extracted
let res = extracted
.entries
.get(program)
.map(|entry| entry.deployment_slot == deployment_slot)
.unwrap_or(false)
.unwrap_or(false);
if !res {
println!("Deployment slot not found: {:?}", deployment_slot);
}
res
}

fn match_missing(
Expand All @@ -2429,9 +2562,10 @@ mod tests {
missing.iter().any(|(key, _)| key == program) == expected_result
}

#[test]
fn test_fork_extract_and_prune() {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(false);
#[test_case(false ; "use_index_v1")]
// #[test_case(true ; "use_index_v2")] // Can't do this without `prune`.
fn test_fork_extract_and_prune(use_index_v2: bool) {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(use_index_v2);

// Fork graph created for the test
// 0
Expand Down Expand Up @@ -2624,9 +2758,10 @@ mod tests {
assert!(match_slot(&extracted, &program4, 15, 23));
}

#[test]
fn test_extract_using_deployment_slot() {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(false);
#[test_case(false ; "use_index_v1")]
#[test_case(true ; "use_index_v2")]
fn test_extract_using_deployment_slot(use_index_v2: bool) {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(use_index_v2);

// Fork graph created for the test
// 0
Expand Down Expand Up @@ -2681,9 +2816,10 @@ mod tests {
assert!(match_slot(&extracted, &program2, 11, 12));
}

#[test]
fn test_extract_unloaded() {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(false);
#[test_case(false ; "use_index_v1")]
#[test_case(true ; "use_index_v2")]
fn test_extract_unloaded(use_index_v2: bool) {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(use_index_v2);

// Fork graph created for the test
// 0
Expand Down Expand Up @@ -2757,9 +2893,10 @@ mod tests {
assert!(match_missing(&missing, &program3, true));
}

#[test]
fn test_extract_nonexistent() {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(false);
#[test_case(false ; "use_index_v1")]
#[test_case(true ; "use_index_v2")]
fn test_extract_nonexistent(use_index_v2: bool) {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(use_index_v2);
let fork_graph = TestForkGraphSpecific::default();
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
Expand Down Expand Up @@ -2861,7 +2998,7 @@ mod tests {
}

#[test_case(false ; "index v1")]
// #[test_case(true ; "index v2")] // Can't do this yet without `extract`.
#[test_case(true ; "index v2")]
fn test_prune_by_deployment_slot(use_index_v2: bool) {
let mut cache = new_mock_cache::<TestForkGraphSpecific>(use_index_v2);

Expand Down

0 comments on commit 3b68e5c

Please sign in to comment.