Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed May 25, 2024
1 parent 5647d4a commit 41952fc
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub fn build_recluster_physical_plan(
snapshot,
catalog_info,
is_distributed,
true,
need_lock,
),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
Expand All @@ -33,9 +34,11 @@ use databend_query::schedulers::build_query_pipeline_without_render_result_set;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;
use databend_query::test_kits::*;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use opendal::Operator;
use rand::thread_rng;
use rand::Rng;
use uuid::Uuid;
Expand Down Expand Up @@ -196,6 +199,7 @@ async fn test_safety() -> Result<()> {
threshold,
cluster_key_id,
5,
false,
)
.await?;

Expand Down Expand Up @@ -240,53 +244,72 @@ async fn test_safety() -> Result<()> {
eprintln!("no target select");
continue;
}
assert!(selections.partitions_type() != PartInfoType::LazyLevel);

let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in selections.partitions.into_iter() {
let part = CompactBlockPartInfo::from_part(&part)?;
match part {
CompactBlockPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
verify_compact_tasks(
ctx.get_data_operator()?.operator(),
selections,
locations,
HashSet::new(),
)
.await?;
}

Ok(())
}

pub async fn verify_compact_tasks(
dal: Operator,
parts: Partitions,
locations: Vec<Location>,
expected_segment_indices: HashSet<usize>,
) -> Result<()> {
assert!(parts.partitions_type() != PartInfoType::LazyLevel);

let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in parts.partitions.into_iter() {
let part = CompactBlockPartInfo::from_part(&part)?;
match part {
CompactBlockPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
CompactBlockPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
CompactBlockPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
}
}

eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
ctx.get_data_operator()?.operator(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
except_block_ids.insert(b.location.clone());
}
eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
if !expected_segment_indices.is_empty() {
assert_eq!(expected_segment_indices, compact_segment_indices);
}
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
dal.clone(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
except_block_ids.insert(b.location.clone());
}
assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
}

assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ mod deletion;
mod recluster_mutator;
mod segments_compact_mutator;

pub use block_compact_mutator::verify_compact_tasks;
pub use segments_compact_mutator::compact_segment;
pub use segments_compact_mutator::CompactSegmentTestFixture;
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use rand::thread_rng;
use rand::Rng;
use uuid::Uuid;

use crate::storages::fuse::operations::mutation::verify_compact_tasks;
use crate::storages::fuse::operations::mutation::CompactSegmentTestFixture;

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -203,13 +204,25 @@ async fn test_safety_for_recluster() -> Result<()> {
number_of_segments, number_of_blocks,
);

let unclustered: bool = rand.gen();
let mut unclustered_segment_indices = HashSet::new();
if unclustered {
unclustered_segment_indices = block_number_of_segments
.iter()
.rev()
.enumerate()
.filter(|(_, &num)| num % 4 == 0)
.map(|(index, _)| index)
.collect();
}
let (locations, _, segment_infos) = CompactSegmentTestFixture::gen_segments(
ctx.clone(),
block_number_of_segments,
rows_per_blocks,
threshold,
Some(cluster_key_id),
block_per_seg,
unclustered,
)
.await?;

Expand Down Expand Up @@ -241,7 +254,7 @@ async fn test_safety_for_recluster() -> Result<()> {
}

let ctx: Arc<dyn TableContext> = ctx.clone();
let segment_locations = create_segment_location_vector(locations, None);
let segment_locations = create_segment_location_vector(locations.clone(), None);
let compact_segments = FuseTable::segment_pruning(
&ctx,
schema.clone(),
Expand Down Expand Up @@ -284,46 +297,58 @@ async fn test_safety_for_recluster() -> Result<()> {

eprintln!("need_recluster: {}", need_recluster);
if need_recluster {
let ReclusterTasks::Recluster {
tasks,
remained_blocks,
removed_segment_indexes,
..
} = mutator.tasks
else {
return Err(ErrorCode::Internal("Logical error, it's a bug"));
};
assert!(tasks.len() <= max_tasks && !tasks.is_empty());
eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks);
let mut blocks = Vec::new();
for task in tasks.into_iter() {
let parts = task.parts.partitions;
assert!(task.total_bytes <= recluster_block_size);
for part in parts.into_iter() {
let fuse_part = FuseBlockPartInfo::from_part(&part)?;
blocks.push(fuse_part.location.clone());
match mutator.tasks {
ReclusterTasks::Recluster {
tasks,
remained_blocks,
removed_segment_indexes,
..
} => {
assert!(unclustered_segment_indices.is_empty());
assert!(tasks.len() <= max_tasks && !tasks.is_empty());
eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks);
let mut blocks = Vec::new();
for task in tasks.into_iter() {
let parts = task.parts.partitions;
assert!(task.total_bytes <= recluster_block_size);
for part in parts.into_iter() {
let fuse_part = FuseBlockPartInfo::from_part(&part)?;
blocks.push(fuse_part.location.clone());
}
}

eprintln!(
"selected segments number {}, selected blocks number {}, remained blocks number {}",
removed_segment_indexes.len(),
blocks.len(),
remained_blocks.len()
);
for remain in remained_blocks {
blocks.push(remain.location.0.clone());
}

let block_ids_after_target = HashSet::from_iter(blocks.into_iter());

let mut origin_blocks_ids = HashSet::new();
for idx in &removed_segment_indexes {
for b in &segment_infos[*idx].blocks {
origin_blocks_ids.insert(b.location.0.clone());
}
}
assert_eq!(block_ids_after_target, origin_blocks_ids);
}
}

eprintln!(
"selected segments number {}, selected blocks number {}, remained blocks number {}",
removed_segment_indexes.len(),
blocks.len(),
remained_blocks.len()
);
for remain in remained_blocks {
blocks.push(remain.location.0.clone());
}

let block_ids_after_target = HashSet::from_iter(blocks.into_iter());

let mut origin_blocks_ids = HashSet::new();
for idx in &removed_segment_indexes {
for b in &segment_infos[*idx].blocks {
origin_blocks_ids.insert(b.location.0.clone());
ReclusterTasks::Compact(parts) => {
assert!(unclustered);
assert!(!unclustered_segment_indices.is_empty());
verify_compact_tasks(
ctx.get_data_operator()?.operator(),
parts,
locations,
unclustered_segment_indices,
)
.await?;
}
}
assert_eq!(block_ids_after_target, origin_blocks_ids);
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ impl CompactSegmentTestFixture {
BlockThresholds::default(),
cluster_key_id,
block_per_seg as usize,
false,
)
.await?;
let mut summary = Statistics::default();
Expand All @@ -707,6 +708,7 @@ impl CompactSegmentTestFixture {
thresholds: BlockThresholds,
cluster_key_id: Option<u32>,
block_per_seg: usize,
unclustered: bool,
) -> Result<(Vec<Location>, Vec<BlockMeta>, Vec<SegmentInfo>)> {
let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned());
let data_accessor = ctx.get_data_operator()?.operator();
Expand All @@ -731,7 +733,7 @@ impl CompactSegmentTestFixture {

let col_stats = gen_columns_statistics(&block, None, &schema)?;

let cluster_stats = if num_blocks % 5 == 0 {
let cluster_stats = if unclustered && num_blocks % 4 == 0 {
None
} else {
cluster_key_id.map(|v| {
Expand Down Expand Up @@ -1013,6 +1015,7 @@ async fn test_compact_segment_with_cluster() -> Result<()> {
BlockThresholds::default(),
Some(cluster_key_id),
block_per_seg as usize,
false,
)
.await?;
let mut summary = Statistics::default();
Expand Down
Loading

0 comments on commit 41952fc

Please sign in to comment.