Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exist queries match subpath fields #2558

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 106 additions & 22 deletions columnar/src/columnar/reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{fmt, io, mem};

use common::file_slice::FileSlice;
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::BinarySerializable;
use sstable::{Dictionary, RangeSSTable};

Expand Down Expand Up @@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
Ok(results)
}

fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
format!("{}{}", column_name, '\0')
}

fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
}

impl ColumnarReader {
/// Opens a new Columnar file.
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
Expand Down Expand Up @@ -144,32 +158,14 @@ impl ColumnarReader {
Ok(self.iter_columns()?.collect())
}

fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
let mut start_key = column_name.to_string();
start_key.push('\0');
let mut end_key = column_name.to_string();
end_key.push(1u8 as char);
self.column_dictionary
.range()
.ge(start_key.as_bytes())
.lt(end_key.as_bytes())
}

pub async fn read_columns_async(
&self,
column_name: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self
.stream_for_column_range(column_name)
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
Expand All @@ -180,7 +176,35 @@ impl ColumnarReader {
/// There can be more than one column associated to a given column name, provided they have
/// different types.
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self.stream_for_column_range(column_name).into_stream()?;
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}

pub async fn read_subpath_columns_async(
&self,
root_path: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}

/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
///
/// There can be more than one column associated to each path within the JSON structure,
/// provided they have different types.
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix.as_bytes())
.into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}

Expand All @@ -192,6 +216,8 @@ impl ColumnarReader {

#[cfg(test)]
mod tests {
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;

use crate::{ColumnType, ColumnarReader, ColumnarWriter};

#[test]
Expand Down Expand Up @@ -224,6 +250,64 @@ mod tests {
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
}

#[test]
fn test_read_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_column_type("col", ColumnType::U64, false);
columnar_writer.record_numerical(1, "col", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_columns("col").unwrap();
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].column_type(), ColumnType::U64);
}
{
let columns = columnar.read_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}

#[test]
fn test_read_subpath_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_str(
0,
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
"hello",
);
columnar_writer.record_numerical(
0,
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
1i64,
);
columnar_writer.record_str(1, "col1", "hello");
columnar_writer.record_str(0, "col2", "hello");
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();

let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_subpath_columns("col1").unwrap();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].column_type(), ColumnType::Str);
assert_eq!(columns[1].column_type(), ColumnType::I64);
}
{
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("col2").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}

#[test]
#[should_panic(expected = "Input type forbidden")]
fn test_list_columns_strict_typing_panics_on_wrong_types() {
Expand Down
1 change: 0 additions & 1 deletion columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl ColumnarWriter {
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
);
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));

let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {
Expand Down
46 changes: 43 additions & 3 deletions src/fastfield/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl FastFieldReaders {
Ok(dynamic_column.into())
}

/// Returning a `dynamic_column_handle`.
/// Returns a `dynamic_column_handle`.
pub fn dynamic_column_handle(
&self,
field_name: &str,
Expand All @@ -234,7 +234,7 @@ impl FastFieldReaders {
Ok(dynamic_column_handle_opt)
}

/// Returning all `dynamic_column_handle`.
/// Returns all `dynamic_column_handle` that match the given field name.
pub fn dynamic_column_handles(
&self,
field_name: &str,
Expand All @@ -250,6 +250,22 @@ impl FastFieldReaders {
Ok(dynamic_column_handles)
}

/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
pub fn dynamic_subpath_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let dynamic_column_handles = self
.columnar
.read_subpath_columns(&resolved_field_name)?
.into_iter()
.collect();
Ok(dynamic_column_handles)
}

#[doc(hidden)]
pub async fn list_dynamic_column_handles(
&self,
Expand All @@ -265,6 +281,21 @@ impl FastFieldReaders {
Ok(columns)
}

#[doc(hidden)]
pub async fn list_subpath_dynamic_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let columns = self
.columnar
.read_subpath_columns_async(&resolved_field_name)
.await?;
Ok(columns)
}

/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
/// i64, u64, f64, DateTime).
///
Expand Down Expand Up @@ -476,6 +507,15 @@ mod tests {
.iter()
.any(|column| column.column_type() == ColumnType::Str));

println!("*** {:?}", fast_fields.columnar().list_columns());
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
assert_eq!(json_columns.len(), 0);

let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
assert_eq!(json_subcolumns.len(), 3);

let foo_subcolumns = fast_fields
.dynamic_subpath_column_handles("json.foo")
.unwrap();
assert_eq!(foo_subcolumns.len(), 0);
}
}
76 changes: 73 additions & 3 deletions src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ mod tests {
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};

use columnar::ColumnType;
use tempfile::TempDir;

use crate::collector::{Count, TopDocs};
Expand All @@ -431,15 +432,15 @@ mod tests {
use crate::query::{PhraseQuery, QueryParser};
use crate::schema::{
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
};
use crate::store::{Compressor, StoreReader, StoreWriter};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime;
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
TERMINATED,
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
TantivyDocument, Term, TERMINATED,
};

#[test]
Expand Down Expand Up @@ -841,6 +842,75 @@ mod tests {
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}

#[test]
fn test_json_fast() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let json_val: serde_json::Value = serde_json::from_str(
r#"{
"toto": "titi",
"float": -0.2,
"bool": true,
"unsigned": 1,
"signed": -2,
"complexobject": {
"field.with.dot": 1
},
"date": "1985-04-12T23:20:50.52Z",
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val.clone());
let index = Index::create_in_ram(schema.clone());
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);

fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 1, "{}", field);
assert_eq!(cols[0].column_type(), typ, "{}", field);
}
assert_type(segment_reader, "json.toto", ColumnType::Str);
assert_type(segment_reader, "json.float", ColumnType::F64);
assert_type(segment_reader, "json.bool", ColumnType::Bool);
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
assert_type(segment_reader, "json.signed", ColumnType::I64);
assert_type(
segment_reader,
"json.complexobject.field\\.with\\.dot",
ColumnType::I64,
);
assert_type(segment_reader, "json.date", ColumnType::DateTime);
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);

fn assert_empty(reader: &SegmentReader, field: &str) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 0);
}
assert_empty(segment_reader, "unknown");
assert_empty(segment_reader, "json");
assert_empty(segment_reader, "json.toto.titi");

let sub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json")
.unwrap();
assert_eq!(sub_columns.len(), 9);

let subsub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json.complexobject")
.unwrap();
assert_eq!(subsub_columns.len(), 1);
}

#[test]
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
// https://github.com/quickwit-oss/tantivy/issues/2283
Expand Down
Loading
Loading