Skip to content

Commit

Permalink
Validate documents during persist (#5046)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jun 19, 2024
1 parent cbe87f6 commit ca97595
Show file tree
Hide file tree
Showing 52 changed files with 1,853 additions and 430 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ utoipa = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-common = { workspace = true }
quickwit-proto ={ workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ThreadPool {
/// but is not running yet "cancellable".
pub fn run_cpu_intensive<F, R>(
&self,
cpu_heavy_task: F,
cpu_intensive_fn: F,
) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
Expand All @@ -103,7 +103,7 @@ impl ThreadPool {
let _guard = span.enter();
let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks);
ongoing_task_guard.add(1i64);
let result = cpu_heavy_task();
let result = cpu_intensive_fn();
let _ = tx.send(result);
});
rx.map_err(|_| Panicked)
Expand All @@ -118,7 +118,7 @@ impl ThreadPool {
///
/// Disclaimer: The function will no be executed if the Future is dropped.
#[must_use = "run_cpu_intensive will not run if the future it returns is dropped"]
pub fn run_cpu_intensive<F, R>(cpu_heavy_task: F) -> impl Future<Output = Result<R, Panicked>>
pub fn run_cpu_intensive<F, R>(cpu_intensive_fn: F) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -129,7 +129,7 @@ where
let num_threads: usize = (crate::num_cpus() / 3).max(2);
ThreadPool::new("small_tasks", Some(num_threads))
})
.run_cpu_intensive(cpu_heavy_task)
.run_cpu_intensive(cpu_intensive_fn)
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand Down
38 changes: 23 additions & 15 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod serialize;

use anyhow::ensure;
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use quickwit_proto::types::{DocMappingUid, IndexId};
use serde::{Deserialize, Serialize};
pub use serialize::{IndexTemplateV0_8, VersionedIndexTemplate};

Expand Down Expand Up @@ -68,10 +68,14 @@ impl IndexTemplate {
.unwrap_or(default_index_root_uri)
.join(&index_id)?;

// Ensure that the doc mapping UID is truly unique per index.
let mut doc_mapping = self.doc_mapping.clone();
doc_mapping.doc_mapping_uid = DocMappingUid::random();

let index_config = IndexConfig {
index_id,
index_uri,
doc_mapping: self.doc_mapping.clone(),
doc_mapping,
indexing_settings: self.indexing_settings.clone(),
search_settings: self.search_settings.clone(),
retention_policy_opt: self.retention_policy_opt.clone(),
Expand Down Expand Up @@ -235,33 +239,37 @@ mod tests {
});
let default_index_root_uri = Uri::for_test("s3://test-bucket/indexes");

let index_config = index_template
.apply_template("test-index".to_string(), &default_index_root_uri)
let index_config_foo = index_template
.apply_template("test-index-foo".to_string(), &default_index_root_uri)
.unwrap();

assert_eq!(index_config.index_id, "test-index");
assert_eq!(index_config.index_uri, "ram:///indexes/test-index");
assert_eq!(index_config_foo.index_id, "test-index-foo");
assert_eq!(index_config_foo.index_uri, "ram:///indexes/test-index-foo");

assert_eq!(index_config.doc_mapping.timestamp_field.unwrap(), "ts");
assert_eq!(index_config.indexing_settings.commit_timeout_secs, 42);
assert_eq!(index_config_foo.doc_mapping.timestamp_field.unwrap(), "ts");
assert_eq!(index_config_foo.indexing_settings.commit_timeout_secs, 42);
assert_eq!(
index_config.search_settings.default_search_fields,
index_config_foo.search_settings.default_search_fields,
["message"]
);
let retention_policy = index_config.retention_policy_opt.unwrap();
let retention_policy = index_config_foo.retention_policy_opt.unwrap();
assert_eq!(retention_policy.retention_period, "42 days");
assert_eq!(retention_policy.evaluation_schedule, "hourly");

index_template.index_root_uri = None;

let index_config = index_template
.apply_template("test-index".to_string(), &default_index_root_uri)
let index_config_bar = index_template
.apply_template("test-index-bar".to_string(), &default_index_root_uri)
.unwrap();

assert_eq!(index_config.index_id, "test-index");
assert_eq!(index_config_bar.index_id, "test-index-bar");
assert_eq!(
index_config.index_uri,
"s3://test-bucket/indexes/test-index"
index_config_bar.index_uri,
"s3://test-bucket/indexes/test-index-bar"
);
assert_ne!(
index_config_foo.doc_mapping.doc_mapping_uid,
index_config_bar.doc_mapping.doc_mapping_uid
);
}

Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,6 @@ mod tests {
control_plane_mailbox.ask(callback).await.unwrap();

let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
println!("{:?}", control_plane_debug_info);
let shard =
&control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000000");
Expand Down
Loading

0 comments on commit ca97595

Please sign in to comment.