Skip to content

Commit

Permalink
fix(cdc): fix wrong default column matching (#20348)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Feb 5, 2025
1 parent 51a8b08 commit d1eccfb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
29 changes: 29 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ CREATE TABLE test_my_default_value (
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
CREATE TABLE test_my_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -200,6 +207,13 @@ select * from test_my_default_value;
----
2 jack Shanghai

statement ok
INSERT INTO test_my_default_value_disorderd (id) VALUES (2);

query II
select * from test_my_default_value_disorderd;
----
Shanghai 2

statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';
Expand Down Expand Up @@ -650,6 +664,13 @@ CREATE TABLE test_pg_default_value (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
CREATE TABLE test_pg_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -661,6 +682,14 @@ select * from test_pg_default_value;
----
1 noris Shanghai

statement ok
INSERT INTO test_pg_default_value_disorderd (id) VALUES (1);

query II
select * from test_pg_default_value_disorderd;
----
Shanghai 1

### BEGIN reset the password to the original one
onlyif can-use-recover
statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'mydb.dbo.orders';

# column name mismatch
statement error INVALID_ARGUMENT: Column 'wrong_order_date' not found in the upstream database
statement error Column 'wrong_order_date' not found in the upstream database
CREATE TABLE shared_orders (
order_id INT,
wrong_order_date BIGINT,
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_common::license::Feature;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -1182,20 +1181,25 @@ pub(super) async fn handle_create_table_plan(
let table: ExternalTableImpl = ExternalTableImpl::connect(config)
.await
.context("failed to auto derive table schema")?;
let external_columns: Vec<_> = table
let external_columns: HashMap<&str, &ColumnDesc> = table
.column_descs()
.iter()
.cloned()
.map(|column_desc| ColumnCatalog {
column_desc,
is_hidden: false,
})
.map(|column_desc| (column_desc.name.as_str(), column_desc))
.collect();
for (col, external_col) in
columns.iter_mut().zip_eq_fast(external_columns.into_iter())
{

for col in &mut columns {
let external_column_desc =
*external_columns.get(col.name()).ok_or_else(|| {
ErrorCode::ConnectorError(
format!(
"Column '{}' not found in the upstream database",
col.name()
)
.into(),
)
})?;
col.column_desc.generated_or_default_column =
external_col.column_desc.generated_or_default_column;
external_column_desc.generated_or_default_column.clone();
}
(columns, pk_names)
}
Expand Down

0 comments on commit d1eccfb

Please sign in to comment.