diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index e3d530c427563..61603a7b1e1e7 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1188,21 +1188,12 @@ pub(super) async fn handle_create_table_plan( .collect(); for col in &mut columns { - if col.is_generated() { - continue; + // Keep the default column alignd with external table. + // Note the generated columns have not been initialized yet. If a column is not found here, it should be a generated column. + if let Some(external_column_desc) = external_columns.get(col.name()) { + col.column_desc.generated_or_default_column = + external_column_desc.generated_or_default_column.clone(); } - let external_column_desc = - *external_columns.get(col.name()).ok_or_else(|| { - ErrorCode::ConnectorError( - format!( - "Column '{}' not found in the upstream database", - col.name() - ) - .into(), - ) - })?; - col.column_desc.generated_or_default_column = - external_column_desc.generated_or_default_column.clone(); } (columns, pk_names) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 6e9707fef8fff..5593dee9e4fb3 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -49,7 +49,7 @@ use risingwave_pb::ddl_service::{ }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; -use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::stream_node::{self, NodeBody}; use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph, @@ -656,46 +656,70 @@ impl DdlController { table_fragments.fragments ) })?; - + assert_eq!( + stream_scan_fragment.actors.len(), + 1, + "Stream scan fragment should have only one actor" + ); for actor in &stream_scan_fragment.actors { - if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = - actor.nodes.as_ref().unwrap().node_body - && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc - { - let options_with_secret = WithOptionsSecResolved::new( - cdc_table_desc.connect_properties.clone(), - cdc_table_desc.secret_refs.clone(), - ); - let mut props = ConnectorProperties::extract(options_with_secret, true)?; - props.init_from_pb_cdc_table_desc(cdc_table_desc); - // try creating a split enumerator to validate - let _enumerator = props - .create_split_enumerator(SourceEnumeratorContext::dummy().into()) - .await?; - tracing::debug!(?table.id, "validate cdc table success"); - } else { - for input in &actor.nodes.as_ref().unwrap().input { - if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = input.node_body - && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc + let mut found_cdc_scan = false; + match &actor.nodes.as_ref().unwrap().node_body { + Some(NodeBody::StreamCdcScan(_)) => { + if Self::validate_cdc_table_inner( + &actor.nodes.as_ref().unwrap().node_body, + table.id, + ) + .await? { - let options_with_secret = WithOptionsSecResolved::new( - cdc_table_desc.connect_properties.clone(), - cdc_table_desc.secret_refs.clone(), - ); - let mut props = ConnectorProperties::extract(options_with_secret, true)?; - props.init_from_pb_cdc_table_desc(cdc_table_desc); - // try creating a split enumerator to validate - let _enumerator = props - .create_split_enumerator(SourceEnumeratorContext::dummy().into()) - .await?; - tracing::debug!(?table.id, "validate cdc table success"); + found_cdc_scan = true; + } + } + // When there's generated columns, the cdc scan node is wrapped in a project node + Some(NodeBody::Project(_)) => { + for input in &actor.nodes.as_ref().unwrap().input { + if Self::validate_cdc_table_inner(&input.node_body, table.id).await? { + found_cdc_scan = true; + } } } + _ => { + bail!("Unexpected node body for stream cdc scan"); + } + }; + if !found_cdc_scan { + bail!("No stream cdc scan node found in stream scan fragment"); } } Ok(()) } + async fn validate_cdc_table_inner( + node_body: &Option, + table_id: u32, + ) -> MetaResult { + if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = node_body + && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc + { + let options_with_secret = WithOptionsSecResolved::new( + cdc_table_desc.connect_properties.clone(), + cdc_table_desc.secret_refs.clone(), + ); + + let mut props = ConnectorProperties::extract(options_with_secret, true)?; + props.init_from_pb_cdc_table_desc(cdc_table_desc); + + // Try creating a split enumerator to validate + let _enumerator = props + .create_split_enumerator(SourceEnumeratorContext::dummy().into()) + .await?; + + tracing::debug!(?table_id, "validate cdc table success"); + Ok(true) + } else { + Ok(false) + } + } + // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream. // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function. // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.