diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 1f609660..6ee1067d 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -107,6 +107,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.7" @@ -355,6 +361,7 @@ name = "bitbazaar" version = "0.1.3" dependencies = [ "aes-gcm-siv", + "arc-swap", "async-semaphore", "axum-extra", "bincode", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b469e217..2187dbf4 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -36,6 +36,7 @@ itertools = "0.12" tracing-core = "0.1" chrono = { version = '0.4' } chrono-humanize = { version = "0.2" } +arc-swap = "1" # Not in default, but randomly useful in features: strum = { version = "0.25", features = ["derive"], optional = true } diff --git a/rust/bitbazaar/misc/refreshable.rs b/rust/bitbazaar/misc/refreshable.rs index 37e0e465..5026a78a 100644 --- a/rust/bitbazaar/misc/refreshable.rs +++ b/rust/bitbazaar/misc/refreshable.rs @@ -1,7 +1,7 @@ use std::sync::{atomic::AtomicU64, Arc}; +use arc_swap::ArcSwap; use futures::Future; -use parking_lot::Mutex; use crate::prelude::*; @@ -9,7 +9,7 @@ use crate::prelude::*; /// The data is set to refresh at a certain interval (triggered on access), or can be forcefully refreshed. pub struct Refreshable>, F: Fn() -> Fut> { // Don't want to hold lock when giving out data, so opposite to normal pattern: - data: Mutex>, + data: ArcSwap, getter: F, last_updated_utc_ms: AtomicU64, update_every_ms: u64, @@ -24,46 +24,45 @@ impl>, F: Fn() -> Fut> Refreshable RResult { let data = getter().await?; Ok(Self { - data: Mutex::new(Arc::new(data)), + data: ArcSwap::new(Arc::new(data)), getter, last_updated_utc_ms: AtomicU64::new(utc_now_ms()), update_every_ms: update_every.as_millis() as u64, }) } + /// Update T from outside. + pub fn set(&self, new_data: T) { + self.last_updated_utc_ms + .store(utc_now_ms(), std::sync::atomic::Ordering::Relaxed); + self.data.store(Arc::new(new_data)); + } + /// Force a refresh of the data. - pub async fn force_refresh(&self) -> RResult<(), AnyErr> { + pub async fn refresh(&self) -> RResult<(), AnyErr> { let new_data = (self.getter)().await?; self.last_updated_utc_ms .store(utc_now_ms(), std::sync::atomic::Ordering::Relaxed); - *self.data.lock() = Arc::new(new_data); + self.data.store(Arc::new(new_data)); Ok(()) } /// Get the underlying data for use. /// If the data is stale, it will be refreshed before returning. - pub async fn get(&self) -> RResult, AnyErr> { - let now = utc_now_ms(); - let last_updated = self - .last_updated_utc_ms - .load(std::sync::atomic::Ordering::Relaxed); - + /// + /// NOTE: the implementation of the guards means not too many should be alive at once, and keeping across await points should be discouraged. + /// If you need long access to the underlying data, consider cloning it. + pub async fn get(&self) -> RResult>, AnyErr> { // Refresh if now stale: - let replacement_data = if now - last_updated > self.update_every_ms { - let new_data = (self.getter)().await?; - self.last_updated_utc_ms - .store(now, std::sync::atomic::Ordering::Relaxed); - Some(new_data) - } else { - None - }; - - // Temporarily lock to access: - let mut data = self.data.lock(); - if let Some(new_data) = replacement_data { - *data = Arc::new(new_data); + if utc_now_ms() + - self + .last_updated_utc_ms + .load(std::sync::atomic::Ordering::Relaxed) + > self.update_every_ms + { + self.refresh().await?; } - Ok(data.clone()) + Ok(self.data.load()) } }