Skip to content

Commit

Permalink
AcctIdx: only advance age on thread 0 (solana-labs#25943)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Jun 15, 2022
1 parent 8a3d48b commit ae37359
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
10 changes: 7 additions & 3 deletions runtime/src/accounts_index_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ impl BgThreads {
storage: &Arc<BucketMapHolder<T>>,
in_mem: &[Arc<InMemAccountsIndex<T>>],
threads: usize,
can_advance_age: bool,
) -> Self {
// stop signal used for THIS batch of bg threads
let exit = Arc::new(AtomicBool::default());
let handles = Some(
(0..threads)
.into_iter()
.map(|_| {
.map(|idx| {
// the first thread we start is special
let can_advance_age = can_advance_age && idx == 0;
let storage_ = Arc::clone(storage);
let exit_ = Arc::clone(&exit);
let in_mem_ = in_mem.to_vec();
Expand All @@ -71,7 +74,7 @@ impl BgThreads {
Builder::new()
.name("solana-idx-flusher".to_string())
.spawn(move || {
storage_.background(exit_, in_mem_);
storage_.background(exit_, in_mem_, can_advance_age);
})
.unwrap()
})
Expand Down Expand Up @@ -113,6 +116,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
&self.storage,
&self.in_mem,
Self::num_threads(),
false, // cannot advance age from any of these threads
));
}
self.storage.set_startup(value);
Expand Down Expand Up @@ -157,7 +161,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
.collect::<Vec<_>>();

Self {
_bg_threads: BgThreads::new(&storage, &in_mem, threads),
_bg_threads: BgThreads::new(&storage, &in_mem, threads, true),
storage,
in_mem,
startup_worker_threads: Mutex::default(),
Expand Down
33 changes: 23 additions & 10 deletions runtime/src/bucket_map_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,13 @@ impl<T: IndexValue> BucketMapHolder<T> {
self.age.load(Ordering::Acquire)
}

pub fn bucket_flushed_at_current_age(&self) {
pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) {
let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel);
self.maybe_advance_age_internal(
self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
);
if can_advance_age {
self.maybe_advance_age_internal(
self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
);
}
}

/// have all buckets been flushed at the current age?
Expand Down Expand Up @@ -297,7 +299,12 @@ impl<T: IndexValue> BucketMapHolder<T> {
}

// intended to execute in a bg thread
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
pub fn background(
&self,
exit: Arc<AtomicBool>,
in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
can_advance_age: bool,
) {
let bins = in_mem.len();
let flush = self.disk.is_some();
let mut throttling_wait_ms = None;
Expand All @@ -312,6 +319,10 @@ impl<T: IndexValue> BucketMapHolder<T> {
.remaining_until_next_interval(self.age_interval_ms()),
self.stats.remaining_until_next_interval(),
);
if !can_advance_age {
// if this thread cannot advance age, then make sure we don't sleep 0
wait = wait.max(1);
}
if let Some(throttling_wait_ms) = throttling_wait_ms {
self.stats
.bg_throttling_wait_us
Expand All @@ -327,7 +338,9 @@ impl<T: IndexValue> BucketMapHolder<T> {
.bg_waiting_us
.fetch_add(m.as_us(), Ordering::Relaxed);
// likely some time has elapsed. May have been waiting for age time interval to elapse.
self.maybe_advance_age();
if can_advance_age {
self.maybe_advance_age();
}
}
throttling_wait_ms = None;

Expand All @@ -339,7 +352,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
for _ in 0..bins {
if flush {
let index = self.next_bucket_to_flush();
in_mem[index].flush();
in_mem[index].flush(can_advance_age);
}
self.stats.report_stats(self);
if self.all_buckets_flushed_at_current_age() {
Expand Down Expand Up @@ -453,11 +466,11 @@ pub mod tests {
let time = AGE_MS * 8 / 3;
let expected = (time / AGE_MS) as Age;
let now = Instant::now();
test.bucket_flushed_at_current_age(); // done with age 0
test.bucket_flushed_at_current_age(true); // done with age 0
(0..threads).into_par_iter().for_each(|_| {
while now.elapsed().as_millis() < (time as u128) {
if test.maybe_advance_age() {
test.bucket_flushed_at_current_age();
test.bucket_flushed_at_current_age(true);
}
}
});
Expand All @@ -472,7 +485,7 @@ pub mod tests {
assert_eq!(test.current_age(), 0);
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
test.bucket_flushed_at_current_age();
test.bucket_flushed_at_current_age(true);
}
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
test.maybe_advance_age();
Expand Down
16 changes: 8 additions & 8 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}

/// called after flush scans this bucket at the current age
fn set_has_aged(&self, age: Age) {
fn set_has_aged(&self, age: Age, can_advance_age: bool) {
self.last_age_flushed.store(age, Ordering::Release);
self.storage.bucket_flushed_at_current_age();
self.storage.bucket_flushed_at_current_age(can_advance_age);
}

fn last_age_flushed(&self) -> Age {
Expand Down Expand Up @@ -876,9 +876,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
self.stop_evictions_changes.load(Ordering::Acquire)
}

pub(crate) fn flush(&self) {
pub(crate) fn flush(&self, can_advance_age: bool) {
if let Some(flush_guard) = FlushGuard::lock(&self.flushing_active) {
self.flush_internal(&flush_guard)
self.flush_internal(&flush_guard, can_advance_age)
}
}

Expand Down Expand Up @@ -983,7 +983,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard) {
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
let iterate_for_age = self.get_should_age(current_age);
let startup = self.storage.get_startup();
Expand Down Expand Up @@ -1093,7 +1093,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
if iterate_for_age {
// completed iteration of the buckets at the current age
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age);
self.set_has_aged(current_age, can_advance_age);
}
}
}
Expand Down Expand Up @@ -1555,13 +1555,13 @@ mod tests {
let test = new_for_test::<u64>();
assert!(test.get_should_age(test.storage.current_age()));
assert_eq!(test.storage.count_buckets_flushed(), 0);
test.set_has_aged(0);
test.set_has_aged(0, true);
assert!(!test.get_should_age(test.storage.current_age()));
assert_eq!(test.storage.count_buckets_flushed(), 1);
// simulate rest of buckets aging
for _ in 1..BINS_FOR_TESTING {
assert!(!test.storage.all_buckets_flushed_at_current_age());
test.storage.bucket_flushed_at_current_age();
test.storage.bucket_flushed_at_current_age(true);
}
assert!(test.storage.all_buckets_flushed_at_current_age());
// advance age
Expand Down

0 comments on commit ae37359

Please sign in to comment.