Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve IndexWriter customisation via builder #2562

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fastdivide = "0.4.0"
itertools = "0.13.0"
measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"

columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
Expand Down
62 changes: 32 additions & 30 deletions src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
Expand All @@ -24,8 +26,6 @@ use crate::schema::{Field, FieldType, Schema};
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::SegmentReader;

const DEFAULT_NUM_MERGE_THREADS: usize = 4;

fn load_metas(
directory: &dyn Directory,
inventory: &SegmentMetaInventory,
Expand Down Expand Up @@ -521,30 +521,24 @@ impl Index {
load_metas(self.directory(), &self.inventory)
}

/// Open a new index writer. Attempts to acquire a lockfile.
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `num_threads` defines the number of indexing workers that should work at the same time.
///
/// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing
/// thread.
///
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
/// - `options` defines the writer configuration which includes things like buffer sizes,
/// indexer threads, etc...
///
/// # Errors
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads_and_num_merge_threads<D: Document>(
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
pub fn writer_with_options<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
num_merge_threads: usize,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
Expand All @@ -560,32 +554,40 @@ impl Index {
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
num_merge_threads,
)

IndexWriter::new(self, options, directory_lock)
}

/// Creates a multithreaded writer with 4 merge threads.
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `num_threads` defines the number of indexing workers that should work at the same time.
///
/// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing
/// thread.
///
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
self.writer_with_num_threads_and_num_merge_threads(
num_threads,
overall_memory_budget_in_bytes,
DEFAULT_NUM_MERGE_THREADS,
)
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
ChillFish8 marked this conversation as resolved.
Show resolved Hide resolved
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
}

/// Helper to create an index writer for tests.
Expand Down
89 changes: 64 additions & 25 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
))
}

#[derive(Clone, bon::Builder)]
/// A builder for creating a new [IndexWriter] for an index.
pub struct IndexWriterOptions {
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
/// The memory budget per indexer thread.
///
/// When an indexer thread has buffered this much data in memory
/// it will flush the segment to disk (although this is not searchable until commit is called.)
memory_budget_per_thread: usize,
#[builder(default = 1)]
/// The number of indexer worker threads to use.
num_worker_threads: usize,
ChillFish8 marked this conversation as resolved.
Show resolved Hide resolved
#[builder(default = 4)]
/// Defines the number of merger threads to use.
num_merge_threads: usize,
}

/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
Expand All @@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {

index: Index,

// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,

workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,

Expand All @@ -70,9 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {

worker_id: usize,

num_threads: usize,
num_merge_threads: usize,

delete_queue: DeleteQueue,

stamper: Stamper,
Expand Down Expand Up @@ -266,24 +279,27 @@ impl<D: Document> IndexWriter<D> {
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
num_merge_threads: usize,
) -> crate::Result<Self> {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.num_worker_threads == 0 {
let err_msg = "At least one worker thread is required, got 0".to_string();
return Err(TantivyError::InvalidArgument(err_msg));
}

let (document_sender, document_receiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);

Expand All @@ -297,23 +313,20 @@ impl<D: Document> IndexWriter<D> {
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
num_merge_threads,
options.num_merge_threads,
)?;

let mut index_writer = Self {
_directory_lock: Some(directory_lock),

memory_budget_in_bytes_per_thread,
options: options.clone(),
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,

segment_updater,

workers_join_handle: vec![],
num_threads,

num_merge_threads,

delete_queue,

Expand Down Expand Up @@ -406,7 +419,7 @@ impl<D: Document> IndexWriter<D> {

let mut delete_cursor = self.delete_queue.cursor();

let mem_budget = self.memory_budget_in_bytes_per_thread;
let mem_budget = self.options.memory_budget_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
Expand Down Expand Up @@ -459,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
}

fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.num_threads {
for _ in 0..self.options.num_worker_threads {
self.add_indexing_worker()?;
}
Ok(())
Expand Down Expand Up @@ -561,13 +574,7 @@ impl<D: Document> IndexWriter<D> {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");

let new_index_writer = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
self.num_merge_threads,
)?;
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;

// the current `self` is dropped right away because of this call.
//
Expand Down Expand Up @@ -821,7 +828,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
Expand Down Expand Up @@ -2542,4 +2549,36 @@ mod tests {
index_writer.commit().unwrap();
Ok(())
}

#[test]
fn test_writer_options_validation() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bool_field("example", STORED);
let index = Index::create_in_ram(schema_builder.build());

let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
assert!(result.is_err(), "Writer should reject 0 thread count");
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));

let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(10 << 10)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too low memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));

let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(5 << 30)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too high memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
}
}
2 changes: 1 addition & 1 deletion src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;

pub use self::index_writer::IndexWriter;
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
Expand Down
Loading