From f648ca9f468aa60504c964e41509654f57ca0d6e Mon Sep 17 00:00:00 2001 From: Ksenia Vazhdaeva Date: Mon, 19 Aug 2024 12:41:08 +0700 Subject: [PATCH] fixes for queue length and push/pop operations --- queue_rs/src/lib.rs | 155 ++++++++++++++++++++++++++---------- queue_rs/src/nonblocking.rs | 6 +- 2 files changed, 114 insertions(+), 47 deletions(-) diff --git a/queue_rs/src/lib.rs b/queue_rs/src/lib.rs index 64c9c17..ca848e0 100644 --- a/queue_rs/src/lib.rs +++ b/queue_rs/src/lib.rs @@ -2,8 +2,9 @@ pub mod blocking; mod fs; pub mod nonblocking; -use anyhow::Result; +use anyhow::{anyhow, Result}; use rocksdb::{Options, DB}; +use std::cmp::Ordering; pub fn version() -> &'static str { env!("CARGO_PKG_VERSION") @@ -17,6 +18,7 @@ pub struct PersistentQueueWithCapacity { write_index: u64, read_index: u64, max_elements: u64, + empty: bool, } const U64_BYTE_LEN: usize = 8; @@ -35,6 +37,12 @@ const MAX_ALLOWED_INDEX: u64 = u64::MAX - 100; impl PersistentQueueWithCapacity { pub fn new(path: &str, max_elements: usize, mut db_opts: Options) -> Result { + if max_elements > MAX_ALLOWED_INDEX as usize { + return Err(anyhow!( + "max_elements can't be greater than {}", + MAX_ALLOWED_INDEX + )); + } db_opts.create_if_missing(true); db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(U64_BYTE_LEN)); @@ -70,6 +78,8 @@ impl PersistentQueueWithCapacity { None => 0u64, }; + let empty = db.get(Self::index_to_key(read_index))?.is_none(); + Ok(Self { db, path: path.to_string(), @@ -77,6 +87,7 @@ impl PersistentQueueWithCapacity { read_index, space_stat, max_elements: max_elements as u64, + empty, }) } @@ -93,11 +104,15 @@ impl PersistentQueueWithCapacity { } pub fn len(&self) -> usize { - (if self.write_index >= self.read_index { - self.write_index - self.read_index + if self.empty { + 0 } else { - MAX_ALLOWED_INDEX - self.read_index + self.write_index - }) as usize + (match self.write_index.cmp(&self.read_index) { + Ordering::Less => MAX_ALLOWED_INDEX - self.read_index + self.write_index, + Ordering::Equal => MAX_ALLOWED_INDEX, + Ordering::Greater => self.write_index - self.read_index, + }) as usize + } } pub fn payload_size(&self) -> u64 { @@ -105,76 +120,90 @@ impl PersistentQueueWithCapacity { } pub fn is_empty(&self) -> bool { - self.write_index == self.read_index + self.empty } pub fn push(&mut self, values: &[&[u8]]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } if self.len() + values.len() > self.max_elements as usize { return Err(anyhow::anyhow!("Queue is full")); } let mut batch = rocksdb::WriteBatch::default(); + let mut write_index = self.write_index; for value in values { - batch.put(Self::index_to_key(self.write_index), value); - self.write_index += 1; - if self.write_index == MAX_ALLOWED_INDEX { - self.write_index = 0; + batch.put(Self::index_to_key(write_index), value); + write_index += 1; + if write_index == MAX_ALLOWED_INDEX { + write_index = 0; } } batch.put( Self::index_to_key(WRITE_INDEX_CELL), - self.write_index.to_le_bytes(), + write_index.to_le_bytes(), ); - self.space_stat += values.iter().map(|v| v.len() as u64).sum::(); + let space_stat = self.space_stat + values.iter().map(|v| v.len() as u64).sum::(); batch.put( Self::index_to_key(SPACE_STAT_CELL), - self.space_stat.to_le_bytes(), + space_stat.to_le_bytes(), ); self.db.write(batch)?; + self.empty = false; + self.write_index = write_index; + self.space_stat = space_stat; + Ok(()) } pub fn pop(&mut self, mut max_elts: usize) -> Result>> { let mut res = Vec::with_capacity(max_elts); let mut batch = rocksdb::WriteBatch::default(); + let mut read_index = self.read_index; loop { - let key = Self::index_to_key(self.read_index); + let key = Self::index_to_key(read_index); let value = self.db.get(key)?; if let Some(v) = value { batch.delete(key); res.push(v); - self.read_index += 1; - if self.read_index == MAX_ALLOWED_INDEX { - self.read_index = 0; + read_index += 1; + if read_index == MAX_ALLOWED_INDEX { + read_index = 0; } max_elts -= 1; } else { break; } - if self.read_index != self.write_index && max_elts > 0 { + if read_index != self.write_index && max_elts > 0 { continue; } else { break; } } if !res.is_empty() { - self.space_stat -= res.iter().map(|v| v.len() as u64).sum::(); + let empty = read_index == self.write_index; + let space_stat = self.space_stat - res.iter().map(|v| v.len() as u64).sum::(); batch.put( Self::index_to_key(SPACE_STAT_CELL), - self.space_stat.to_le_bytes(), + space_stat.to_le_bytes(), ); batch.put( Self::index_to_key(READ_INDEX_CELL), - self.read_index.to_le_bytes(), + read_index.to_le_bytes(), ); self.db.write(batch)?; + + self.read_index = read_index; + self.space_stat = space_stat; + self.empty = empty; } Ok(res) @@ -189,7 +218,12 @@ mod tests { let path = "/tmp/test1".to_string(); _ = PersistentQueueWithCapacity::remove_db(&path); { - let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap(); + let mut db = PersistentQueueWithCapacity::new( + &path, + MAX_ALLOWED_INDEX as usize, + Options::default(), + ) + .unwrap(); db.push(&[&[1, 2, 3]]).unwrap(); db.push(&[&[4, 5, 6]]).unwrap(); assert_eq!(db.len(), 2); @@ -213,6 +247,7 @@ mod tests { assert_eq!(db.write_index, 1); let data = db.pop(1).unwrap(); assert!(db.is_empty()); + assert_eq!(db.len(), 0); assert_eq!(data, vec![vec![7, 8, 9]]); } PersistentQueueWithCapacity::remove_db(&path).unwrap(); @@ -236,15 +271,16 @@ mod tests { fn test_read_with_close() { let path = "/tmp/test3".to_string(); _ = PersistentQueueWithCapacity::remove_db(&path); + let size = MAX_ALLOWED_INDEX as usize; { - let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap(); + let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap(); db.push(&[&[1, 2, 3]]).unwrap(); db.push(&[&[4, 5, 6]]).unwrap(); db.push(&[&[7, 8, 9]]).unwrap(); } { - let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap(); + let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap(); assert_eq!(db.payload_size(), 9); let res = db.pop(1).unwrap(); assert_eq!(res, vec![vec![1, 2, 3]]); @@ -252,7 +288,7 @@ mod tests { } { - let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap(); + let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap(); let res = db.pop(1).unwrap(); assert_eq!(res, vec![vec![4, 5, 6]]); let res = db.pop(1).unwrap(); @@ -260,7 +296,7 @@ mod tests { } { - let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap(); + let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap(); let res = db.pop(1).unwrap(); assert!(res.is_empty()); } @@ -271,30 +307,61 @@ mod tests { fn push_pop_many() { let path = "/tmp/test_push_pop_many".to_string(); _ = PersistentQueueWithCapacity::remove_db(&path); - let mut queue = PersistentQueueWithCapacity::new(&path, 1000, Options::default()).unwrap(); + let mut queue = PersistentQueueWithCapacity::new(&path, 3, Options::default()).unwrap(); queue - .push(&[ - &[1u8, 2u8, 3u8], - &[4u8, 5u8, 6u8], - &[7u8, 8u8, 9u8], - &[10u8, 11u8, 12u8], - ]) + .push(&[&[1u8, 2u8, 3u8], &[4u8, 5u8, 6u8], &[7u8, 8u8, 9u8]]) .unwrap(); - let res = queue.pop(3).unwrap(); - assert_eq!( - res, - vec![ - vec![1u8, 2u8, 3u8], - vec![4u8, 5u8, 6u8], - vec![7u8, 8u8, 9u8] - ] - ); - let res = queue.pop(3).unwrap(); - assert_eq!(res, vec![vec![10u8, 11u8, 12u8]]); + + let res = queue.pop(2).unwrap(); + + assert_eq!(res, vec![vec![1u8, 2u8, 3u8], vec![4u8, 5u8, 6u8]]); + let res = queue.pop(1).unwrap(); + assert_eq!(res, vec![vec![7u8, 8u8, 9u8]]); let res = queue.pop(1).unwrap(); assert!(res.is_empty()); _ = PersistentQueueWithCapacity::remove_db(&path); } + + #[test] + fn push_pop_max_elements() { + let path = "/tmp/test_push_pop_max_elements".to_string(); + _ = PersistentQueueWithCapacity::remove_db(&path); + let size = MAX_ALLOWED_INDEX as usize; + let mut queue = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap(); + + let values = vec!["a".as_bytes(); size]; + queue.push(&values).unwrap(); + + assert_eq!(queue.read_index, 0); + assert_eq!(queue.write_index, 0); + assert_eq!(queue.empty, false); + assert_eq!(queue.len(), size); + + let res = queue.pop(size).unwrap(); + + assert_eq!(res, values); + assert_eq!(queue.read_index, 0); + assert_eq!(queue.write_index, 0); + assert_eq!(queue.empty, true); + assert_eq!(queue.len(), 0); + + _ = PersistentQueueWithCapacity::remove_db(&path); + } + + #[test] + fn new_invalid_max_elements() { + let result = PersistentQueueWithCapacity::new( + "/tmp/test_invalid_max_elements", + (MAX_ALLOWED_INDEX + 1) as usize, + Options::default(), + ); + + assert_eq!( + result.is_err_and(|e| e.to_string() + == format!("max_elements can't be greater than {}", MAX_ALLOWED_INDEX)), + true + ); + } } diff --git a/queue_rs/src/nonblocking.rs b/queue_rs/src/nonblocking.rs index dff8a48..09fcdf7 100644 --- a/queue_rs/src/nonblocking.rs +++ b/queue_rs/src/nonblocking.rs @@ -207,7 +207,7 @@ mod tests { let path = "/tmp/test_fresh_healthy".to_string(); _ = crate::PersistentQueueWithCapacity::remove_db(&path); let queue = - super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default()) + super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default()) .unwrap(); assert!(queue.is_healthy()); let resp = queue.len().unwrap().get().unwrap(); @@ -220,7 +220,7 @@ mod tests { let path = "/tmp/test_push_pop".to_string(); _ = crate::PersistentQueueWithCapacity::remove_db(&path); let queue = - super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default()) + super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default()) .unwrap(); assert!(queue.is_healthy()); @@ -249,7 +249,7 @@ mod tests { let path = "/tmp/test_size".to_string(); _ = crate::PersistentQueueWithCapacity::remove_db(&path); let queue = - super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default()) + super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default()) .unwrap(); let size_query = queue.disk_size().unwrap(); let size = size_query.get().unwrap();