Skip to content

Commit

Permalink
Improving refreshable
Browse files Browse the repository at this point in the history
  • Loading branch information
zakstucke committed Jun 22, 2024
1 parent 9bb6a64 commit a4cf143
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 25 deletions.
7 changes: 7 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ itertools = "0.12"
tracing-core = "0.1"
chrono = { version = '0.4' }
chrono-humanize = { version = "0.2" }
arc-swap = "1"

# Not in default, but randomly useful in features:
strum = { version = "0.25", features = ["derive"], optional = true }
Expand Down
49 changes: 24 additions & 25 deletions rust/bitbazaar/misc/refreshable.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::sync::{atomic::AtomicU64, Arc};

use arc_swap::ArcSwap;
use futures::Future;
use parking_lot::Mutex;

use crate::prelude::*;

/// A data wrapper that automatically updates the data given out when deemed stale.
/// The data is set to refresh at a certain interval (triggered on access), or can be forcefully refreshed.
pub struct Refreshable<T, Fut: Future<Output = RResult<T, AnyErr>>, F: Fn() -> Fut> {
// Don't want to hold lock when giving out data, so opposite to normal pattern:
data: Mutex<Arc<T>>,
data: ArcSwap<T>,
getter: F,
last_updated_utc_ms: AtomicU64,
update_every_ms: u64,
Expand All @@ -24,46 +24,45 @@ impl<T, Fut: Future<Output = RResult<T, AnyErr>>, F: Fn() -> Fut> Refreshable<T,
pub async fn new(update_every: std::time::Duration, getter: F) -> RResult<Self, AnyErr> {
let data = getter().await?;
Ok(Self {
data: Mutex::new(Arc::new(data)),
data: ArcSwap::new(Arc::new(data)),
getter,
last_updated_utc_ms: AtomicU64::new(utc_now_ms()),
update_every_ms: update_every.as_millis() as u64,
})
}

/// Update T from outside.
pub fn set(&self, new_data: T) {
self.last_updated_utc_ms
.store(utc_now_ms(), std::sync::atomic::Ordering::Relaxed);
self.data.store(Arc::new(new_data));
}

/// Force a refresh of the data.
pub async fn force_refresh(&self) -> RResult<(), AnyErr> {
pub async fn refresh(&self) -> RResult<(), AnyErr> {
let new_data = (self.getter)().await?;
self.last_updated_utc_ms
.store(utc_now_ms(), std::sync::atomic::Ordering::Relaxed);
*self.data.lock() = Arc::new(new_data);
self.data.store(Arc::new(new_data));
Ok(())
}

/// Get the underlying data for use.
/// If the data is stale, it will be refreshed before returning.
pub async fn get(&self) -> RResult<Arc<T>, AnyErr> {
let now = utc_now_ms();
let last_updated = self
.last_updated_utc_ms
.load(std::sync::atomic::Ordering::Relaxed);

///
/// NOTE: the implementation of the guards means not too many should be alive at once, and keeping across await points should be discouraged.
/// If you need long access to the underlying data, consider cloning it.
pub async fn get(&self) -> RResult<arc_swap::Guard<Arc<T>>, AnyErr> {
// Refresh if now stale:
let replacement_data = if now - last_updated > self.update_every_ms {
let new_data = (self.getter)().await?;
self.last_updated_utc_ms
.store(now, std::sync::atomic::Ordering::Relaxed);
Some(new_data)
} else {
None
};

// Temporarily lock to access:
let mut data = self.data.lock();
if let Some(new_data) = replacement_data {
*data = Arc::new(new_data);
if utc_now_ms()
- self
.last_updated_utc_ms
.load(std::sync::atomic::Ordering::Relaxed)
> self.update_every_ms
{
self.refresh().await?;
}
Ok(data.clone())
Ok(self.data.load())
}
}

Expand Down

0 comments on commit a4cf143

Please sign in to comment.