Skip to content

Commit

Permalink
Read pending segment on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
hicder committed Feb 1, 2025
1 parent 1e889ad commit bc5acd8
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 2 deletions.
34 changes: 34 additions & 0 deletions rs/index/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ mod tests {
use rand::Rng;
use tempdir::TempDir;

use super::reader::CollectionReader;
use crate::collection::Collection;
use crate::optimizers::noop::NoopOptimizer;
use crate::segment::{BoxedImmutableSegment, MockedSegment, Segment};
Expand Down Expand Up @@ -825,4 +826,37 @@ mod tests {
stopped.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
}

#[test]
fn test_collection_reader() -> Result<()> {
let temp_dir = TempDir::new("test_collection")?;
let base_directory: String = temp_dir.path().to_str().unwrap().to_string();
let segment_config = CollectionConfig::default_test_config();
// write the collection config
let collection_config_path = format!("{}/collection_config.json", base_directory);
serde_json::to_writer_pretty(std::fs::File::create(collection_config_path)?, &segment_config)?;

{
let collection =
Arc::new(Collection::new(base_directory.clone(), segment_config).unwrap());

collection.insert_for_users(&[0], 1, &[1.0, 2.0, 3.0, 4.0])?;
collection.flush()?;

let segment_names = collection.get_all_segment_names();
assert_eq!(segment_names.len(), 1);

let pending_segment = collection.init_optimizing(&segment_names)?;

let toc = collection.get_current_toc();
assert_eq!(toc.pending.len(), 1);
assert_eq!(toc.pending.get(&pending_segment).unwrap().len(), 1);
}

let reader = CollectionReader::new(base_directory);
let collection = reader.read()?;
let toc = collection.get_current_toc();
assert_eq!(toc.pending.len(), 1);
Ok(())
}
}
49 changes: 47 additions & 2 deletions rs/index/src/collection/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use utils::io::get_latest_version;
use super::{Collection, TableOfContent};
use crate::multi_spann::reader::MultiSpannReader;
use crate::segment::immutable_segment::ImmutableSegment;
use crate::segment::BoxedImmutableSegment;
use crate::segment::pending_segment::PendingSegment;
use crate::segment::{BoxedImmutableSegment, Segment};

pub struct CollectionReader {
path: String,
Expand All @@ -34,9 +35,14 @@ impl CollectionReader {
let toc_path = format!("{}/version_{}", self.path, latest_version);
let toc: TableOfContent = serde_json::from_reader(std::fs::File::open(toc_path)?)?;

// let collection = Arc::new(Collection::new(self.path.clone()));
// Read the segments
let mut segments: Vec<BoxedImmutableSegment> = vec![];
for name in &toc.toc {
// We read pending segments later
if toc.pending.contains_key(name) {
continue;
}

let spann_path = format!("{}/{}", self.path, name);
let spann_reader = MultiSpannReader::new(spann_path);
match collection_config.quantization_type {
Expand All @@ -55,6 +61,45 @@ impl CollectionReader {
};
}

// Empty all the pending segments
let pending_segment_names = toc.pending.keys().collect::<Vec<&String>>();
for pending_segment_name in pending_segment_names {
let pending_segment_path = format!("{}/{}", self.path, pending_segment_name);
std::fs::remove_dir_all(&pending_segment_path).unwrap();
std::fs::create_dir_all(&pending_segment_path).unwrap();

// Get the inner segments
let inner_segment_names = toc.pending.get(pending_segment_name).unwrap();
let mut inner_segments: Vec<BoxedImmutableSegment> = vec![];
for inner_segment_name in inner_segment_names {
for segment in &segments {
if segment.name() == *inner_segment_name {
inner_segments.push(segment.clone());
break;
}
}
}

match collection_config.quantization_type {
QuantizerType::ProductQuantizer => {
segments.push(BoxedImmutableSegment::PendingProductQuantizationSegment(
Arc::new(RwLock::new(PendingSegment::new(
inner_segments,
pending_segment_path,
))),
));
}
QuantizerType::NoQuantizer => {
segments.push(BoxedImmutableSegment::PendingNoQuantizationSegment(
Arc::new(RwLock::new(PendingSegment::new(
inner_segments,
pending_segment_path,
))),
));
}
}
}

let collection = Arc::new(Collection::init_from(
self.path.clone(),
latest_version,
Expand Down
5 changes: 5 additions & 0 deletions rs/index/src/segment/pending_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ impl<Q: Quantizer> PendingSegment<Q> {

// Caller must hold the read lock before calling this function.
pub fn build_index(&self) -> Result<()> {
if self.use_internal_index {
// We shouldn't build the index if it already exists.
return Err(anyhow::anyhow!("Index already exists"));
}

let current_directory = format!("{}/{}", self.parent_directory, self.name);
let reader = MultiSpannReader::new(current_directory);
let index = reader.read::<Q>()?;
Expand Down

0 comments on commit bc5acd8

Please sign in to comment.