diff --git a/rs/index/src/ivf/builder.rs b/rs/index/src/ivf/builder.rs index 9bbbc7b6..9a6dd3ed 100644 --- a/rs/index/src/ivf/builder.rs +++ b/rs/index/src/ivf/builder.rs @@ -32,6 +32,7 @@ pub struct IvfBuilder { vectors: Box>, centroids: Box>, posting_lists: Box PostingListStorage<'a>>, + doc_id_mapping: Box>, } impl IvfBuilder { @@ -70,11 +71,23 @@ impl IvfBuilder { config.num_clusters, )); + let doc_id_mapping_path = + format!("{}/builder_doc_id_mapping_storage", config.base_directory); + create_dir(&doc_id_mapping_path)?; + + let doc_id_mapping = Box::new(FileBackedAppendableVectorStorage::::new( + doc_id_mapping_path, + config.memory_size, + config.file_size, + 1, + )); + Ok(Self { config, vectors, centroids, posting_lists, + doc_id_mapping, }) } @@ -86,6 +99,10 @@ impl IvfBuilder { &*self.vectors } + pub fn doc_id_mapping(&self) -> &dyn VectorStorage { + &*self.doc_id_mapping + } + pub fn centroids(&self) -> &dyn VectorStorage { &*self.centroids } @@ -112,6 +129,12 @@ impl IvfBuilder { Ok(()) } + pub fn generate_id(&mut self, doc_id: u64) -> Result { + let generated_id = self.doc_id_mapping.len() as u32; + self.doc_id_mapping.append(std::slice::from_ref(&doc_id))?; + Ok(generated_id) + } + fn find_nearest_centroids( vector: &[f32], centroids: &dyn VectorStorage, diff --git a/rs/index/src/ivf/index.rs b/rs/index/src/ivf/index.rs index 41eb6b76..64847b13 100644 --- a/rs/index/src/ivf/index.rs +++ b/rs/index/src/ivf/index.rs @@ -47,10 +47,16 @@ impl Ivf { .get_centroid(i as usize) .with_context(|| format!("Failed to get centroid at index {}", i))?; let dist = L2DistanceCalculator::calculate(&vector, ¢roid); + println!("TYB comparing {:?} {:?} {}", vector, centroid, dist); distances.push((i as usize, dist)); } distances.select_nth_unstable_by(num_probes - 1, |a, b| a.1.total_cmp(&b.1)); - Ok(distances.into_iter().map(|(idx, _)| idx).collect()) + println!("TYB {:?}", distances); + let mut nearest_centroids: Vec<(usize, f32)> = + distances.into_iter().take(num_probes).collect(); + nearest_centroids.sort_by(|a, b| a.1.total_cmp(&b.1)); + println!("TYB {:?}", nearest_centroids); + Ok(nearest_centroids.into_iter().map(|(idx, _)| idx).collect()) } } @@ -134,11 +140,13 @@ mod tests { fn create_fixed_file_index_storage( file_path: &String, + doc_id_mapping: &Vec, centroids: &Vec>, posting_lists: &Vec>, ) -> Result { let mut file = File::create(file_path.clone())?; + let num_vectors = doc_id_mapping.len(); let num_clusters = centroids.len(); if num_clusters != posting_lists.len() { return Err(anyhow!( @@ -149,6 +157,7 @@ mod tests { } // Create a test header + let doc_id_mapping_len = size_of::() * (num_vectors + 1); let num_features = centroids[0].len(); let centroids_len = size_of::() + num_features * num_clusters * size_of::(); @@ -158,7 +167,11 @@ mod tests { offset += size_of::(); assert!(file.write_all(&(num_clusters as u32).to_le_bytes()).is_ok()); offset += size_of::(); - assert!(file.write_all(&7u64.to_le_bytes()).is_ok()); + assert!(file.write_all(&(num_vectors as u64).to_le_bytes()).is_ok()); + offset += size_of::(); + assert!(file + .write_all(&(doc_id_mapping_len as u64).to_le_bytes()) + .is_ok()); offset += size_of::(); assert!(file .write_all(&(centroids_len as u64).to_le_bytes()) @@ -175,6 +188,14 @@ mod tests { assert!(file.write_all(&pad).is_ok()); offset += pad.len(); + // Write doc_id_mapping + assert!(file.write_all(&(num_vectors as u64).to_le_bytes()).is_ok()); + offset += size_of::(); + for doc_id in doc_id_mapping.iter() { + assert!(file.write_all(&(*doc_id as u64).to_le_bytes()).is_ok()); + offset += size_of::(); + } + // Write centroids assert!(file.write_all(&(num_clusters as u64).to_le_bytes()).is_ok()); offset += size_of::(); @@ -230,9 +251,16 @@ mod tests { .expect("FixedFileVectorStorage should be created"); let file_path = format!("{}/index", base_dir); + let doc_id_mapping = vec![100, 101, 102]; let centroids = vec![vec![1.5, 2.5, 3.5], vec![5.5, 6.5, 7.5]]; let posting_lists = vec![vec![0], vec![1, 2]]; - assert!(create_fixed_file_index_storage(&file_path, ¢roids, &posting_lists).is_ok()); + assert!(create_fixed_file_index_storage( + &file_path, + &doc_id_mapping, + ¢roids, + &posting_lists + ) + .is_ok()); let index_storage = FixedIndexFile::new(file_path).expect("FixedIndexFile should be created"); @@ -256,15 +284,22 @@ mod tests { .to_str() .expect("Failed to convert temporary directory path to string") .to_string(); - let file_path = format!("{}/centroids", base_dir); + let file_path = format!("{}/index", base_dir); let vector = vec![3.0, 4.0, 5.0]; + let doc_id_mapping = vec![100, 101, 102]; let centroids = vec![ vec![1.0, 2.0, 3.0], vec![4.0, 5.0, 6.0], vec![7.0, 8.0, 9.0], ]; let posting_lists = vec![vec![0], vec![1], vec![2]]; - assert!(create_fixed_file_index_storage(&file_path, ¢roids, &posting_lists).is_ok()); + assert!(create_fixed_file_index_storage( + &file_path, + &doc_id_mapping, + ¢roids, + &posting_lists + ) + .is_ok()); let index_storage = FixedIndexFile::new(file_path).expect("FixedIndexFile should be created"); let num_probes = 2; @@ -298,9 +333,16 @@ mod tests { .expect("FixedFileVectorStorage should be created"); let file_path = format!("{}/index", base_dir); + let doc_id_mapping = vec![100, 101, 102, 103]; let centroids = vec![vec![1.5, 2.5, 3.5], vec![5.5, 6.5, 7.5]]; let posting_lists = vec![vec![0, 3], vec![1, 2]]; - assert!(create_fixed_file_index_storage(&file_path, ¢roids, &posting_lists).is_ok()); + assert!(create_fixed_file_index_storage( + &file_path, + &doc_id_mapping, + ¢roids, + &posting_lists + ) + .is_ok()); let index_storage = FixedIndexFile::new(file_path).expect("FixedIndexFile should be created"); @@ -340,9 +382,16 @@ mod tests { .expect("FixedFileVectorStorage should be created"); let file_path = format!("{}/index", base_dir); + let doc_id_mapping = vec![100]; let centroids = vec![vec![100.0, 200.0, 300.0]]; let posting_lists = vec![vec![0]]; - assert!(create_fixed_file_index_storage(&file_path, ¢roids, &posting_lists).is_ok()); + assert!(create_fixed_file_index_storage( + &file_path, + &doc_id_mapping, + ¢roids, + &posting_lists + ) + .is_ok()); let index_storage = FixedIndexFile::new(file_path).expect("FixedIndexFile should be created"); diff --git a/rs/index/src/ivf/reader.rs b/rs/index/src/ivf/reader.rs index 72c93f7d..6739f3aa 100644 --- a/rs/index/src/ivf/reader.rs +++ b/rs/index/src/ivf/reader.rs @@ -68,10 +68,13 @@ mod tests { }) .expect("Failed to create builder"); // Generate 1000 vectors of f32, dimension 4 - for _ in 0..num_vectors { + for i in 0..num_vectors { builder .add_vector(generate_random_vector(num_features)) .expect("Vector should be added"); + builder + .generate_id((i + 100) as u64) + .expect("Id should be generated"); } assert!(builder.build().is_ok()); @@ -118,6 +121,19 @@ mod tests { index.index_storage.header().centroids_len, (num_clusters * num_features * size_of::() + size_of::()) as u64 ); + // Verify doc_id_mapping content + for i in 0..num_vectors { + let ref_id = builder + .doc_id_mapping() + .get(i as u32) + .expect("Failed to read doc_id from FileBackedAppendableVectorStorage"); + let read_id = index + .index_storage + .get_doc_id(i) + .expect("Failed to read doc_id from FixedFileVectorStorage"); + assert_eq!(ref_id.len(), 1); + assert_eq!((*ref_id)[0], read_id); + } // Verify centroid content for i in 0..num_clusters { let ref_vector = builder diff --git a/rs/index/src/ivf/writer.rs b/rs/index/src/ivf/writer.rs index 02962963..d928a01c 100644 --- a/rs/index/src/ivf/writer.rs +++ b/rs/index/src/ivf/writer.rs @@ -35,6 +35,19 @@ impl IvfWriter { )); } + // Write doc_id_mapping + let doc_id_mapping_len = self + .write_doc_id_mapping(ivf_builder) + .context("Failed to write doc_id_mapping")?; + let expected_bytes_written = std::mem::size_of::() * (num_vectors + 1); + if doc_id_mapping_len != expected_bytes_written { + return Err(anyhow!( + "Expected to write {} bytes in centroid storage, but wrote {}", + expected_bytes_written, + doc_id_mapping_len, + )); + } + // Write centroids let centroids_len = self .write_centroids(ivf_builder) @@ -69,6 +82,7 @@ impl IvfWriter { num_features: num_features as u32, num_clusters: num_clusters as u32, num_vectors: num_vectors as u64, + doc_id_mapping_len: doc_id_mapping_len as u64, centroids_len: centroids_len as u64, posting_lists_len: posting_lists_len as u64, }; @@ -86,6 +100,15 @@ impl IvfWriter { Ok(bytes_written) } + fn write_doc_id_mapping(&self, ivf_builder: &IvfBuilder) -> Result { + let path = format!("{}/doc_id_mapping", self.base_directory); + let mut file = File::create(path)?; + let mut writer = BufWriter::new(&mut file); + + let bytes_written = ivf_builder.doc_id_mapping().write(&mut writer)?; + Ok(bytes_written) + } + fn write_centroids(&self, ivf_builder: &IvfBuilder) -> Result { let path = format!("{}/centroids", self.base_directory); let mut file = File::create(path)?; @@ -113,6 +136,7 @@ impl IvfWriter { written += wrap_write(writer, &header.num_features.to_le_bytes())?; written += wrap_write(writer, &header.num_clusters.to_le_bytes())?; written += wrap_write(writer, &header.num_vectors.to_le_bytes())?; + written += wrap_write(writer, &header.doc_id_mapping_len.to_le_bytes())?; written += wrap_write(writer, &header.centroids_len.to_le_bytes())?; written += wrap_write(writer, &header.posting_lists_len.to_le_bytes())?; Ok(written) @@ -120,6 +144,7 @@ impl IvfWriter { /// Combine all individual files into one final index file. Keep vectors file separate. fn combine_files(&self, header: &Header) -> Result { + let doc_id_mapping_path = format!("{}/doc_id_mapping", self.base_directory); let centroids_path = format!("{}/centroids", self.base_directory); let posting_lists_path = format!("{}/posting_lists", self.base_directory); @@ -133,6 +158,9 @@ impl IvfWriter { // Compute padding for alignment to 8 bytes written += Self::write_pad(written, &mut combined_buffer_writer, 8)?; + written += append_file_to_writer(&doc_id_mapping_path, &mut combined_buffer_writer)?; + + // No need for padding, doc_id_mapping is always 8-byte aligned written += append_file_to_writer(¢roids_path, &mut combined_buffer_writer)?; // Pad again in case num_features and num_clusters are both odd @@ -143,6 +171,7 @@ impl IvfWriter { .flush() .context("Failed to flush combined buffer")?; + remove_file(format!("{}/doc_id_mapping", self.base_directory))?; remove_file(format!("{}/centroids", self.base_directory))?; remove_file(format!("{}/posting_lists", self.base_directory))?; @@ -202,6 +231,7 @@ mod tests { // Create test files create_test_file(&base_directory, "centroids", &[5, 6, 7, 8])?; create_test_file(&base_directory, "posting_lists", &[9, 10, 11, 12])?; + create_test_file(&base_directory, "doc_id_mapping", &[100, 101, 102, 103])?; // Create a test header let header = Header { @@ -209,6 +239,7 @@ mod tests { num_features: 10, num_clusters: 5, num_vectors: 4, + doc_id_mapping_len: 4, centroids_len: 4, posting_lists_len: 4, }; @@ -231,6 +262,7 @@ mod tests { 10, 0, 0, 0, // num_features (little-endian) 5, 0, 0, 0, // num_clusters (little-endian) 4, 0, 0, 0, 0, 0, 0, 0, // num_vectors (little-endian) + 4, 0, 0, 0, 0, 0, 0, 0, // doc_id_mapping_len (little-endian) 4, 0, 0, 0, 0, 0, 0, 0, // centroids_len (little-endian) 4, 0, 0, 0, 0, 0, 0, 0, // posting_lists_len (little-endian) ]; @@ -246,11 +278,19 @@ mod tests { ); // Verify the content of the files + // doc_id_mapping let offset = expected_header.len(); - assert_eq!(&combined_content[offset..offset + 4], [5, 6, 7, 8]); + assert_eq!(&combined_content[offset..offset + 4], [100, 101, 102, 103]); - // Check for padding after centroids + // centroids let mut next_offset = offset + 4; + assert_eq!( + &combined_content[next_offset..next_offset + 4], + [5, 6, 7, 8] + ); + + // Check for padding after centroids + next_offset += 4; while next_offset % 8 != 0 { assert_eq!(combined_content[next_offset], 0); next_offset += 1; @@ -315,11 +355,15 @@ mod tests { .expect("Failed to create builder"); // Generate 1000 vectors of f32, dimension 4 let mut original_vectors = Vec::new(); - for _ in 0..num_vectors { + for i in 0..num_vectors { let vector = generate_random_vector(num_features); original_vectors.push(vector.clone()); builder.add_vector(vector).expect("Vector should be added"); + builder + .generate_id((i + 100) as u64) + .expect("Id should be generated"); } + assert_eq!(builder.doc_id_mapping().len(), 1000); assert!(builder.build().is_ok()); @@ -328,6 +372,7 @@ mod tests { // Check if files were created and removed correctly assert!(fs::metadata(format!("{}/vectors", base_directory)).is_ok()); assert!(fs::metadata(format!("{}/index", base_directory)).is_ok()); + assert!(fs::metadata(format!("{}/doc_id_mapping", base_directory)).is_err()); assert!(fs::metadata(format!("{}/centroids", base_directory)).is_err()); assert!(fs::metadata(format!("{}/posting_lists", base_directory)).is_err()); @@ -374,6 +419,14 @@ mod tests { .expect("Failed to read num_vectors"); assert_eq!(stored_num_vectors, num_vectors as u64); + let doc_id_mapping_len = index_reader + .read_u64::() + .expect("Failed to read doc_id_mapping_len"); + assert_eq!( + doc_id_mapping_len, + (std::mem::size_of::() * (num_vectors + 1)) as u64 + ); + let centroids_len = index_reader .read_u64::() .expect("Failed to read centroids_len"); @@ -386,6 +439,9 @@ mod tests { .metadata() .expect("Failed to get file metadata") .len(); - assert_eq!(file_size, 33 + 7 + centroids_len + posting_lists_len); // 33 bytes for header + 7 padding + assert_eq!( + file_size, + 41 + 7 + doc_id_mapping_len + centroids_len + posting_lists_len + ); // 41 bytes for header + 7 padding } } diff --git a/rs/index/src/posting_list/combined_file.rs b/rs/index/src/posting_list/combined_file.rs index 707f81d5..1e901d86 100644 --- a/rs/index/src/posting_list/combined_file.rs +++ b/rs/index/src/posting_list/combined_file.rs @@ -19,6 +19,7 @@ pub struct Header { pub num_features: u32, pub num_clusters: u32, pub num_vectors: u64, + pub doc_id_mapping_len: u64, pub centroids_len: u64, pub posting_lists_len: u64, } @@ -28,6 +29,7 @@ pub struct FixedIndexFile { mmap: Mmap, header: Header, + doc_id_mapping_offset: usize, centroid_offset: usize, posting_list_metadata_offset: usize, } @@ -38,7 +40,12 @@ impl FixedIndexFile { .read(true) .open(file_path.clone())?; let mmap = unsafe { Mmap::map(&file) }?; - let (header, centroid_offset) = Self::read_header(&mmap)?; + let (header, doc_id_mapping_offset) = Self::read_header(&mmap)?; + + let centroid_offset = Self::align_to_next_boundary( + doc_id_mapping_offset + header.doc_id_mapping_len as usize, + 8, + ); let posting_list_metadata_offset = Self::align_to_next_boundary(centroid_offset + header.centroids_len as usize, 8) @@ -47,6 +54,7 @@ impl FixedIndexFile { _marker: PhantomData, mmap, header, + doc_id_mapping_offset, centroid_offset, posting_list_metadata_offset, }) @@ -66,6 +74,8 @@ impl FixedIndexFile { offset += 4; let num_vectors = LittleEndian::read_u64(&buffer[offset..]); offset += 8; + let doc_id_mapping_len = LittleEndian::read_u64(&buffer[offset..]); + offset += 8; let centroids_len = LittleEndian::read_u64(&buffer[offset..]); offset += 8; let posting_lists_len = LittleEndian::read_u64(&buffer[offset..]); @@ -76,6 +86,7 @@ impl FixedIndexFile { num_features, num_clusters, num_vectors, + doc_id_mapping_len, centroids_len, posting_lists_len, }; @@ -91,6 +102,19 @@ impl FixedIndexFile { (current_position + mask) & !mask } + pub fn get_doc_id(&self, index: usize) -> Result { + if index >= self.header.num_vectors as usize { + return Err(anyhow!("Index out of bound")); + } + + let start = self.doc_id_mapping_offset + + size_of::() // Read another u64 which encodes num_vectors (when combining with + // doc_id_mapping storage) + + index * size_of::(); + let slice = &self.mmap[start..start + size_of::()]; + Ok(u64::from_le_bytes(slice.try_into()?)) + } + pub fn get_centroid(&self, index: usize) -> Result<&[f32]> { if index >= self.header.num_clusters as usize { return Err(anyhow!("Index out of bound")); @@ -157,6 +181,7 @@ mod tests { 4, 0, 0, 0, // num_features (little-endian) 2, 0, 0, 0, // num_clusters (little-endian) 4, 0, 0, 0, 0, 0, 0, 0, // num_vectors (little-endian) + 40, 0, 0, 0, 0, 0, 0, 0, // doc_id_mapping_len (little-endian) 40, 0, 0, 0, 0, 0, 0, 0, // centroids_len (little-endian) 9, 0, 0, 0, 0, 0, 0, 0, // posting_lists_len - garbage (little-endian) ]; @@ -167,6 +192,14 @@ mod tests { } assert!(file.write_all(&header).is_ok()); + let doc_id_mapping: Vec = vec![100, 200, 300, 400]; + let num_vectors = vec![4, 0, 0, 0, 0, 0, 0, 0]; + assert!(file.write_all(&num_vectors).is_ok()); + assert!(file + .write_all(transmute_slice_to_u8(&doc_id_mapping)) + .is_ok()); + // No need for padding here + let centroids: Vec> = vec![vec![1.0, 2.0, 3.0, 4.0], vec![5.0, 6.0, 7.0, 8.0]]; let num_clusters = vec![2, 0, 0, 0, 0, 0, 0, 0]; assert!(file.write_all(&num_clusters).is_ok()); @@ -192,9 +225,36 @@ mod tests { assert_eq!(combined_file.header.num_features, 4); assert_eq!(combined_file.header.num_clusters, 2); assert_eq!(combined_file.header.num_vectors, 4); + assert_eq!(combined_file.header.doc_id_mapping_len, 40); assert_eq!(combined_file.header.centroids_len, 40); assert_eq!(combined_file.header.posting_lists_len, 9); + assert_eq!( + combined_file + .get_doc_id(0) + .expect("Failed to read doc_id_mapping"), + doc_id_mapping[0] + ); + assert_eq!( + combined_file + .get_doc_id(1) + .expect("Failed to read doc_id_mapping"), + doc_id_mapping[1] + ); + assert_eq!( + combined_file + .get_doc_id(2) + .expect("Failed to read doc_id_mapping"), + doc_id_mapping[2] + ); + assert_eq!( + combined_file + .get_doc_id(3) + .expect("Failed to read doc_id_mapping"), + doc_id_mapping[3] + ); + assert!(combined_file.get_doc_id(4).is_err()); + assert_eq!( combined_file .get_centroid(0) diff --git a/rs/index_writer/src/index_writer.rs b/rs/index_writer/src/index_writer.rs index 8c2f26e3..66b2f9db 100644 --- a/rs/index_writer/src/index_writer.rs +++ b/rs/index_writer/src/index_writer.rs @@ -128,6 +128,7 @@ impl IndexWriter { while input.has_next() { let row = input.next(); ivf_builder.add_vector(row.data.to_vec())?; + ivf_builder.generate_id(row.id)?; if row.id % 10000 == 0 { debug!("Inserted {} rows", row.id); }