Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes for queue length and push/pop operations #4

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 111 additions & 44 deletions queue_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ pub mod blocking;
mod fs;
pub mod nonblocking;

use anyhow::Result;
use anyhow::{anyhow, Result};
use rocksdb::{Options, DB};
use std::cmp::Ordering;

pub fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
Expand All @@ -17,6 +18,7 @@ pub struct PersistentQueueWithCapacity {
write_index: u64,
read_index: u64,
max_elements: u64,
empty: bool,
}

const U64_BYTE_LEN: usize = 8;
Expand All @@ -35,6 +37,12 @@ const MAX_ALLOWED_INDEX: u64 = u64::MAX - 100;

impl PersistentQueueWithCapacity {
pub fn new(path: &str, max_elements: usize, mut db_opts: Options) -> Result<Self> {
if max_elements > MAX_ALLOWED_INDEX as usize {
return Err(anyhow!(
"max_elements can't be greater than {}",
MAX_ALLOWED_INDEX
));
}
db_opts.create_if_missing(true);
db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(U64_BYTE_LEN));

Expand Down Expand Up @@ -70,13 +78,16 @@ impl PersistentQueueWithCapacity {
None => 0u64,
};

let empty = db.get(Self::index_to_key(read_index))?.is_none();

Ok(Self {
db,
path: path.to_string(),
write_index,
read_index,
space_stat,
max_elements: max_elements as u64,
empty,
})
}

Expand All @@ -93,88 +104,106 @@ impl PersistentQueueWithCapacity {
}

pub fn len(&self) -> usize {
(if self.write_index >= self.read_index {
self.write_index - self.read_index
if self.empty {
0
} else {
MAX_ALLOWED_INDEX - self.read_index + self.write_index
}) as usize
(match self.write_index.cmp(&self.read_index) {
Ordering::Less => MAX_ALLOWED_INDEX - self.read_index + self.write_index,
Ordering::Equal => MAX_ALLOWED_INDEX,
Ordering::Greater => self.write_index - self.read_index,
}) as usize
}
}

pub fn payload_size(&self) -> u64 {
self.space_stat
}

pub fn is_empty(&self) -> bool {
self.write_index == self.read_index
self.empty
}

pub fn push(&mut self, values: &[&[u8]]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
if self.len() + values.len() > self.max_elements as usize {
return Err(anyhow::anyhow!("Queue is full"));
}

let mut batch = rocksdb::WriteBatch::default();
let mut write_index = self.write_index;

for value in values {
batch.put(Self::index_to_key(self.write_index), value);
self.write_index += 1;
if self.write_index == MAX_ALLOWED_INDEX {
self.write_index = 0;
batch.put(Self::index_to_key(write_index), value);
write_index += 1;
if write_index == MAX_ALLOWED_INDEX {
write_index = 0;
}
}

batch.put(
Self::index_to_key(WRITE_INDEX_CELL),
self.write_index.to_le_bytes(),
write_index.to_le_bytes(),
);

self.space_stat += values.iter().map(|v| v.len() as u64).sum::<u64>();
let space_stat = self.space_stat + values.iter().map(|v| v.len() as u64).sum::<u64>();

batch.put(
Self::index_to_key(SPACE_STAT_CELL),
self.space_stat.to_le_bytes(),
space_stat.to_le_bytes(),
);

self.db.write(batch)?;

self.empty = false;
self.write_index = write_index;
self.space_stat = space_stat;

Ok(())
}

pub fn pop(&mut self, mut max_elts: usize) -> Result<Vec<Vec<u8>>> {
let mut res = Vec::with_capacity(max_elts);
let mut batch = rocksdb::WriteBatch::default();
let mut read_index = self.read_index;
loop {
let key = Self::index_to_key(self.read_index);
let key = Self::index_to_key(read_index);
let value = self.db.get(key)?;
if let Some(v) = value {
batch.delete(key);
res.push(v);
self.read_index += 1;
if self.read_index == MAX_ALLOWED_INDEX {
self.read_index = 0;
read_index += 1;
if read_index == MAX_ALLOWED_INDEX {
read_index = 0;
}
max_elts -= 1;
} else {
break;
}

if self.read_index != self.write_index && max_elts > 0 {
if read_index != self.write_index && max_elts > 0 {
continue;
} else {
break;
}
}
if !res.is_empty() {
self.space_stat -= res.iter().map(|v| v.len() as u64).sum::<u64>();
let empty = read_index == self.write_index;
let space_stat = self.space_stat - res.iter().map(|v| v.len() as u64).sum::<u64>();
batch.put(
Self::index_to_key(SPACE_STAT_CELL),
self.space_stat.to_le_bytes(),
space_stat.to_le_bytes(),
);
batch.put(
Self::index_to_key(READ_INDEX_CELL),
self.read_index.to_le_bytes(),
read_index.to_le_bytes(),
);
self.db.write(batch)?;

self.read_index = read_index;
self.space_stat = space_stat;
self.empty = empty;
}

Ok(res)
Expand All @@ -189,7 +218,12 @@ mod tests {
let path = "/tmp/test1".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(
&path,
MAX_ALLOWED_INDEX as usize,
Options::default(),
)
.unwrap();
db.push(&[&[1, 2, 3]]).unwrap();
db.push(&[&[4, 5, 6]]).unwrap();
assert_eq!(db.len(), 2);
Expand All @@ -213,6 +247,7 @@ mod tests {
assert_eq!(db.write_index, 1);
let data = db.pop(1).unwrap();
assert!(db.is_empty());
assert_eq!(db.len(), 0);
assert_eq!(data, vec![vec![7, 8, 9]]);
}
PersistentQueueWithCapacity::remove_db(&path).unwrap();
Expand All @@ -236,31 +271,32 @@ mod tests {
fn test_read_with_close() {
let path = "/tmp/test3".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
let size = MAX_ALLOWED_INDEX as usize;
{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
db.push(&[&[1, 2, 3]]).unwrap();
db.push(&[&[4, 5, 6]]).unwrap();
db.push(&[&[7, 8, 9]]).unwrap();
}

{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
assert_eq!(db.payload_size(), 9);
let res = db.pop(1).unwrap();
assert_eq!(res, vec![vec![1, 2, 3]]);
assert_eq!(db.payload_size(), 6);
}

{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
let res = db.pop(1).unwrap();
assert_eq!(res, vec![vec![4, 5, 6]]);
let res = db.pop(1).unwrap();
assert_eq!(res, vec![vec![7, 8, 9]]);
}

{
let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap();
let mut db = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();
let res = db.pop(1).unwrap();
assert!(res.is_empty());
}
Expand All @@ -271,30 +307,61 @@ mod tests {
fn push_pop_many() {
let path = "/tmp/test_push_pop_many".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
let mut queue = PersistentQueueWithCapacity::new(&path, 1000, Options::default()).unwrap();
let mut queue = PersistentQueueWithCapacity::new(&path, 3, Options::default()).unwrap();
queue
.push(&[
&[1u8, 2u8, 3u8],
&[4u8, 5u8, 6u8],
&[7u8, 8u8, 9u8],
&[10u8, 11u8, 12u8],
])
.push(&[&[1u8, 2u8, 3u8], &[4u8, 5u8, 6u8], &[7u8, 8u8, 9u8]])
.unwrap();
let res = queue.pop(3).unwrap();
assert_eq!(
res,
vec![
vec![1u8, 2u8, 3u8],
vec![4u8, 5u8, 6u8],
vec![7u8, 8u8, 9u8]
]
);
let res = queue.pop(3).unwrap();
assert_eq!(res, vec![vec![10u8, 11u8, 12u8]]);

let res = queue.pop(2).unwrap();

assert_eq!(res, vec![vec![1u8, 2u8, 3u8], vec![4u8, 5u8, 6u8]]);
let res = queue.pop(1).unwrap();
assert_eq!(res, vec![vec![7u8, 8u8, 9u8]]);

let res = queue.pop(1).unwrap();
assert!(res.is_empty());

_ = PersistentQueueWithCapacity::remove_db(&path);
}

#[test]
fn push_pop_max_elements() {
let path = "/tmp/test_push_pop_max_elements".to_string();
_ = PersistentQueueWithCapacity::remove_db(&path);
let size = MAX_ALLOWED_INDEX as usize;
let mut queue = PersistentQueueWithCapacity::new(&path, size, Options::default()).unwrap();

let values = vec!["a".as_bytes(); size];
queue.push(&values).unwrap();

assert_eq!(queue.read_index, 0);
assert_eq!(queue.write_index, 0);
assert_eq!(queue.empty, false);
assert_eq!(queue.len(), size);

let res = queue.pop(size).unwrap();

assert_eq!(res, values);
assert_eq!(queue.read_index, 0);
assert_eq!(queue.write_index, 0);
assert_eq!(queue.empty, true);
assert_eq!(queue.len(), 0);

_ = PersistentQueueWithCapacity::remove_db(&path);
}

#[test]
fn new_invalid_max_elements() {
let result = PersistentQueueWithCapacity::new(
"/tmp/test_invalid_max_elements",
(MAX_ALLOWED_INDEX + 1) as usize,
Options::default(),
);

assert_eq!(
result.is_err_and(|e| e.to_string()
== format!("max_elements can't be greater than {}", MAX_ALLOWED_INDEX)),
true
);
}
}
6 changes: 3 additions & 3 deletions queue_rs/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
let path = "/tmp/test_fresh_healthy".to_string();
_ = crate::PersistentQueueWithCapacity::remove_db(&path);
let queue =
super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default())
super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default())
.unwrap();
assert!(queue.is_healthy());
let resp = queue.len().unwrap().get().unwrap();
Expand All @@ -220,7 +220,7 @@ mod tests {
let path = "/tmp/test_push_pop".to_string();
_ = crate::PersistentQueueWithCapacity::remove_db(&path);
let queue =
super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default())
super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default())
.unwrap();
assert!(queue.is_healthy());

Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {
let path = "/tmp/test_size".to_string();
_ = crate::PersistentQueueWithCapacity::remove_db(&path);
let queue =
super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default())
super::PersistentQueueWithCapacity::new(&path, 3, 1000, rocksdb::Options::default())
.unwrap();
let size_query = queue.disk_size().unwrap();
let size = size_query.get().unwrap();
Expand Down
Loading