diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0468eae16e1..00e685d444b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5841,6 +5841,7 @@ dependencies = [ "quickwit-common", "quickwit-doc-mapper", "quickwit-proto", + "rand 0.8.5", "regex", "serde", "serde_json", diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 63a7b00d29c..a0edff19860 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -578,7 +578,8 @@ async fn test_cmd_update_index() { index_metadata.index_config.retention_policy_opt, Some(RetentionPolicy { retention_period: String::from("1 week"), - evaluation_schedule: String::from("daily") + evaluation_schedule: String::from("daily"), + jitter_secs: None, }) ); diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index a3f300742ea..558e9df0ef1 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -24,6 +24,7 @@ itertools = { workspace = true } json_comments = { workspace = true } new_string_template = { workspace = true } once_cell = { workspace = true } +rand = { workspace = true } regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 08ebee23ff5..3b347270037 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -31,6 +31,7 @@ use humantime::parse_duration; use quickwit_common::uri::Uri; use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping}; use quickwit_proto::types::IndexId; +use rand::{distributions, thread_rng, Rng}; use serde::{Deserialize, Serialize}; pub use serialize::{load_index_config_from_user_config, load_index_config_update}; use tracing::warn; @@ -179,6 +180,16 @@ pub struct RetentionPolicy { #[serde(default = "RetentionPolicy::default_schedule")] #[serde(rename = "schedule")] pub evaluation_schedule: String, + + /// A jitter to apply to the schedule. The policy will be evaluated [0..jitter_second] seconds + /// after the scheduled time. When many indexes use the same schedule, this can be used to + /// spread the load instead of causing a very bursty load.o + /// + /// If unset, a default jitter of `min(1 hour, next_next_evaluation - next_evaluation)` is + /// applied. Said otherwise, an operation may start any time between the next time it's + /// scheduled, and the time after that, but no later than 1h after the scheduled time. + #[serde(skip_serializing_if = "Option::is_none")] + pub jitter_secs: Option, } impl RetentionPolicy { @@ -195,7 +206,7 @@ impl RetentionPolicy { }) } - pub fn evaluation_schedule(&self) -> anyhow::Result { + fn evaluation_schedule(&self) -> anyhow::Result { let evaluation_schedule = prepend_at_char(&self.evaluation_schedule); Schedule::from_str(&evaluation_schedule).with_context(|| { @@ -208,13 +219,24 @@ impl RetentionPolicy { pub fn duration_until_next_evaluation(&self) -> anyhow::Result { let schedule = self.evaluation_schedule()?; - let future_date = schedule - .upcoming(Utc) + let mut schedule_iter = schedule.upcoming(Utc); + let future_date = schedule_iter .next() .expect("Failed to obtain next evaluation date."); - let duration = (future_date - Utc::now()) + let mut duration = (future_date - Utc::now()) .to_std() .map_err(|err| anyhow::anyhow!(err.to_string()))?; + let jitter_secs = self.jitter_secs.unwrap_or_else(|| { + if let Some(next_next_date) = schedule_iter.next() { + let time_between_schedules = next_next_date - future_date; + time_between_schedules.num_seconds().clamp(0, 3600) as u64 + } else { + // we don't know when the schedule is. That's odd. Let's allow no jitter + 0 + } + }); + let jitter = thread_rng().sample::(distributions::Standard) % (jitter_secs + 1); + duration += Duration::from_secs(jitter); Ok(duration) } @@ -413,6 +435,7 @@ impl crate::TestableForRegression for IndexConfig { let retention_policy = Some(RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "daily".to_string(), + jitter_secs: None, }); let stable_log_config = StableLogMergePolicyConfig { merge_factor: 9, @@ -544,6 +567,7 @@ mod tests { let expected_retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "daily".to_string(), + jitter_secs: None, }; assert_eq!( index_config.retention_policy_opt.unwrap(), @@ -719,6 +743,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; let retention_policy_yaml = serde_yaml::to_string(&retention_policy).unwrap(); assert_eq!( @@ -739,6 +764,7 @@ mod tests { let expected_retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; assert_eq!(retention_policy, expected_retention_policy); } @@ -753,6 +779,7 @@ mod tests { let expected_retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "daily".to_string(), + jitter_secs: None, }; assert_eq!(retention_policy, expected_retention_policy); } @@ -764,6 +791,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; assert_eq!( retention_policy.retention_period().unwrap(), @@ -773,6 +801,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "foo".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; assert_eq!( retention_policy.retention_period().unwrap_err().to_string(), @@ -797,6 +826,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "@hourly".to_string(), + jitter_secs: None, }; assert_eq!( retention_policy.evaluation_schedule().unwrap(), @@ -807,6 +837,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; assert_eq!( retention_policy.evaluation_schedule().unwrap(), @@ -817,6 +848,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "0 * * * * *".to_string(), + jitter_secs: None, }; let evaluation_schedule = retention_policy.evaluation_schedule().unwrap(); assert_eq!(evaluation_schedule.seconds().count(), 1); @@ -830,6 +862,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; retention_policy.validate().unwrap(); } @@ -837,6 +870,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "foo".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }; retention_policy.validate().unwrap_err(); } @@ -844,6 +878,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "foo".to_string(), + jitter_secs: None, }; retention_policy.validate().unwrap_err(); } @@ -852,10 +887,11 @@ mod tests { #[test] fn test_retention_schedule_duration() { let schedule_test_helper_fn = |schedule_str: &str| { - let hourly_schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap(); + let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap(); let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: schedule_str.to_string(), + jitter_secs: Some(0), }; let next_evaluation_duration = chrono::Duration::nanoseconds( @@ -865,7 +901,7 @@ mod tests { .as_nanos() as i64, ); let next_evaluation_date = Utc::now() + next_evaluation_duration; - let expected_date = hourly_schedule.upcoming(Utc).next().unwrap(); + let expected_date = schedule.upcoming(Utc).next().unwrap(); assert_eq!(next_evaluation_date.timestamp(), expected_date.timestamp()); }; @@ -875,4 +911,91 @@ mod tests { schedule_test_helper_fn("monthly"); schedule_test_helper_fn("* * * ? * ?"); } + + #[test] + fn test_retention_schedule_durationi_with_jitter() { + let schedule_test_helper_fn = |schedule_str: &str| { + let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap(); + let retention_policy = RetentionPolicy { + retention_period: "1 hour".to_string(), + evaluation_schedule: schedule_str.to_string(), + jitter_secs: Some(60 * 30), + }; + + for _ in 0..11 { + // we run this a few times in case we are unlucky and pick a null jitter. + // This happens in one in 3601 tries, 11 unlucky tries in a row is as likely as + // finding the right aes128 key to decrypt some message at random on 1st try. + let next_evaluation_duration = chrono::Duration::nanoseconds( + retention_policy + .duration_until_next_evaluation() + .unwrap() + .as_nanos() as i64, + ); + let next_evaluation_date = Utc::now() + next_evaluation_duration; + let expected_date_early = schedule.upcoming(Utc).next().unwrap(); + let expected_date_late = + schedule.upcoming(Utc).next().unwrap() + chrono::Duration::seconds(30 * 60); + assert!(next_evaluation_date.timestamp() >= expected_date_early.timestamp()); + assert!(next_evaluation_date.timestamp() <= expected_date_late.timestamp()); + if next_evaluation_date.timestamp() != expected_date_early.timestamp() { + return; + } + } + panic!("got no jitter at all on multiple successive runs") + }; + + schedule_test_helper_fn("hourly"); + schedule_test_helper_fn("daily"); + schedule_test_helper_fn("weekly"); + schedule_test_helper_fn("monthly"); + schedule_test_helper_fn("* * * ? * ?"); + } + + #[test] + fn test_retention_schedule_durationi_with_default_jitter() { + let schedule_test_helper_fn = |schedule_str: &str| { + let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap(); + let retention_policy = RetentionPolicy { + retention_period: "1 hour".to_string(), + evaluation_schedule: schedule_str.to_string(), + jitter_secs: None, + }; + let max_1s_delay = schedule_str.starts_with('*'); + let (limit, max_delay) = if max_1s_delay { + // one of our policies only allow 2 start dates, to make the test reliable, try a + // few more times + (128, 1) + } else { + (11, 3600) + }; + for _ in 0..limit { + // we run this a few times in case we are unlucky and pick a null jitter. + // This happens in one in 3601 tries, 11 unlucky tries in a row is as likely as + // finding the right aes128 key to decrypt some message at random on 1st try. + let next_evaluation_duration = chrono::Duration::nanoseconds( + retention_policy + .duration_until_next_evaluation() + .unwrap() + .as_nanos() as i64, + ); + let next_evaluation_date = Utc::now() + next_evaluation_duration; + let expected_date_early = schedule.upcoming(Utc).next().unwrap(); + let expected_date_late = + schedule.upcoming(Utc).next().unwrap() + chrono::Duration::seconds(max_delay); + assert!(dbg!(next_evaluation_date.timestamp()) >= expected_date_early.timestamp()); + assert!(next_evaluation_date.timestamp() <= expected_date_late.timestamp()); + if next_evaluation_date.timestamp() != expected_date_early.timestamp() { + return; + } + } + panic!("got no jitter at all on multiple successive runs") + }; + + schedule_test_helper_fn("hourly"); + schedule_test_helper_fn("daily"); + schedule_test_helper_fn("weekly"); + schedule_test_helper_fn("monthly"); + schedule_test_helper_fn("* * * ? * ?"); + } } diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index f789158197e..c5155fc1760 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -313,6 +313,7 @@ mod test { invalid_index_config.retention_policy_opt = Some(RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }); let validation_err = invalid_index_config .build_and_validate(None) diff --git a/quickwit/quickwit-config/src/index_template/mod.rs b/quickwit/quickwit-config/src/index_template/mod.rs index b1c3b0d36c1..fb766704358 100644 --- a/quickwit/quickwit-config/src/index_template/mod.rs +++ b/quickwit/quickwit-config/src/index_template/mod.rs @@ -177,6 +177,7 @@ impl crate::TestableForRegression for IndexTemplate { retention_policy_opt: Some(RetentionPolicy { retention_period: "42 days".to_string(), evaluation_schedule: "daily".to_string(), + jitter_secs: None, }), } } @@ -236,6 +237,7 @@ mod tests { index_template.retention_policy_opt = Some(RetentionPolicy { retention_period: "42 days".to_string(), evaluation_schedule: "hourly".to_string(), + jitter_secs: None, }); let default_index_root_uri = Uri::for_test("s3://test-bucket/indexes"); @@ -291,6 +293,7 @@ mod tests { index_template.retention_policy_opt = Some(RetentionPolicy { retention_period: "".to_string(), evaluation_schedule: "".to_string(), + jitter_secs: None, }); let error = index_template.validate().unwrap_err(); assert!(error diff --git a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs index 998ce33afbf..6654c957d8f 100644 --- a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs +++ b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs @@ -313,6 +313,7 @@ mod tests { index.retention_policy_opt = Some(RetentionPolicy { retention_period: retention_period.to_string(), evaluation_schedule: EVALUATION_SCHEDULE.to_string(), + jitter_secs: None, }) } index @@ -346,6 +347,7 @@ mod tests { let scheduler = RetentionPolicy { retention_period: "".to_string(), evaluation_schedule: EVALUATION_SCHEDULE.to_string(), + jitter_secs: None, }; scheduler.duration_until_next_evaluation().unwrap() + Duration::from_secs(1) diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index c01947ba0ba..d589a4b1f0a 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -113,6 +113,7 @@ pub async fn test_metastore_update_retention_policy< let new_retention_policy_opt = Some(RetentionPolicy { retention_period: String::from("3 days"), evaluation_schedule: String::from("daily"), + jitter_secs: None, }); // set and unset retention policy multiple times