Skip to content

Commit

Permalink
fixes for queue length and push/pop operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ksenia-vazhdaeva committed Aug 19, 2024
1 parent 4702b9b commit f648ca9
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 47 deletions.
155 changes: 111 additions & 44 deletions queue_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ pub mod blocking;
mod fs;
pub mod nonblocking;

use anyhow::Result;
use anyhow::{anyhow, Result};
use rocksdb::{Options, DB};
use std::cmp::Ordering;

pub fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
Expand All @@ -17,6 +18,7 @@ pub struct PersistentQueueWithCapacity {
write_index: u64,
read_index: u64,
max_elements: u64,
empty: bool,
}

const U64_BYTE_LEN: usize = 8;
Expand All @@ -35,6 +37,12 @@ const MAX_ALLOWED_INDEX: u64 = u64::MAX - 100;

impl PersistentQueueWithCapacity {
pub fn new(path: &str, max_elements: usize, mut db_opts: Options) -> Result<Self> {
if max_elements > MAX_ALLOWED_INDEX as usize {
return Err(anyhow!(
"max_elements can't be greater than {}",
MAX_ALLOWED_INDEX
));
}
db_opts.create_if_missing(true);
db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(U64_BYTE_LEN));

Expand Down Expand Up @@ -70,13 +78,16 @@ impl PersistentQueueWithCapacity {
None => 0u64,
};

let empty = db.get(Self::index_to_key(read_index))?.is_none();

Ok(Self {
db,
path: path.to_string(),
write_index,
read_index,
space_stat,
max_elements: max_elements as u64,
empty,
})
}

Expand All @@ -93,88 +104,106 @@ impl PersistentQueueWithCapacity {
}

pub fn len(&self) -> usize {
(if self.write_index >= self.read_index {
self.write_index - self.read_index
if self.empty {
0
} else {
MAX_ALLOWED_INDEX - self.read_index + self.write_index
}) as usize
(match self.write_index.cmp(&self.read_index) {
Ordering::Less => MAX_ALLOWED_INDEX - self.read_index + self.write_index,
Ordering::Equal => MAX_ALLOWED_INDEX,
Ordering::Greater => self.write_index - self.read_index,
}) as usize
}
}

pub fn payload_size(&self) -> u64 {
self.space_stat
}

pub fn is_empty(&self) -> bool {
self.write_index == self.read_index
self.empty
}

pub fn push(&mut self, values: &[&[u8]]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
if self.len() + values.len() > self.max_elements as usize {
return Err(anyhow::anyhow!("Queue is full"));
}

let mut batch = rocksdb::WriteBatch::default();
let mut write_index = self.write_index;

for value in values {
batch.put(Self::index_to_key(self.write_index), value);
self.write_index += 1;
if self.write_index == MAX_ALLOWED_INDEX {
self.write_index = 0;
batch.put(Self::index_to_key(write_index), value);
write_index += 1;
if write_index == MAX_ALLOWED_INDEX {
write_index = 0;
}
}

batch.put(
Self::index_to_key(WRITE_INDEX_CELL),
self.write_index.to_le_bytes(),
write_index.to_le_bytes(),
);

self.space_stat += values.iter().map(|v| v.len() as u64).sum::<u64>();
let space_stat = self.space_stat + values.iter().map(|v| v.len() as u64).sum::<u64>();

batch.put(
Self::index_to_key(SPACE_STAT_CELL),
self.space_stat.to_le_bytes(),
space_stat.to_le_bytes(),
);

self.db.write(batch)?;

self.empty = false;
self.write_index = write_index;
self.space_stat = space_stat;

Ok(())
}

pub fn pop(&mut self, mut max_elts: usize) -> Result<Vec<Vec<u8>>> {
let mut res = Vec::with_capacity(max_elts);
let mut batch = rocksdb::WriteBatch::default();
let mut read_index = self.read_index;
loop {
let key = Self::index_to_key(self.read_index);
let key = Self::index_to_key(read_index);
let value = self.db.get(key)?;
if let Some(v) = value {
batch.delete(key);
res.push(v);
self.read_index += 1;
if self.read_index == MAX_ALLOWED_INDEX {
self.read_index = 0;
read_index += 1;
if read_index == MAX_ALLOWED_INDEX {
read_index = 0;
}
max_elts -= 1;
} else {
break;
}

if self.read_index != self.write_index && max_elts > 0 {
if read_index != self.write_index && max_elts > 0 {
continue;
} else {
break;
}
}
if !res.is_empty() {
self.space_stat -= res.iter().map(|v| v.len() as u64).sum::<u64>();
let empty = read_index == self.write_index;
let space_stat = self.space_stat - res.iter().map(|v| v.len() as u64).sum::<u64>();
batch.put(
Self::index_to_key(SPACE_STAT_CELL),
self.space_stat.to_le_bytes(),
space_stat.to_le_bytes(),
);
batch.put(
Self::index_to_key(READ_INDEX_CELL),
self.read_index.to_le_bytes(),
read_index.to_le_bytes(),
);
self.db.write(batch)?;

self.read_index = read_index;
self.space_stat = space_stat;
self.empty = empty;
}

Ok(res)
Expand All @@ -189,7 +218,12 @@ mod tests {
let path = "/tmp/test1".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(
&path,
MAX_ALLOWED_INDEX as usize,
Options::default(),
)
.unwrap();
db.push(&[&[1, 2, 3]]).unwrap();
db.push(&[&[4, 5, 6]]).unwrap();
assert_eq!(db.len(), 2);
Expand All @@ -213,6 +247,7 @@ mod tests {
assert_eq!(db.write_index, 1);
let data = db.pop(1).unwrap();
assert!(db.is_empty());
assert_eq!(db.len(), 0);
assert_eq!(data, vec![vec![7, 8, 9]]);
}
PersistentQueueWithCapacity::remove_db(&path).unwrap();
Expand All @@ -236,31 +271,32 @@ mod tests {
fn test_read_with_close() {
let path = "/tmp/test3".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
let size = MAX_ALLOWED_INDEX as usize;
{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
db.push(&[&[1, 2, 3]]).unwrap();
db.push(&[&[4, 5, 6]]).unwrap();
db.push(&[&[7, 8, 9]]).unwrap();
}

{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
assert_eq!(db.payload_size(), 9);
let res = db.pop(1).unwrap();
assert_eq!(res, vec![vec![1, 2, 3]]);
assert_eq!(db.payload_size(), 6);
}

{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
let res = db.pop(1).unwrap();
assert_eq!(res, vec![vec![4, 5, 6]]);
let res = db.pop(1).unwrap();
assert_eq!(res, vec![vec![7, 8, 9]]);
}

{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
let res = db.pop(1).unwrap();
assert!(res.is_empty());
}
Expand All @@ -271,30 +307,61 @@ mod tests {
fn push_pop_many() {
let path = "/tmp/test_push_pop_many".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
let mut queue = PersistentQueueWithCapacity::new(&path, 1000, Options::default()).unwrap();
let mut queue = PersistentQueueWithCapacity::new(&path, 3, Options::default()).unwrap();
queue
.push(&[
&[1u8, 2u8, 3u8],
&[4u8, 5u8, 6u8],
&[7u8, 8u8, 9u8],
&[10u8, 11u8, 12u8],
])
.push(&[&[1u8, 2u8, 3u8], &[4u8, 5u8, 6u8], &[7u8, 8u8, 9u8]])
.unwrap();
let res = queue.pop(3).unwrap();
assert_eq!(
res,
vec![
vec![1u8, 2u8, 3u8],
vec![4u8, 5u8, 6u8],
vec![7u8, 8u8, 9u8]
]
);
let res = queue.pop(3).unwrap();
assert_eq!(res, vec![vec![10u8, 11u8, 12u8]]);

let res = queue.pop(2).unwrap();

assert_eq!(res, vec![vec![1u8, 2u8, 3u8], vec![4u8, 5u8, 6u8]]);
let res = queue.pop(1).unwrap();
assert_eq!(res, vec![vec![7u8, 8u8, 9u8]]);

let res = queue.pop(1).unwrap();
assert!(res.is_empty());

_ = PersistentQueueWithCapacity::remove_db(&path);
}

#[test]
fn push_pop_max_elements() {
let path = "/tmp/test_push_pop_max_elements".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
let size = MAX_ALLOWED_INDEX as usize;
let mut queue = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();

let values = vec!["a".as_bytes(); size];
queue.push(&values).unwrap();

assert_eq!(queue.read_index, 0);
assert_eq!(queue.write_index, 0);
assert_eq!(queue.empty, false);
assert_eq!(queue.len(), size);

let res = queue.pop(size).unwrap();

assert_eq!(res, values);
assert_eq!(queue.read_index, 0);
assert_eq!(queue.write_index, 0);
assert_eq!(queue.empty, true);
assert_eq!(queue.len(), 0);

_ = PersistentQueueWithCapacity::remove_db(&path);
}

#[test]
fn new_invalid_max_elements() {
let result = PersistentQueueWithCapacity::new(
"/tmp/test_invalid_max_elements",
(MAX_ALLOWED_INDEX + 1) as usize,
Options::default(),
);

assert_eq!(
result.is_err_and(|e| e.to_string()
== format!("max_elements can't be greater than {}", MAX_ALLOWED_INDEX)),
true
);
}
}
6 changes: 3 additions & 3 deletions queue_rs/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
let path = "/tmp/test_fresh_healthy".to_string();
_ = crate::PersistentQueueWithCapacity::remove_db(&path);
let queue =
super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default())
super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default())
.unwrap();
assert!(queue.is_healthy());
let resp = queue.len().unwrap().get().unwrap();
Expand All @@ -220,7 +220,7 @@ mod tests {
let path = "/tmp/test_push_pop".to_string();
_ = crate::PersistentQueueWithCapacity::remove_db(&path);
let queue =
super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default())
super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default())
.unwrap();
assert!(queue.is_healthy());

Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {
let path = "/tmp/test_size".to_string();
_ = crate::PersistentQueueWithCapacity::remove_db(&path);
let queue =
super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default())
super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default())
.unwrap();
let size_query = queue.disk_size().unwrap();
let size = size_query.get().unwrap();
Expand Down

0 comments on commit f648ca9

Please sign in to comment.