Skip to content

Commit

Permalink
Creating a background thread for key expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed Apr 23, 2024
1 parent 298c1d9 commit 220bf84
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 10 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ path = "src/client/client.rs"
[dependencies]
rayon = "1.10"
env_logger = "0.11.3"
log = "0.4"
log = "0.4"
priority-queue = "2.0.2"
2 changes: 1 addition & 1 deletion src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn main() {
// let port = &args[2];
// let mut stream = TcpStream::connect(format!("{ip}:{port}"))?;

let mut stream = TcpStream::connect("127.0.0.1:7878").expect("Failed to connect to server");
let stream = TcpStream::connect("127.0.0.1:7878").expect("Failed to connect to server");
let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream_clone);
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod server {
pub mod control_plane;

pub mod commands;

pub mod evictor;
}


Expand Down
50 changes: 43 additions & 7 deletions src/server/cache.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,61 @@
use std::cmp::Reverse;
use std::collections::HashMap;
use std::ops::Add;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, SystemTime};
use log::info;
use priority_queue::PriorityQueue;


pub struct Cache {
hash_map: HashMap<String, String>,
hash_map: Arc<Mutex<HashMap<String, String>>>,
ttl_queue: Arc<Mutex<PriorityQueue<String, Reverse<SystemTime>>>>,
}

impl Cache {
pub fn new() -> Cache {
return Cache {
hash_map: HashMap::new(),
let hash_map: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
let ttl_queue: Arc<Mutex<PriorityQueue<String, Reverse<SystemTime>>>> = Arc::new(Mutex::new(PriorityQueue::new()));

let hash_map_clone = hash_map.clone();
let ttl_queue_clone = ttl_queue.clone();

thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(2));

let cur_time = SystemTime::now();
while let Some((key, expiration_time)) = ttl_queue_clone.lock().unwrap().peek() {
if expiration_time.0 >= cur_time {
info!("It's not yet time to expire {key}");
break;
}
info!("{key} expired, removing");
hash_map_clone.lock().unwrap().remove(key);
ttl_queue_clone.lock().unwrap().pop();
}
}
});


let cache = Cache {
hash_map,
ttl_queue,
};
return cache;
}
pub fn put(&mut self, key: &String, value: &String) {
self.hash_map.insert(key.to_string(), value.to_string());
self.hash_map.lock().unwrap().insert(key.to_string(), value.to_string());
let expiration_time = SystemTime::now().add(Duration::from_secs(15));
self.ttl_queue.lock().unwrap().push(key.to_string(), Reverse(expiration_time));
}

pub fn get(&self, key: &String) -> Option<&String> {
return self.hash_map.get(key);
pub fn get(&self, key: &String) -> Option<String> {
return self.hash_map.lock().unwrap().get(key).map(|s| s.clone());
}

pub fn exists(&self, key: &String) -> bool {
return self.hash_map.contains_key(key);
return self.hash_map.lock().unwrap().contains_key(key);
}
}
2 changes: 1 addition & 1 deletion src/server/control_plane.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::warn;
use log::{info, warn};
use crate::server::cache::Cache;
use crate::server::commands;
use crate::server::commands::CommandEnum;
Expand Down
Empty file added src/server/evictor.rs
Empty file.

0 comments on commit 220bf84

Please sign in to comment.