Skip to content

Commit

Permalink
Making shard throughput configurable (#5183)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jul 2, 2024
1 parent 694cd5c commit be20923
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 9 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct RateLimiterSettings {
pub refill_period: Duration,
}

#[cfg(any(test, feature = "testsuite"))]
impl Default for RateLimiterSettings {
fn default() -> Self {
// 10 MB burst limit.
Expand Down
40 changes: 34 additions & 6 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,17 @@ pub struct IngestApiConfig {
pub max_queue_disk_usage: ByteSize,
pub replication_factor: usize,
pub content_length_limit: ByteSize,
pub shard_throughput_limit: ByteSize,
}

impl Default for IngestApiConfig {
fn default() -> Self {
Self {
max_queue_memory_usage: ByteSize::gib(2), // TODO maybe we want more?
max_queue_disk_usage: ByteSize::gib(4), // TODO maybe we want more?
max_queue_memory_usage: ByteSize::gib(2),
max_queue_disk_usage: ByteSize::gib(4),
replication_factor: 1,
content_length_limit: ByteSize::mib(10),
shard_throughput_limit: ByteSize::mib(5),
}
}
}
Expand Down Expand Up @@ -320,6 +322,12 @@ impl IngestApiConfig {
self.max_queue_disk_usage,
self.max_queue_memory_usage
);
ensure!(
self.shard_throughput_limit >= ByteSize::mib(1)
&& self.shard_throughput_limit <= ByteSize::mib(20),
"shard_throughput_limit ({:?}) must be within 1mb and 20mb",
self.shard_throughput_limit
);
Ok(())
}
}
Expand Down Expand Up @@ -538,34 +546,54 @@ mod tests {
);
}
}

#[test]
fn test_validate_ingest_api_default() {
let ingest_api_config: IngestApiConfig = serde_yaml::from_str("").unwrap();
assert!(ingest_api_config.validate().is_ok());
assert_eq!(ingest_api_config, IngestApiConfig::default());
}

#[test]
fn test_validate_ingest_api_config() {
{
let indexer_config: IngestApiConfig = serde_yaml::from_str(
let ingest_api_config: IngestApiConfig = serde_yaml::from_str(
r#"
max_queue_disk_usage: 100M
"#,
)
.unwrap();
assert_eq!(
indexer_config.validate().unwrap_err().to_string(),
ingest_api_config.validate().unwrap_err().to_string(),
"max_queue_disk_usage must be at least 256 MiB, got `100.0 MB`"
);
}
{
let indexer_config: IngestApiConfig = serde_yaml::from_str(
let ingest_api_config: IngestApiConfig = serde_yaml::from_str(
r#"
max_queue_memory_usage: 600M
max_queue_disk_usage: 500M
"#,
)
.unwrap();
assert_eq!(
indexer_config.validate().unwrap_err().to_string(),
ingest_api_config.validate().unwrap_err().to_string(),
"max_queue_disk_usage (500.0 MB) must be at least max_queue_memory_usage (600.0 \
MB)"
);
}
{
let ingest_api_config: IngestApiConfig = serde_yaml::from_str(
r#"
shard_throughput_limit: 21M
"#,
)
.unwrap();
assert_eq!(
ingest_api_config.validate().unwrap_err().to_string(),
"shard_throughput_limit (21.0 MB) must be within 1mb and 20mb"
);
}
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ utoipa = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true }
quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub mod ingester;
pub mod router;

include!("../codegen/quickwit/quickwit.ingest.rs");

pub type IngestV2Result<T> = std::result::Result<T, IngestV2Error>;

#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
7 changes: 6 additions & 1 deletion quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,14 @@ async fn setup_ingest_v2(
// we actually rewrite the `\n-delimited format into a tiny bit larger buffer, where the
// line length is prefixed.
let burst_limit = (content_length_limit.as_u64() * 3 / 2).clamp(10_000_000, 200_000_000);

let rate_limit =
ConstantRate::bytes_per_sec(node_config.ingest_api_config.shard_throughput_limit);
let rate_limiter_settings = RateLimiterSettings {
burst_limit,
..Default::default()
rate_limit,
// Refill every 100ms.
refill_period: Duration::from_millis(100),
};

// Instantiate ingester.
Expand Down

0 comments on commit be20923

Please sign in to comment.