diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index b90f795d236..044a868f354 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -30,7 +30,7 @@ use quickwit_actors::{ use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; -use quickwit_config::{IndexingSettings, SourceConfig}; +use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig}; use quickwit_doc_mapper::DocMapper; use quickwit_ingest::IngesterPool; use quickwit_proto::indexing::IndexingPipelineId; @@ -367,6 +367,7 @@ impl IndexingPipeline { UploaderType::IndexUploader, self.params.metastore.clone(), self.params.merge_policy.clone(), + self.params.retention_policy.clone(), self.params.split_store.clone(), SplitsUpdateMailbox::Sequencer(sequencer_mailbox), self.params.max_concurrent_split_uploads_index, @@ -585,6 +586,7 @@ pub struct IndexingPipelineParams { // Merge-related parameters pub merge_policy: Arc, + pub retention_policy: Option, pub merge_planner_mailbox: Mailbox, pub max_concurrent_split_uploads_merge: usize, @@ -717,6 +719,7 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), + retention_policy: None, queues_dir_path: PathBuf::from("./queues"), max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, @@ -831,6 +834,7 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), + retention_policy: None, max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, @@ -908,6 +912,7 @@ mod tests { metastore: metastore.clone(), split_store: split_store.clone(), merge_policy: default_merge_policy(), + retention_policy: None, max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, merge_scheduler_service: universe.get_or_spawn_one(), @@ -930,6 +935,7 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), + retention_policy: None, max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, @@ -1057,6 +1063,7 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), + retention_policy: None, max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 5a180840a1e..697bfb57b62 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -287,6 +287,7 @@ impl IndexingService { })?; let merge_policy = crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); + let retention_policy = index_config.retention_policy_opt.clone(); let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) @@ -301,6 +302,7 @@ impl IndexingService { split_store: split_store.clone(), merge_scheduler_service: self.merge_scheduler_service.clone(), merge_policy: merge_policy.clone(), + retention_policy: retention_policy.clone(), merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), @@ -329,6 +331,7 @@ impl IndexingService { // Merge-related parameters merge_policy, + retention_policy, max_concurrent_split_uploads_merge, merge_planner_mailbox, diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index acb0f00c3e3..97c57a79b31 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -29,6 +29,7 @@ use quickwit_common::io::{IoControls, Limiter}; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; +use quickwit_config::RetentionPolicy; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, @@ -286,6 +287,7 @@ impl MergePipeline { UploaderType::MergeUploader, self.params.metastore.clone(), self.params.merge_policy.clone(), + self.params.retention_policy.clone(), self.params.split_store.clone(), merge_publisher_mailbox.into(), self.params.max_concurrent_split_uploads, @@ -572,6 +574,7 @@ pub struct MergePipelineParams { pub merge_scheduler_service: Mailbox, pub split_store: IndexingSplitStore, pub merge_policy: Arc, + pub retention_policy: Option, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, @@ -635,6 +638,7 @@ mod tests { merge_scheduler_service: universe.get_or_spawn_one(), split_store, merge_policy: default_merge_policy(), + retention_policy: None, max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index f7d09dc3fd7..daa5caa140a 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -31,6 +31,7 @@ use once_cell::sync::OnceCell; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::pubsub::EventBroker; use quickwit_common::spawn_named_task; +use quickwit_config::RetentionPolicy; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_metastore::{SplitMetadata, StageSplitsRequestExt}; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, StageSplitsRequest}; @@ -166,6 +167,7 @@ pub struct Uploader { uploader_type: UploaderType, metastore: MetastoreServiceClient, merge_policy: Arc, + retention_policy: Option, split_store: IndexingSplitStore, split_update_mailbox: SplitsUpdateMailbox, max_concurrent_split_uploads: usize, @@ -174,10 +176,12 @@ pub struct Uploader { } impl Uploader { + #[allow(clippy::too_many_arguments)] pub fn new( uploader_type: UploaderType, metastore: MetastoreServiceClient, merge_policy: Arc, + retention_policy: Option, split_store: IndexingSplitStore, split_update_mailbox: SplitsUpdateMailbox, max_concurrent_split_uploads: usize, @@ -187,6 +191,7 @@ impl Uploader { uploader_type, metastore, merge_policy, + retention_policy, split_store, split_update_mailbox, max_concurrent_split_uploads, @@ -300,6 +305,7 @@ impl Handler for Uploader { let index_uid = batch.index_uid(); let ctx_clone = ctx.clone(); let merge_policy = self.merge_policy.clone(); + let retention_policy = self.retention_policy.clone(); debug!(split_ids=?split_ids, "start-stage-and-store-splits"); let event_broker = self.event_broker.clone(); spawn_named_task( @@ -324,6 +330,7 @@ impl Handler for Uploader { )?; let split_metadata = create_split_metadata( &merge_policy, + retention_policy.as_ref(), &packaged_split.split_attrs, packaged_split.tags.clone(), split_streamer.footer_range.start..split_streamer.footer_range.end, @@ -535,6 +542,7 @@ mod tests { UploaderType::IndexUploader, MetastoreServiceClient::from_mock(mock_metastore), merge_policy, + None, split_store, SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, @@ -650,6 +658,7 @@ mod tests { UploaderType::IndexUploader, MetastoreServiceClient::from_mock(mock_metastore), merge_policy, + None, split_store, SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, @@ -797,6 +806,7 @@ mod tests { UploaderType::IndexUploader, MetastoreServiceClient::from_mock(mock_metastore), merge_policy, + None, split_store, SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, @@ -870,6 +880,7 @@ mod tests { UploaderType::IndexUploader, MetastoreServiceClient::from_mock(mock_metastore), default_merge_policy(), + None, split_store, SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, @@ -974,6 +985,7 @@ mod tests { UploaderType::IndexUploader, MetastoreServiceClient::from_mock(mock_metastore), merge_policy, + None, split_store, SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index e916c9b6ffc..02f2249c5dc 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -396,7 +396,7 @@ pub mod tests { source_id: "test_source".to_string(), }; let split_attrs = merge_split_attrs(pipeline_id, merged_split_id, splits).unwrap(); - create_split_metadata(merge_policy, &split_attrs, tags, 0..0) + create_split_metadata(merge_policy, None, &split_attrs, tags, 0..0) } fn apply_merge( diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs index 5ac0de40ff3..217f1bc331d 100644 --- a/quickwit/quickwit-indexing/src/models/split_attrs.rs +++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs @@ -21,8 +21,9 @@ use std::collections::BTreeSet; use std::fmt; use std::ops::{Range, RangeInclusive}; use std::sync::Arc; +use std::time::Duration; -use quickwit_metastore::SplitMetadata; +use quickwit_metastore::{SplitMaturity, SplitMetadata}; use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId, SourceId, SplitId}; use tantivy::DateTime; use time::OffsetDateTime; @@ -92,13 +93,27 @@ impl fmt::Debug for SplitAttrs { pub fn create_split_metadata( merge_policy: &Arc, + retention_policy: Option<&quickwit_config::RetentionPolicy>, split_attrs: &SplitAttrs, tags: BTreeSet, footer_offsets: Range, ) -> SplitMetadata { let create_timestamp = OffsetDateTime::now_utc().unix_timestamp(); - let maturity = + + let time_range = split_attrs + .time_range + .as_ref() + .map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs()); + + let mut maturity = merge_policy.split_maturity(split_attrs.num_docs as usize, split_attrs.num_merge_ops); + if let Some(max_maturity) = max_maturity_before_end_of_retention( + retention_policy, + create_timestamp, + time_range.as_ref().map(|time_range| *time_range.end()), + ) { + maturity = maturity.min(max_maturity); + } SplitMetadata { node_id: split_attrs.node_id.to_string(), index_uid: split_attrs.index_uid.clone(), @@ -107,10 +122,7 @@ pub fn create_split_metadata( split_id: split_attrs.split_id.clone(), partition_id: split_attrs.partition_id, num_docs: split_attrs.num_docs as usize, - time_range: split_attrs - .time_range - .as_ref() - .map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs()), + time_range, uncompressed_docs_size_in_bytes: split_attrs.uncompressed_docs_size_in_bytes, create_timestamp, maturity, @@ -120,3 +132,80 @@ pub fn create_split_metadata( num_merge_ops: split_attrs.num_merge_ops, } } + +/// reduce the maturity period of a split based on retention policy, so that it doesn't get merged +/// after it expires. +fn max_maturity_before_end_of_retention( + retention_policy: Option<&quickwit_config::RetentionPolicy>, + create_timestamp: i64, + time_range_end: Option, +) -> Option { + let time_range_end = time_range_end? as u64; + let retention_period_s = retention_policy?.retention_period().ok()?.as_secs(); + + let maturity = if let Some(maturation_period_s) = + (time_range_end + retention_period_s).checked_sub(create_timestamp as u64) + { + SplitMaturity::Immature { + maturation_period: Duration::from_secs(maturation_period_s), + } + } else { + // this split could be deleted as soon as it is created. Ideally we would + // handle that sooner. + SplitMaturity::Mature + }; + Some(maturity) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_metastore::SplitMaturity; + + use super::max_maturity_before_end_of_retention; + + #[test] + fn test_max_maturity_before_end_of_retention() { + let retention_policy = quickwit_config::RetentionPolicy { + evaluation_schedule: "daily".to_string(), + retention_period: "300 sec".to_string(), + }; + let create_timestamp = 1000; + + // this should be deleted asap, not subject to merge + assert_eq!( + max_maturity_before_end_of_retention( + Some(&retention_policy), + create_timestamp, + Some(200), + ), + Some(SplitMaturity::Mature) + ); + + // retention ends at 750 + 300 = 1050, which is 50s from now + assert_eq!( + max_maturity_before_end_of_retention( + Some(&retention_policy), + create_timestamp, + Some(750), + ), + Some(SplitMaturity::Immature { + maturation_period: Duration::from_secs(50) + }) + ); + + // no retention policy + assert_eq!( + max_maturity_before_end_of_retention(None, create_timestamp, Some(850),), + None, + ); + + // no timestamp_range.end but a retention policy, that's odd, don't change anything about + // the maturity period + assert_eq!( + max_maturity_before_end_of_retention(Some(&retention_policy), create_timestamp, None,), + None, + ); + } +} diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 5c25c7ae1c7..452e47bcb5b 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -181,6 +181,7 @@ impl DeleteTaskPipeline { UploaderType::DeleteUploader, self.metastore.clone(), merge_policy, + index_config.retention_policy_opt.clone(), split_store.clone(), SplitsUpdateMailbox::Publisher(publisher_mailbox), self.max_concurrent_split_uploads, diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index 53608f307d8..9af86c0a5de 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -344,7 +344,7 @@ impl FromStr for SplitState { /// or `Immature` with a given maturation period. /// The maturity is determined by the `MergePolicy`. #[serde_as] -#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq, PartialOrd, Ord)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum SplitMaturity { @@ -439,4 +439,21 @@ mod tests { assert_eq!(format!("{:?}", split_metadata), expected_output); } + + #[test] + fn test_spit_maturity_order() { + assert!( + SplitMaturity::Mature + < SplitMaturity::Immature { + maturation_period: Duration::from_secs(0) + } + ); + assert!( + SplitMaturity::Immature { + maturation_period: Duration::from_secs(0) + } < SplitMaturity::Immature { + maturation_period: Duration::from_secs(1) + } + ); + } }