Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in get_with and friend methods of future::Cache #330

Merged
merged 5 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Moka Cache — Change Log

## Version 0.12.1

### Fixed

- Fixed memory leak in `future::Cache` that occurred when `get_with()`,
`entry().or_insert_with()`, and similar methods were used ([#329][gh-issue-0329]).
- This bug was introduced in `v0.12.0`. Versions prior to `v0.12.0` do not
have this bug.


## Version 0.12.0

> **Note**
Expand Down Expand Up @@ -702,6 +712,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-Swatinem]: https://github.com/Swatinem
[gh-tinou98]: https://github.com/tinou98

[gh-issue-0329]: https://github.com/moka-rs/moka/issues/329/
[gh-issue-0322]: https://github.com/moka-rs/moka/issues/322/
[gh-issue-0255]: https://github.com/moka-rs/moka/issues/255/
[gh-issue-0252]: https://github.com/moka-rs/moka/issues/252/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.0"
version = "0.12.1"
edition = "2018"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down
2 changes: 1 addition & 1 deletion src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ where

// TODO: Instead using Arc<AtomicU8> to check if the actual operation was
// insert or update, check the return value of insert_with_or_modify. If it
// is_some, the value was inserted, otherwise the value was updated.
// is_some, the value was updated, otherwise the value was inserted.

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
Expand Down
61 changes: 38 additions & 23 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ where
S: BuildHasher,
{
is_waiter_value_set: bool,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiters: TrioArc<WaiterMap<K, V, S>>,
write_lock: RwLockWriteGuard<'a, WaiterValue<V>>,
}
Expand All @@ -71,15 +71,15 @@ where
S: BuildHasher,
{
fn new(
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiters: TrioArc<WaiterMap<K, V, S>>,
write_lock: RwLockWriteGuard<'a, WaiterValue<V>>,
) -> Self {
Self {
is_waiter_value_set: false,
cht_key,
hash,
w_key,
w_hash,
waiters,
write_lock,
}
Expand All @@ -103,7 +103,7 @@ where
// has been aborted. Remove our waiter to prevent the issue described in
// https://github.com/moka-rs/moka/issues/59
*self.write_lock = WaiterValue::EnclosingFutureAborted;
remove_waiter(&self.waiters, self.cht_key.clone(), self.hash);
remove_waiter(&self.waiters, self.w_key.clone(), self.w_hash);
self.is_waiter_value_set = true;
}
}
Expand Down Expand Up @@ -147,8 +147,8 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) async fn try_init_or_read<'a, C, I, O, E>(
&'a self,
key: &Arc<K>,
hash: u64,
c_key: &Arc<K>,
c_hash: u64,
type_id: TypeId,
cache: &C,
ignore_if: Arc<Mutex<Option<I>>>,
Expand All @@ -169,35 +169,35 @@ where
const MAX_RETRIES: usize = 200;
let mut retries = 0;

let cht_key = (Arc::clone(key), type_id);
let (w_key, w_hash) = waiter_key_hash(&self.waiters, c_key, type_id);

loop {
let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing));
let lock = waiter.write().await;

match try_insert_waiter(&self.waiters, cht_key.clone(), hash, &waiter) {
match try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) {
None => {
// Our waiter was inserted.

// Create a guard. This will ensure to remove our waiter when the
// enclosing future has been aborted:
// https://github.com/moka-rs/moka/issues/59
let mut waiter_guard = WaiterGuard::new(
cht_key.clone(),
hash,
w_key.clone(),
w_hash,
TrioArc::clone(&self.waiters),
lock,
);

// Check if the value has already been inserted by other thread.
if let Some(value) = cache
.get_without_recording(key, hash, ignore_if.lock().await.as_mut())
.get_without_recording(c_key, c_hash, ignore_if.lock().await.as_mut())
.await
{
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone())));
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
return InitResult::ReadExisting(value);
}

Expand All @@ -209,7 +209,7 @@ where
Ok(value) => {
let (waiter_val, init_res) = match post_init(value) {
Ok(value) => {
cache.insert(Arc::clone(key), hash, value.clone()).await;
cache.insert(Arc::clone(c_key), c_hash, value.clone()).await;
(
WaiterValue::Ready(Ok(value.clone())),
InitResult::Initialized(value),
Expand All @@ -224,14 +224,14 @@ where
}
};
waiter_guard.set_waiter_value(waiter_val);
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
return init_res;
}
// Panicked.
Err(payload) => {
waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked);
// Remove the waiter so that others can retry.
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
resume_unwind(payload);
}
} // The lock will be unlocked here.
Expand Down Expand Up @@ -311,27 +311,42 @@ where
}

#[inline]
fn remove_waiter<K, V, S>(waiter_map: &WaiterMap<K, V, S>, cht_key: (Arc<K>, TypeId), hash: u64)
fn remove_waiter<K, V, S>(waiter_map: &WaiterMap<K, V, S>, w_key: (Arc<K>, TypeId), w_hash: u64)
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
waiter_map.remove(hash, |k| k == &cht_key);
waiter_map.remove(w_hash, |k| k == &w_key);
}

#[inline]
fn try_insert_waiter<K, V, S>(
waiter_map: &WaiterMap<K, V, S>,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiter: &Waiter<V>,
) -> Option<Waiter<V>>
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
let waiter = TrioArc::clone(waiter);
waiter_map.insert_if_not_present(cht_key, hash, waiter)
waiter_map.insert_if_not_present(w_key, w_hash, waiter)
}

#[inline]
fn waiter_key_hash<K, V, S>(
waiter_map: &WaiterMap<K, V, S>,
c_key: &Arc<K>,
type_id: TypeId,
) -> ((Arc<K>, TypeId), u64)
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
let w_key = (Arc::clone(c_key), type_id);
let w_hash = waiter_map.hash(&w_key);
(w_key, w_hash)
}

fn panic_if_retry_exhausted_for_panicking(retries: usize, max: usize) {
Expand Down
28 changes: 14 additions & 14 deletions src/sync/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,21 @@ where
const MAX_RETRIES: usize = 200;
let mut retries = 0;

let (cht_key, hash) = self.cht_key_hash(key, type_id);
let (w_key, w_hash) = self.waiter_key_hash(key, type_id);

loop {
let waiter = TrioArc::new(RwLock::new(None));
let mut lock = waiter.write();

match self.try_insert_waiter(cht_key.clone(), hash, &waiter) {
match self.try_insert_waiter(w_key.clone(), w_hash, &waiter) {
None => {
// Our waiter was inserted.
// Check if the value has already been inserted by other thread.
if let Some(value) = get() {
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
*lock = Some(Ok(value.clone()));
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
return InitResult::ReadExisting(value);
}

Expand All @@ -106,14 +106,14 @@ where
}
};
*lock = waiter_val;
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
return init_res;
}
// Panicked.
Err(payload) => {
*lock = None;
// Remove the waiter so that others can retry.
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
resume_unwind(payload);
}
} // The write lock will be unlocked here.
Expand Down Expand Up @@ -184,25 +184,25 @@ where
}

#[inline]
fn remove_waiter(&self, cht_key: (Arc<K>, TypeId), hash: u64) {
self.waiters.remove(hash, |k| k == &cht_key);
fn remove_waiter(&self, w_key: (Arc<K>, TypeId), w_hash: u64) {
self.waiters.remove(w_hash, |k| k == &w_key);
}

#[inline]
fn try_insert_waiter(
&self,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiter: &Waiter<V>,
) -> Option<Waiter<V>> {
let waiter = TrioArc::clone(waiter);
self.waiters.insert_if_not_present(cht_key, hash, waiter)
self.waiters.insert_if_not_present(w_key, w_hash, waiter)
}

#[inline]
fn cht_key_hash(&self, key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let cht_key = (Arc::clone(key), type_id);
let hash = self.waiters.hash(&cht_key);
(cht_key, hash)
fn waiter_key_hash(&self, c_key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let w_key = (Arc::clone(c_key), type_id);
let w_hash = self.waiters.hash(&w_key);
(w_key, w_hash)
}
}
2 changes: 1 addition & 1 deletion src/sync_base/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ where

// TODO: Instead using Arc<AtomicU8> to check if the actual operation was
// insert or update, check the return value of insert_with_or_modify. If it
// is_some, the value was inserted, otherwise the value was updated.
// is_some, the value was updated, otherwise the value was inserted.

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
Expand Down
Loading