Skip to content

Commit

Permalink
WIP: Implement mpmc queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ksenia-vazhdaeva committed Aug 15, 2024
1 parent 4702b9b commit 32327ec
Show file tree
Hide file tree
Showing 4 changed files with 639 additions and 37 deletions.
3 changes: 3 additions & 0 deletions queue_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ crossbeam-channel = "0.5.8"
[dependencies.rocksdb]
version = "0.22"
default-features = false

[dev-dependencies]
tempfile = "3.12.0"
52 changes: 15 additions & 37 deletions queue_rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub mod blocking;
mod fs;
pub mod mpmc;
pub mod nonblocking;
mod utilities;

use crate::utilities::{index_to_key, u64_from_byte_vec};
use anyhow::Result;
use rocksdb::{Options, DB};

Expand Down Expand Up @@ -40,33 +43,21 @@ impl PersistentQueueWithCapacity {

let db = DB::open(&db_opts, path)?;

let write_index_opt = db.get(Self::index_to_key(WRITE_INDEX_CELL))?;
let write_index_opt = db.get(index_to_key(WRITE_INDEX_CELL))?;
let write_index = match write_index_opt {
Some(v) => {
let mut buf = [0u8; U64_BYTE_LEN];
buf.copy_from_slice(&v);
u64::from_le_bytes(buf)
}
Some(v) => u64_from_byte_vec(&v),
None => 0u64,
};

let read_index_opt = db.get(Self::index_to_key(READ_INDEX_CELL))?;
let read_index_opt = db.get(index_to_key(READ_INDEX_CELL))?;
let read_index = match read_index_opt {
Some(v) => {
let mut buf = [0u8; U64_BYTE_LEN];
buf.copy_from_slice(&v);
u64::from_le_bytes(buf)
}
Some(v) => u64_from_byte_vec(&v),
None => 0u64,
};

let space_stat_opt = db.get(Self::index_to_key(SPACE_STAT_CELL))?;
let space_stat_opt = db.get(index_to_key(SPACE_STAT_CELL))?;
let space_stat = match space_stat_opt {
Some(v) => {
let mut buf = [0u8; U64_BYTE_LEN];
buf.copy_from_slice(&v);
u64::from_le_bytes(buf)
}
Some(v) => u64_from_byte_vec(&v),
None => 0u64,
};

Expand All @@ -80,10 +71,6 @@ impl PersistentQueueWithCapacity {
})
}

fn index_to_key(index: u64) -> [u8; U64_BYTE_LEN] {
index.to_le_bytes()
}

pub fn remove_db(path: &str) -> Result<()> {
Ok(DB::destroy(&Options::default(), path)?)
}
Expand Down Expand Up @@ -116,24 +103,21 @@ impl PersistentQueueWithCapacity {
let mut batch = rocksdb::WriteBatch::default();

for value in values {
batch.put(Self::index_to_key(self.write_index), value);
batch.put(index_to_key(self.write_index), value);
self.write_index += 1;
if self.write_index == MAX_ALLOWED_INDEX {
self.write_index = 0;
}
}

batch.put(
Self::index_to_key(WRITE_INDEX_CELL),
index_to_key(WRITE_INDEX_CELL),
self.write_index.to_le_bytes(),
);

self.space_stat += values.iter().map(|v| v.len() as u64).sum::<u64>();

batch.put(
Self::index_to_key(SPACE_STAT_CELL),
self.space_stat.to_le_bytes(),
);
batch.put(index_to_key(SPACE_STAT_CELL), self.space_stat.to_le_bytes());

self.db.write(batch)?;

Expand All @@ -144,7 +128,7 @@ impl PersistentQueueWithCapacity {
let mut res = Vec::with_capacity(max_elts);
let mut batch = rocksdb::WriteBatch::default();
loop {
let key = Self::index_to_key(self.read_index);
let key = index_to_key(self.read_index);
let value = self.db.get(key)?;
if let Some(v) = value {
batch.delete(key);
Expand All @@ -166,14 +150,8 @@ impl PersistentQueueWithCapacity {
}
if !res.is_empty() {
self.space_stat -= res.iter().map(|v| v.len() as u64).sum::<u64>();
batch.put(
Self::index_to_key(SPACE_STAT_CELL),
self.space_stat.to_le_bytes(),
);
batch.put(
Self::index_to_key(READ_INDEX_CELL),
self.read_index.to_le_bytes(),
);
batch.put(index_to_key(SPACE_STAT_CELL), self.space_stat.to_le_bytes());
batch.put(index_to_key(READ_INDEX_CELL), self.read_index.to_le_bytes());
self.db.write(batch)?;
}

Expand Down
Loading

0 comments on commit 32327ec

Please sign in to comment.