Skip to content

Commit

Permalink
fix: merge into split logic
Browse files Browse the repository at this point in the history
for merge-into stmt with both matched and not-matched branch, right join
is used,
after got the joined result-set, a column of the result-set is chosen as
the "split" column,
i.e. for rows of the split column, those are nulls are recognized as
no-matched, and others as matched.

before this PR, an arbitrary column of target table is picked as the
"split" column, which is unsafe, since
the "arbitrary column" may have NULL values originally, which may lead
to incorrectly treat a matched row
as unmatched, and unexpected result (data duplication).
  • Loading branch information
dantengsky committed May 27, 2024
1 parent 28c543e commit 2ec12ac
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl MergeIntoInterpreter {

let insert_only = matches!(merge_type, MergeIntoType::InsertOnly);

let mut row_id_idx = if !insert_only && !target_build_optimization {
let mut row_id_idx = if !insert_only {
match meta_data
.read()
.row_id_index_by_table_index(*target_table_idx)
Expand Down
17 changes: 2 additions & 15 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::IndexType;
use crate::ScalarBinder;
use crate::ScalarExpr;
use crate::Visibility;
use crate::DUMMY_COLUMN_INDEX;

#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum MergeIntoType {
Expand Down Expand Up @@ -438,20 +437,8 @@ impl Binder {
.await?,
);
}
let mut split_idx = DUMMY_COLUMN_INDEX;
// find any target table column index for merge_into_split
for column in self
.metadata
.read()
.columns_by_table_index(table_index)
.iter()
{
if column.index() != row_id_index {
split_idx = column.index();
break;
}
}
assert!(split_idx != DUMMY_COLUMN_INDEX);

let split_idx = row_id_index;

Ok(MergeInto {
catalog: catalog_name.to_string(),
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) ->
== 0
&& flag
{
new_columns_set.remove(&plan.row_id_index);
opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin {
merge_into_join_type: MergeIntoJoinType::Left,
is_distributed: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
statement ok
create or replace database m_test;

statement ok
use m_test;

statement ok
create table t(a string, b string, c string, d string, k string);

statement ok
create table s(a string, b string, c string, d string, k string);

statement ok
insert into t(k) values('k');

statement ok
insert into s(k) values('k');


query II
merge into t using s on t.k = s.k when matched then update * when not matched then insert *;
----
0 1

query TTTTT
select * from t;
----
NULL NULL NULL NULL k

0 comments on commit 2ec12ac

Please sign in to comment.