Skip to content

Commit

Permalink
use fst for sstable index (#2268)
Browse files Browse the repository at this point in the history
* read path for new fst based index

* implement BlockAddrStoreWriter

* extract slop/derivation computation

* use better linear approximator and allow negative correction to approximator

* document format and reorder some fields

* optimize single block sstable size

* plug backward compat
  • Loading branch information
trinity-1686a authored Dec 4, 2023
1 parent 0b56c88 commit 9ebc5ed
Show file tree
Hide file tree
Showing 10 changed files with 1,086 additions and 323 deletions.
4 changes: 2 additions & 2 deletions columnar/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87);
assert_eq!(cols[0].num_bytes(), 73);
}

#[test]
Expand All @@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87);
assert_eq!(cols[0].num_bytes(), 73);
}

#[test]
Expand Down
16 changes: 8 additions & 8 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();

assert_eq!(file.len(), 93);
assert_eq!(file.len(), 80);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers
.u64("field")
Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 121);
assert_eq!(file.len(), 108);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
.u64("field")
Expand Down Expand Up @@ -214,7 +214,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 94);
assert_eq!(file.len(), 81);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
Expand Down Expand Up @@ -246,7 +246,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4489);
assert_eq!(file.len(), 4476);
{
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 265);
assert_eq!(file.len(), 252);

{
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
Expand Down Expand Up @@ -773,7 +773,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 102);
assert_eq!(file.len(), 84);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
Expand Down Expand Up @@ -805,7 +805,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 114);
assert_eq!(file.len(), 96);
let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
Expand All @@ -830,7 +830,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 104);
assert_eq!(file.len(), 86);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);
Expand Down
1 change: 1 addition & 0 deletions sstable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "sstables for tantivy"

[dependencies]
common = {version= "0.6", path="../common", package="tantivy-common"}
tantivy-bitpacker = { version= "0.5", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound
zstd = { version = "0.13", features = ["experimental"] }
Expand Down
86 changes: 62 additions & 24 deletions sstable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,71 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw

### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+
| Block | Block | ... | IndexOffset | NumTerm | Version |
+-------+-------+-----+-------------+---------+---------+
|----( # of blocks)---|
+-----+----------------+-------------+-------------+---------+---------+
| Fst | BlockAddrStore | StoreOffset | IndexOffset | NumTerm | Version |
+-----+----------------+-------------+-------------+---------+---------+
```
- Block(SSTBlock): uses IndexValue for its Values format
- Fst(Fst): finite state transducer mapping keys to a block number
- BlockAddrStore(BlockAddrStore): store mapping a block number to its BlockAddr
- StoreOffset(u64): Offset to start of the BlockAddrStore. If zero, see the SingleBlockSStable section
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently equal to 2
- Version(u32): Currently equal to 3

### IndexValue
```
+------------+----------+-------+-------+-----+
| EntryCount | StartPos | Entry | Entry | ... |
+------------+----------+-------+-------+-----+
|---( # of entries)---|
```
### Fst

- EntryCount(VInt): number of entries
- StartPos(VInt): the start pos of the first (data) block referenced by this (index) block
- Entry (IndexEntry)
Fst is in the format of tantivy\_fst

### Entry
```
+----------+--------------+
| BlockLen | FirstOrdinal |
+----------+--------------+
```
- BlockLen(VInt): length of the block
- FirstOrdinal(VInt): ordinal of the first element in the given block
### BlockAddrStore

+---------+-----------+-----------+-----+-----------+-----------+-----+
| MetaLen | BlockMeta | BlockMeta | ... | BlockData | BlockData | ... |
+---------+-----------+-----------+-----+-----------+-----------+-----+
|---------(N blocks)----------|---------(N blocks)----------|

- MetaLen(u64): length of the BlockMeta section
- BlockMeta(BlockAddrBlockMetadata): metadata to seek through BlockData
- BlockData(CompactedBlockAddr): bitpacked per block metadata

### BlockAddrBlockMetadata

+--------+------------+--------------+------------+--------------+-------------------+-----------------+----------+
| Offset | RangeStart | FirstOrdinal | RangeSlope | OrdinalSlope | FirstOrdinalNBits | RangeStartNBits | BlockLen |
+--------+------------+--------------+------------+--------------+-------------------+-----------------+----------+

- Offset(u64): offset of the corresponding BlockData in the datastream
- RangeStart(u64): the start position of the first block
- FirstOrdinal(u64): the first ordinal of the first block
- RangeSlope(u32): slope predicted for start range evolution (see computation in BlockData)
- OrdinalSlope(u64): slope predicted for first ordinal evolution (see computation in BlockData)
- FirstOrdinalNBits(u8): number of bits per ordinal in datastream (see computation in BlockData)
- RangeStartNBits(u8): number of bits per range start in datastream (see computation in BlockData)

### BlockData

+-----------------+-------------------+---------------+
| RangeStartDelta | FirstOrdinalDelta | FinalRangeEnd |
+-----------------+-------------------+---------------+
|------(BlockLen repetitions)---------|

- RangeStartDelta(var): RangeStartNBits *bits* of little endian number. See below for decoding
- FirstOrdinalDelta(var): FirstOrdinalNBits *bits* of little endian number. See below for decoding
- FinalRangeEnd(var): RangeStartNBits *bits* of integer. See below for decoding

converting a BlockData of index Index and a BlockAddrBlockMetadata to an actual block address is done as follow:
range\_prediction := RangeStart + Index * RangeSlop;
range\_derivation := RangeStartDelta - (1 << (RangeStartNBits-1));
range\_start := range\_prediction + range\_derivation

The same computation can be done for ordinal.

Note that `range_derivation` can take negative value. `RangeStartDelta` is just its translation to a positive range.


## SingleBlockSStable

The format used for the index is meant to be compact, however it has a constant cost of around 70
bytes, which isn't negligible for a table containing very few keys.
To limit the impact of that constant cost, single block sstable omit the Fst and BlockAddrStore from
their index. Instead a block with first ordinal of 0, range start of 0 and range end of IndexOffset
is implicitly used for every operations.
44 changes: 44 additions & 0 deletions sstable/benches/ord_to_term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,31 @@ pub fn criterion_benchmark(c: &mut Criterion) {
assert!(dict.ord_to_term(19_000_000, &mut res).unwrap());
})
});
c.bench_function("term_ord_suffix", |b| {
b.iter(|| {
assert_eq!(
dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(),
100_000
);
assert_eq!(
dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(),
19_000_000
);
})
});
c.bench_function("open_and_term_ord_suffix", |b| {
b.iter(|| {
let dict = Dictionary::<MonotonicU64SSTable>::open(slice.clone()).unwrap();
assert_eq!(
dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(),
100_000
);
assert_eq!(
dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(),
19_000_000
);
})
});
}
{
let slice = make_test_sstable("");
Expand All @@ -59,6 +84,25 @@ pub fn criterion_benchmark(c: &mut Criterion) {
assert!(dict.ord_to_term(19_000_000, &mut res).unwrap());
})
});
c.bench_function("term_ord", |b| {
b.iter(|| {
assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000);
assert_eq!(
dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(),
19_000_000
);
})
});
c.bench_function("open_and_term_ord", |b| {
b.iter(|| {
let dict = Dictionary::<MonotonicU64SSTable>::open(slice.clone()).unwrap();
assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000);
assert_eq!(
dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(),
19_000_000
);
})
});
}
}

Expand Down
48 changes: 34 additions & 14 deletions sstable/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use common::{BinarySerializable, OwnedBytes};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;

use crate::sstable_index_v3::SSTableIndexV3Empty;
use crate::streamer::{Streamer, StreamerBuilder};
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
use crate::{
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
};

/// An SSTable is a sorted map that associates sorted `&[u8]` keys
/// to any kind of typed values.
Expand Down Expand Up @@ -180,24 +183,41 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;

let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let version = u32::deserialize(&mut footer_len_bytes)?;
if version != crate::SSTABLE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {version}, found {}",
crate::SSTABLE_VERSION,
),
));
}

let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;

let sstable_index = match version {
2 => SSTableIndex::V2(
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
),
3 => {
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Unsuported sstable version, expected one of [2, 3], found {version}"),
))
}
};

Ok(Dictionary {
sstable_slice,
sstable_index,
Expand Down
17 changes: 8 additions & 9 deletions sstable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub mod merge;
mod streamer;
pub mod value;

mod sstable_index;
pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
mod sstable_index_v3;
pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
mod sstable_index_v2;
pub(crate) mod vint;
pub use dictionary::Dictionary;
pub use streamer::{Streamer, StreamerBuilder};
Expand All @@ -28,7 +29,7 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64;

const DEFAULT_KEY_CAPACITY: usize = 50;
const SSTABLE_VERSION: u32 = 2;
const SSTABLE_VERSION: u32 = 3;

/// Given two byte string returns the length of
/// the longest common prefix.
Expand Down Expand Up @@ -304,7 +305,8 @@ where

let offset = wrt.written_bytes();

self.index_builder.serialize(&mut wrt)?;
let fst_len: u64 = self.index_builder.serialize(&mut wrt)?;
wrt.write_all(&fst_len.to_le_bytes())?;
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;

Expand Down Expand Up @@ -385,13 +387,10 @@ mod test {
16, 17, 33, 18, 19, 17, 20, // data block
0, 0, 0, 0, // no more block
// index
8, 0, 0, 0, // size of index block
0, // compression
1, 0, 12, 0, 32, 17, 20, // index block
0, 0, 0, 0, // no more index block
0, 0, 0, 0, 0, 0, 0, 0, // fst lenght
16, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num term
2, 0, 0, 0, // version
3, 0, 0, 0, // version
]
);
let buffer = OwnedBytes::new(buffer);
Expand Down
Loading

0 comments on commit 9ebc5ed

Please sign in to comment.