Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add jitter to retention schedule #5361

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ async fn test_cmd_update_index() {
index_metadata.index_config.retention_policy_opt,
Some(RetentionPolicy {
retention_period: String::from("1 week"),
evaluation_schedule: String::from("daily")
evaluation_schedule: String::from("daily"),
jitter_secs: None,
})
);

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ itertools = { workspace = true }
json_comments = { workspace = true }
new_string_template = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
135 changes: 129 additions & 6 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use humantime::parse_duration;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping};
use quickwit_proto::types::IndexId;
use rand::{distributions, thread_rng, Rng};
use serde::{Deserialize, Serialize};
pub use serialize::{load_index_config_from_user_config, load_index_config_update};
use tracing::warn;
Expand Down Expand Up @@ -179,6 +180,16 @@ pub struct RetentionPolicy {
#[serde(default = "RetentionPolicy::default_schedule")]
#[serde(rename = "schedule")]
pub evaluation_schedule: String,

/// A jitter to apply to the schedule. The policy will be evaluated [0..jitter_second] seconds
/// after the scheduled time. When many indexes use the same schedule, this can be used to
/// spread the load instead of causing a very bursty load.o
///
/// If unset, a default jitter of `min(1 hour, next_next_evaluation - next_evaluation)` is
/// applied. Said otherwise, an operation may start any time between the next time it's
/// scheduled, and the time after that, but no later than 1h after the scheduled time.
#[serde(skip_serializing_if = "Option::is_none")]
pub jitter_secs: Option<u64>,
}

impl RetentionPolicy {
Expand All @@ -195,7 +206,7 @@ impl RetentionPolicy {
})
}

pub fn evaluation_schedule(&self) -> anyhow::Result<Schedule> {
fn evaluation_schedule(&self) -> anyhow::Result<Schedule> {
let evaluation_schedule = prepend_at_char(&self.evaluation_schedule);

Schedule::from_str(&evaluation_schedule).with_context(|| {
Expand All @@ -208,13 +219,24 @@ impl RetentionPolicy {

pub fn duration_until_next_evaluation(&self) -> anyhow::Result<Duration> {
let schedule = self.evaluation_schedule()?;
let future_date = schedule
.upcoming(Utc)
let mut schedule_iter = schedule.upcoming(Utc);
let future_date = schedule_iter
.next()
.expect("Failed to obtain next evaluation date.");
let duration = (future_date - Utc::now())
let mut duration = (future_date - Utc::now())
.to_std()
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
let jitter_secs = self.jitter_secs.unwrap_or_else(|| {
if let Some(next_next_date) = schedule_iter.next() {
let time_between_schedules = next_next_date - future_date;
time_between_schedules.num_seconds().clamp(0, 3600) as u64
} else {
// we don't know when the schedule is. That's odd. Let's allow no jitter
0
}
});
let jitter = thread_rng().sample::<u64, _>(distributions::Standard) % (jitter_secs + 1);
duration += Duration::from_secs(jitter);
Ok(duration)
}

Expand Down Expand Up @@ -413,6 +435,7 @@ impl crate::TestableForRegression for IndexConfig {
let retention_policy = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter_secs: None,
});
let stable_log_config = StableLogMergePolicyConfig {
merge_factor: 9,
Expand Down Expand Up @@ -544,6 +567,7 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter_secs: None,
};
assert_eq!(
index_config.retention_policy_opt.unwrap(),
Expand Down Expand Up @@ -719,6 +743,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
let retention_policy_yaml = serde_yaml::to_string(&retention_policy).unwrap();
assert_eq!(
Expand All @@ -739,6 +764,7 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
assert_eq!(retention_policy, expected_retention_policy);
}
Expand All @@ -753,6 +779,7 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter_secs: None,
};
assert_eq!(retention_policy, expected_retention_policy);
}
Expand All @@ -764,6 +791,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
assert_eq!(
retention_policy.retention_period().unwrap(),
Expand All @@ -773,6 +801,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "foo".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
assert_eq!(
retention_policy.retention_period().unwrap_err().to_string(),
Expand All @@ -797,6 +826,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "@hourly".to_string(),
jitter_secs: None,
};
assert_eq!(
retention_policy.evaluation_schedule().unwrap(),
Expand All @@ -807,6 +837,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
assert_eq!(
retention_policy.evaluation_schedule().unwrap(),
Expand All @@ -817,6 +848,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "0 * * * * *".to_string(),
jitter_secs: None,
};
let evaluation_schedule = retention_policy.evaluation_schedule().unwrap();
assert_eq!(evaluation_schedule.seconds().count(), 1);
Expand All @@ -830,20 +862,23 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
retention_policy.validate().unwrap();
}
{
let retention_policy = RetentionPolicy {
retention_period: "foo".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
};
retention_policy.validate().unwrap_err();
}
{
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "foo".to_string(),
jitter_secs: None,
};
retention_policy.validate().unwrap_err();
}
Expand All @@ -852,10 +887,11 @@ mod tests {
#[test]
fn test_retention_schedule_duration() {
let schedule_test_helper_fn = |schedule_str: &str| {
let hourly_schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
jitter_secs: Some(0),
};

let next_evaluation_duration = chrono::Duration::nanoseconds(
Expand All @@ -865,7 +901,7 @@ mod tests {
.as_nanos() as i64,
);
let next_evaluation_date = Utc::now() + next_evaluation_duration;
let expected_date = hourly_schedule.upcoming(Utc).next().unwrap();
let expected_date = schedule.upcoming(Utc).next().unwrap();
assert_eq!(next_evaluation_date.timestamp(), expected_date.timestamp());
};

Expand All @@ -875,4 +911,91 @@ mod tests {
schedule_test_helper_fn("monthly");
schedule_test_helper_fn("* * * ? * ?");
}

#[test]
fn test_retention_schedule_durationi_with_jitter() {
let schedule_test_helper_fn = |schedule_str: &str| {
let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
jitter_secs: Some(60 * 30),
};

for _ in 0..11 {
// we run this a few times in case we are unlucky and pick a null jitter.
// This happens in one in 3601 tries, 11 unlucky tries in a row is as likely as
// finding the right aes128 key to decrypt some message at random on 1st try.
let next_evaluation_duration = chrono::Duration::nanoseconds(
retention_policy
.duration_until_next_evaluation()
.unwrap()
.as_nanos() as i64,
);
let next_evaluation_date = Utc::now() + next_evaluation_duration;
let expected_date_early = schedule.upcoming(Utc).next().unwrap();
let expected_date_late =
schedule.upcoming(Utc).next().unwrap() + chrono::Duration::seconds(30 * 60);
assert!(next_evaluation_date.timestamp() >= expected_date_early.timestamp());
assert!(next_evaluation_date.timestamp() <= expected_date_late.timestamp());
if next_evaluation_date.timestamp() != expected_date_early.timestamp() {
return;
}
}
panic!("got no jitter at all on multiple successive runs")
};

schedule_test_helper_fn("hourly");
schedule_test_helper_fn("daily");
schedule_test_helper_fn("weekly");
schedule_test_helper_fn("monthly");
schedule_test_helper_fn("* * * ? * ?");
}

#[test]
fn test_retention_schedule_durationi_with_default_jitter() {
let schedule_test_helper_fn = |schedule_str: &str| {
let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
jitter_secs: None,
};
let max_1s_delay = schedule_str.starts_with('*');
let (limit, max_delay) = if max_1s_delay {
// one of our policies only allow 2 start dates, to make the test reliable, try a
// few more times
(128, 1)
} else {
(11, 3600)
};
for _ in 0..limit {
// we run this a few times in case we are unlucky and pick a null jitter.
// This happens in one in 3601 tries, 11 unlucky tries in a row is as likely as
// finding the right aes128 key to decrypt some message at random on 1st try.
let next_evaluation_duration = chrono::Duration::nanoseconds(
retention_policy
.duration_until_next_evaluation()
.unwrap()
.as_nanos() as i64,
);
let next_evaluation_date = Utc::now() + next_evaluation_duration;
let expected_date_early = schedule.upcoming(Utc).next().unwrap();
let expected_date_late =
schedule.upcoming(Utc).next().unwrap() + chrono::Duration::seconds(max_delay);
assert!(dbg!(next_evaluation_date.timestamp()) >= expected_date_early.timestamp());
assert!(next_evaluation_date.timestamp() <= expected_date_late.timestamp());
if next_evaluation_date.timestamp() != expected_date_early.timestamp() {
return;
}
}
panic!("got no jitter at all on multiple successive runs")
};

schedule_test_helper_fn("hourly");
schedule_test_helper_fn("daily");
schedule_test_helper_fn("weekly");
schedule_test_helper_fn("monthly");
schedule_test_helper_fn("* * * ? * ?");
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ mod test {
invalid_index_config.retention_policy_opt = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
});
let validation_err = invalid_index_config
.build_and_validate(None)
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl crate::TestableForRegression for IndexTemplate {
retention_policy_opt: Some(RetentionPolicy {
retention_period: "42 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter_secs: None,
}),
}
}
Expand Down Expand Up @@ -236,6 +237,7 @@ mod tests {
index_template.retention_policy_opt = Some(RetentionPolicy {
retention_period: "42 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter_secs: None,
});
let default_index_root_uri = Uri::for_test("s3://test-bucket/indexes");

Expand Down Expand Up @@ -291,6 +293,7 @@ mod tests {
index_template.retention_policy_opt = Some(RetentionPolicy {
retention_period: "".to_string(),
evaluation_schedule: "".to_string(),
jitter_secs: None,
});
let error = index_template.validate().unwrap_err();
assert!(error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ mod tests {
index.retention_policy_opt = Some(RetentionPolicy {
retention_period: retention_period.to_string(),
evaluation_schedule: EVALUATION_SCHEDULE.to_string(),
jitter_secs: None,
})
}
index
Expand Down Expand Up @@ -346,6 +347,7 @@ mod tests {
let scheduler = RetentionPolicy {
retention_period: "".to_string(),
evaluation_schedule: EVALUATION_SCHEDULE.to_string(),
jitter_secs: None,
};

scheduler.duration_until_next_evaluation().unwrap() + Duration::from_secs(1)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-metastore/src/tests/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub async fn test_metastore_update_retention_policy<
let new_retention_policy_opt = Some(RetentionPolicy {
retention_period: String::from("3 days"),
evaluation_schedule: String::from("daily"),
jitter_secs: None,
});

// set and unset retention policy multiple times
Expand Down
Loading