diff --git a/Cargo.toml b/Cargo.toml index 41058dcfb5..81c6f71c21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ fastdivide = "0.4.0" itertools = "0.13.0" measure_time = "0.9.0" arc-swap = "1.5.0" +bon = "3.3.1" columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" } sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true } diff --git a/src/index/index.rs b/src/index/index.rs index 94882951ed..e1a508a797 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -15,7 +15,9 @@ use crate::directory::MmapDirectory; use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK}; use crate::error::{DataCorruption, TantivyError}; use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory}; -use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN}; +use crate::indexer::index_writer::{ + IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN, +}; use crate::indexer::segment_updater::save_metas; use crate::indexer::{IndexWriter, SingleSegmentIndexWriter}; use crate::reader::{IndexReader, IndexReaderBuilder}; @@ -24,8 +26,6 @@ use crate::schema::{Field, FieldType, Schema}; use crate::tokenizer::{TextAnalyzer, TokenizerManager}; use crate::SegmentReader; -const DEFAULT_NUM_MERGE_THREADS: usize = 4; - fn load_metas( directory: &dyn Directory, inventory: &SegmentMetaInventory, @@ -521,7 +521,7 @@ impl Index { load_metas(self.directory(), &self.inventory) } - /// Open a new index writer. Attempts to acquire a lockfile. + /// Open a new index writer with the given options. Attempts to acquire a lockfile. /// /// The lockfile should be deleted on drop, but it is possible /// that due to a panic or other error, a stale lockfile will be @@ -529,22 +529,16 @@ impl Index { /// `IndexWriter` on the system is accessing the index directory, /// it is safe to manually delete the lockfile. /// - /// - `num_threads` defines the number of indexing workers that should work at the same time. - /// - /// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing - /// thread. - /// - /// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`. + /// - `options` defines the writer configuration which includes things like buffer sizes, + /// indexer threads, etc... /// /// # Errors - /// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`. + /// If the lockfile already exists, returns `TantivyError::LockFailure`. /// If the memory arena per thread is too small or too big, returns /// `TantivyError::InvalidArgument` - pub fn writer_with_num_threads_and_num_merge_threads( + pub fn writer_with_options( &self, - num_threads: usize, - overall_memory_budget_in_bytes: usize, - num_merge_threads: usize, + options: IndexWriterOptions, ) -> crate::Result> { let directory_lock = self .directory @@ -560,20 +554,27 @@ impl Index { ), ) })?; - let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads; - IndexWriter::new( - self, - num_threads, - memory_arena_in_bytes_per_thread, - directory_lock, - num_merge_threads, - ) + + IndexWriter::new(self, options, directory_lock) } - /// Creates a multithreaded writer with 4 merge threads. + /// Open a new index writer. Attempts to acquire a lockfile. + /// + /// The lockfile should be deleted on drop, but it is possible + /// that due to a panic or other error, a stale lockfile will be + /// left in the index directory. If you are sure that no other + /// `IndexWriter` on the system is accessing the index directory, + /// it is safe to manually delete the lockfile. + /// + /// - `num_threads` defines the number of indexing workers that should work at the same time. + /// + /// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing + /// thread. + /// + /// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`. /// /// # Errors - /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`. /// If the memory arena per thread is too small or too big, returns /// `TantivyError::InvalidArgument` pub fn writer_with_num_threads( @@ -581,11 +582,12 @@ impl Index { num_threads: usize, overall_memory_budget_in_bytes: usize, ) -> crate::Result> { - self.writer_with_num_threads_and_num_merge_threads( - num_threads, - overall_memory_budget_in_bytes, - DEFAULT_NUM_MERGE_THREADS, - ) + let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads; + let options = IndexWriterOptions::builder() + .num_worker_threads(num_threads) + .memory_budget_per_thread(memory_arena_in_bytes_per_thread) + .build(); + self.writer_with_options(options) } /// Helper to create an index writer for tests. diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index c7c8ba740e..6bbb6acc4d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError { )) } +#[derive(Clone, bon::Builder)] +/// A builder for creating a new [IndexWriter] for an index. +pub struct IndexWriterOptions { + #[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)] + /// The memory budget per indexer thread. + /// + /// When an indexer thread has buffered this much data in memory + /// it will flush the segment to disk (although this is not searchable until commit is called.) + memory_budget_per_thread: usize, + #[builder(default = 1)] + /// The number of indexer worker threads to use. + num_worker_threads: usize, + #[builder(default = 4)] + /// Defines the number of merger threads to use. + num_merge_threads: usize, +} + /// `IndexWriter` is the user entry-point to add document to an index. /// /// It manages a small number of indexing thread, as well as a shared @@ -58,8 +75,7 @@ pub struct IndexWriter { index: Index, - // The memory budget per thread, after which a commit is triggered. - memory_budget_in_bytes_per_thread: usize, + options: IndexWriterOptions, workers_join_handle: Vec>>, @@ -70,9 +86,6 @@ pub struct IndexWriter { worker_id: usize, - num_threads: usize, - num_merge_threads: usize, - delete_queue: DeleteQueue, stamper: Stamper, @@ -266,24 +279,27 @@ impl IndexWriter { /// `TantivyError::InvalidArgument` pub(crate) fn new( index: &Index, - num_threads: usize, - memory_budget_in_bytes_per_thread: usize, + options: IndexWriterOptions, directory_lock: DirectoryLock, - num_merge_threads: usize, ) -> crate::Result { - if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN { + if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN { let err_msg = format!( "The memory arena in bytes per thread needs to be at least \ {MEMORY_BUDGET_NUM_BYTES_MIN}." ); return Err(TantivyError::InvalidArgument(err_msg)); } - if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX { + if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX { let err_msg = format!( "The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}" ); return Err(TantivyError::InvalidArgument(err_msg)); } + if options.num_worker_threads == 0 { + let err_msg = "At least one worker thread is required, got 0".to_string(); + return Err(TantivyError::InvalidArgument(err_msg)); + } + let (document_sender, document_receiver) = crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS); @@ -297,13 +313,13 @@ impl IndexWriter { index.clone(), stamper.clone(), &delete_queue.cursor(), - num_merge_threads, + options.num_merge_threads, )?; let mut index_writer = Self { _directory_lock: Some(directory_lock), - memory_budget_in_bytes_per_thread, + options: options.clone(), index: index.clone(), index_writer_status: IndexWriterStatus::from(document_receiver), operation_sender: document_sender, @@ -311,9 +327,6 @@ impl IndexWriter { segment_updater, workers_join_handle: vec![], - num_threads, - - num_merge_threads, delete_queue, @@ -406,7 +419,7 @@ impl IndexWriter { let mut delete_cursor = self.delete_queue.cursor(); - let mem_budget = self.memory_budget_in_bytes_per_thread; + let mem_budget = self.options.memory_budget_per_thread; let index = self.index.clone(); let join_handle: JoinHandle> = thread::Builder::new() .name(format!("thrd-tantivy-index{}", self.worker_id)) @@ -459,7 +472,7 @@ impl IndexWriter { } fn start_workers(&mut self) -> crate::Result<()> { - for _ in 0..self.num_threads { + for _ in 0..self.options.num_worker_threads { self.add_indexing_worker()?; } Ok(()) @@ -561,13 +574,7 @@ impl IndexWriter { .take() .expect("The IndexWriter does not have any lock. This is a bug, please report."); - let new_index_writer = IndexWriter::new( - &self.index, - self.num_threads, - self.memory_budget_in_bytes_per_thread, - directory_lock, - self.num_merge_threads, - )?; + let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?; // the current `self` is dropped right away because of this call. // @@ -821,7 +828,7 @@ mod tests { use crate::directory::error::LockError; use crate::error::*; use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN; - use crate::indexer::NoMergePolicy; + use crate::indexer::{IndexWriterOptions, NoMergePolicy}; use crate::query::{QueryParser, TermQuery}; use crate::schema::{ self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions, @@ -2542,4 +2549,36 @@ mod tests { index_writer.commit().unwrap(); Ok(()) } + + #[test] + fn test_writer_options_validation() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_bool_field("example", STORED); + let index = Index::create_in_ram(schema_builder.build()); + + let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build(); + let result = index.writer_with_options::(opt_wo_threads); + assert!(result.is_err(), "Writer should reject 0 thread count"); + assert!(matches!(result, Err(TantivyError::InvalidArgument(_)))); + + let opt_with_low_memory = IndexWriterOptions::builder() + .memory_budget_per_thread(10 << 10) + .build(); + let result = index.writer_with_options::(opt_with_low_memory); + assert!( + result.is_err(), + "Writer should reject options with too low memory size" + ); + assert!(matches!(result, Err(TantivyError::InvalidArgument(_)))); + + let opt_with_low_memory = IndexWriterOptions::builder() + .memory_budget_per_thread(5 << 30) + .build(); + let result = index.writer_with_options::(opt_with_low_memory); + assert!( + result.is_err(), + "Writer should reject options with too high memory size" + ); + assert!(matches!(result, Err(TantivyError::InvalidArgument(_)))); + } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index c583a4b5c5..278e5aca15 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -31,7 +31,7 @@ mod stamper; use crossbeam_channel as channel; use smallvec::SmallVec; -pub use self::index_writer::IndexWriter; +pub use self::index_writer::{IndexWriter, IndexWriterOptions}; pub use self::log_merge_policy::LogMergePolicy; pub use self::merge_operation::MergeOperation; pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};