diff --git a/src/query/storages/common/index/src/inverted_index.rs b/src/query/storages/common/index/src/inverted_index.rs index 907f24f1ec6f..23d5fadbc27d 100644 --- a/src/query/storages/common/index/src/inverted_index.rs +++ b/src/query/storages/common/index/src/inverted_index.rs @@ -36,6 +36,8 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; +use std::fmt::Display; +use std::fmt::Formatter; use std::io; use std::io::BufWriter; use std::io::Cursor; @@ -242,6 +244,33 @@ impl Automaton for DfaWrapper { } } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct TermId { + // the `field_id` of term. + pub field_id: u32, + // the ordinal of term in FST. + pub term_ordinal: u64, +} + +impl Display for TermId { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "field_id={} term_ordinal={}", + self.field_id, self.term_ordinal + ) + } +} + +impl TermId { + fn new(field_id: u32, term_ordinal: u64) -> Self { + Self { + field_id, + term_ordinal, + } + } +} + // Read term related infos. #[derive(Clone)] pub struct TermReader { @@ -252,23 +281,21 @@ pub struct TermReader { // These terms are in `fst`, means that those terms exist in the block, // we need to use related term infos to determine the matched `doc_ids`. // Use `term_id` as key to get related information and avoid copy `term`. - term_map: HashMap, - // key is `term_id`, value is `field_id`. - term_field_id_map: HashMap, + term_map: HashMap, // key is `term_id`, value is `term_info`. - term_infos: HashMap, + term_infos: HashMap, // key is `term_id`, value is related `BlockSegmentPostings`, // used to read `doc_ids` and `term_freqs`. - block_postings_map: HashMap, + block_postings_map: HashMap, // key is `term_id`, value is related `PositionReader`, // used to read `positions` in each docs. - position_reader_map: HashMap, + position_reader_map: HashMap, // key is `term_id`, value is related `doc_ids`. // `doc_ids` is lazy loaded when used. - doc_ids: HashMap, + doc_ids: HashMap, // key is `term_id`, value is related `term_freqs`. // `term_freqs` is lazy loaded when used. - term_freqs: HashMap>, + term_freqs: HashMap>, // key is `field_id`, value is the `FieldNormReader`, used to read fieldnorm. fieldnorm_reader_map: HashMap, // key is `field_id`, value is the number of tokens. @@ -280,29 +307,20 @@ impl TermReader { row_count: u64, need_position: bool, has_score: bool, - terms: HashMap, - term_infos: HashMap, - block_postings_map: HashMap, - position_reader_map: HashMap, + term_map: HashMap, + term_infos: HashMap, + block_postings_map: HashMap, + position_reader_map: HashMap, fieldnorm_reader_map: HashMap, field_num_tokens_map: HashMap, ) -> Self { - let term_len = terms.len(); - let term_field_id_map = terms - .iter() - .map(|(_, (field_id, term_id))| (*term_id, *field_id)) - .collect::>(); - let term_map = terms - .into_iter() - .map(|(term, (_, term_id))| (term, term_id)) - .collect::>(); + let term_len = term_map.len(); Self { row_count, need_position, has_score, term_map, - term_field_id_map, term_infos, block_postings_map, position_reader_map, @@ -314,9 +332,9 @@ impl TermReader { } // get `doc_ids` of a `term_id`, - fn get_doc_ids(&mut self, term_id: u64) -> Result<&RoaringTreemap> { + fn get_doc_ids(&mut self, term_id: TermId) -> Result<&RoaringTreemap> { if let std::collections::hash_map::Entry::Vacant(doc_ids_entry) = - self.doc_ids.entry(term_id) + self.doc_ids.entry(term_id.clone()) { // `doc_ids` are lazy loaded when used. let block_postings = self.block_postings_map.get_mut(&term_id).ok_or_else(|| { @@ -358,7 +376,7 @@ impl TermReader { block_postings.advance(); } doc_ids_entry.insert(doc_ids); - self.term_freqs.insert(term_id, term_freqs); + self.term_freqs.insert(term_id.clone(), term_freqs); } let doc_ids = self.doc_ids.get(&term_id).unwrap(); @@ -369,11 +387,11 @@ impl TermReader { // which is used to read `positions`. fn get_position_offsets( &mut self, - term_id: u64, + term_id: &TermId, all_doc_ids: &RoaringTreemap, ) -> Result> { - let doc_ids = self.doc_ids.get(&term_id).unwrap(); - let term_freqs = self.term_freqs.get(&term_id).unwrap(); + let doc_ids = self.doc_ids.get(term_id).unwrap(); + let term_freqs = self.term_freqs.get(term_id).unwrap(); let mut doc_offset = 0; let mut offset_and_term_freqs = HashMap::with_capacity(all_doc_ids.len() as usize); @@ -396,11 +414,11 @@ impl TermReader { // get `positions` of a `term_id` in a `doc`. fn get_positions( &mut self, - term_id: u64, + term_id: &TermId, doc_offset: u64, term_freq: u32, ) -> Result { - let position_reader = self.position_reader_map.get_mut(&term_id).ok_or_else(|| { + let position_reader = self.position_reader_map.get_mut(term_id).ok_or_else(|| { ErrorCode::TantivyError(format!( "inverted index position reader `{}` does not exist", term_id @@ -417,14 +435,10 @@ impl TermReader { Ok(term_poses) } - fn term_id(&self, term: &Term) -> Option<&u64> { + fn term_id(&self, term: &Term) -> Option<&TermId> { self.term_map.get(term) } - fn field_id(&self, term_id: &u64) -> Option<&u32> { - self.term_field_id_map.get(term_id) - } - fn fieldnorm_id(&self, field_id: u32, doc_id: u64) -> u8 { if let Some(fieldnorm_reader) = self.fieldnorm_reader_map.get(&field_id) { fieldnorm_reader.fieldnorm_id(doc_id as u32) @@ -433,9 +447,9 @@ impl TermReader { } } - fn term_freq(&self, term_id: u64, doc_id: u64) -> Option<&u32> { + fn term_freq(&self, term_id: &TermId, doc_id: u64) -> Option<&u32> { if let (Some(doc_ids), Some(term_freqs)) = - (self.doc_ids.get(&term_id), self.term_freqs.get(&term_id)) + (self.doc_ids.get(term_id), self.term_freqs.get(term_id)) { if doc_ids.contains(doc_id) { // if not store `term_freq`, return 1 as default value. @@ -506,10 +520,11 @@ impl DocIdsCollector { fst_map: &tantivy_fst::Map, field_id: u32, term: &Term, - matched_terms: &mut HashMap, + matched_terms: &mut HashMap, ) -> bool { - if let Some(term_id) = fst_map.get(term.serialized_value_bytes()) { - matched_terms.insert(term.clone(), (field_id, term_id)); + if let Some(term_ord) = fst_map.get(term.serialized_value_bytes()) { + let term_id = TermId::new(field_id, term_ord); + matched_terms.insert(term.clone(), term_id); true } else { false @@ -536,9 +551,9 @@ impl DocIdsCollector { query: Box, fst_maps: &HashMap>, fuzziness: &Option, - matched_terms: &mut HashMap, - prefix_terms: &mut HashMap>, - fuzziness_terms: &mut HashMap>, + matched_terms: &mut HashMap, + prefix_terms: &mut HashMap>, + fuzziness_terms: &mut HashMap>, ) -> Result { if let Some(term_query) = query.downcast_ref::() { let term = term_query.term(); @@ -622,10 +637,11 @@ impl DocIdsCollector { let mut prefix_term_ids = vec![]; let mut stream = fst_map.search(&re).into_stream(); - while let Some((key, term_id)) = stream.next() { + while let Some((key, term_ord)) = stream.next() { let key_str = unsafe { std::str::from_utf8_unchecked(key) }; let prefix_term = Term::from_field_text(field, key_str); - matched_terms.insert(prefix_term.clone(), (field_id, term_id)); + let term_id = TermId::new(field_id, term_ord); + matched_terms.insert(prefix_term.clone(), term_id.clone()); prefix_term_ids.push(term_id); } let matched = !prefix_term_ids.is_empty(); @@ -648,10 +664,11 @@ impl DocIdsCollector { let mut fuzz_term_ids = vec![]; let mut stream = fst_map.search(automaton).into_stream(); - while let Some((key, term_id)) = stream.next() { + while let Some((key, term_ord)) = stream.next() { let key_str = unsafe { std::str::from_utf8_unchecked(key) }; let fuzz_term = Term::from_field_text(field, key_str); - matched_terms.insert(fuzz_term.clone(), (field_id, term_id)); + let term_id = TermId::new(field_id, term_ord); + matched_terms.insert(fuzz_term.clone(), term_id.clone()); fuzz_term_ids.push(term_id); } let matched = !fuzz_term_ids.is_empty(); @@ -709,7 +726,7 @@ impl DocIdsCollector { &mut self, query_key: String, phrase_terms: Vec<(usize, Term)>, - prefix_term: Option<(usize, &Vec)>, + prefix_term: Option<(usize, &Vec)>, ) -> Result> { let mut query_term_poses = Vec::with_capacity(phrase_terms.len()); for (term_pos, term) in &phrase_terms { @@ -717,7 +734,7 @@ impl DocIdsCollector { let Some(term_id) = self.term_reader.term_id(term) else { return Ok(None); }; - query_term_poses.push((*term_pos, *term_id)); + query_term_poses.push((*term_pos, term_id.clone())); } if query_term_poses.is_empty() { return Ok(None); @@ -727,9 +744,9 @@ impl DocIdsCollector { let first_term_id = &query_term_poses[0].1; let mut term_ids = HashSet::with_capacity(phrase_terms.len() + 1); - term_ids.insert(*first_term_id); + term_ids.insert(first_term_id.clone()); - let first_doc_ids = self.term_reader.get_doc_ids(*first_term_id)?; + let first_doc_ids = self.term_reader.get_doc_ids(first_term_id.clone())?; let mut candidate_doc_ids = RoaringTreemap::new(); candidate_doc_ids.bitor_assign(first_doc_ids); @@ -738,16 +755,16 @@ impl DocIdsCollector { let mut query_term_offsets = Vec::with_capacity(query_term_poses.len() - 1); for (term_pos, term_id) in query_term_poses.iter().skip(1) { if !term_ids.contains(term_id) { - let doc_ids = self.term_reader.get_doc_ids(*term_id)?; + let doc_ids = self.term_reader.get_doc_ids(term_id.clone())?; candidate_doc_ids.bitand_assign(doc_ids); if candidate_doc_ids.is_empty() { break; } - term_ids.insert(*term_id); + term_ids.insert(term_id.clone()); } let term_pos_offset = (term_pos - first_term_pos) as u64; - query_term_offsets.push((*term_id, term_pos_offset)); + query_term_offsets.push((term_id.clone(), term_pos_offset)); } // If the candidate `doc_ids` is empty, all docs are not matched. if candidate_doc_ids.is_empty() { @@ -759,14 +776,14 @@ impl DocIdsCollector { if let Some((_, prefix_term_ids)) = prefix_term { let mut all_prefix_doc_ids = RoaringTreemap::new(); for prefix_term_id in prefix_term_ids { - let prefix_doc_ids = self.term_reader.get_doc_ids(*prefix_term_id)?; + let prefix_doc_ids = self.term_reader.get_doc_ids(prefix_term_id.clone())?; // If the `doc_ids` does not intersect at all, this prefix can be ignored. if candidate_doc_ids.is_disjoint(prefix_doc_ids) { continue; } all_prefix_doc_ids.bitor_assign(prefix_doc_ids); - term_ids.insert(*prefix_term_id); + term_ids.insert(prefix_term_id.clone()); } // If there is no matched prefix `doc_ids`, the prefix term does not matched. if all_prefix_doc_ids.is_empty() { @@ -783,7 +800,7 @@ impl DocIdsCollector { for term_id in term_ids.into_iter() { let offset_and_term_freqs = self .term_reader - .get_position_offsets(term_id, &candidate_doc_ids)?; + .get_position_offsets(&term_id, &candidate_doc_ids)?; offset_and_term_freqs_map.insert(term_id, offset_and_term_freqs); } @@ -796,7 +813,7 @@ impl DocIdsCollector { let mut term_poses_map = HashMap::new(); for (doc_id, (first_doc_offset, first_term_freq)) in first_offset_and_term_freqs.iter() { let mut first_term_poses = self.term_reader.get_positions( - *first_term_id, + first_term_id, *first_doc_offset, *first_term_freq, )?; @@ -811,7 +828,7 @@ impl DocIdsCollector { let term_poses = self.term_reader - .get_positions(*term_id, *doc_offset, *term_freq)?; + .get_positions(term_id, *doc_offset, *term_freq)?; term_poses_map.insert(term_id, term_poses); } let term_poses = term_poses_map.get(term_id).unwrap(); @@ -849,7 +866,7 @@ impl DocIdsCollector { offset_and_term_freqs.get(doc_id) { let term_poses = self.term_reader.get_positions( - *prefix_term_id, + prefix_term_id, *doc_offset, *term_freq, )?; @@ -911,13 +928,13 @@ impl DocIdsCollector { pub fn collect_matched_doc_ids( &mut self, query: Box, - prefix_terms: &HashMap>, - fuzziness_terms: &HashMap>, + prefix_terms: &HashMap>, + fuzziness_terms: &HashMap>, ) -> Result> { if let Some(term_query) = query.downcast_ref::() { let term = term_query.term(); if let Some(term_id) = self.term_reader.term_id(term) { - let doc_ids = self.term_reader.get_doc_ids(*term_id)?; + let doc_ids = self.term_reader.get_doc_ids(term_id.clone())?; Ok(Some(doc_ids.clone())) } else { Ok(None) @@ -1034,7 +1051,7 @@ impl DocIdsCollector { }; // collect related terms of the original term. for term_id in fuzz_term_ids { - let doc_ids = self.term_reader.get_doc_ids(*term_id)?; + let doc_ids = self.term_reader.get_doc_ids(term_id.clone())?; all_doc_ids.bitor_assign(doc_ids); } if !all_doc_ids.is_empty() { @@ -1075,22 +1092,20 @@ impl DocIdsCollector { .remove(&query_key) .unwrap_or_default(); if let Some(term_id) = self.term_reader.term_id(&phrase_terms[0]) { - if let Some(field_id) = self.term_reader.field_id(term_id) { - let mut bm25_weight = Bm25Weight::for_terms(&self.term_reader, &phrase_terms)?; - if let Some(boost) = boost { - // increase weight by multiply a optional boost factor - bm25_weight = bm25_weight.boost_by(boost); - } - let mut scores = Vec::with_capacity(doc_ids.len() as usize); - for doc_id in doc_ids { - let fieldnorm_id = self.term_reader.fieldnorm_id(*field_id, doc_id); - let phrase_count = phrase_counts_map.remove(&doc_id).unwrap_or_default(); + let mut bm25_weight = Bm25Weight::for_terms(&self.term_reader, &phrase_terms)?; + if let Some(boost) = boost { + // increase weight by multiply a optional boost factor + bm25_weight = bm25_weight.boost_by(boost); + } + let mut scores = Vec::with_capacity(doc_ids.len() as usize); + for doc_id in doc_ids { + let fieldnorm_id = self.term_reader.fieldnorm_id(term_id.field_id, doc_id); + let phrase_count = phrase_counts_map.remove(&doc_id).unwrap_or_default(); - let score = bm25_weight.score(fieldnorm_id, phrase_count as u32); - scores.push(score.into()); - } - return Ok(scores); + let score = bm25_weight.score(fieldnorm_id, phrase_count as u32); + scores.push(score.into()); } + return Ok(scores); } let scores = vec![F32::from(0_f32); doc_ids.len() as usize]; Ok(scores) @@ -1106,23 +1121,20 @@ impl DocIdsCollector { if let Some(term_query) = query.downcast_ref::() { let term = term_query.term(); if let Some(term_id) = self.term_reader.term_id(term) { - if let Some(field_id) = self.term_reader.field_id(term_id) { - let mut bm25_weight = - Bm25Weight::for_terms(&self.term_reader, &[term.clone()])?; - if let Some(boost) = boost { - // increase weight by multiply a optional boost factor - bm25_weight = bm25_weight.boost_by(boost); - } - let mut scores = Vec::with_capacity(doc_ids.len() as usize); - for doc_id in doc_ids { - let fieldnorm_id = self.term_reader.fieldnorm_id(*field_id, doc_id); - let term_freq = self.term_reader.term_freq(*term_id, doc_id).unwrap_or(&0); + let mut bm25_weight = Bm25Weight::for_terms(&self.term_reader, &[term.clone()])?; + if let Some(boost) = boost { + // increase weight by multiply a optional boost factor + bm25_weight = bm25_weight.boost_by(boost); + } + let mut scores = Vec::with_capacity(doc_ids.len() as usize); + for doc_id in doc_ids { + let fieldnorm_id = self.term_reader.fieldnorm_id(term_id.field_id, doc_id); + let term_freq = self.term_reader.term_freq(term_id, doc_id).unwrap_or(&0); - let score = bm25_weight.score(fieldnorm_id, *term_freq); - scores.push(score.into()); - } - return Ok(scores); + let score = bm25_weight.score(fieldnorm_id, *term_freq); + scores.push(score.into()); } + return Ok(scores); } let scores = vec![F32::from(0_f32); doc_ids.len() as usize]; Ok(scores) diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs index 2c456daccf80..3b766b4fd5c5 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs @@ -280,12 +280,12 @@ impl InvertedIndexReader { let term_dict_file = FileSlice::new(Arc::new(term_dict_data)); let term_info_store = TermInfoStore::open(term_dict_file)?; - for (_, (term_field_id, term_id)) in matched_terms.iter() { - if field_id == term_field_id { - let term_info = term_info_store.get(*term_id); - term_infos.insert(*term_id, term_info); + for (_, term_id) in matched_terms.iter() { + if *field_id == term_id.field_id { + let term_info = term_info_store.get(term_id.term_ordinal); + term_infos.insert(term_id.clone(), term_info); if let Some(term_ids) = field_term_ids.get_mut(field_id) { - term_ids.insert(*term_id); + term_ids.insert(term_id.clone()); } } } @@ -293,17 +293,19 @@ impl InvertedIndexReader { } // 5. read postings and optional positions. - let mut term_slice_len = if self.need_position { + let term_slice_len = if self.need_position { term_infos.len() * 2 } else { term_infos.len() }; - if self.has_score { - term_slice_len += 2 * field_ids.len(); - } - - let mut slice_columns = Vec::with_capacity(term_slice_len); + let field_slice_len = if self.has_score { + 2 * field_ids.len() + } else { + 0 + }; let mut slice_name_map = HashMap::with_capacity(term_slice_len); + let mut field_slice_name_map = HashMap::with_capacity(field_slice_len); + let mut slice_columns = Vec::with_capacity(term_slice_len + field_slice_len); for (field_id, term_ids) in field_term_ids.iter() { // if has score, need read fieldnorm to calculate the score. if self.has_score { @@ -315,7 +317,7 @@ impl InvertedIndexReader { ..(fieldnorm_col_meta.offset + fieldnorm_col_meta.len); slice_columns.push((fieldnorm_slice_name.clone(), fieldnorm_range)); } - slice_name_map.insert(fieldnorm_slice_name, *field_id as u64); + field_slice_name_map.insert(fieldnorm_slice_name, *field_id); } let idx_col_name = format!("idx-{}", field_id); let idx_meta = inverted_index_meta_map.get(&idx_col_name).ok_or_else(|| { @@ -331,7 +333,7 @@ impl InvertedIndexReader { let len = 8; let tokens_slice_range = offset..(offset + len); slice_columns.push((tokens_slice_name.clone(), tokens_slice_range)); - slice_name_map.insert(tokens_slice_name, *field_id as u64); + field_slice_name_map.insert(tokens_slice_name, *field_id); } for term_id in term_ids { @@ -343,7 +345,7 @@ impl InvertedIndexReader { let idx_slice_name = format!("{}-{}", idx_col_name, term_info.postings_range.start); slice_columns.push((idx_slice_name.clone(), idx_slice_range)); - slice_name_map.insert(idx_slice_name, *term_id); + slice_name_map.insert(idx_slice_name, term_id.clone()); } if self.need_position { @@ -364,7 +366,7 @@ impl InvertedIndexReader { let pos_slice_name = format!("{}-{}", pos_col_name, term_info.positions_range.start); slice_columns.push((pos_slice_name.clone(), pos_slice_range)); - slice_name_map.insert(pos_slice_name, *term_id); + slice_name_map.insert(pos_slice_name, term_id.clone()); } } } @@ -381,9 +383,8 @@ impl InvertedIndexReader { let mut block_postings_map = HashMap::with_capacity(term_infos.len()); let mut position_reader_map = HashMap::with_capacity(term_infos.len()); for (slice_name, mut slice_data) in slice_column_files_map.into_iter() { - let id = slice_name_map.remove(&slice_name).unwrap(); if slice_name.starts_with("idx") { - let term_id = id; + let term_id = slice_name_map.remove(&slice_name).unwrap(); let term_info = term_infos.get(&term_id).unwrap(); let posting_file = FileSlice::new(Arc::new(slice_data)); let block_postings = BlockSegmentPostings::open( @@ -395,16 +396,16 @@ impl InvertedIndexReader { block_postings_map.insert(term_id, block_postings); } else if slice_name.starts_with("pos") { - let term_id = id; + let term_id = slice_name_map.remove(&slice_name).unwrap(); let position_reader = PositionReader::open(slice_data)?; position_reader_map.insert(term_id, position_reader); } else if slice_name.starts_with("fieldnorm") { - let field_id = id as u32; + let field_id = field_slice_name_map.remove(&slice_name).unwrap(); let slice_file = FileSlice::new(Arc::new(slice_data)); let fieldnorm_reader = FieldNormReader::open(slice_file)?; fieldnorm_reader_map.insert(field_id, fieldnorm_reader); } else if slice_name.starts_with("tokens") { - let field_id = id as u32; + let field_id = field_slice_name_map.remove(&slice_name).unwrap(); let total_num_tokens = u64::deserialize(&mut slice_data)?; field_num_tokens_map.insert(field_id, total_num_tokens); }