Skip to content

Commit

Permalink
fix mocking layer when using multiple databases (azuqua#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
alecembke-okta authored Oct 28, 2019
1 parent de95b6c commit f3f20a0
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "1.2.2"
version = "1.2.3"
authors = ["Alec Embke <[email protected]>"]
edition = "2018"
description = "A Redis client for Rust built on Futures and Tokio."
Expand Down
12 changes: 6 additions & 6 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ const INTERFACE: &'static str = "127.0.0.1";
const PORT: u16 = 3000;

#[derive(Clone)]
pub struct HttpInterface<'a> {
client: &'a RedisClient
pub struct HttpInterface {
client: RedisClient
}

impl<'a> HttpInterface<'a> {
impl HttpInterface {

pub fn new(client: &'a RedisClient) -> HttpInterface<'a> {
pub fn new(client: RedisClient) -> HttpInterface {
HttpInterface { client }
}

}

impl<'a> Service for HttpInterface<'a> {
impl Service for HttpInterface {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
Expand Down Expand Up @@ -116,7 +116,7 @@ fn main() {
// give the service its own clone of the client
let http_client = client.clone();
let server = Http::new().bind(&addr, move || {
Ok(HttpInterface::new(&http_client))
Ok(HttpInterface::new(http_client.clone()))
});

let server = match server {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ extern crate tokio_timer_patched as tokio_timer;
extern crate tokio_io;
extern crate rand;

#[macro_use]
extern crate lazy_static;

#[macro_use]
extern crate log;
extern crate pretty_env_logger;
Expand Down
72 changes: 57 additions & 15 deletions src/mocks/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::rc::Rc;
use std::collections::BTreeMap;

use crate::protocol::types::*;
use std::ops::{DerefMut, Deref};
use std::sync::Arc;


pub fn log_unimplemented(command: &RedisCommand) -> Result<Frame, RedisError> {
Expand Down Expand Up @@ -97,7 +99,7 @@ pub fn set(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
data.key_types.insert(key.clone(), KeyType::Data);

let now = Instant::now();
let _ = data.expirations.borrow_mut().add(&key, ExpireLog {
let _ = data.expirations.write().deref_mut().add(&key, ExpireLog {
after: now + Duration::from_millis(count as u64),
internal: Some((now, (key.clone())))
})?;
Expand Down Expand Up @@ -134,7 +136,7 @@ pub fn set(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
data.key_types.insert(key.clone(), KeyType::Data);

let now = Instant::now();
let _ = data.expirations.borrow_mut().add(&key, ExpireLog {
let _ = data.expirations.write().deref_mut().add(&key, ExpireLog {
after: now + Duration::from_millis(count as u64),
internal: Some((now, (key.clone())))
})?;
Expand Down Expand Up @@ -201,7 +203,7 @@ pub fn get(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
}

pub fn del(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
let keys: Vec<(KeyType, Rc<RedisKey>)> = args.into_iter().filter_map(|s| {
let keys: Vec<(KeyType, Arc<RedisKey>)> = args.into_iter().filter_map(|s| {
let k = match s {
RedisValue::String(s) => s,
RedisValue::Integer(i) => i.to_string(),
Expand All @@ -216,15 +218,15 @@ pub fn del(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis

Some((kind, k))
})
.collect();
.collect();

let mut deleted = 0;
for (kind, key) in keys.into_iter() {
if data.keys.remove(&key) {
deleted += 1;
}
let _ = data.key_types.remove(&key);
let _ = data.expirations.borrow_mut().del(&key);
let _ = data.expirations.write().deref_mut().del(&key);

match kind {
KeyType::Data => { data.data.remove(&key); },
Expand Down Expand Up @@ -260,7 +262,7 @@ pub fn expire(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Re
let now = Instant::now();
let _key = key.clone();

let _ = data.expirations.borrow_mut().add(&_key, ExpireLog {
let _ = data.expirations.write().deref_mut().add(&_key, ExpireLog {
after: now + Duration::from_millis(ms as u64),
internal: Some((now, (_key.clone())))
})?;
Expand All @@ -281,7 +283,7 @@ pub fn persist(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, R
};
let key = utils::get_key(&*data, key);

let count = data.expirations.borrow_mut().del(&key)?;
let count = data.expirations.write().deref_mut().del(&key)?;
Ok(Frame::Integer(count as i64))
}

Expand Down Expand Up @@ -379,7 +381,7 @@ pub fn hset(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redi
1
};

let _ = inner.insert(Rc::new(field), value);
let _ = inner.insert(Arc::new(field), value);

Ok(Frame::Integer(res))
}
Expand Down Expand Up @@ -629,13 +631,53 @@ pub fn ping(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redi
Ok(Frame::SimpleString("PONG".into()))
}

pub fn flushall(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
data.data.clear();
data.maps.clear();
data.sets.clear();
data.key_types.clear();
data.keys.clear();
utils::clear_expirations(&data.expirations);
pub fn flushall(mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
let global_data = utils::global_data_set();

for data_ref in global_data.read().deref().values() {
let mut data_guard = data_ref.write();
let mut data = data_guard.deref_mut();

data.data.clear();
data.maps.clear();
data.sets.clear();
data.key_types.clear();
data.keys.clear();
utils::clear_expirations(&data.expirations);
}

utils::ok()
}

pub fn smembers(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
args.reverse();

let key = match args.pop() {
Some(RedisValue::String(s)) => s,
Some(RedisValue::Integer(i)) => i.to_string(),
_ => return Err(RedisError::new(
RedisErrorKind::InvalidArgument, "Invalid key."
))
};
let key = utils::get_key(&*data, key);

let members = match data.sets.get(&key) {
Some(m) => m,
None => return Ok(Frame::Array(vec![]))
};

let mut out = Vec::with_capacity(members.len());
for member in members.iter() {
out.push(Frame::BulkString(member.key.as_bytes().to_vec()));
}

Ok(Frame::Array(out))
}

pub fn publish(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
Ok(Frame::Integer(1))
}

pub fn subscribe(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
Ok(Frame::Integer(1))
}
43 changes: 30 additions & 13 deletions src/mocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,30 @@ pub fn create_commands_ft(handle: Handle, inner: Arc<RedisClientInner>) -> Box<F
let expire_tx = tx.clone();
multiplexer_utils::set_command_tx(&inner, tx);

let data = DataSet::default();
let expirations = data.expirations.clone();
let data = utils::global_data_set();

let expire_ft = inner.timer.interval(Duration::from_secs(1)).map_err(|_| ()).for_each(move |_| {
trace!("Starting to scan for expired keys.");
let global_data = utils::global_data_set();

let expired = {
let mut expiration_ref = expirations.borrow_mut();
for data_ref in global_data.read().deref().values() {
let data_guard = data_ref.read();
let data = data_guard.deref();
let expirations = data.expirations.clone();

let expired = expiration_ref.find_expired();
expiration_ref.cleanup();
let expired = {
let mut expiration_guard = expirations.write();
let mut expiration_ref = expiration_guard.deref_mut();

expired
};
let expired = expiration_ref.find_expired();
expiration_ref.cleanup();

trace!("Cleaning up mock {} expired keys", expired.len());
utils::cleanup_keys(&expire_tx, expired);
expired
};

trace!("Cleaning up mock {} expired keys", expired.len());
utils::cleanup_keys(&expire_tx, expired);
}

Ok::<(), ()>(())
});
Expand All @@ -72,10 +79,20 @@ pub fn create_commands_ft(handle: Handle, inner: Arc<RedisClientInner>) -> Box<F
Ok::<(), ()>(())
});

Box::new(rx.from_err::<RedisError>().fold((handle, inner, data, None), |(handle, inner, mut data, err), mut command| {
Box::new(rx.from_err::<RedisError>().fold((handle, inner, 0, None), |(handle, inner, mut db, err), mut command| {
debug!("{} Handling redis command {:?}", n!(inner), command.kind);
client_utils::decr_atomic(&inner.cmd_buffer_len);

if command.kind == RedisCommandKind::Select {
db = command.args.first().map(|v| {
match v {
RedisValue::Integer(i) => *i as u8,
_ => panic!("Invalid redis database in mock layer.")
}
}).unwrap_or(db);
}
let data = utils::global_data_set_db(db);

if command.kind.is_close() {
debug!("{} Recv close command on the command stream.", n!(inner));

Expand All @@ -97,12 +114,12 @@ pub fn create_commands_ft(handle: Handle, inner: Arc<RedisClientInner>) -> Box<F
return Err(RedisError::new_canceled());
}

let result = utils::handle_command(&inner, &mut data, command);
let result = utils::handle_command(&inner, &data, command);
if let Some(resp_tx) = resp_tx {
let _ = resp_tx.send(result);
}

Ok((handle, inner, data, err))
Ok((handle, inner, db, err))
}
})
.map(|(_, _, _, err)| err)
Expand Down
42 changes: 22 additions & 20 deletions src/mocks/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ use futures::{
Stream
};
use std::time::Instant;
use std::sync::Arc;
use parking_lot::RwLock;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ExpireLog {
/// Timestamp of when to clean up, in ms.
pub after: Instant,
/// Timestamp of set operation, reference to the key. This is set by the library.
pub internal: Option<(Instant, Rc<RedisKey>)>
pub internal: Option<(Instant, Arc<RedisKey>)>
}

impl ExpireLog {

pub fn set_internal(&mut self, set: Instant, key: &Rc<RedisKey>) {
pub fn set_internal(&mut self, set: Instant, key: &Arc<RedisKey>) {
self.internal = Some((set, key.clone()));
}

Expand All @@ -70,7 +72,7 @@ impl ExpireLog {
}
}

pub fn get_key(&self) -> Option<&Rc<RedisKey>> {
pub fn get_key(&self) -> Option<&Arc<RedisKey>> {
match self.internal {
Some((_, ref key)) => Some(key),
None => None
Expand Down Expand Up @@ -121,9 +123,9 @@ impl fmt::Display for KeyType {
/// Uses a map of "dirty" logs to batch up slower operations on the heap.
#[derive(Debug, Clone)]
pub struct Expirations {
pub expirations: BTreeMap<Rc<RedisKey>, Rc<ExpireLog>>,
pub sorted: BinaryHeap<Rc<ExpireLog>>,
pub dirty: BTreeMap<Rc<RedisKey>, Rc<ExpireLog>>
pub expirations: BTreeMap<Arc<RedisKey>, Arc<ExpireLog>>,
pub sorted: BinaryHeap<Arc<ExpireLog>>,
pub dirty: BTreeMap<Arc<RedisKey>, Arc<ExpireLog>>
}

impl Expirations {
Expand All @@ -137,12 +139,12 @@ impl Expirations {
}

/// Add or update an expire log in the data set.
pub fn add(&mut self, key: &Rc<RedisKey>, mut expiration: ExpireLog) -> Result<(), RedisError> {
pub fn add(&mut self, key: &Arc<RedisKey>, mut expiration: ExpireLog) -> Result<(), RedisError> {
if !expiration.has_internal() {
expiration.set_internal(Instant::now(), key);
}

let expiration = Rc::new(expiration);
let expiration = Arc::new(expiration);

if let Some(old) = self.expirations.insert(key.clone(), expiration.clone()) {
// move old value to deleted set for lazy deletion later
Expand All @@ -160,7 +162,7 @@ impl Expirations {
Ok(())
}

pub fn del(&mut self, key: &Rc<RedisKey>) -> Result<usize, RedisError> {
pub fn del(&mut self, key: &Arc<RedisKey>) -> Result<usize, RedisError> {
let old = match self.expirations.remove(key) {
Some(old) => old,
None => return Ok(0)
Expand All @@ -174,9 +176,9 @@ impl Expirations {
self.dirty.len()
}

pub fn find_expired(&mut self) -> Vec<Rc<ExpireLog>> {
pub fn find_expired(&mut self) -> Vec<Arc<ExpireLog>> {
let now = Instant::now();
let mut out: Vec<Rc<ExpireLog>> = Vec::new();
let mut out: Vec<Arc<ExpireLog>> = Vec::new();

while self.sorted.len() > 0 {
let youngest = match self.sorted.pop() {
Expand Down Expand Up @@ -227,7 +229,7 @@ impl Expirations {

// do a full pass over the binary heap to remove things from the `dirty` map
pub fn cleanup(&mut self) {
let mut new_sorted: BinaryHeap<Rc<ExpireLog>> = BinaryHeap::new();
let mut new_sorted: BinaryHeap<Arc<ExpireLog>> = BinaryHeap::new();

for expire in self.sorted.drain() {
let expire_key = match expire.get_key() {
Expand Down Expand Up @@ -260,13 +262,13 @@ impl Expirations {
}

pub struct DataSet {
pub keys: BTreeSet<Rc<RedisKey>>,
pub key_types: BTreeMap<Rc<RedisKey>, KeyType>,
pub data: BTreeMap<Rc<RedisKey>, RedisValue>,
pub maps: BTreeMap<Rc<RedisKey>, BTreeMap<Rc<RedisKey>, RedisValue>>,
pub sets: BTreeMap<Rc<RedisKey>, BTreeSet<RedisKey>>,
pub lists: BTreeMap<Rc<RedisKey>, VecDeque<RedisValue>>,
pub expirations: Rc<RefCell<Expirations>>,
pub keys: BTreeSet<Arc<RedisKey>>,
pub key_types: BTreeMap<Arc<RedisKey>, KeyType>,
pub data: BTreeMap<Arc<RedisKey>, RedisValue>,
pub maps: BTreeMap<Arc<RedisKey>, BTreeMap<Arc<RedisKey>, RedisValue>>,
pub sets: BTreeMap<Arc<RedisKey>, BTreeSet<RedisKey>>,
pub lists: BTreeMap<Arc<RedisKey>, VecDeque<RedisValue>>,
pub expirations: Arc<RwLock<Expirations>>,
}

impl Default for DataSet {
Expand All @@ -279,7 +281,7 @@ impl Default for DataSet {
maps: BTreeMap::new(),
sets: BTreeMap::new(),
lists: BTreeMap::new(),
expirations: Rc::new(RefCell::new(Expirations::new()))
expirations: Arc::new(RwLock::new(Expirations::new()))
}
}

Expand Down
Loading

0 comments on commit f3f20a0

Please sign in to comment.