Skip to content

Commit

Permalink
refactor recluster
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed May 24, 2024
1 parent c8e3c20 commit 5647d4a
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,14 @@ impl OptimizeTableInterpreter {
.await?
{
if !mutator.tasks.is_empty() {
let is_distributed = mutator.is_distributed();
let reclustered_block_count = mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
is_distributed,
self.plan.need_lock,
)?;

Expand Down
93 changes: 52 additions & 41 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::ReclusterSink;
use databend_common_sql::executor::physical_plans::ReclusterSource;
use databend_common_sql::executor::physical_plans::ReclusterTask;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_storages_fuse::operations::ReclusterTasks;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::error;
use log::warn;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::interpreters::OptimizeTableInterpreter;
use crate::locks::LockExt;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
Expand Down Expand Up @@ -149,16 +148,15 @@ impl Interpreter for ReclusterTableInterpreter {
let mutator = mutator.unwrap();
if mutator.tasks.is_empty() {
break;
};
}
let is_distributed = mutator.is_distributed();
block_count += mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table_info,
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
is_distributed,
true,
)?;

Expand Down Expand Up @@ -222,46 +220,59 @@ impl Interpreter for ReclusterTableInterpreter {
}
}

#[allow(clippy::too_many_arguments)]
pub fn build_recluster_physical_plan(
tasks: Vec<ReclusterTask>,
tasks: ReclusterTasks,
table_info: TableInfo,
catalog_info: CatalogInfo,
snapshot: Arc<TableSnapshot>,
remained_blocks: Vec<Arc<BlockMeta>>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
is_distributed: bool,
need_lock: bool,
) -> Result<PhysicalPlan> {
let is_distributed = tasks.len() > 1;
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
plan_id: u32::MAX,
}));
match tasks {
ReclusterTasks::Recluster {
tasks,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
} => {
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
plan_id: u32::MAX,
}));

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
}
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
plan_id: u32::MAX,
need_lock,
}));
plan.adjust_plan_id(&mut 0);
Ok(plan)
}
ReclusterTasks::Compact(parts) => OptimizeTableInterpreter::build_physical_plan(
parts,
table_info,
snapshot,
catalog_info,
is_distributed,
true,
),
}
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
plan_id: u32::MAX,
need_lock,
}));
plan.adjust_plan_id(&mut 0);
Ok(plan)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_expression::TableSchemaRef;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::operations::ReclusterMutator;
use databend_common_storages_fuse::operations::ReclusterTasks;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
use databend_common_storages_fuse::statistics::reducers::reduce_block_metas;
Expand Down Expand Up @@ -134,19 +135,23 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
)
.await?;

let mut mutator = ReclusterMutator::try_create(
let mut mutator = ReclusterMutator::new(
ctx,
Arc::new(snapshot),
schema,
1.0,
BlockThresholds::default(),
cluster_key_id,
1,
)?;
1000,
);
let need_recluster = mutator.target_select(compact_segments).await?;
assert!(need_recluster);
assert_eq!(mutator.tasks.len(), 1);
let total_block_nums = mutator.tasks.iter().map(|t| t.parts.len()).sum::<usize>();
let ReclusterTasks::Recluster { tasks, .. } = mutator.tasks else {
return Err(ErrorCode::Internal("Logical error, it's a bug"));
};
assert_eq!(tasks.len(), 1);
let total_block_nums = tasks.iter().map(|t| t.parts.len()).sum::<usize>();
assert_eq!(total_block_nums, 3);

Ok(())
Expand Down Expand Up @@ -247,24 +252,20 @@ async fn test_safety_for_recluster() -> Result<()> {
.await?;

let mut need_recluster = false;
let mut mutator = ReclusterMutator::try_create(
let mut mutator = ReclusterMutator::new(
ctx.clone(),
snapshot,
schema.clone(),
1.0,
threshold,
cluster_key_id,
max_tasks,
)?;
let selected_segs =
ReclusterMutator::select_segments(&compact_segments, block_per_seg, 8, cluster_key_id)?;
block_per_seg,
);
let selected_segs = mutator.select_segments(&compact_segments, 8)?;
if selected_segs.is_empty() {
for compact_segment in compact_segments.into_iter() {
if !ReclusterMutator::segment_can_recluster(
&compact_segment.1.summary,
block_per_seg,
cluster_key_id,
) {
if !mutator.segment_can_recluster(&compact_segment.1.summary) {
continue;
}

Expand All @@ -283,7 +284,15 @@ async fn test_safety_for_recluster() -> Result<()> {

eprintln!("need_recluster: {}", need_recluster);
if need_recluster {
let tasks = mutator.tasks;
let ReclusterTasks::Recluster {
tasks,
remained_blocks,
removed_segment_indexes,
..
} = mutator.tasks
else {
return Err(ErrorCode::Internal("Logical error, it's a bug"));
};
assert!(tasks.len() <= max_tasks && !tasks.is_empty());
eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks);
let mut blocks = Vec::new();
Expand All @@ -296,10 +305,9 @@ async fn test_safety_for_recluster() -> Result<()> {
}
}

let remained_blocks = std::mem::take(&mut mutator.remained_blocks);
eprintln!(
"selected segments number {}, selected blocks number {}, remained blocks number {}",
mutator.removed_segment_indexes.len(),
removed_segment_indexes.len(),
blocks.len(),
remained_blocks.len()
);
Expand All @@ -310,7 +318,7 @@ async fn test_safety_for_recluster() -> Result<()> {
let block_ids_after_target = HashSet::from_iter(blocks.into_iter());

let mut origin_blocks_ids = HashSet::new();
for idx in &mutator.removed_segment_indexes {
for idx in &removed_segment_indexes {
for b in &segment_infos[*idx].blocks {
origin_blocks_ids.insert(b.location.0.clone());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl BlockCompactMutator {
for (segment_idx, compact_segment) in segment_infos.into_iter() {
let segments_vec = checker.add(segment_idx, compact_segment);
for segments in segments_vec {
self.generate_part(segments, &mut parts, &mut checker);
checker.generate_part(segments, &mut parts);
}

let residual_segment_cnt = checker.segments.len();
Expand Down Expand Up @@ -177,11 +177,7 @@ impl BlockCompactMutator {
}

// finalize the compaction.
self.generate_part(
std::mem::take(&mut checker.segments),
&mut parts,
&mut checker,
);
checker.finalize(&mut parts);

// Status.
let elapsed_time = start.elapsed();
Expand Down Expand Up @@ -295,28 +291,9 @@ impl BlockCompactMutator {
}
}
}

fn generate_part(
&mut self,
segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
parts: &mut Vec<PartInfoPtr>,
checker: &mut SegmentCompactChecker,
) {
if !segments.is_empty() && checker.check_for_compact(&segments) {
let mut segment_indices = Vec::with_capacity(segments.len());
let mut compact_segments = Vec::with_capacity(segments.len());
for (idx, segment) in segments.into_iter() {
segment_indices.push(idx);
compact_segments.push(segment);
}

let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments);
parts.push(lazy_part);
}
}
}

struct SegmentCompactChecker {
pub struct SegmentCompactChecker {
segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
total_block_count: u64,
block_threshold: u64,
Expand All @@ -327,7 +304,7 @@ struct SegmentCompactChecker {
}

impl SegmentCompactChecker {
fn new(block_threshold: u64, cluster_key_id: Option<u32>) -> Self {
pub fn new(block_threshold: u64, cluster_key_id: Option<u32>) -> Self {
Self {
segments: vec![],
total_block_count: 0,
Expand Down Expand Up @@ -361,7 +338,7 @@ impl SegmentCompactChecker {
true
}

fn add(
pub fn add(
&mut self,
idx: SegmentIndex,
segment: Arc<CompactSegmentInfo>,
Expand All @@ -386,6 +363,29 @@ impl SegmentCompactChecker {
self.segments.push((idx, segment));
vec![std::mem::take(&mut self.segments)]
}

pub fn generate_part(
&mut self,
segments: Vec<(SegmentIndex, Arc<CompactSegmentInfo>)>,
parts: &mut Vec<PartInfoPtr>,
) {
if !segments.is_empty() && self.check_for_compact(&segments) {
let mut segment_indices = Vec::with_capacity(segments.len());
let mut compact_segments = Vec::with_capacity(segments.len());
for (idx, segment) in segments.into_iter() {
segment_indices.push(idx);
compact_segments.push(segment);
}

let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments);
parts.push(lazy_part);
}
}

pub fn finalize(&mut self, parts: &mut Vec<PartInfoPtr>) {
let final_segments = std::mem::take(&mut self.segments);
self.generate_part(final_segments, parts);
}
}

struct CompactTaskBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ mod recluster_mutator;
mod segment_compact_mutator;

pub use block_compact_mutator::BlockCompactMutator;
pub use block_compact_mutator::SegmentCompactChecker;
pub use recluster_mutator::ReclusterMutator;
pub use recluster_mutator::ReclusterTasks;
pub use segment_compact_mutator::SegmentCompactMutator;
pub use segment_compact_mutator::SegmentCompactionState;
pub use segment_compact_mutator::SegmentCompactor;
Loading

0 comments on commit 5647d4a

Please sign in to comment.