Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Feb 5, 2025
1 parent 771325e commit f532f61
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 46 deletions.
19 changes: 5 additions & 14 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,21 +1188,12 @@ pub(super) async fn handle_create_table_plan(
.collect();

for col in &mut columns {
if col.is_generated() {
continue;
// Keep the default column alignd with external table.

Check warning on line 1191 in src/frontend/src/handler/create_table.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"alignd" should be "aligned".
// Note the generated columns have not been initialized yet. If a column is not found here, it should be a generated column.
if let Some(external_column_desc) = external_columns.get(col.name()) {
col.column_desc.generated_or_default_column =
external_column_desc.generated_or_default_column.clone();
}
let external_column_desc =
*external_columns.get(col.name()).ok_or_else(|| {
ErrorCode::ConnectorError(
format!(
"Column '{}' not found in the upstream database",
col.name()
)
.into(),
)
})?;
col.column_desc.generated_or_default_column =
external_column_desc.generated_or_default_column.clone();
}
(columns, pk_names)
}
Expand Down
88 changes: 56 additions & 32 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use risingwave_pb::ddl_service::{
};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::stream_node::{self, NodeBody};
use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate;
use risingwave_pb::stream_plan::{
Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph,
Expand Down Expand Up @@ -656,46 +656,70 @@ impl DdlController {
table_fragments.fragments
)
})?;

assert_eq!(
stream_scan_fragment.actors.len(),
1,
"Stream scan fragment should have only one actor"
);
for actor in &stream_scan_fragment.actors {
if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) =
actor.nodes.as_ref().unwrap().node_body
&& let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
{
let options_with_secret = WithOptionsSecResolved::new(
cdc_table_desc.connect_properties.clone(),
cdc_table_desc.secret_refs.clone(),
);
let mut props = ConnectorProperties::extract(options_with_secret, true)?;
props.init_from_pb_cdc_table_desc(cdc_table_desc);
// try creating a split enumerator to validate
let _enumerator = props
.create_split_enumerator(SourceEnumeratorContext::dummy().into())
.await?;
tracing::debug!(?table.id, "validate cdc table success");
} else {
for input in &actor.nodes.as_ref().unwrap().input {
if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = input.node_body
&& let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
let mut found_cdc_scan = false;
match &actor.nodes.as_ref().unwrap().node_body {
Some(NodeBody::StreamCdcScan(_)) => {
if Self::validate_cdc_table_inner(
&actor.nodes.as_ref().unwrap().node_body,
table.id,
)
.await?
{
let options_with_secret = WithOptionsSecResolved::new(
cdc_table_desc.connect_properties.clone(),
cdc_table_desc.secret_refs.clone(),
);
let mut props = ConnectorProperties::extract(options_with_secret, true)?;
props.init_from_pb_cdc_table_desc(cdc_table_desc);
// try creating a split enumerator to validate
let _enumerator = props
.create_split_enumerator(SourceEnumeratorContext::dummy().into())
.await?;
tracing::debug!(?table.id, "validate cdc table success");
found_cdc_scan = true;
}
}
// When there's generated columns, the cdc scan node is wrapped in a project node
Some(NodeBody::Project(_)) => {
for input in &actor.nodes.as_ref().unwrap().input {
if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
found_cdc_scan = true;
}
}
}
_ => {
bail!("Unexpected node body for stream cdc scan");
}
};
if !found_cdc_scan {
bail!("No stream cdc scan node found in stream scan fragment");
}
}
Ok(())
}

async fn validate_cdc_table_inner(
node_body: &Option<stream_node::NodeBody>,
table_id: u32,
) -> MetaResult<bool> {
if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = node_body
&& let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
{
let options_with_secret = WithOptionsSecResolved::new(
cdc_table_desc.connect_properties.clone(),
cdc_table_desc.secret_refs.clone(),
);

let mut props = ConnectorProperties::extract(options_with_secret, true)?;
props.init_from_pb_cdc_table_desc(cdc_table_desc);

// Try creating a split enumerator to validate
let _enumerator = props
.create_split_enumerator(SourceEnumeratorContext::dummy().into())
.await?;

tracing::debug!(?table_id, "validate cdc table success");
Ok(true)
} else {
Ok(false)
}
}

// Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
// The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
// Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
Expand Down

0 comments on commit f532f61

Please sign in to comment.