Skip to content

Commit

Permalink
fix: merge into split logic
Browse files Browse the repository at this point in the history
for merge-into stmt with both match branch, right join is used.

after got the joined resultset, instead of using a arbitrary column of
target table, use `row_id` column to split the set
into matched and not-matched parts; since "arbitrary column" may have
NULL values.
  • Loading branch information
dantengsky committed May 26, 2024
1 parent 9e1d397 commit 904173d
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 76 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ join_spilling_memory_ratio = 60
[log]

[log.file]
level = "info"
level = "DEBUG"
format = "text"
dir = "./.databend/logs_1"
prefix_filter = ""
Expand Down
53 changes: 29 additions & 24 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::u64::MAX;

use databend_common_catalog::merge_into_join::MergeIntoJoin;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -50,7 +49,6 @@ use databend_common_sql::IndexType;
use databend_common_sql::ScalarExpr;
use databend_common_sql::TypeCheck;
use databend_common_sql::DUMMY_COLUMN_INDEX;
use databend_common_sql::DUMMY_TABLE_INDEX;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
Expand Down Expand Up @@ -143,11 +141,11 @@ impl MergeIntoInterpreter {
distributed,
change_join_order,
split_idx,
row_id_index,
row_id_index: _,
can_try_update_column_only,
..
} = &self.plan;
let mut columns_set = columns_set.clone();
let columns_set = columns_set.clone();
let table = self.ctx.get_table(catalog, database, table_name).await?;
let fuse_table = table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::Unimplemented(format!(
Expand Down Expand Up @@ -181,26 +179,33 @@ impl MergeIntoInterpreter {
// for `target_build_optimization` we don't need to read rowId column. for now, there are two cases we don't read rowid:
// I. InsertOnly, the MergeIntoType is InsertOnly
// II. target build optimization for this pr. the MergeIntoType is MergeIntoType
let mut target_build_optimization =
matches!(self.plan.merge_type, MergeIntoType::FullOperation)
&& !self.plan.columns_set.contains(&self.plan.row_id_index);
if target_build_optimization {
assert!(*change_join_order && !*distributed);
// so if `target_build_optimization` is true, it means the optimizer enable this rule.
// but we need to check if it's parquet format or native format. for now,we just support
// parquet. (we will support native in the next pr).
if fuse_table.is_native() {
target_build_optimization = false;
// and we need to add row_id back and forbidden target_build_optimization
columns_set.insert(*row_id_index);
let merge_into_join = self.ctx.get_merge_into_join();
self.ctx.set_merge_into_join(MergeIntoJoin {
target_tbl_idx: DUMMY_TABLE_INDEX,
is_distributed: merge_into_join.is_distributed,
merge_into_join_type: merge_into_join.merge_into_join_type,
});
}
}

// Temporarily commented out, due to issue https://github.com/datafuselabs/databend/issues/15643
// might be uncommented or removed during recovering the target build feature of merg-into
//
// let mut target_build_optimization =
// matches!(self.plan.merge_type, MergeIntoType::FullOperation)
// && !self.plan.columns_set.contains(&self.plan.row_id_index);

// if target_build_optimization {
// assert!(*change_join_order && !*distributed);
// // so if `target_build_optimization` is true, it means the optimizer enable this rule.
// // but we need to check if it's parquet format or native format. for now,we just support
// // parquet. (we will support native in the next pr).
// if fuse_table.is_native() {
// target_build_optimization = false;
// // and we need to add row_id back and forbidden target_build_optimization
// columns_set.insert(*row_id_index);
// let merge_into_join = self.ctx.get_merge_into_join();
// self.ctx.set_merge_into_join(MergeIntoJoin {
// target_tbl_idx: DUMMY_TABLE_INDEX,
// is_distributed: merge_into_join.is_distributed,
// merge_into_join_type: merge_into_join.merge_into_join_type,
// });
// }
// }

let target_build_optimization = false;

// check mutability
let check_table = self.ctx.get_table(catalog, database, table_name).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use databend_common_storages_fuse::operations::TransformDistributedMergeIntoBloc
use databend_common_storages_fuse::operations::TransformSerializeBlock;
use databend_common_storages_fuse::operations::TransformSerializeSegment;
use databend_common_storages_fuse::FuseTable;
use log::info;

use crate::pipelines::processors::transforms::AccumulateRowNumber;
use crate::pipelines::processors::transforms::ExtractHashTableByRowNumber;
Expand Down Expand Up @@ -379,7 +378,6 @@ impl PipelineBuilder {
if let Some(split_idx) = merge_into_split_idx {
let mut items = Vec::with_capacity(self.main_pipeline.output_len());
let output_len = self.main_pipeline.output_len();
info!("split_idx {split_idx}, main_pipeline output len {output_len}");
for _ in 0..output_len {
let merge_into_split_processor =
MergeIntoSplitProcessor::create(*split_idx as u32)?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl DefaultSettings {
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("disable_merge_into_join_reorder", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
value: UserSettingValue::UInt64(1),
desc: "Disable merge into join reorder optimization.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
Expand Down
36 changes: 21 additions & 15 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::IndexType;
use crate::ScalarBinder;
use crate::ScalarExpr;
use crate::Visibility;
use crate::DUMMY_COLUMN_INDEX;

#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum MergeIntoType {
Expand Down Expand Up @@ -438,20 +437,27 @@ impl Binder {
.await?,
);
}
let mut split_idx = DUMMY_COLUMN_INDEX;
// find any target table column index for merge_into_split
for column in self
.metadata
.read()
.columns_by_table_index(table_index)
.iter()
{
if column.index() != row_id_index {
split_idx = column.index();
break;
}
}
assert!(split_idx != DUMMY_COLUMN_INDEX);

let split_idx = row_id_index;
// Temporarily commented out, due to issue https://github.com/datafuselabs/databend/issues/15643
// might be un-commented or removed during recovering the target build feature of merge-into
//
//
// let mut split_idx = DUMMY_COLUMN_INDEX;
// // find any target table column index for merge_into_split
// for column in self
// .metadata
// .read()
// .columns_by_table_index(table_index)
// .iter()
// {
// if column.index() != row_id_index {
// split_idx = column.index();
// break;
// }
// }
// assert!(split_idx != DUMMY_COLUMN_INDEX);
//

Ok(MergeInto {
catalog: catalog_name.to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ impl Planner {
.with_enable_join_reorder(unsafe { !settings.get_disable_join_reorder()? })
.with_enable_dphyp(settings.get_enable_dphyp()?)
.with_enable_merge_into_join_reorder(
!settings.get_disable_merge_into_join_reorder()?,
// Temporarily commented out, due to issue https://github.com/datafuselabs/databend/issues/15643

// might be uncommented or removed during recovering the target build feature of merge-into
// ! settings.get_disable_merge_into_join_reorder()?,
false,
);

let optimized_plan = optimize(opt_ctx, plan).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl MatchedAggregator {
.insert(offset as usize)
{
return Err(ErrorCode::UnresolvableConflict(
"multi rows from source match one and the same row in the target_table multi times",
"multi rows from source match one and the same row in the target_table multi times (update)",
));
}
}
Expand Down Expand Up @@ -331,7 +331,7 @@ impl MatchedAggregator {
< update_modified_offsets.len() + delete_modified_offsets.len()
{
return Err(ErrorCode::UnresolvableConflict(
"multi rows from source match one and the same row in the target_table multi times",
"multi rows from source match one and the same row in the target_table multi times (other)",
));
}

Expand Down Expand Up @@ -365,7 +365,6 @@ impl MatchedAggregator {
mutation_logs.push(segment_mutation_log);
}
}
info!("CHECK: mutation logs {:#?}", mutation_logs);
let elapsed_time = start.elapsed().as_millis() as u64;
metrics_inc_merge_into_apply_milliseconds(elapsed_time);
Ok(Some(MutationLogs {
Expand All @@ -383,10 +382,6 @@ impl AggregationContext {
block_meta: &BlockMeta,
modified_offsets: HashSet<usize>,
) -> Result<Option<MutationLogEntry>> {
info!(
"CHECK: apply update and delete to segment idx {}, block idx {}",
segment_idx, block_idx,
);
let progress_values = ProgressValues {
rows: modified_offsets.len(),
bytes: 0,
Expand Down Expand Up @@ -463,9 +458,10 @@ impl AggregationContext {
let new_block_raw_data = serialized.block_raw_data;
let data_accessor = self.data_accessor.clone();

// TODO BUGGY, inverted index not saved

// write block data
write_data(new_block_raw_data, &data_accessor, &new_block_location).await?;

// write bloom index data
if let Some(bloom_state) = serialized.bloom_index_state {
write_data(bloom_state.data, &data_accessor, &bloom_state.location.0).await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::DataBlock;
use log::info;

pub struct MergeIntoSplitMutator {
pub split_idx: u32,
Expand All @@ -38,7 +37,6 @@ impl MergeIntoSplitMutator {
// get row_id do check duplicate and get filter
let filter: Bitmap = match &split_column.value {
databend_common_expression::Value::Scalar(scalar) => {
info!("split block using scalar value");
// fast judge
if scalar.is_null() {
return Ok((DataBlock::empty(), block.clone()));
Expand All @@ -48,10 +46,6 @@ impl MergeIntoSplitMutator {
}
databend_common_expression::Value::Column(column) => match column {
databend_common_expression::Column::Nullable(nullable_column) => {
info!(
"CHECK: split block using column's validity: {:#?}",
nullable_column
);
nullable_column.validity.clone()
}
_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use databend_common_pipeline_core::PipeItem;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_storage::MergeStatus;
use itertools::Itertools;
use log::info;

use crate::operations::merge_into::mutator::SplitByExprMutator;
use crate::operations::BlockMetaIndex;
Expand Down Expand Up @@ -162,7 +161,6 @@ impl Processor for MergeIntoNotMatchedProcessor {
if no_need_add_status {
// no need to give source schema, the data block's schema is complete, so we won'f fill default
// field values.The computed field will be processed in `TransformResortAddOnWithoutSourceSchema`.
info!("CHECK: target build optimized, passing data block directly to downstream");
self.output_data.push(data_block);
return Ok(());
}
Expand All @@ -184,7 +182,6 @@ impl Processor for MergeIntoNotMatchedProcessor {
deleted_rows: 0,
});

info!("CHECK: appending data block {:#?} ", satisfied_block);
self.output_data
.push(op.op.execute(&self.func_ctx, satisfied_block)?)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,14 @@ impl Processor for MergeIntoSplitProcessor {
metrics_inc_merge_into_split_milliseconds(elapsed_time);

if !matched_block.is_empty() {
info!("got {} row matched data", matched_block.num_rows(),);
metrics_inc_merge_into_matched_rows(matched_block.num_rows() as u32);
info!(
"CHECK: matched data, row number {}, col number {}: {:#?}",
matched_block.num_rows(),
matched_block.num_columns(),
matched_block.columns()
);
self.output_data_matched_data = Some(matched_block);
}

if !not_matched_block.is_empty() {
info!("got {} row unmatched data", not_matched_block.num_rows(),);
metrics_inc_merge_into_unmatched_rows(not_matched_block.num_rows() as u32);
info!(
"CHECK: not matched data, row number {}, col number {} : {:#?}",
not_matched_block.num_rows(),
not_matched_block.num_columns(),
not_matched_block.columns()
);
self.output_data_not_matched_data = Some(not_matched_block);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
statement ok
create or replace database m_test;

statement ok
use m_test;

statement ok
create table t(a string, b string, c string, d string, k string);

statement ok
create table s(a string, b string, c string, d string, k string);

statement ok
insert into t(k) values('k');

statement ok
insert into s(k) values('k');


# expects 1 row updated, 0 row inserted
query II
merge into t using s on t.k = s.k
when matched then update *
when not matched then insert *;
---
0 1

query TTTTT
select * from t;
---
NULL NULL NULL NULL k

0 comments on commit 904173d

Please sign in to comment.