Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Nov 27, 2023
1 parent 8a1deee commit b8a8760
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 120 deletions.
13 changes: 4 additions & 9 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use log::trace;

use crate::{value::SeqNo, Tree, Value};
use crate::{Tree, Value};

/// An atomic write batch
pub struct Batch {
Expand All @@ -20,18 +20,13 @@ impl Batch {

/// Inserts a key-value pair into the batch
pub fn insert<K: Into<Vec<u8>>, V: Into<Vec<u8>>>(&mut self, key: K, value: V) {
self.data.push(Value::new(
key.into(),
value.into(),
false,
0, /* TODO: */
));
self.data
.push(Value::new(key.into(), value.into(), false, 0));
}

/// Adds a tombstone marker for a key
pub fn remove<K: Into<Vec<u8>>>(&mut self, key: K) {
self.data
.push(Value::new(key.into(), vec![], true, 0 /* TODO: */));
self.data.push(Value::new(key.into(), vec![], true, 0));
}

/// Commits the batch to the LSM-tree atomically
Expand Down
8 changes: 5 additions & 3 deletions src/commit_log/marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ mod tests {
use test_log::test;

#[test]
fn test_serialize_and_deserialize_success() {
fn test_serialize_and_deserialize_success() -> crate::Result<()> {
let item = Marker::Item(Value::new(vec![1, 2, 3], vec![], false, 42));

// Serialize
let mut serialized_data = Vec::new();
item.serialize(&mut serialized_data).unwrap();
item.serialize(&mut serialized_data)?;

// Deserialize
let mut reader = &serialized_data[..];
let deserialized_item = Marker::deserialize(&mut reader).unwrap();
let deserialized_item = Marker::deserialize(&mut reader)?;

assert_eq!(item, deserialized_item);

Ok(())
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions src/commit_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ mod tests {
use test_log::test;

#[test]
fn test_write() -> std::io::Result<()> {
fn test_write() -> crate::Result<()> {
let folder = tempdir()?.into_path();
let log_path = folder.join("commit_log");
let mut commit_log = CommitLog::new(&log_path)?;
Expand All @@ -108,7 +108,7 @@ mod tests {
Value::new(vec![7, 8, 9], vec![], false, 43),
];

commit_log.append_batch(items).unwrap();
commit_log.append_batch(items)?;
commit_log.flush()?;

let file_content = std::fs::read(log_path)?;
Expand Down
2 changes: 1 addition & 1 deletion src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn to_base36(mut x: u32) -> String {

/// Generate a ID for a segment
///
/// Like https://cassandra.apache.org/_/blog/Apache-Cassandra-4.1-New-SSTable-Identifiers.html
/// Like <https://cassandra.apache.org/_/blog/Apache-Cassandra-4.1-New-SSTable-Identifiers.html>
pub fn generate_segment_id() -> String {
let now = chrono::Utc::now();

Expand Down
9 changes: 7 additions & 2 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ impl Segment {
metadata.id.clone(),
folder.as_ref(),
Arc::clone(&block_cache),
)
.unwrap();
)?;

Ok(Self {
metadata,
Expand Down Expand Up @@ -83,6 +82,12 @@ impl Segment {
///
/// Will return `Err` if an IO error occurs
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<Value>> {
if !self.key_range_contains(&key) {
return Ok(None);
}

// TODO: bloom

let block_ref = self.block_index.get_latest(key.as_ref());

Ok(match block_ref {
Expand Down
64 changes: 32 additions & 32 deletions src/segment/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl PrefixedReader {
) -> crate::Result<Self> {
let prefix = prefix.into();

//TODO: optimize upper bound
let upper_bound = block_index.get_prefix_upper_bound(&prefix);
let upper_bound = upper_bound.map(|x| x.start_key).map_or(Unbounded, Excluded);

Expand Down Expand Up @@ -107,16 +106,15 @@ mod tests {
use test_log::test;

#[test]
fn test_lots_of_prefixed() {
fn test_lots_of_prefixed() -> crate::Result<()> {
for item_count in [1, 10, 100, 1_000, 10_000] {
let folder = tempfile::tempdir().unwrap().into_path();
let folder = tempfile::tempdir()?.into_path();

let mut writer = Writer::new(Options {
path: folder.clone(),
evict_tombstones: false,
block_size: 4096,
})
.unwrap();
})?;

for x in 0_u64..item_count {
let item = Value::new(
Expand All @@ -129,7 +127,7 @@ mod tests {
false,
0,
);
writer.write(item).unwrap();
writer.write(item)?;
}

for x in 0_u64..item_count {
Expand All @@ -143,7 +141,7 @@ mod tests {
false,
0,
);
writer.write(item).unwrap();
writer.write(item)?;
}

for x in 0_u64..item_count {
Expand All @@ -157,19 +155,20 @@ mod tests {
false,
0,
);
writer.write(item).unwrap();
writer.write(item)?;
}

writer.finish().unwrap();
writer.finish()?;

let metadata = Metadata::from_writer(nanoid::nanoid!(), writer);
metadata.write_to_file().unwrap();
metadata.write_to_file()?;

let block_cache = Arc::new(BlockCache::new(usize::MAX));
let meta_index = Arc::new(
MetaIndex::from_file(metadata.id.clone(), &folder, Arc::clone(&block_cache))
.unwrap(),
);
let meta_index = Arc::new(MetaIndex::from_file(
metadata.id.clone(),
&folder,
Arc::clone(&block_cache),
)?);

let iter = Reader::new(
folder.join("blocks"),
Expand All @@ -178,8 +177,7 @@ mod tests {
Arc::clone(&meta_index),
None,
None,
)
.unwrap();
)?;
assert_eq!(iter.count() as u64, item_count * 3);

let iter = PrefixedReader::new(
Expand All @@ -188,8 +186,7 @@ mod tests {
Arc::clone(&block_cache),
Arc::clone(&meta_index),
b"a/b/".to_vec(),
)
.unwrap();
)?;

assert_eq!(iter.count() as u64, item_count);

Expand All @@ -199,23 +196,23 @@ mod tests {
Arc::clone(&block_cache),
Arc::clone(&meta_index),
b"a/b/".to_vec(),
)
.unwrap();
)?;

assert_eq!(iter.rev().count() as u64, item_count);
}

Ok(())
}

#[test]
fn test_prefixed() {
let folder = tempfile::tempdir().unwrap().into_path();
fn test_prefixed() -> crate::Result<()> {
let folder = tempfile::tempdir()?.into_path();

let mut writer = Writer::new(Options {
path: folder.clone(),
evict_tombstones: false,
block_size: 4096,
})
.unwrap();
})?;

let items = [
b"a".to_vec(),
Expand All @@ -235,18 +232,20 @@ mod tests {
.map(|(idx, key)| Value::new(key, nanoid::nanoid!(), false, idx as SeqNo));

for item in items {
writer.write(item).unwrap();
writer.write(item)?;
}

writer.finish().unwrap();
writer.finish()?;

let metadata = Metadata::from_writer(nanoid::nanoid!(), writer);
metadata.write_to_file().unwrap();
metadata.write_to_file()?;

let block_cache = Arc::new(BlockCache::new(usize::MAX));
let meta_index = Arc::new(
MetaIndex::from_file(metadata.id.clone(), &folder, Arc::clone(&block_cache)).unwrap(),
);
let meta_index = Arc::new(MetaIndex::from_file(
metadata.id.clone(),
&folder,
Arc::clone(&block_cache),
)?);

let expected = [
(b"a".to_vec(), 9),
Expand All @@ -266,10 +265,11 @@ mod tests {
Arc::clone(&block_cache),
Arc::clone(&meta_index),
prefix_key,
)
.unwrap();
)?;

assert_eq!(iter.count(), item_count);
}

Ok(())
}
}
Loading

0 comments on commit b8a8760

Please sign in to comment.