Skip to content

Commit

Permalink
feat: recluster unclustered blocks (#15623)
Browse files Browse the repository at this point in the history
* refactor recluster

* add tests

* fix
  • Loading branch information
zhyass authored Jun 4, 2024
1 parent 6e59dd7 commit e980c27
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,14 @@ impl OptimizeTableInterpreter {
.await?
{
if !mutator.tasks.is_empty() {
let is_distributed = mutator.is_distributed();
let reclustered_block_count = mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
is_distributed,
)?;

build_res =
Expand Down
90 changes: 50 additions & 40 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@ use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::ReclusterSink;
use databend_common_sql::executor::physical_plans::ReclusterSource;
use databend_common_sql::executor::physical_plans::ReclusterTask;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::plans::LockTableOption;
use databend_common_storages_fuse::operations::ReclusterTasks;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::error;
use log::warn;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::interpreters::OptimizeTableInterpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -148,16 +147,15 @@ impl Interpreter for ReclusterTableInterpreter {
let mutator = mutator.unwrap();
if mutator.tasks.is_empty() {
break;
};
}
let is_distributed = mutator.is_distributed();
block_count += mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
is_distributed,
)?;

let mut build_res =
Expand Down Expand Up @@ -217,44 +215,56 @@ impl Interpreter for ReclusterTableInterpreter {
}
}

#[allow(clippy::too_many_arguments)]
pub fn build_recluster_physical_plan(
tasks: Vec<ReclusterTask>,
tasks: ReclusterTasks,
table_info: TableInfo,
catalog_info: CatalogInfo,
snapshot: Arc<TableSnapshot>,
remained_blocks: Vec<Arc<BlockMeta>>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
is_distributed: bool,
) -> Result<PhysicalPlan> {
let is_distributed = tasks.len() > 1;
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
plan_id: u32::MAX,
}));
match tasks {
ReclusterTasks::Recluster {
tasks,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
} => {
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
plan_id: u32::MAX,
}));

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
}
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
plan_id: u32::MAX,
}));
plan.adjust_plan_id(&mut 0);
Ok(plan)
}
ReclusterTasks::Compact(parts) => OptimizeTableInterpreter::build_physical_plan(
parts,
table_info,
snapshot,
catalog_info,
is_distributed,
),
}
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
plan_id: u32::MAX,
}));
plan.adjust_plan_id(&mut 0);
Ok(plan)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
Expand All @@ -33,9 +34,11 @@ use databend_query::schedulers::build_query_pipeline_without_render_result_set;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;
use databend_query::test_kits::*;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use opendal::Operator;
use rand::thread_rng;
use rand::Rng;
use uuid::Uuid;
Expand Down Expand Up @@ -195,6 +198,7 @@ async fn test_safety() -> Result<()> {
threshold,
cluster_key_id,
5,
false,
)
.await?;

Expand Down Expand Up @@ -239,53 +243,72 @@ async fn test_safety() -> Result<()> {
eprintln!("no target select");
continue;
}
assert!(selections.partitions_type() != PartInfoType::LazyLevel);

let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in selections.partitions.into_iter() {
let part = CompactBlockPartInfo::from_part(&part)?;
match part {
CompactBlockPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
verify_compact_tasks(
ctx.get_application_level_data_operator()?.operator(),
selections,
locations,
HashSet::new(),
)
.await?;
}

Ok(())
}

pub async fn verify_compact_tasks(
dal: Operator,
parts: Partitions,
locations: Vec<Location>,
expected_segment_indices: HashSet<usize>,
) -> Result<()> {
assert!(parts.partitions_type() != PartInfoType::LazyLevel);

let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in parts.partitions.into_iter() {
let part = CompactBlockPartInfo::from_part(&part)?;
match part {
CompactBlockPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
CompactBlockPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
CompactBlockPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
}
}

eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
ctx.get_application_level_data_operator()?.operator(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
except_block_ids.insert(b.location.clone());
}
eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
if !expected_segment_indices.is_empty() {
assert_eq!(expected_segment_indices, compact_segment_indices);
}
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
dal.clone(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
except_block_ids.insert(b.location.clone());
}
assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
}

assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ mod deletion;
mod recluster_mutator;
mod segments_compact_mutator;

pub use block_compact_mutator::verify_compact_tasks;
pub use segments_compact_mutator::compact_segment;
pub use segments_compact_mutator::CompactSegmentTestFixture;
Loading

0 comments on commit e980c27

Please sign in to comment.