Skip to content

Commit

Permalink
Refine impl of IOLooper
Browse files Browse the repository at this point in the history
Signed-off-by: kexuan.yang <[email protected]>
  • Loading branch information
yangkx1024 committed Jan 16, 2025
1 parent 7f7382c commit 011d4ab
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 66 deletions.
63 changes: 34 additions & 29 deletions src/core/io_looper.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use crate::Error::{IOError, LockError};
use crate::Result;
use std::any::Any;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

use crate::Error::{IOError, LockError};
use crate::Result;
use std::time::Instant;

const LOG_TAG: &str = "MMKV:IO";

type Job = Box<dyn FnOnce(&mut dyn Any) + Send + 'static>;

enum Signal {
Normal,
Kill(Job),
Next,
Quit,
}

pub trait Callback: Send + Any {}
Expand All @@ -42,22 +41,23 @@ impl<T: Callback + 'static> IOLooper<T> {
}
}

pub fn post_and_kill<F: FnOnce(&mut T) + Send + 'static>(&mut self, task: F) {
let job: Job = Box::new(|callback| {
let callback = callback.downcast_mut::<T>().unwrap();
task(callback)
});
self.executor.queue.lock().unwrap().clear();
pub fn quit(&mut self) -> Result<()> {
self.sender
.as_ref()
.unwrap()
.send(Signal::Kill(job))
.unwrap();
drop(self.sender.take());
.take()
.map(|sender| {
sender
.send(Signal::Quit)
.map_err(|e| IOError(e.to_string()))
})
.transpose()?;
if let Some(handle) = self.executor.join_handle.take() {
debug!(LOG_TAG, "kill io thread");
handle.join().unwrap();
debug!(LOG_TAG, "waiting for remain tasks to finish");
drop(self.sender.take());
handle
.join()
.map_err(|_| IOError("io thread dead unexpected".to_string()))?;
}
Ok(())
}

pub fn post<F: FnOnce(&mut T) + Send + 'static>(&self, task: F) -> Result<()> {
Expand All @@ -75,35 +75,38 @@ impl<T: Callback + 'static> IOLooper<T> {
.as_ref()
.map(|sender| {
sender
.send(Signal::Normal)
.send(Signal::Next)
.map_err(|e| IOError(e.to_string()))
})
.ok_or(IOError("channel closed".to_string()))?
.ok_or(IOError("channel closed unexpected".to_string()))?
}

pub fn sync(&self) {
#[allow(dead_code)]
pub fn sync(&self) -> Result<()> {
use std::sync::atomic::{AtomicBool, Ordering};
let synced = Arc::new(AtomicBool::new(false));
let synced_clone = synced.clone();
self.post(move |_| {
synced.store(true, Ordering::Release);
})
.unwrap();
})?;
loop {
if synced_clone.load(Ordering::Acquire) {
break;
return Ok(());
}
}
}
}

impl<T> Drop for IOLooper<T> {
fn drop(&mut self) {
let time_start = Instant::now();
drop(self.sender.take());

if let Some(handle) = self.executor.join_handle.take() {
handle.join().unwrap();
verbose!(LOG_TAG, "io thread finished");
}
debug!(LOG_TAG, "IOLooper dropped, cost {:?}", time_start.elapsed());
}
}

Expand All @@ -116,11 +119,10 @@ impl Executor {
let signal = receiver.recv();

match signal {
Ok(Signal::Kill(job)) => {
job(&mut callback);
Ok(Signal::Quit) => {
break;
}
Ok(Signal::Normal) => {
Ok(Signal::Next) => {
let mut current_queue = queue.lock().unwrap();
std::mem::swap(&mut buffer, &mut *current_queue);
drop(current_queue);
Expand Down Expand Up @@ -177,7 +179,10 @@ mod tests {
assert_eq!(io_looper.executor.queue.lock().unwrap().len(), 2);
assert!(io_looper.executor.join_handle.is_some());
thread::sleep(Duration::from_millis(50));
io_looper.post_and_kill(|callback| callback.print("last job"));
io_looper
.post(|callback| callback.print("last job"))
.unwrap();
io_looper.quit().unwrap();
assert!(io_looper.sender.is_none());
assert!(io_looper.executor.queue.lock().unwrap().is_empty());
assert!(io_looper.executor.join_handle.is_none());
Expand Down
4 changes: 2 additions & 2 deletions src/core/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl MemoryMap {
}
}

impl<'a, F> Iter<'a, F>
impl<F> Iter<'_, F>
where
F: Fn(&[u8], u32) -> crate::Result<DecodeResult>,
{
Expand All @@ -53,7 +53,7 @@ where
}
}

impl<'a, F> Iterator for Iter<'a, F>
impl<F> Iterator for Iter<'_, F>
where
F: Fn(&[u8], u32) -> crate::Result<DecodeResult>,
{
Expand Down
32 changes: 10 additions & 22 deletions src/core/mmkv_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,11 @@ const LOG_TAG: &str = "MMKV:Core";
pub struct MmkvImpl {
kv_map: HashMap<String, Buffer>,
is_valid: bool,
io_looper: Option<IOLooper<IOWriter>>,
io_looper: IOLooper<IOWriter>,
#[cfg(feature = "encryption")]
encryptor: Encryptor,
}

impl Drop for MmkvImpl {
fn drop(&mut self) {
debug!(LOG_TAG, "drop MmkvImpl");
let time_start = Instant::now();
drop(self.io_looper.take());
debug!(LOG_TAG, "MmkvImpl dropped, cost {:?}", time_start.elapsed());
}
}

impl MmkvImpl {
pub fn new(config: Config, #[cfg(feature = "encryption")] key: &str) -> Self {
let time_start = Instant::now();
Expand All @@ -56,7 +47,7 @@ impl MmkvImpl {
let mmkv = MmkvImpl {
kv_map,
is_valid: true,
io_looper: Some(IOLooper::new(io_writer)),
io_looper: IOLooper::new(io_writer),
#[cfg(feature = "encryption")]
encryptor,
};
Expand All @@ -78,8 +69,6 @@ impl MmkvImpl {
let result = self.kv_map.insert(key.to_string(), raw_buffer.clone());
let duplicated = result.is_some();
self.io_looper
.as_ref()
.unwrap()
.post(move |writer| writer.write(raw_buffer, duplicated))
}

Expand All @@ -103,26 +92,25 @@ impl MmkvImpl {
}
let buffer = Buffer::deleted_buffer(key);
self.io_looper
.as_ref()
.unwrap()
.post(move |writer| writer.write(buffer, true))
}

pub fn clear_data(&mut self) {
pub fn clear_data(&mut self) -> Result<()> {
if !self.is_valid {
warn!(LOG_TAG, "instance already closed");
return;
return Ok(());
}
self.is_valid = false;
self.kv_map.clear();
#[cfg(feature = "encryption")]
let meta_file = self.encryptor.meta_file_path.clone();
self.io_looper.as_mut().unwrap().post_and_kill(|writer| {
self.io_looper.post(|writer| {
writer.remove_file();
#[cfg(feature = "encryption")]
let _ = fs::remove_file(meta_file);
info!(LOG_TAG, "data cleared");
});
})?;
self.io_looper.quit()
}
}

Expand Down Expand Up @@ -195,7 +183,7 @@ mod tests {
assert_eq!(mm.write_offset(), 127);

mmkv = init(config);
mmkv.clear_data();
mmkv.clear_data().unwrap();
assert!(!Path::new(file_path).exists());
}

Expand Down Expand Up @@ -238,7 +226,7 @@ mod tests {
assert_eq!(mm.write_offset(), 128);

mmkv = init(config);
mmkv.clear_data();
mmkv.clear_data().unwrap();
assert!(!Path::new(file).exists());
}

Expand Down Expand Up @@ -290,7 +278,7 @@ mod tests {
mmkv.get("test_multi_thread_mmkv_repeat_key"),
Err(KeyNotFound)
);
mmkv.clear_data();
mmkv.clear_data().unwrap();
assert!(!Path::new(file).exists());
}
}
6 changes: 3 additions & 3 deletions src/ffi/ffi_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ mod test {
unsafe {
ptr.release();
}
logger::sync()
logger::sync().unwrap()
}

#[test]
Expand All @@ -247,7 +247,7 @@ mod test {
unsafe {
ptr.release();
}
logger::sync();
logger::sync().unwrap();
}

#[test]
Expand Down Expand Up @@ -279,6 +279,6 @@ mod test {
unsafe {
ptr.release();
}
logger::sync();
logger::sync().unwrap();
}
}
2 changes: 1 addition & 1 deletion src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub unsafe extern "C" fn close_instance(ptr: *const c_void) {
#[no_mangle]
pub unsafe extern "C" fn clear_data(ptr: *const c_void) {
let mmkv = (ptr as *const MMKV).as_ref().unwrap();
mmkv.clear_data();
mmkv.clear_data().unwrap();
}

#[no_mangle]
Expand Down
2 changes: 1 addition & 1 deletion src/jni/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ pub unsafe extern "C" fn Java_net_yangkx_mmkv_MMKV_setLogLevel(
#[no_mangle]
pub unsafe extern "C" fn Java_net_yangkx_mmkv_MMKV_clearData(mut env: JNIEnv, obj: JClass) {
let mmkv = get_mmkv_ptr(&mut env, &obj).as_ref().unwrap();
mmkv.clear_data();
mmkv.clear_data().unwrap();
}

#[no_mangle]
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! mmkv.put("key1", 1).unwrap();
//! assert_eq!(mmkv.get("key1"), Ok(1));
//! // Not actually needed unless you intend to delete all data
//! mmkv.clear_data();
//! mmkv.clear_data().unwrap();
//! ```
//! For detailed API doc, see [MMKV]
pub use crate::core::buffer::{FromBytes, ProvideTypeToken, ToBytes, TypeToken};
Expand Down
2 changes: 1 addition & 1 deletion src/log/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ pub fn set_logger(log_impl: Option<Box<dyn Logger>>) {
}

#[allow(dead_code)]
pub fn sync() {
pub fn sync() -> crate::Result<()> {
LOG_WRAPPER.io_looper.sync()
}
2 changes: 1 addition & 1 deletion src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod logger;
/**
See [MMKV::set_logger](crate::MMKV::set_logger)
Logger should be [`Send`], cause it will be moved into io thread
Logger should be [`Send`], because it will be moved into io thread
*/
pub trait Logger: Debug + Send + Sync {
fn verbose(&self, log_str: String);
Expand Down
12 changes: 8 additions & 4 deletions src/mmkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,20 @@ impl MMKV {
/**
Clear all data.
*/
pub fn clear_data(&self) {
let mut mmkv_impl = self.mmkv_impl.write().unwrap();
mmkv_impl.clear_data();
pub fn clear_data(&self) -> Result<()> {
let mut mmkv_impl = self
.mmkv_impl
.write()
.map_err(|e| LockError(e.to_string()))?;
mmkv_impl.clear_data()?;
let file_path = MMKV::resolve_file_path(&self.path);
let config = Config::new(file_path.as_path(), page_size() as u64);
*mmkv_impl = MmkvImpl::new(
config,
#[cfg(feature = "encryption")]
&self.key,
);
Ok(())
}

/**
Expand Down Expand Up @@ -349,7 +353,7 @@ mod tests {
);
assert_eq!(mmkv.get("first"), Ok("one".to_string()));
assert_eq!(mmkv.get::<i32>("second"), Err(KeyNotFound));
mmkv.clear_data();
mmkv.clear_data().unwrap();
let _ = fs::remove_file("mini_mmkv");
let _ = fs::remove_file("mini_mmkv.meta");
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn integration_test() {
#[cfg(feature = "encryption")]
"88C51C536176AD8A8EE4A06F62EE897E",
);
mmkv.clear_data();
mmkv.clear_data().unwrap();
let _ = fs::remove_file("mini_mmkv");
let _ = fs::remove_file("mini_mmkv.meta");
}

0 comments on commit 011d4ab

Please sign in to comment.